From 02eca80d8e357e80585f766b873ef3efceabfe42 Mon Sep 17 00:00:00 2001 From: Kjetil Orbekk Date: Tue, 11 Feb 2020 20:32:29 -0500 Subject: Import detailed activities from Strava --- src/db.rs | 30 ++++++++++++ src/importer.rs | 132 ++++++++++++++++++++++++++++++++++++++++++-------- src/models.rs | 2 +- src/template.rs | 8 +++ templates/profile.hbs | 3 -- 5 files changed, 150 insertions(+), 25 deletions(-) diff --git a/src/db.rs b/src/db.rs index 5b165c9..7e271bc 100644 --- a/src/db.rs +++ b/src/db.rs @@ -137,6 +137,16 @@ pub fn get_strava_token( Ok(token) } +pub fn update_strava_token( + conn: &PgConnection, + token: &models::StravaToken) -> Result<(), Error> { + use crate::schema::strava_tokens; + diesel::update(strava_tokens::table).set(token) + .execute(conn)?; + Ok(()) +} + + pub fn insert_task(conn: &PgConnection, task: &models::NewTask) -> Result { use crate::schema::tasks; let id = diesel::insert_into(tasks::table) @@ -192,6 +202,25 @@ pub fn take_task( }) } +pub fn find_missing_data(conn: &PgConnection, username: &str, data_type: models::DataType, ids: &[i64]) + -> Result, Error> { + use diesel::pg::expression::dsl::any; + use crate::schema::raw_data; + use std::collections::HashSet; + + let present: HashSet = raw_data::table + .select(raw_data::id) + .filter(raw_data::username.eq(username) + .and(raw_data::data_type.eq(data_type)) + .and(raw_data::id.eq(any(ids)))) + .get_results::(conn)? + .into_iter().collect(); + + let ids: HashSet = ids.iter().map(|v| *v).collect(); + let missing = ids.difference(&present); + Ok(missing.map(|v| *v).collect()) +} + pub fn insert_data(conn: &PgConnection, data: &models::RawData) -> Result { use crate::schema::raw_data; let rows = diesel::insert_into(raw_data::table) @@ -253,6 +282,7 @@ pub fn get_entries( .eq(username) .and(entries::entry_type.eq(entry_type)), ) + .order(entries::timestamp.desc()) .get_results::(conn)?; Ok(r) } diff --git a/src/importer.rs b/src/importer.rs index 799b6bb..644f3b0 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -12,6 +12,7 @@ use std::thread; use std::time::Duration; use std::time::Instant; use threadpool::ThreadPool; +use std::collections::HashSet; use crate::db; use crate::diesel::Connection; @@ -26,6 +27,7 @@ pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Command { ImportStravaUser { username: String }, + ImportStravaActivity { username: String, id: i64, }, ProcessAllRawData, ProcessRawData(models::RawDataKey), } @@ -65,7 +67,8 @@ fn handle_one_task( let command = serde_json::from_value(task.payload.clone())?; match command { - Command::ImportStravaUser { username } => import_strava_user(shared, username.as_str())?, + Command::ImportStravaUser { username } => import_strava_user(shared, task, username.as_str())?, + Command::ImportStravaActivity { username, id } => import_strava_activity(shared, task, username.as_str(), id)?, Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?, Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?, } @@ -158,6 +161,7 @@ fn to_run(data: &Value) -> Result { "moving_time": data["moving_time"], "elapsed_time": data["elapsed_time"], "name": data["name"], + "description": data["description"], })) } @@ -167,7 +171,10 @@ fn process_strava_activity( mut task: models::Task, ) -> Result<(), Error> { if data.entry_type.is_some() && data.entry_id.is_some() { - return Err(Error::InternalError); + task.state = models::TaskState::SUCCESSFUL; + let conn = &shared.conn.lock().unwrap(); + db::update_task(conn, task)?; + return Ok(()) } let json_error = || Error::UnexpectedJson(data.payload.clone()); @@ -175,7 +182,13 @@ fn process_strava_activity( let entry_payload = match strava_type { "Run" => to_run(&data.payload), - &_ => Err(Error::NotFound), + t => { + info!("Ignoring strava activity with type {:?}", t); + task.state = models::TaskState::SUCCESSFUL; + let conn = &shared.conn.lock().unwrap(); + db::update_task(conn, task)?; + return Ok(()) + } }?; let entry_type = entry_payload["type"] .as_str() @@ -220,7 +233,7 @@ fn process_raw_data( models::DataType::StravaActivity => process_strava_activity(shared.clone(), data, task)?, }; - info!("Process finished."); + info!("Finished processing {:?}", key); Ok(()) } @@ -254,8 +267,58 @@ fn process_all_raw_data( Ok(()) } +fn import_strava_activity( + shared: Arc>, + mut task: models::Task, + username: &str, + id: i64 + ) -> Result<(), Error> { + let strava = shared.strava.read().unwrap(); + let user = db::get_user(&shared.conn.lock().unwrap(), username)?; + + let token = { + let conn = shared.conn.lock().unwrap(); + get_or_refresh_token(&*strava, &conn, &user)? + }; + + let activity = strava.get(&format!("/activities/{}", id), &token.access_token, &[])?; + + let now = Utc::now(); + let conn = &shared.conn.lock().unwrap(); + conn.transaction::<(), Error, _>(|| { + db::insert_data( + conn, + &models::RawData { + data_type: models::DataType::StravaActivity, + id: id, + username: username.to_string(), + payload: activity, + entry_type: None, + entry_id: None, + })?; + db::insert_task( + conn, + &models::NewTask { + start_at: now, + state: models::TaskState::NEW, + username: username, + payload: &to_value( + Command::ProcessRawData(models::RawDataKey { + data_type: models::DataType::StravaActivity, + id: id, + username: username.to_string(), + }))? + })?; + task.state = models::TaskState::SUCCESSFUL; + db::update_task(conn, task)?; + Ok(()) + })?; + Ok(()) +} + fn import_strava_user( shared: Arc>, + mut task: models::Task, username: &str, ) -> Result<(), Error> { let strava = shared.strava.read().unwrap(); @@ -266,6 +329,7 @@ fn import_strava_user( get_or_refresh_token(&*strava, &conn, &user)? }; + let mut missing_ids = HashSet::new(); let per_page = 30; for page in 1.. { let params = [ @@ -274,25 +338,27 @@ fn import_strava_user( ]; let result = strava.get("/athlete/activities", &token.access_token, ¶ms[..])?; + if page == 1 { + info!("{:#?}", result); + } let json_error = || Error::UnexpectedJson(result.clone()); let result = result.as_array().ok_or_else(json_error)?; - for activity in result { - let id = activity["id"].as_i64().ok_or_else(json_error)?; - info!("activity id: {} start: {}", id, activity["start_date"]); - - db::insert_data( - &shared.conn.lock().unwrap(), - &models::RawData { - data_type: models::DataType::StravaActivity, - id: id, - username: username.to_string(), - payload: activity.clone(), - entry_type: None, - entry_id: None, - }, - )?; + let activity_ids = result + .iter() + .map(|a| a["id"].as_i64().ok_or_else(json_error)) + .collect::, _>>()?; + + let missing_on_page = db::find_missing_data( + &shared.conn.lock().unwrap(), + username, + models::DataType::StravaActivity, + &activity_ids)?; + missing_ids.extend(missing_on_page.iter()); + + if missing_on_page.len() < per_page / 3 { + break; } if result.len() < per_page { @@ -301,7 +367,30 @@ fn import_strava_user( thread::sleep(Duration::from_secs(1)); } - Err(Error::InternalError) + let conn = &shared.conn.lock().unwrap(); + let now = Utc::now(); + conn.transaction::<(), Error, _>(|| { + for &missing_id in &missing_ids { + db::insert_task( + conn, + &models::NewTask { + start_at: now, + state: models::TaskState::NEW, + username: username, + payload: &to_value( + Command::ImportStravaActivity { + username: username.to_string(), + id: missing_id, + })?, + })?; + } + task.state = models::TaskState::SUCCESSFUL; + db::update_task(conn, task)?; + info!("scheduled {} tasks for {}", missing_ids.len(), username); + Ok(()) + })?; + + Ok(()) } fn get_or_refresh_token( @@ -309,12 +398,13 @@ fn get_or_refresh_token( conn: &PgConnection, user: &models::User, ) -> Result { - let mut token = db::get_strava_token(&conn, &user).expect("FIX"); + let mut token = db::get_strava_token(&conn, &user)?; if token.expires_at < Utc::now() { info!("refresh expired token: {:?}", token.expires_at); let new_token = strava.refresh_token(&From::from(&token))?; new_token.update_model(&mut token); + db::update_strava_token(conn, &token)?; } Ok(token) diff --git a/src/models.rs b/src/models.rs index 83153fd..4a85369 100644 --- a/src/models.rs +++ b/src/models.rs @@ -100,7 +100,7 @@ impl fmt::Debug for User { } } -#[derive(Insertable, Queryable)] +#[derive(AsChangeset, Insertable, Queryable)] #[table_name = "strava_tokens"] pub struct StravaToken { pub username: String, diff --git a/src/template.rs b/src/template.rs index 0ac67fb..e50ad21 100644 --- a/src/template.rs +++ b/src/template.rs @@ -42,6 +42,14 @@ pub fn running_template() -> TemplateSpec { )], ), }, + // Column { + // display_name: "Title".to_string(), + // field: FieldSpec::Field("name".to_string()), + // }, + // Column { + // display_name: "Description".to_string(), + // field: FieldSpec::Field("description".to_string()), + // }, ]) } diff --git a/templates/profile.hbs b/templates/profile.hbs index cd69e61..f01e4fe 100644 --- a/templates/profile.hbs +++ b/templates/profile.hbs @@ -38,9 +38,6 @@ function toggleDetails(e) { {{#each columns ~}} {{ this }} {{/each ~}} - - Here are the details about this activity. - {{/each ~}} -- cgit v1.2.3