diff options
Diffstat (limited to 'src/importer.rs')
-rw-r--r-- | src/importer.rs | 84 |
1 files changed, 67 insertions, 17 deletions
diff --git a/src/importer.rs b/src/importer.rs index 5f5790a..b1c1b18 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -9,7 +9,9 @@ use std::thread; use std::time::Duration; use std::time::Instant; use threadpool::ThreadPool; +use serde_json::to_value; +use crate::diesel::Connection; use crate::db; use crate::error::Error; use crate::models; @@ -21,6 +23,8 @@ pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Command { ImportStravaUser { username: String }, + ProcessAllRawData, + ProcessRawData(models::RawDataKey), } pub struct ImporterSharedData<StravaApi: strava::StravaApi + 'static> { @@ -53,39 +57,46 @@ fn run_periodically<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>, pe fn handle_one_task<S: strava::StravaApi>( shared: Arc<ImporterSharedData<S>>, -) -> Result<models::Task, Error> { - let task = { - let conn = shared.conn.lock().unwrap(); - let now = Utc::now(); - let eta = now + chrono::Duration::seconds(5); - - db::take_task(&conn, models::TaskState::NEW, now, eta)? - }; - + task: models::Task +) -> Result<(), Error> { let command = serde_json::from_value(task.payload.clone())?; match command { Command::ImportStravaUser { username } => import_strava_user(shared, username.as_str())?, + Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?, + Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?, } - - Ok(task) + + Ok(()) } fn handle_tasks<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>) { let mut done = false; while !done { - match handle_one_task(shared.clone()) { + let task = (|| { + let conn = shared.conn.lock().unwrap(); + let now = Utc::now(); + let eta = now + chrono::Duration::seconds(5); + + db::take_task(&conn, models::TaskState::NEW, now, eta) + })(); + + match task { Err(Error::NotFound) => { info!("No more tasks"); done = true; - } + }, Err(e) => { - error!("Error handling task: {}", e); - } + error!("Failed to get task: {:?}", e); + done = true; + }, Ok(t) => { - info!("Successfully handled task: {:?}", t); + match handle_one_task(shared.clone(), t) { + Err(e) => error!("Task failed: {:?}", e), + _ => () + } } - }; + } } } @@ -119,6 +130,45 @@ impl<StravaApi: strava::StravaApi> Importer<StravaApi> { } } +fn process_raw_data<S: strava::StravaApi>( + shared: Arc<ImporterSharedData<S>>, + key: &models::RawDataKey, + mut task: models::Task) -> Result<(), Error> { + + let data = db::get_raw_data(&shared.conn.lock().unwrap(), key)?; + println!("Process raw data: {:#?}", data); + + unimplemented!(); +} + +fn process_all_raw_data<S: strava::StravaApi>( + shared: Arc<ImporterSharedData<S>>, + mut task: models::Task) -> Result<(), Error> { + let now = Utc::now(); + let conn = &shared.conn.lock().unwrap(); + + let mut tasks = 0; + conn.transaction::<(), Error, _>(|| { + let keys = db::get_raw_data_keys(conn)?; + tasks = keys.len(); + for key in keys { + db::insert_task( + conn, + &models::NewTask { + start_at: now, + state: models::TaskState::NEW, + username: key.username.as_str(), + payload: &to_value(Command::ProcessRawData(key.clone()))?, + })?; + } + task.state = models::TaskState::SUCCESSFUL; + db::update_task(conn, task)?; + Ok(()) + })?; + info!("process_all_raw_data: Added {} tasks", tasks); + Ok(()) +} + fn import_strava_user<S: strava::StravaApi>( shared: Arc<ImporterSharedData<S>>, username: &str, |