From 02eca80d8e357e80585f766b873ef3efceabfe42 Mon Sep 17 00:00:00 2001 From: Kjetil Orbekk Date: Tue, 11 Feb 2020 20:32:29 -0500 Subject: Import detailed activities from Strava --- src/importer.rs | 132 +++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 111 insertions(+), 21 deletions(-) (limited to 'src/importer.rs') diff --git a/src/importer.rs b/src/importer.rs index 799b6bb..644f3b0 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -12,6 +12,7 @@ use std::thread; use std::time::Duration; use std::time::Instant; use threadpool::ThreadPool; +use std::collections::HashSet; use crate::db; use crate::diesel::Connection; @@ -26,6 +27,7 @@ pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Command { ImportStravaUser { username: String }, + ImportStravaActivity { username: String, id: i64, }, ProcessAllRawData, ProcessRawData(models::RawDataKey), } @@ -65,7 +67,8 @@ fn handle_one_task( let command = serde_json::from_value(task.payload.clone())?; match command { - Command::ImportStravaUser { username } => import_strava_user(shared, username.as_str())?, + Command::ImportStravaUser { username } => import_strava_user(shared, task, username.as_str())?, + Command::ImportStravaActivity { username, id } => import_strava_activity(shared, task, username.as_str(), id)?, Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?, Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?, } @@ -158,6 +161,7 @@ fn to_run(data: &Value) -> Result { "moving_time": data["moving_time"], "elapsed_time": data["elapsed_time"], "name": data["name"], + "description": data["description"], })) } @@ -167,7 +171,10 @@ fn process_strava_activity( mut task: models::Task, ) -> Result<(), Error> { if data.entry_type.is_some() && data.entry_id.is_some() { - return Err(Error::InternalError); + task.state = models::TaskState::SUCCESSFUL; + let conn = &shared.conn.lock().unwrap(); + db::update_task(conn, task)?; + return Ok(()) } let json_error = || Error::UnexpectedJson(data.payload.clone()); @@ -175,7 +182,13 @@ fn process_strava_activity( let entry_payload = match strava_type { "Run" => to_run(&data.payload), - &_ => Err(Error::NotFound), + t => { + info!("Ignoring strava activity with type {:?}", t); + task.state = models::TaskState::SUCCESSFUL; + let conn = &shared.conn.lock().unwrap(); + db::update_task(conn, task)?; + return Ok(()) + } }?; let entry_type = entry_payload["type"] .as_str() @@ -220,7 +233,7 @@ fn process_raw_data( models::DataType::StravaActivity => process_strava_activity(shared.clone(), data, task)?, }; - info!("Process finished."); + info!("Finished processing {:?}", key); Ok(()) } @@ -254,8 +267,58 @@ fn process_all_raw_data( Ok(()) } +fn import_strava_activity( + shared: Arc>, + mut task: models::Task, + username: &str, + id: i64 + ) -> Result<(), Error> { + let strava = shared.strava.read().unwrap(); + let user = db::get_user(&shared.conn.lock().unwrap(), username)?; + + let token = { + let conn = shared.conn.lock().unwrap(); + get_or_refresh_token(&*strava, &conn, &user)? + }; + + let activity = strava.get(&format!("/activities/{}", id), &token.access_token, &[])?; + + let now = Utc::now(); + let conn = &shared.conn.lock().unwrap(); + conn.transaction::<(), Error, _>(|| { + db::insert_data( + conn, + &models::RawData { + data_type: models::DataType::StravaActivity, + id: id, + username: username.to_string(), + payload: activity, + entry_type: None, + entry_id: None, + })?; + db::insert_task( + conn, + &models::NewTask { + start_at: now, + state: models::TaskState::NEW, + username: username, + payload: &to_value( + Command::ProcessRawData(models::RawDataKey { + data_type: models::DataType::StravaActivity, + id: id, + username: username.to_string(), + }))? + })?; + task.state = models::TaskState::SUCCESSFUL; + db::update_task(conn, task)?; + Ok(()) + })?; + Ok(()) +} + fn import_strava_user( shared: Arc>, + mut task: models::Task, username: &str, ) -> Result<(), Error> { let strava = shared.strava.read().unwrap(); @@ -266,6 +329,7 @@ fn import_strava_user( get_or_refresh_token(&*strava, &conn, &user)? }; + let mut missing_ids = HashSet::new(); let per_page = 30; for page in 1.. { let params = [ @@ -274,25 +338,27 @@ fn import_strava_user( ]; let result = strava.get("/athlete/activities", &token.access_token, ¶ms[..])?; + if page == 1 { + info!("{:#?}", result); + } let json_error = || Error::UnexpectedJson(result.clone()); let result = result.as_array().ok_or_else(json_error)?; - for activity in result { - let id = activity["id"].as_i64().ok_or_else(json_error)?; - info!("activity id: {} start: {}", id, activity["start_date"]); - - db::insert_data( - &shared.conn.lock().unwrap(), - &models::RawData { - data_type: models::DataType::StravaActivity, - id: id, - username: username.to_string(), - payload: activity.clone(), - entry_type: None, - entry_id: None, - }, - )?; + let activity_ids = result + .iter() + .map(|a| a["id"].as_i64().ok_or_else(json_error)) + .collect::, _>>()?; + + let missing_on_page = db::find_missing_data( + &shared.conn.lock().unwrap(), + username, + models::DataType::StravaActivity, + &activity_ids)?; + missing_ids.extend(missing_on_page.iter()); + + if missing_on_page.len() < per_page / 3 { + break; } if result.len() < per_page { @@ -301,7 +367,30 @@ fn import_strava_user( thread::sleep(Duration::from_secs(1)); } - Err(Error::InternalError) + let conn = &shared.conn.lock().unwrap(); + let now = Utc::now(); + conn.transaction::<(), Error, _>(|| { + for &missing_id in &missing_ids { + db::insert_task( + conn, + &models::NewTask { + start_at: now, + state: models::TaskState::NEW, + username: username, + payload: &to_value( + Command::ImportStravaActivity { + username: username.to_string(), + id: missing_id, + })?, + })?; + } + task.state = models::TaskState::SUCCESSFUL; + db::update_task(conn, task)?; + info!("scheduled {} tasks for {}", missing_ids.len(), username); + Ok(()) + })?; + + Ok(()) } fn get_or_refresh_token( @@ -309,12 +398,13 @@ fn get_or_refresh_token( conn: &PgConnection, user: &models::User, ) -> Result { - let mut token = db::get_strava_token(&conn, &user).expect("FIX"); + let mut token = db::get_strava_token(&conn, &user)?; if token.expires_at < Utc::now() { info!("refresh expired token: {:?}", token.expires_at); let new_token = strava.refresh_token(&From::from(&token))?; new_token.update_model(&mut token); + db::update_strava_token(conn, &token)?; } Ok(token) -- cgit v1.2.3