From 6d0a4d03705b96b252a6b29d3b8c188b9c903b89 Mon Sep 17 00:00:00 2001 From: Kjetil Orbekk Date: Mon, 3 Feb 2020 22:55:36 -0500 Subject: Refactor importer to store tasks in postgresql --- Cargo.lock | 60 +++++++++++++ Cargo.toml | 6 +- src/db.rs | 62 ++++++++++++++ src/error.rs | 7 +- src/importer.rs | 257 +++++++++++++++++++++++++++++++++++++++++--------------- src/models.rs | 59 +++++++++++++ src/schema.rs | 18 +++- src/server.rs | 28 ++++-- src/strava.rs | 10 +-- 9 files changed, 423 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f501f35..eaca1d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,6 +83,16 @@ dependencies = [ "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "bigdecimal" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-bigint 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "bitflags" version = "1.2.1" @@ -265,12 +275,21 @@ name = "diesel" version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ + "bigdecimal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "diesel_derives 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "ipnetwork 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "num-bigint 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "pq-sys 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", + "uuid 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -632,6 +651,14 @@ dependencies = [ "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ipnetwork" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "itoa" version = "0.4.5" @@ -833,6 +860,16 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "num-bigint" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "num-integer" version = "0.1.42" @@ -1032,6 +1069,8 @@ dependencies = [ "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)", "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", + "timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1525,6 +1564,14 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "timer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio" version = "0.2.11" @@ -1688,6 +1735,14 @@ name = "utf8-ranges" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "uuid" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "vcpkg" version = "0.2.8" @@ -1905,6 +1960,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" "checksum base64 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "489d6c0ed21b11d038c31b6ceccca973e65d73ba3bd8ecb9a2babf5546164643" "checksum bcrypt 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28dff1c1a22f9401213d983f6c309e807e72c33d5dc5514fe5005b0205c46e8f" +"checksum bigdecimal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "460825c9e21708024d67c07057cd5560e5acdccac85de0de624a81d3de51bacb" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" "checksum block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" "checksum block-cipher-trait 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1c924d49bd09e7c06003acda26cd9742e796e34282ec6c1189404dee0c1f4774" @@ -1968,6 +2024,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum inotify 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24e40d6fd5d64e2082e0c796495c8ef5ad667a96d03e5aaa0becfd9d47bcbfb8" "checksum inotify-sys 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e74a1aa87c59aeff6ef2cc2fa62d41bc43f54952f55652656b18a02fd5e356c0" "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +"checksum ipnetwork 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a69dd5e3613374e74da81c251750153abe3bd0ad17641ea63d43d1e21d0dbd4d" "checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" "checksum js-sys 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "7889c7c36282151f6bf465be4700359318aef36baa951462382eae49e9577cf9" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" @@ -1992,6 +2049,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" "checksum nom 4.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2ad2a91a8e869eeb30b9cb3119ae87773a8f4ae617f41b1eb9c154b2905f7bd6" "checksum notify 4.0.15 (registry+https://github.com/rust-lang/crates.io-index)" = "80ae4a7688d1fab81c5bf19c64fc8db920be8d519ce6336ed4e7efe024724dbd" +"checksum num-bigint 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" "checksum num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba" "checksum num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" "checksum num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6" @@ -2066,6 +2124,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" "checksum threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f0c90a5f3459330ac8bc0d2f879c693bb7a2f59689c1083fc4ef83834da865" "checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" +"checksum timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "31d42176308937165701f50638db1c31586f183f1aab416268216577aec7306b" "checksum tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b" "checksum tokio-tls 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7bde02a3a5291395f59b06ec6945a3077602fac2b07eeeaf0dee2122f3619828" "checksum tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930" @@ -2089,6 +2148,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" "checksum url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb" "checksum utf8-ranges 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b4ae116fef2b7fea257ed6440d3cfcff7f190865f170cdad00bb6465bf18ecba" +"checksum uuid 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e1436e58182935dcd9ce0add9ea0b558e8a87befe01c1a301e6020aeb0876363" "checksum vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3fc439f2794e98976c88a2a2dafce96b930fe8010b0a256b3c2199a773933168" "checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" "checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" diff --git a/Cargo.toml b/Cargo.toml index a89bd57..d68ab83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ reqwest = { version = "0.10.1", features = ["blocking", "json"] } clap = "2" rocket = "0.4.2" rocket_contrib = { version = "0.4.2", default-features = false, features = ["handlebars_templates", "diesel_postgres_pool"] } -diesel = { version = "1.0.0", features = ["postgres", "chrono"] } +diesel = { version = "1.0.0", features = ["postgres", "chrono", "extras"] } dotenv = "0.9.0" bcrypt = "0.6" base64 = "0.11" @@ -19,4 +19,6 @@ rand = "0.7" chrono = { version = "0.4", features = ["serde"] } log = "0.4" fern = { version = "0.5", features = ["colored"] } -threadpool = "1.7" \ No newline at end of file +threadpool = "1.7" +timer = "0.2" +time = "0.1" \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 20123bf..f3a261c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -6,6 +6,9 @@ use diesel::pg::PgConnection; use diesel::ExpressionMethods; use diesel::QueryDsl; use diesel::RunQueryDsl; +use std::time::Duration; +use chrono::DateTime; +use chrono::Utc; pub const COST: u32 = 10; @@ -98,3 +101,62 @@ pub fn get_strava_token( .get_result::(conn)?; Ok(token) } + +pub fn insert_task( + conn: &PgConnection, + task: &models::NewTask) -> Result { + use crate::schema::tasks; + let id = diesel::insert_into(tasks::table) + .values(task) + .returning(tasks::id) + .get_result(conn)?; + Ok(id) +} + +fn update_task_inner(conn: &PgConnection, task: &models::Task) + -> Result { + use crate::schema::tasks; + + diesel::delete(tasks::table.filter(tasks::columns::id.eq(task.id))) + .execute(conn)?; + + let new_id = insert_task(conn, &models::NewTask { + start_at: task.start_at, + state: task.state, + username: &task.username, + payload: &task.payload, + })?; + + let new_task = tasks::table.find(new_id) + .get_result::(conn)?; + + Ok(new_task) +} + +fn update_task(conn: &PgConnection, task: &models::Task) -> Result { + conn.transaction(|| { + update_task_inner(conn, task) + }) +} + +pub fn take_task( + conn: &PgConnection, + state: models::TaskState, + start_before: DateTime, + eta: DateTime) + -> Result { + use crate::schema::tasks; + + conn.transaction(|| { + let mut task = tasks::table + .filter(tasks::state.eq(state)) + .filter(tasks::start_at.lt(start_before)) + .order(tasks::start_at.asc()) + .first::(conn)?; + + task.start_at = eta; + let task = update_task_inner(conn, &task)?; + + Ok(task) + }) +} diff --git a/src/error.rs b/src/error.rs index 4ae2995..75a7568 100644 --- a/src/error.rs +++ b/src/error.rs @@ -67,6 +67,7 @@ pub enum Error { CommunicationError(reqwest::Error), ParseError(serde_json::error::Error), StravaApiError(StravaApiError), + UnexpectedJson(Value), AlreadyExists, NotFound, InternalError, @@ -79,6 +80,7 @@ impl fmt::Display for Error { Error::PasswordError(ref e) => e.fmt(f), Error::CommunicationError(ref e) => e.fmt(f), Error::ParseError(ref e) => e.fmt(f), + Error::UnexpectedJson(_) => f.write_str("UnexpectedJson"), Error::StravaApiError(ref e) => e.fmt(f), Error::AlreadyExists => f.write_str("AlreadyExists"), Error::NotFound => f.write_str("NotFound"), @@ -107,7 +109,10 @@ impl From for Error { impl From for Error { fn from(e: DieselErr) -> Error { - Error::DieselError(e) + match e { + DieselErr::NotFound => Error::NotFound, + e => Error::DieselError(e) + } } } diff --git a/src/importer.rs b/src/importer.rs index 9ea7e35..6909350 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -7,6 +7,13 @@ use std::sync::Mutex; use std::sync::RwLock; use threadpool::ThreadPool; use chrono::Utc; +use timer::Timer; +use timer::Guard; +use std::time::Instant; +use std::time::Duration; +use std::thread; +use serde::Deserialize; +use serde::Serialize; use crate::error::Error; use crate::db; @@ -18,92 +25,210 @@ use crate::Params; pub const WORKERS: usize = 10; pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum Command { - ImportStravaUser(models::User), - Quit, + ImportStravaUser { username: String }, } -#[derive(Clone)] -struct ImporterState { - pool: ThreadPool, - conn: Arc>, - strava: Arc>, - rx: Arc>>, +macro_rules! clone { + ( [ $( $i:ident ),* ] $e:expr ) => { + { + $(let $i = $i.clone();)* + $e + } + } } -fn get_or_refresh_token(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result { - let mut token = db::get_strava_token(&conn, &user).expect("FIX"); +pub struct ImporterSharedData { + strava: RwLock, + pool: Mutex, + conn: Mutex, + running: Mutex, +} - if token.expires_at < Utc::now() { - info!("refresh expired token: {:?}", token.expires_at); - let new_token = strava.refresh_token(&From::from(&token))?; - new_token.update_model(&mut token); +pub struct Importer { + shared: Arc>, +} + +fn run_periodically( + shared: Arc>, + period: Duration) { + let sleep_time = Duration::from_millis(1000); + let mut now = Instant::now(); + loop { + while now.elapsed() < period { + if !*shared.running.lock().unwrap() { + return; + } + thread::sleep(sleep_time); + } + now = Instant::now(); + + info!("run_periodically: wakeup"); + handle_tasks(shared.clone()) } +} - Ok(token) + +fn handle_one_task( + shared: Arc>) -> Result { + let task = { + let conn = shared.conn.lock().unwrap(); + let now = Utc::now(); + let eta = now + chrono::Duration::seconds(5); + + db::take_task(&conn, + models::TaskState::NEW, + now, + eta)? + }; + + let command = serde_json::from_value(task.payload.clone())?; + + match command { + Command::ImportStravaUser{ username } => { + import_strava_user(shared, username.as_str())? + }, + } + + Ok(task) } -fn import_strava_user(state: ImporterState, user: models::User) { - use std::thread::sleep; - use std::time::Duration; +fn handle_tasks( + shared: Arc>) { + let mut done = false; + while !done { + match handle_one_task(shared.clone()) { + Err(Error::NotFound) => { + info!("No more tasks"); + done = true; + }, + Err(e) => { + error!("Error handling task: {}", e); + } + Ok(t) => { + info!("Successfully handled task: {:?}", t); + } + }; + } +} + +impl Importer { + pub fn new(conn: PgConnection, strava: StravaApi) -> Importer { + let shared = Arc::new(ImporterSharedData { + pool: Mutex::new(ThreadPool::with_name("importer".to_string(), WORKERS)), + conn: Mutex::new(conn), + strava: RwLock::new(strava), + running: Mutex::new(false), + }); + Importer { shared: shared } + } - let strava = state.strava.read().expect("FIX"); - let conn = state.conn.lock().expect("FIX"); - let token = get_or_refresh_token(&*strava, &conn, &user).expect("FIX"); + pub fn run(&self) { + info!("run()"); + let pool = self.shared.pool.lock().unwrap(); + let mut running = self.shared.running.lock().unwrap(); + if !*running { + *running = true; + pool.execute({ + let shared = self.shared.clone(); + move || run_periodically(shared, Duration::from_secs(10)) + }); + } + } + + pub fn join(&self) { + self.shared.pool.lock().expect("FIX").join() + } +} + +fn import_strava_user( + shared: Arc>, + username: &str) -> Result<(), Error> { + let strava = shared.strava.read().unwrap(); + let user = db::get_user(&shared.conn.lock().unwrap(), username)?; + + let token = { + let conn = shared.conn.lock().unwrap(); + get_or_refresh_token(&*strava, &conn, &user)? + }; + + let per_page = 30; for page in 1.. { let params = [ - ("page", &format!("{}", page)), - ("per_page", &format!("{}", 200)), + ("page", &format!("{}", page)[..]), + ("per_page", &format!("{}", per_page)[..]) ]; + let result = strava - .get("/athlete/activities", &token.access_token, ¶ms) - .expect("ok"); - // info!("import_strava_user: Got result: {:#?}", result); - for activity in result.as_array().expect("FIX") { + .get("/athlete/activities", &token.access_token, ¶ms[..])?; + + let result = result.as_array().ok_or( + Error::UnexpectedJson(result.clone()))?; + + for activity in result { info!("activity id: {} start: {}", activity["id"], activity["start_date"]); } - sleep(Duration::from_secs(1)); - } -} -fn handle_command(state: ImporterState, command: Command) { - info!("handle_command {:?}", command); - match command { - Command::ImportStravaUser(user) => import_strava_user(state, user), - Command::Quit => (), - } + if result.len() < per_page { + break; + } + thread::sleep(Duration::from_secs(1)); + }; + + Err(Error::InternalError) } -fn receive_commands(state: ImporterState) { - info!("receive_commands"); - match (|| -> Result<(), Box> { - let rx = state.rx.lock()?; - let mut command = rx.recv()?; - loop { - info!("got command: {:?}", command); - let state0 = state.clone(); - state.pool.execute(move || handle_command(state0, command)); - command = rx.recv()?; - } - })() { - Ok(()) => (), - Err(e) => { - error!("receive_commands: {:?}", e); - () - } +fn get_or_refresh_token(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result { + let mut token = db::get_strava_token(&conn, &user).expect("FIX"); + + if token.expires_at < Utc::now() { + info!("refresh expired token: {:?}", token.expires_at); + let new_token = strava.refresh_token(&From::from(&token))?; + new_token.update_model(&mut token); } -} -pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender { - let (tx, rx0) = channel(); - let state = ImporterState { - pool: pool.clone(), - conn: Arc::new(Mutex::new(conn)), - strava: Arc::new(RwLock::new(strava::StravaImpl::new( - params.strava_client_id.clone(), params.strava_client_secret.clone()))), - rx: Arc::new(Mutex::new(rx0)), - }; - pool.execute(move || receive_commands(state)); - tx + Ok(token) } + +// fn handle_command(state: Importer, command: Command) { +// info!("handle_command {:?}", command); +// match command { +// Command::ImportStravaUser(user) => import_strava_user(state, user), +// Command::Quit => (), +// } +// } + +// fn receive_commands(state: Importer) { +// info!("receive_commands"); +// match (|| -> Result<(), Box> { +// let rx = state.rx.lock()?; +// let mut command = rx.recv()?; +// loop { +// info!("got command: {:?}", command); +// let state0 = state.clone(); +// state.pool.execute(move || handle_command(state0, command)); +// command = rx.recv()?; +// } +// })() { +// Ok(()) => (), +// Err(e) => { +// error!("receive_commands: {:?}", e); +// () +// } +// } +// } + +// pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender { +// let (tx, rx0) = channel(); +// let importer = Arc::new(Importer { +// pool: Mutex::new(pool.clone()), +// conn: Mutex::new(conn), +// strava: RwLock::new(strava::StravaImpl::new( +// params.strava_client_id.clone(), params.strava_client_secret.clone())), +// rx: Mutex::new(rx0), +// }); +// // pool.execute(move || receive_commands(state)); +// pool.execute(clone! { [importer] move || importer.run() }); +// tx +// } diff --git a/src/models.rs b/src/models.rs index ce3dd19..0b7e5db 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,9 +1,68 @@ +use crate::schema::tasks; use crate::schema::config; use crate::schema::strava_tokens; use crate::schema::users; use chrono::DateTime; use chrono::Utc; use std::fmt; +use serde_json::Value; +use diesel::pg::Pg; +use diesel::deserialize; +use diesel::deserialize::FromSql; +use diesel::serialize; +use diesel::serialize::Output; +use diesel::serialize::ToSql; +use diesel::sql_types; +use std::io::Write; + +#[derive(PartialEq, Debug, Clone, Copy, AsExpression, FromSqlRow)] +#[sql_type = "sql_types::Text"] +pub enum TaskState { + NEW = 0, + SUCCESSFUL, + FAILED, +} + +impl ToSql for TaskState { + fn to_sql(&self, out: &mut Output) -> serialize::Result { + let t = match *self { + TaskState::NEW => "new".to_string(), + TaskState::SUCCESSFUL => "success".to_string(), + TaskState::FAILED => "failed".to_string(), + }; + >::to_sql(&t, out) + } +} + +impl FromSql for TaskState { + fn from_sql(bytes: Option<&[u8]>) -> deserialize::Result { + let s = >::from_sql(bytes)?; + match s.as_str() { + "new" => Ok(TaskState::NEW), + "success" => Ok(TaskState::SUCCESSFUL), + "failed" => Ok(TaskState::FAILED), + &_ => Err("Unrecognized task state".into()), + } + } +} + +#[derive(Insertable)] +#[table_name = "tasks"] +pub struct NewTask<'a> { + pub start_at: DateTime, + pub state: TaskState, + pub username: &'a str, + pub payload: &'a Value, +} + +#[derive(Queryable, Debug, Clone)] +pub struct Task { + pub id: i64, + pub state: TaskState, + pub start_at: DateTime, + pub username: String, + pub payload: Value, +} #[derive(Insertable, Queryable)] #[table_name = "config"] diff --git a/src/schema.rs b/src/schema.rs index 7cf2892..8748f3c 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -16,6 +16,16 @@ table! { } } +table! { + tasks (id) { + id -> Int8, + state -> Varchar, + start_at -> Timestamptz, + username -> Varchar, + payload -> Jsonb, + } +} + table! { users (username) { username -> Varchar, @@ -24,5 +34,11 @@ table! { } joinable!(strava_tokens -> users (username)); +joinable!(tasks -> users (username)); -allow_tables_to_appear_in_same_query!(config, strava_tokens, users,); +allow_tables_to_appear_in_same_query!( + config, + strava_tokens, + tasks, + users, +); diff --git a/src/server.rs b/src/server.rs index abc430b..f0dd591 100644 --- a/src/server.rs +++ b/src/server.rs @@ -17,6 +17,8 @@ use std::collections::HashMap; use std::sync::mpsc::Sender; use std::sync::Mutex; use threadpool::ThreadPool; +use chrono::Utc; +use serde_json::to_value; use crate::db; use crate::error::Error; @@ -133,14 +135,18 @@ fn link_strava_callback( #[get("/import_strava")] fn import_strava( conn: Db, - tx: State>>, user: LoggedInUser, ) -> Result<(), Error> { let user = db::get_user(&*conn, &user.username)?; - tx.lock() - .expect("FIX") - .send(importer::Command::ImportStravaUser(user)) - .expect("FIX"); + let command = + importer::Command::ImportStravaUser { username: user.username.clone() }; + db::insert_task(&conn, + &models::NewTask { + start_at: Utc::now(), + state: models::TaskState::NEW, + username: user.username.as_str(), + payload: &to_value(command)?, + })?; Ok(()) } @@ -179,12 +185,16 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) { .finalize() .unwrap(); - let importer_pool = ThreadPool::with_name("import".to_string(), importer::WORKERS); - let tx = importer::run(importer_pool.clone(), conn, ¶ms); + let strava = strava::StravaImpl::new( + params.strava_client_id.clone(), + params.strava_client_secret.clone(), + ); + + let importer = importer::Importer::new(conn, strava); + importer.run(); rocket::custom(config) .manage(params) - .manage(Mutex::new(tx)) .mount( "/", routes![ @@ -200,5 +210,5 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) { .attach(Db::fairing()) .launch(); - importer_pool.join(); + importer.join(); } diff --git a/src/strava.rs b/src/strava.rs index 284d8b1..ff59c66 100644 --- a/src/strava.rs +++ b/src/strava.rs @@ -36,12 +36,12 @@ impl From<&models::StravaToken> for Token { } } -pub trait StravaApi { - fn get( +pub trait StravaApi: Sync + Send { + fn get( &self, method: &str, access_token: &str, - parasm: &T, + params: &[(&str, &str)], ) -> Result; fn refresh_token( @@ -70,11 +70,11 @@ impl StravaImpl { } impl StravaApi for StravaImpl { - fn get( + fn get( &self, method: &str, access_token: &str, - params: &T, + params: &[(&str, &str)], ) -> Result { let uri = format!("{}{}{}", self.base_url, self.api_url, method); let response = self -- cgit v1.2.3