diff options
Diffstat (limited to 'src/importer.rs')
-rw-r--r-- | src/importer.rs | 257 |
1 files changed, 191 insertions, 66 deletions
diff --git a/src/importer.rs b/src/importer.rs index 9ea7e35..6909350 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -7,6 +7,13 @@ use std::sync::Mutex; use std::sync::RwLock; use threadpool::ThreadPool; use chrono::Utc; +use timer::Timer; +use timer::Guard; +use std::time::Instant; +use std::time::Duration; +use std::thread; +use serde::Deserialize; +use serde::Serialize; use crate::error::Error; use crate::db; @@ -18,92 +25,210 @@ use crate::Params; pub const WORKERS: usize = 10; pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum Command { - ImportStravaUser(models::User), - Quit, + ImportStravaUser { username: String }, } -#[derive(Clone)] -struct ImporterState { - pool: ThreadPool, - conn: Arc<Mutex<PgConnection>>, - strava: Arc<RwLock<strava::StravaImpl>>, - rx: Arc<Mutex<Receiver<Command>>>, +macro_rules! clone { + ( [ $( $i:ident ),* ] $e:expr ) => { + { + $(let $i = $i.clone();)* + $e + } + } } -fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result<models::StravaToken, Error> { - let mut token = db::get_strava_token(&conn, &user).expect("FIX"); +pub struct ImporterSharedData<StravaApi: strava::StravaApi + 'static> { + strava: RwLock<StravaApi>, + pool: Mutex<ThreadPool>, + conn: Mutex<PgConnection>, + running: Mutex<bool>, +} - if token.expires_at < Utc::now() { - info!("refresh expired token: {:?}", token.expires_at); - let new_token = strava.refresh_token(&From::from(&token))?; - new_token.update_model(&mut token); +pub struct Importer<StravaApi: strava::StravaApi + 'static> { + shared: Arc<ImporterSharedData<StravaApi>>, +} + +fn run_periodically<S: strava::StravaApi>( + shared: Arc<ImporterSharedData<S>>, + period: Duration) { + let sleep_time = Duration::from_millis(1000); + let mut now = Instant::now(); + loop { + while now.elapsed() < period { + if !*shared.running.lock().unwrap() { + return; + } + thread::sleep(sleep_time); + } + now = Instant::now(); + + info!("run_periodically: wakeup"); + handle_tasks(shared.clone()) } +} - Ok(token) + +fn handle_one_task<S: strava::StravaApi>( + shared: Arc<ImporterSharedData<S>>) -> Result<models::Task, Error> { + let task = { + let conn = shared.conn.lock().unwrap(); + let now = Utc::now(); + let eta = now + chrono::Duration::seconds(5); + + db::take_task(&conn, + models::TaskState::NEW, + now, + eta)? + }; + + let command = serde_json::from_value(task.payload.clone())?; + + match command { + Command::ImportStravaUser{ username } => { + import_strava_user(shared, username.as_str())? + }, + } + + Ok(task) } -fn import_strava_user(state: ImporterState, user: models::User) { - use std::thread::sleep; - use std::time::Duration; +fn handle_tasks<S: strava::StravaApi>( + shared: Arc<ImporterSharedData<S>>) { + let mut done = false; + while !done { + match handle_one_task(shared.clone()) { + Err(Error::NotFound) => { + info!("No more tasks"); + done = true; + }, + Err(e) => { + error!("Error handling task: {}", e); + } + Ok(t) => { + info!("Successfully handled task: {:?}", t); + } + }; + } +} + +impl<StravaApi: strava::StravaApi> Importer<StravaApi> { + pub fn new(conn: PgConnection, strava: StravaApi) -> Importer<StravaApi> { + let shared = Arc::new(ImporterSharedData { + pool: Mutex::new(ThreadPool::with_name("importer".to_string(), WORKERS)), + conn: Mutex::new(conn), + strava: RwLock::new(strava), + running: Mutex::new(false), + }); + Importer { shared: shared } + } - let strava = state.strava.read().expect("FIX"); - let conn = state.conn.lock().expect("FIX"); - let token = get_or_refresh_token(&*strava, &conn, &user).expect("FIX"); + pub fn run(&self) { + info!("run()"); + let pool = self.shared.pool.lock().unwrap(); + let mut running = self.shared.running.lock().unwrap(); + if !*running { + *running = true; + pool.execute({ + let shared = self.shared.clone(); + move || run_periodically(shared, Duration::from_secs(10)) + }); + } + } + + pub fn join(&self) { + self.shared.pool.lock().expect("FIX").join() + } +} + +fn import_strava_user<S: strava::StravaApi>( + shared: Arc<ImporterSharedData<S>>, + username: &str) -> Result<(), Error> { + let strava = shared.strava.read().unwrap(); + let user = db::get_user(&shared.conn.lock().unwrap(), username)?; + + let token = { + let conn = shared.conn.lock().unwrap(); + get_or_refresh_token(&*strava, &conn, &user)? + }; + + let per_page = 30; for page in 1.. { let params = [ - ("page", &format!("{}", page)), - ("per_page", &format!("{}", 200)), + ("page", &format!("{}", page)[..]), + ("per_page", &format!("{}", per_page)[..]) ]; + let result = strava - .get("/athlete/activities", &token.access_token, ¶ms) - .expect("ok"); - // info!("import_strava_user: Got result: {:#?}", result); - for activity in result.as_array().expect("FIX") { + .get("/athlete/activities", &token.access_token, ¶ms[..])?; + + let result = result.as_array().ok_or( + Error::UnexpectedJson(result.clone()))?; + + for activity in result { info!("activity id: {} start: {}", activity["id"], activity["start_date"]); } - sleep(Duration::from_secs(1)); - } -} -fn handle_command(state: ImporterState, command: Command) { - info!("handle_command {:?}", command); - match command { - Command::ImportStravaUser(user) => import_strava_user(state, user), - Command::Quit => (), - } + if result.len() < per_page { + break; + } + thread::sleep(Duration::from_secs(1)); + }; + + Err(Error::InternalError) } -fn receive_commands(state: ImporterState) { - info!("receive_commands"); - match (|| -> Result<(), Box<dyn std::error::Error>> { - let rx = state.rx.lock()?; - let mut command = rx.recv()?; - loop { - info!("got command: {:?}", command); - let state0 = state.clone(); - state.pool.execute(move || handle_command(state0, command)); - command = rx.recv()?; - } - })() { - Ok(()) => (), - Err(e) => { - error!("receive_commands: {:?}", e); - () - } +fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result<models::StravaToken, Error> { + let mut token = db::get_strava_token(&conn, &user).expect("FIX"); + + if token.expires_at < Utc::now() { + info!("refresh expired token: {:?}", token.expires_at); + let new_token = strava.refresh_token(&From::from(&token))?; + new_token.update_model(&mut token); } -} -pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender<Command> { - let (tx, rx0) = channel(); - let state = ImporterState { - pool: pool.clone(), - conn: Arc::new(Mutex::new(conn)), - strava: Arc::new(RwLock::new(strava::StravaImpl::new( - params.strava_client_id.clone(), params.strava_client_secret.clone()))), - rx: Arc::new(Mutex::new(rx0)), - }; - pool.execute(move || receive_commands(state)); - tx + Ok(token) } + +// fn handle_command(state: Importer, command: Command) { +// info!("handle_command {:?}", command); +// match command { +// Command::ImportStravaUser(user) => import_strava_user(state, user), +// Command::Quit => (), +// } +// } + +// fn receive_commands(state: Importer) { +// info!("receive_commands"); +// match (|| -> Result<(), Box<dyn std::error::Error>> { +// let rx = state.rx.lock()?; +// let mut command = rx.recv()?; +// loop { +// info!("got command: {:?}", command); +// let state0 = state.clone(); +// state.pool.execute(move || handle_command(state0, command)); +// command = rx.recv()?; +// } +// })() { +// Ok(()) => (), +// Err(e) => { +// error!("receive_commands: {:?}", e); +// () +// } +// } +// } + +// pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender<Command> { +// let (tx, rx0) = channel(); +// let importer = Arc::new(Importer { +// pool: Mutex::new(pool.clone()), +// conn: Mutex::new(conn), +// strava: RwLock::new(strava::StravaImpl::new( +// params.strava_client_id.clone(), params.strava_client_secret.clone())), +// rx: Mutex::new(rx0), +// }); +// // pool.execute(move || receive_commands(state)); +// pool.execute(clone! { [importer] move || importer.run() }); +// tx +// } |