diff options
Diffstat (limited to 'src/importer.rs')
-rw-r--r-- | src/importer.rs | 123 |
1 files changed, 33 insertions, 90 deletions
diff --git a/src/importer.rs b/src/importer.rs index 6909350..05d56fd 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -1,26 +1,19 @@ +use chrono::Utc; use diesel::PgConnection; -use std::sync::mpsc::channel; -use std::sync::mpsc::Receiver; -use std::sync::mpsc::Sender; +use serde::Deserialize; +use serde::Serialize; use std::sync::Arc; use std::sync::Mutex; use std::sync::RwLock; -use threadpool::ThreadPool; -use chrono::Utc; -use timer::Timer; -use timer::Guard; -use std::time::Instant; -use std::time::Duration; use std::thread; -use serde::Deserialize; -use serde::Serialize; +use std::time::Duration; +use std::time::Instant; +use threadpool::ThreadPool; -use crate::error::Error; use crate::db; +use crate::error::Error; use crate::models; use crate::strava; -use crate::strava::StravaApi; -use crate::Params; pub const WORKERS: usize = 10; pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; @@ -30,15 +23,6 @@ pub enum Command { ImportStravaUser { username: String }, } -macro_rules! clone { - ( [ $( $i:ident ),* ] $e:expr ) => { - { - $(let $i = $i.clone();)* - $e - } - } -} - pub struct ImporterSharedData<StravaApi: strava::StravaApi + 'static> { strava: RwLock<StravaApi>, pool: Mutex<ThreadPool>, @@ -50,9 +34,7 @@ pub struct Importer<StravaApi: strava::StravaApi + 'static> { shared: Arc<ImporterSharedData<StravaApi>>, } -fn run_periodically<S: strava::StravaApi>( - shared: Arc<ImporterSharedData<S>>, - period: Duration) { +fn run_periodically<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>, period: Duration) { let sleep_time = Duration::from_millis(1000); let mut now = Instant::now(); loop { @@ -69,40 +51,34 @@ fn run_periodically<S: strava::StravaApi>( } } - fn handle_one_task<S: strava::StravaApi>( - shared: Arc<ImporterSharedData<S>>) -> Result<models::Task, Error> { + shared: Arc<ImporterSharedData<S>>, +) -> Result<models::Task, Error> { let task = { let conn = shared.conn.lock().unwrap(); let now = Utc::now(); let eta = now + chrono::Duration::seconds(5); - db::take_task(&conn, - models::TaskState::NEW, - now, - eta)? + db::take_task(&conn, models::TaskState::NEW, now, eta)? }; let command = serde_json::from_value(task.payload.clone())?; match command { - Command::ImportStravaUser{ username } => { - import_strava_user(shared, username.as_str())? - }, + Command::ImportStravaUser { username } => import_strava_user(shared, username.as_str())?, } Ok(task) } -fn handle_tasks<S: strava::StravaApi>( - shared: Arc<ImporterSharedData<S>>) { +fn handle_tasks<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>) { let mut done = false; while !done { match handle_one_task(shared.clone()) { Err(Error::NotFound) => { info!("No more tasks"); done = true; - }, + } Err(e) => { error!("Error handling task: {}", e); } @@ -138,13 +114,15 @@ impl<StravaApi: strava::StravaApi> Importer<StravaApi> { } pub fn join(&self) { - self.shared.pool.lock().expect("FIX").join() + *self.shared.running.lock().unwrap() = false; + self.shared.pool.lock().unwrap().join(); } } fn import_strava_user<S: strava::StravaApi>( shared: Arc<ImporterSharedData<S>>, - username: &str) -> Result<(), Error> { + username: &str, +) -> Result<(), Error> { let strava = shared.strava.read().unwrap(); let user = db::get_user(&shared.conn.lock().unwrap(), username)?; @@ -157,29 +135,36 @@ fn import_strava_user<S: strava::StravaApi>( for page in 1.. { let params = [ ("page", &format!("{}", page)[..]), - ("per_page", &format!("{}", per_page)[..]) + ("per_page", &format!("{}", per_page)[..]), ]; - let result = strava - .get("/athlete/activities", &token.access_token, ¶ms[..])?; + let result = strava.get("/athlete/activities", &token.access_token, ¶ms[..])?; - let result = result.as_array().ok_or( - Error::UnexpectedJson(result.clone()))?; + let result = result + .as_array() + .ok_or(Error::UnexpectedJson(result.clone()))?; for activity in result { - info!("activity id: {} start: {}", activity["id"], activity["start_date"]); + info!( + "activity id: {} start: {}", + activity["id"], activity["start_date"] + ); } if result.len() < per_page { break; } thread::sleep(Duration::from_secs(1)); - }; + } Err(Error::InternalError) } -fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result<models::StravaToken, Error> { +fn get_or_refresh_token<Strava: strava::StravaApi>( + strava: &Strava, + conn: &PgConnection, + user: &models::User, +) -> Result<models::StravaToken, Error> { let mut token = db::get_strava_token(&conn, &user).expect("FIX"); if token.expires_at < Utc::now() { @@ -190,45 +175,3 @@ fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgCon Ok(token) } - -// fn handle_command(state: Importer, command: Command) { -// info!("handle_command {:?}", command); -// match command { -// Command::ImportStravaUser(user) => import_strava_user(state, user), -// Command::Quit => (), -// } -// } - -// fn receive_commands(state: Importer) { -// info!("receive_commands"); -// match (|| -> Result<(), Box<dyn std::error::Error>> { -// let rx = state.rx.lock()?; -// let mut command = rx.recv()?; -// loop { -// info!("got command: {:?}", command); -// let state0 = state.clone(); -// state.pool.execute(move || handle_command(state0, command)); -// command = rx.recv()?; -// } -// })() { -// Ok(()) => (), -// Err(e) => { -// error!("receive_commands: {:?}", e); -// () -// } -// } -// } - -// pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender<Command> { -// let (tx, rx0) = channel(); -// let importer = Arc::new(Importer { -// pool: Mutex::new(pool.clone()), -// conn: Mutex::new(conn), -// strava: RwLock::new(strava::StravaImpl::new( -// params.strava_client_id.clone(), params.strava_client_secret.clone())), -// rx: Mutex::new(rx0), -// }); -// // pool.execute(move || receive_commands(state)); -// pool.execute(clone! { [importer] move || importer.run() }); -// tx -// } |