diff options
Diffstat (limited to 'src/importer.rs')
-rw-r--r-- | src/importer.rs | 129 |
1 files changed, 75 insertions, 54 deletions
diff --git a/src/importer.rs b/src/importer.rs index c831ca9..f039d03 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -2,6 +2,9 @@ use chrono::Utc; use diesel::PgConnection; use serde::Deserialize; use serde::Serialize; +use serde_json::to_value; +use serde_json::Value; +use std::collections::HashMap; use std::sync::Arc; use std::sync::Mutex; use std::sync::RwLock; @@ -9,13 +12,12 @@ use std::thread; use std::time::Duration; use std::time::Instant; use threadpool::ThreadPool; -use serde_json::to_value; -use serde_json::Value; -use crate::diesel::Connection; use crate::db; +use crate::diesel::Connection; use crate::error::Error; use crate::models; +use crate::schema; use crate::strava; pub const WORKERS: usize = 10; @@ -58,7 +60,7 @@ fn run_periodically<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>, pe fn handle_one_task<S: strava::StravaApi>( shared: Arc<ImporterSharedData<S>>, - task: models::Task + task: models::Task, ) -> Result<(), Error> { let command = serde_json::from_value(task.payload.clone())?; @@ -67,7 +69,7 @@ fn handle_one_task<S: strava::StravaApi>( Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?, Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?, } - + Ok(()) } @@ -86,18 +88,18 @@ fn handle_tasks<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>) { Err(Error::NotFound) => { info!("No more tasks"); done = true; - }, + } Err(e) => { error!("Failed to get task: {:?}", e); done = true; - }, - Ok(t) => { - match handle_one_task(shared.clone(), t) { - Err(e) => error!("Task failed: {:?}", e), - _ => () - } } + Ok(t) => match handle_one_task(shared.clone(), t) { + Err(e) => error!("Task failed: {:?}", e), + _ => (), + }, } + + thread::sleep(Duration::from_secs(10)); } } @@ -135,13 +137,15 @@ fn to_run(data: &Value) -> Result<Value, Error> { info!("to_run"); macro_rules! get { ($id:ident str $e:expr) => { - $id[$e].as_str().ok_or_else( - || Error::UnexpectedJson($id.clone())) + $id[$e] + .as_str() + .ok_or_else(|| Error::UnexpectedJson($id.clone())) }; ($id:ident f64 $e:expr) => { - $id[$e].as_f64().ok_or_else( - || Error::UnexpectedJson($id.clone())) + $id[$e] + .as_f64() + .ok_or_else(|| Error::UnexpectedJson($id.clone())) }; }; @@ -154,46 +158,65 @@ fn to_run(data: &Value) -> Result<Value, Error> { fn process_strava_activity<S: strava::StravaApi>( shared: Arc<ImporterSharedData<S>>, - data: models::RawData) -> Result<Value, Error> { - + data: models::RawData, + mut task: models::Task, +) -> Result<(), Error> { let json_error = || Error::UnexpectedJson(data.payload.clone()); - let dtype = data.payload["type"].as_str().ok_or_else(json_error)?; + let strava_type = data.payload["type"].as_str().ok_or_else(json_error)?; - let payload = match dtype { - "Run" => { - to_run(&data.payload)? - }, - &_ => Err(Error::InternalError)?, - }; + let entry_type = match strava_type { + "Run" => Ok("run".to_string()), + &_ => Err(Error::NotFound), + }?; - Ok(payload) + let conn = &shared.conn.lock().unwrap(); + conn.transaction::<(), Error, _>(|| { + let entry = models::NewEntry { + username: data.username.as_str(), + entry_type: entry_type.as_str(), + timestamp: None, + payload: json!({}), + }; + + let id = insert!(entries <= conn, &entry)?; + + let entry_data = models::EntryData { + username: entry.username.to_string(), + entry_type: entry.entry_type.to_string(), + entry_id: id, + data_type: data.data_type, + data_id: data.id, + }; + insert!(entry_data <= conn, &entry_data)?; + + task.state = models::TaskState::SUCCESSFUL; + db::update_task(conn, task)?; + Ok(()) + })?; + + Ok(()) } fn process_raw_data<S: strava::StravaApi>( shared: Arc<ImporterSharedData<S>>, key: &models::RawDataKey, - mut task: models::Task) -> Result<(), Error> { - + mut task: models::Task, +) -> Result<(), Error> { let data = db::get_raw_data(&shared.conn.lock().unwrap(), key)?; println!("Process raw data: {:#?}", data); - let payload = match data.data_type { - models::DataType::StravaActivity => { - process_strava_activity(shared.clone(), data)? - } + match data.data_type { + models::DataType::StravaActivity => process_strava_activity(shared.clone(), data, task)?, }; - info!("Process finished. Payload: {:#?}", payload); - unimplemented!(); - - task.state = models::TaskState::SUCCESSFUL; - db::update_task(&shared.conn.lock().unwrap(), task)?; + info!("Process finished."); Ok(()) } fn process_all_raw_data<S: strava::StravaApi>( shared: Arc<ImporterSharedData<S>>, - mut task: models::Task) -> Result<(), Error> { + mut task: models::Task, +) -> Result<(), Error> { let now = Utc::now(); let conn = &shared.conn.lock().unwrap(); @@ -209,7 +232,8 @@ fn process_all_raw_data<S: strava::StravaApi>( state: models::TaskState::NEW, username: key.username.as_str(), payload: &to_value(Command::ProcessRawData(key.clone()))?, - })?; + }, + )?; } task.state = models::TaskState::SUCCESSFUL; db::update_task(conn, task)?; @@ -241,24 +265,21 @@ fn import_strava_user<S: strava::StravaApi>( let result = strava.get("/athlete/activities", &token.access_token, ¶ms[..])?; let json_error = || Error::UnexpectedJson(result.clone()); - let result = result - .as_array() - .ok_or_else(json_error)?; + let result = result.as_array().ok_or_else(json_error)?; for activity in result { let id = activity["id"].as_i64().ok_or_else(json_error)?; - info!( - "activity id: {} start: {}", - id, activity["start_date"] - ); - - db::insert_data(&shared.conn.lock().unwrap(), - &models::RawData { - data_type: models::DataType::StravaActivity, - id: id, - username: username.to_string(), - payload: activity.clone(), - })?; + info!("activity id: {} start: {}", id, activity["start_date"]); + + db::insert_data( + &shared.conn.lock().unwrap(), + &models::RawData { + data_type: models::DataType::StravaActivity, + id: id, + username: username.to_string(), + payload: activity.clone(), + }, + )?; } if result.len() < per_page { |