diff options
Diffstat (limited to 'src/importer.rs')
-rw-r--r-- | src/importer.rs | 55 |
1 files changed, 43 insertions, 12 deletions
diff --git a/src/importer.rs b/src/importer.rs index 2966449..e1f73e2 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -1,12 +1,19 @@ -use crate::models; use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; use std::sync::mpsc::Sender; use std::sync::Arc; use std::sync::Mutex; +use std::sync::RwLock; use threadpool::ThreadPool; +use diesel::PgConnection; + +use crate::db; +use crate::strava; +use crate::models; +use crate::strava::StravaApi; pub const WORKERS: usize = 10; +pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; #[derive(Debug, Clone)] pub enum Command { @@ -14,19 +21,39 @@ pub enum Command { Quit, } -fn handle_command(pool: ThreadPool, command: Command) { - info!("handle {:?}", command); +#[derive(Clone)] +struct ImporterState { + pool: ThreadPool, + conn: Arc<Mutex<PgConnection>>, + strava: Arc<RwLock<strava::StravaImpl>>, + rx: Arc<Mutex<Receiver<Command>>>, +} + +fn import_strava_user(state: ImporterState, user: models::User) { + let strava = state.strava.read().expect("FIX"); + let conn = state.conn.lock().expect("FIX"); + let token = db::get_strava_token(&conn, &user).expect("FIX"); + let result = strava.get("/athlete/activities", &token.access_token, EMPTY_PARAMS).expect("ok"); + info!("Imported user. Got result: {:#?}", result); +} + +fn handle_command(state: ImporterState, command: Command) { + info!("handle_command {:?}", command); + match command { + Command::ImportStravaUser(user) => import_strava_user(state, user), + Command::Quit => (), + } } -fn receive_commands(pool: ThreadPool, rx: Arc<Mutex<Receiver<Command>>>) { +fn receive_commands(state: ImporterState) { info!("receive_commands"); - match (move || -> Result<(), Box<dyn std::error::Error>> { - let rx = rx.lock().expect("channel"); + match (|| -> Result<(), Box<dyn std::error::Error>> { + let rx = state.rx.lock()?; let mut command = rx.recv()?; loop { info!("got command: {:?}", command); - let pool0 = pool.clone(); - pool.execute(|| handle_command(pool0, command)); + let state0 = state.clone(); + state.pool.execute(move || handle_command(state0, command)); command = rx.recv()?; } })() { @@ -38,10 +65,14 @@ fn receive_commands(pool: ThreadPool, rx: Arc<Mutex<Receiver<Command>>>) { } } -pub fn run(pool: ThreadPool) -> Sender<Command> { +pub fn run(pool: ThreadPool, conn: PgConnection) -> Sender<Command> { let (tx, rx0) = channel(); - let rx = Arc::new(Mutex::new(rx0)); - let pool0 = pool.clone(); - pool.execute(move || receive_commands(pool0, rx)); + let state = ImporterState { + pool: pool.clone(), + conn: Arc::new(Mutex::new(conn)), + strava: Arc::new(RwLock::new(strava::StravaImpl::new())), + rx: Arc::new(Mutex::new(rx0)), + }; + pool.execute(move || receive_commands(state)); tx } |