use chrono::DateTime; use chrono::Utc; use diesel::PgConnection; use serde::Deserialize; use serde::Serialize; use serde_json::to_value; use serde_json::Value; use std::sync::Arc; use std::sync::Mutex; use std::sync::RwLock; use std::thread; use std::time::Duration; use std::time::Instant; use threadpool::ThreadPool; use std::collections::HashSet; use crate::db; use crate::diesel::Connection; use crate::error::Error; use crate::models; use crate::schema; use crate::strava; pub const WORKERS: usize = 10; pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Command { ImportStravaUser { username: String }, ImportStravaActivity { username: String, id: i64, }, ProcessAllRawData, ProcessRawData(models::RawDataKey), } pub struct ImporterSharedData { strava: RwLock, pool: Mutex, conn: Mutex, running: Mutex, } pub struct Importer { shared: Arc>, } fn run_periodically(shared: Arc>, period: Duration) { let sleep_time = Duration::from_millis(1000); let mut now = Instant::now(); loop { while now.elapsed() < period { if !*shared.running.lock().unwrap() { return; } thread::sleep(sleep_time); } now = Instant::now(); info!("run_periodically: wakeup"); handle_tasks(shared.clone()) } } fn handle_one_task( shared: Arc>, task: models::Task, ) -> Result<(), Error> { let command = serde_json::from_value(task.payload.clone())?; match command { Command::ImportStravaUser { username } => import_strava_user(shared, task, username.as_str())?, Command::ImportStravaActivity { username, id } => import_strava_activity(shared, task, username.as_str(), id)?, Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?, Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?, } Ok(()) } fn handle_tasks(shared: Arc>) { let mut done = false; while !done { let task = (|| { let conn = shared.conn.lock().unwrap(); let now = Utc::now(); let eta = now + chrono::Duration::minutes(5); db::take_task(&conn, models::TaskState::NEW, now, eta) })(); match task { Err(Error::NotFound) => { info!("No more tasks"); done = true; } Err(e) => { error!("Failed to get task: {:?}", e); done = true; } Ok(t) => match handle_one_task(shared.clone(), t) { Err(e) => error!("Task failed: {:?}", e), _ => (), }, } thread::sleep(Duration::from_millis(10)); } } impl Importer { pub fn new(conn: PgConnection, strava: StravaApi) -> Importer { let shared = Arc::new(ImporterSharedData { pool: Mutex::new(ThreadPool::with_name("importer".to_string(), WORKERS)), conn: Mutex::new(conn), strava: RwLock::new(strava), running: Mutex::new(false), }); Importer { shared: shared } } pub fn run(&self) { info!("run()"); let pool = self.shared.pool.lock().unwrap(); let mut running = self.shared.running.lock().unwrap(); if !*running { *running = true; pool.execute({ let shared = self.shared.clone(); move || run_periodically(shared, Duration::from_secs(5)) }); } } pub fn join(&self) { *self.shared.running.lock().unwrap() = false; self.shared.pool.lock().unwrap().join(); } } fn to_run(data: &Value) -> Result { info!("to_run"); #[allow(unused_macros)] macro_rules! get { ($id:ident str $e:expr) => { $id[$e] .as_str() .ok_or_else(|| Error::UnexpectedJson($id.clone())) }; ($id:ident f64 $e:expr) => { $id[$e] .as_f64() .ok_or_else(|| Error::UnexpectedJson($id.clone())) }; }; Ok(json!({ "type": "run", "start_timestamp": data["start_date"], "distance": data["distance"], "moving_time": data["moving_time"], "elapsed_time": data["elapsed_time"], "name": data["name"], "description": data["description"], })) } fn process_strava_activity( shared: Arc>, data: models::RawData, mut task: models::Task, ) -> Result<(), Error> { if data.entry_type.is_some() && data.entry_id.is_some() { task.state = models::TaskState::SUCCESSFUL; let conn = &shared.conn.lock().unwrap(); db::update_task(conn, task)?; return Ok(()) } let json_error = || Error::UnexpectedJson(data.payload.clone()); let strava_type = data.payload["type"].as_str().ok_or_else(json_error)?; let entry_payload = match strava_type { "Run" => to_run(&data.payload), t => { info!("Ignoring strava activity with type {:?}", t); task.state = models::TaskState::SUCCESSFUL; let conn = &shared.conn.lock().unwrap(); db::update_task(conn, task)?; return Ok(()) } }?; let entry_type = entry_payload["type"] .as_str() .ok_or_else(json_error)? .to_string(); let timestamp = entry_payload["start_timestamp"] .as_str() .map(|t| DateTime::parse_from_rfc3339(t).map(|t| t.with_timezone(&Utc))) .transpose()?; let conn = &shared.conn.lock().unwrap(); conn.transaction::<(), Error, _>(|| { let (entry_type, id) = { let entry = models::NewEntry { username: data.username.as_str(), entry_type: entry_type.as_str(), timestamp: timestamp, payload: entry_payload, }; info!("Inserting entry: {:#?}", entry); let id = insert!(entries <= conn, &entry)?; (entry.entry_type.to_string(), id) }; db::link_data(conn, data, &entry_type, id)?; task.state = models::TaskState::SUCCESSFUL; db::update_task(conn, task)?; Ok(()) })?; Ok(()) } fn process_raw_data( shared: Arc>, key: &models::RawDataKey, task: models::Task, ) -> Result<(), Error> { let data = db::get_raw_data(&shared.conn.lock().unwrap(), key)?; match data.data_type { models::DataType::StravaActivity => process_strava_activity(shared.clone(), data, task)?, }; info!("Finished processing {:?}", key); Ok(()) } fn process_all_raw_data( shared: Arc>, mut task: models::Task, ) -> Result<(), Error> { let now = Utc::now(); let conn = &shared.conn.lock().unwrap(); let mut tasks = 0; conn.transaction::<(), Error, _>(|| { let keys = db::get_raw_data_keys(conn)?; tasks = keys.len(); for key in keys { db::insert_task( conn, &models::NewTask { start_at: now, state: models::TaskState::NEW, username: key.username.as_str(), payload: &to_value(Command::ProcessRawData(key.clone()))?, }, )?; } task.state = models::TaskState::SUCCESSFUL; db::update_task(conn, task)?; Ok(()) })?; info!("process_all_raw_data: Added {} tasks", tasks); Ok(()) } fn import_strava_activity( shared: Arc>, mut task: models::Task, username: &str, id: i64 ) -> Result<(), Error> { let strava = shared.strava.read().unwrap(); let user = db::get_user(&shared.conn.lock().unwrap(), username)?; let token = { let conn = shared.conn.lock().unwrap(); get_or_refresh_token(&*strava, &conn, &user)? }; let activity = strava.get(&format!("/activities/{}", id), &token.access_token, &[])?; let now = Utc::now(); let conn = &shared.conn.lock().unwrap(); conn.transaction::<(), Error, _>(|| { db::insert_data( conn, &models::RawData { data_type: models::DataType::StravaActivity, id: id, username: username.to_string(), payload: activity, entry_type: None, entry_id: None, })?; db::insert_task( conn, &models::NewTask { start_at: now, state: models::TaskState::NEW, username: username, payload: &to_value( Command::ProcessRawData(models::RawDataKey { data_type: models::DataType::StravaActivity, id: id, username: username.to_string(), }))? })?; task.state = models::TaskState::SUCCESSFUL; db::update_task(conn, task)?; Ok(()) })?; Ok(()) } fn import_strava_user( shared: Arc>, mut task: models::Task, username: &str, ) -> Result<(), Error> { let strava = shared.strava.read().unwrap(); let user = db::get_user(&shared.conn.lock().unwrap(), username)?; let token = { let conn = shared.conn.lock().unwrap(); get_or_refresh_token(&*strava, &conn, &user)? }; let mut missing_ids = HashSet::new(); let per_page = 30; for page in 1.. { let params = [ ("page", &format!("{}", page)[..]), ("per_page", &format!("{}", per_page)[..]), ]; let result = strava.get("/athlete/activities", &token.access_token, ¶ms[..])?; if page == 1 { info!("{:#?}", result); } let json_error = || Error::UnexpectedJson(result.clone()); let result = result.as_array().ok_or_else(json_error)?; let activity_ids = result .iter() .map(|a| a["id"].as_i64().ok_or_else(json_error)) .collect::, _>>()?; let missing_on_page = db::find_missing_data( &shared.conn.lock().unwrap(), username, models::DataType::StravaActivity, &activity_ids)?; missing_ids.extend(missing_on_page.iter()); if missing_on_page.len() < per_page / 3 { break; } if result.len() < per_page { break; } thread::sleep(Duration::from_secs(1)); } let conn = &shared.conn.lock().unwrap(); let now = Utc::now(); conn.transaction::<(), Error, _>(|| { for &missing_id in &missing_ids { db::insert_task( conn, &models::NewTask { start_at: now, state: models::TaskState::NEW, username: username, payload: &to_value( Command::ImportStravaActivity { username: username.to_string(), id: missing_id, })?, })?; } task.state = models::TaskState::SUCCESSFUL; db::update_task(conn, task)?; info!("scheduled {} tasks for {}", missing_ids.len(), username); Ok(()) })?; Ok(()) } fn get_or_refresh_token( strava: &Strava, conn: &PgConnection, user: &models::User, ) -> Result { let mut token = db::get_strava_token(&conn, &user)?; if token.expires_at < Utc::now() { info!("refresh expired token: {:?}", token.expires_at); let new_token = strava.refresh_token(&From::from(&token))?; new_token.update_model(&mut token); db::update_strava_token(conn, &token)?; } Ok(token) }