summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-03 22:55:36 -0500
committerKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-03 22:55:36 -0500
commit6d0a4d03705b96b252a6b29d3b8c188b9c903b89 (patch)
treeb8ea3f7459ae4c9b22a976259e637cc7a3d695c7
parentc459b5e85ef9b695b3c9a107b7cf7f08847c608f (diff)
Refactor importer to store tasks in postgresql
-rw-r--r--Cargo.lock60
-rw-r--r--Cargo.toml6
-rw-r--r--src/db.rs62
-rw-r--r--src/error.rs7
-rw-r--r--src/importer.rs257
-rw-r--r--src/models.rs59
-rw-r--r--src/schema.rs18
-rw-r--r--src/server.rs28
-rw-r--r--src/strava.rs10
9 files changed, 423 insertions, 84 deletions
diff --git a/Cargo.lock b/Cargo.lock
index f501f35..eaca1d3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -84,6 +84,16 @@ dependencies = [
]
[[package]]
+name = "bigdecimal"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "num-bigint 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -265,12 +275,21 @@ name = "diesel"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
+ "bigdecimal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel_derives 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "ipnetwork 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-bigint 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"pq-sys 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)",
+ "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
+ "uuid 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -633,6 +652,14 @@ dependencies = [
]
[[package]]
+name = "ipnetwork"
+version = "0.15.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "itoa"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -834,6 +861,16 @@ dependencies = [
]
[[package]]
+name = "num-bigint"
+version = "0.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "num-integer"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1032,6 +1069,8 @@ dependencies = [
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)",
"threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
+ "timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1526,6 +1565,14 @@ dependencies = [
]
[[package]]
+name = "timer"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "tokio"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1689,6 +1736,14 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "uuid"
+version = "0.6.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "vcpkg"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1905,6 +1960,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
"checksum base64 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "489d6c0ed21b11d038c31b6ceccca973e65d73ba3bd8ecb9a2babf5546164643"
"checksum bcrypt 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28dff1c1a22f9401213d983f6c309e807e72c33d5dc5514fe5005b0205c46e8f"
+"checksum bigdecimal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "460825c9e21708024d67c07057cd5560e5acdccac85de0de624a81d3de51bacb"
"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
"checksum block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b"
"checksum block-cipher-trait 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1c924d49bd09e7c06003acda26cd9742e796e34282ec6c1189404dee0c1f4774"
@@ -1968,6 +2024,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum inotify 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24e40d6fd5d64e2082e0c796495c8ef5ad667a96d03e5aaa0becfd9d47bcbfb8"
"checksum inotify-sys 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e74a1aa87c59aeff6ef2cc2fa62d41bc43f54952f55652656b18a02fd5e356c0"
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
+"checksum ipnetwork 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a69dd5e3613374e74da81c251750153abe3bd0ad17641ea63d43d1e21d0dbd4d"
"checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e"
"checksum js-sys 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "7889c7c36282151f6bf465be4700359318aef36baa951462382eae49e9577cf9"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
@@ -1992,6 +2049,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88"
"checksum nom 4.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2ad2a91a8e869eeb30b9cb3119ae87773a8f4ae617f41b1eb9c154b2905f7bd6"
"checksum notify 4.0.15 (registry+https://github.com/rust-lang/crates.io-index)" = "80ae4a7688d1fab81c5bf19c64fc8db920be8d519ce6336ed4e7efe024724dbd"
+"checksum num-bigint 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304"
"checksum num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba"
"checksum num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096"
"checksum num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6"
@@ -2066,6 +2124,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
"checksum threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f0c90a5f3459330ac8bc0d2f879c693bb7a2f59689c1083fc4ef83834da865"
"checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f"
+"checksum timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "31d42176308937165701f50638db1c31586f183f1aab416268216577aec7306b"
"checksum tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b"
"checksum tokio-tls 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7bde02a3a5291395f59b06ec6945a3077602fac2b07eeeaf0dee2122f3619828"
"checksum tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930"
@@ -2089,6 +2148,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a"
"checksum url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb"
"checksum utf8-ranges 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b4ae116fef2b7fea257ed6440d3cfcff7f190865f170cdad00bb6465bf18ecba"
+"checksum uuid 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e1436e58182935dcd9ce0add9ea0b558e8a87befe01c1a301e6020aeb0876363"
"checksum vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3fc439f2794e98976c88a2a2dafce96b930fe8010b0a256b3c2199a773933168"
"checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a"
"checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd"
diff --git a/Cargo.toml b/Cargo.toml
index a89bd57..d68ab83 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,7 +11,7 @@ reqwest = { version = "0.10.1", features = ["blocking", "json"] }
clap = "2"
rocket = "0.4.2"
rocket_contrib = { version = "0.4.2", default-features = false, features = ["handlebars_templates", "diesel_postgres_pool"] }
-diesel = { version = "1.0.0", features = ["postgres", "chrono"] }
+diesel = { version = "1.0.0", features = ["postgres", "chrono", "extras"] }
dotenv = "0.9.0"
bcrypt = "0.6"
base64 = "0.11"
@@ -19,4 +19,6 @@ rand = "0.7"
chrono = { version = "0.4", features = ["serde"] }
log = "0.4"
fern = { version = "0.5", features = ["colored"] }
-threadpool = "1.7" \ No newline at end of file
+threadpool = "1.7"
+timer = "0.2"
+time = "0.1" \ No newline at end of file
diff --git a/src/db.rs b/src/db.rs
index 20123bf..f3a261c 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -6,6 +6,9 @@ use diesel::pg::PgConnection;
use diesel::ExpressionMethods;
use diesel::QueryDsl;
use diesel::RunQueryDsl;
+use std::time::Duration;
+use chrono::DateTime;
+use chrono::Utc;
pub const COST: u32 = 10;
@@ -98,3 +101,62 @@ pub fn get_strava_token(
.get_result::<models::StravaToken>(conn)?;
Ok(token)
}
+
+pub fn insert_task(
+ conn: &PgConnection,
+ task: &models::NewTask) -> Result<i64, Error> {
+ use crate::schema::tasks;
+ let id = diesel::insert_into(tasks::table)
+ .values(task)
+ .returning(tasks::id)
+ .get_result(conn)?;
+ Ok(id)
+}
+
+fn update_task_inner(conn: &PgConnection, task: &models::Task)
+ -> Result<models::Task, Error> {
+ use crate::schema::tasks;
+
+ diesel::delete(tasks::table.filter(tasks::columns::id.eq(task.id)))
+ .execute(conn)?;
+
+ let new_id = insert_task(conn, &models::NewTask {
+ start_at: task.start_at,
+ state: task.state,
+ username: &task.username,
+ payload: &task.payload,
+ })?;
+
+ let new_task = tasks::table.find(new_id)
+ .get_result::<models::Task>(conn)?;
+
+ Ok(new_task)
+}
+
+fn update_task(conn: &PgConnection, task: &models::Task) -> Result<models::Task, Error> {
+ conn.transaction(|| {
+ update_task_inner(conn, task)
+ })
+}
+
+pub fn take_task(
+ conn: &PgConnection,
+ state: models::TaskState,
+ start_before: DateTime<Utc>,
+ eta: DateTime<Utc>)
+ -> Result<models::Task, Error> {
+ use crate::schema::tasks;
+
+ conn.transaction(|| {
+ let mut task = tasks::table
+ .filter(tasks::state.eq(state))
+ .filter(tasks::start_at.lt(start_before))
+ .order(tasks::start_at.asc())
+ .first::<models::Task>(conn)?;
+
+ task.start_at = eta;
+ let task = update_task_inner(conn, &task)?;
+
+ Ok(task)
+ })
+}
diff --git a/src/error.rs b/src/error.rs
index 4ae2995..75a7568 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -67,6 +67,7 @@ pub enum Error {
CommunicationError(reqwest::Error),
ParseError(serde_json::error::Error),
StravaApiError(StravaApiError),
+ UnexpectedJson(Value),
AlreadyExists,
NotFound,
InternalError,
@@ -79,6 +80,7 @@ impl fmt::Display for Error {
Error::PasswordError(ref e) => e.fmt(f),
Error::CommunicationError(ref e) => e.fmt(f),
Error::ParseError(ref e) => e.fmt(f),
+ Error::UnexpectedJson(_) => f.write_str("UnexpectedJson"),
Error::StravaApiError(ref e) => e.fmt(f),
Error::AlreadyExists => f.write_str("AlreadyExists"),
Error::NotFound => f.write_str("NotFound"),
@@ -107,7 +109,10 @@ impl From<reqwest::Error> for Error {
impl From<DieselErr> for Error {
fn from(e: DieselErr) -> Error {
- Error::DieselError(e)
+ match e {
+ DieselErr::NotFound => Error::NotFound,
+ e => Error::DieselError(e)
+ }
}
}
diff --git a/src/importer.rs b/src/importer.rs
index 9ea7e35..6909350 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -7,6 +7,13 @@ use std::sync::Mutex;
use std::sync::RwLock;
use threadpool::ThreadPool;
use chrono::Utc;
+use timer::Timer;
+use timer::Guard;
+use std::time::Instant;
+use std::time::Duration;
+use std::thread;
+use serde::Deserialize;
+use serde::Serialize;
use crate::error::Error;
use crate::db;
@@ -18,92 +25,210 @@ use crate::Params;
pub const WORKERS: usize = 10;
pub const EMPTY_PARAMS: &[(&str, &str)] = &[];
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
- ImportStravaUser(models::User),
- Quit,
+ ImportStravaUser { username: String },
}
-#[derive(Clone)]
-struct ImporterState {
- pool: ThreadPool,
- conn: Arc<Mutex<PgConnection>>,
- strava: Arc<RwLock<strava::StravaImpl>>,
- rx: Arc<Mutex<Receiver<Command>>>,
+macro_rules! clone {
+ ( [ $( $i:ident ),* ] $e:expr ) => {
+ {
+ $(let $i = $i.clone();)*
+ $e
+ }
+ }
}
-fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result<models::StravaToken, Error> {
- let mut token = db::get_strava_token(&conn, &user).expect("FIX");
+pub struct ImporterSharedData<StravaApi: strava::StravaApi + 'static> {
+ strava: RwLock<StravaApi>,
+ pool: Mutex<ThreadPool>,
+ conn: Mutex<PgConnection>,
+ running: Mutex<bool>,
+}
- if token.expires_at < Utc::now() {
- info!("refresh expired token: {:?}", token.expires_at);
- let new_token = strava.refresh_token(&From::from(&token))?;
- new_token.update_model(&mut token);
+pub struct Importer<StravaApi: strava::StravaApi + 'static> {
+ shared: Arc<ImporterSharedData<StravaApi>>,
+}
+
+fn run_periodically<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>,
+ period: Duration) {
+ let sleep_time = Duration::from_millis(1000);
+ let mut now = Instant::now();
+ loop {
+ while now.elapsed() < period {
+ if !*shared.running.lock().unwrap() {
+ return;
+ }
+ thread::sleep(sleep_time);
+ }
+ now = Instant::now();
+
+ info!("run_periodically: wakeup");
+ handle_tasks(shared.clone())
}
+}
- Ok(token)
+
+fn handle_one_task<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>) -> Result<models::Task, Error> {
+ let task = {
+ let conn = shared.conn.lock().unwrap();
+ let now = Utc::now();
+ let eta = now + chrono::Duration::seconds(5);
+
+ db::take_task(&conn,
+ models::TaskState::NEW,
+ now,
+ eta)?
+ };
+
+ let command = serde_json::from_value(task.payload.clone())?;
+
+ match command {
+ Command::ImportStravaUser{ username } => {
+ import_strava_user(shared, username.as_str())?
+ },
+ }
+
+ Ok(task)
}
-fn import_strava_user(state: ImporterState, user: models::User) {
- use std::thread::sleep;
- use std::time::Duration;
+fn handle_tasks<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>) {
+ let mut done = false;
+ while !done {
+ match handle_one_task(shared.clone()) {
+ Err(Error::NotFound) => {
+ info!("No more tasks");
+ done = true;
+ },
+ Err(e) => {
+ error!("Error handling task: {}", e);
+ }
+ Ok(t) => {
+ info!("Successfully handled task: {:?}", t);
+ }
+ };
+ }
+}
+
+impl<StravaApi: strava::StravaApi> Importer<StravaApi> {
+ pub fn new(conn: PgConnection, strava: StravaApi) -> Importer<StravaApi> {
+ let shared = Arc::new(ImporterSharedData {
+ pool: Mutex::new(ThreadPool::with_name("importer".to_string(), WORKERS)),
+ conn: Mutex::new(conn),
+ strava: RwLock::new(strava),
+ running: Mutex::new(false),
+ });
+ Importer { shared: shared }
+ }
- let strava = state.strava.read().expect("FIX");
- let conn = state.conn.lock().expect("FIX");
- let token = get_or_refresh_token(&*strava, &conn, &user).expect("FIX");
+ pub fn run(&self) {
+ info!("run()");
+ let pool = self.shared.pool.lock().unwrap();
+ let mut running = self.shared.running.lock().unwrap();
+ if !*running {
+ *running = true;
+ pool.execute({
+ let shared = self.shared.clone();
+ move || run_periodically(shared, Duration::from_secs(10))
+ });
+ }
+ }
+
+ pub fn join(&self) {
+ self.shared.pool.lock().expect("FIX").join()
+ }
+}
+
+fn import_strava_user<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>,
+ username: &str) -> Result<(), Error> {
+ let strava = shared.strava.read().unwrap();
+ let user = db::get_user(&shared.conn.lock().unwrap(), username)?;
+
+ let token = {
+ let conn = shared.conn.lock().unwrap();
+ get_or_refresh_token(&*strava, &conn, &user)?
+ };
+
+ let per_page = 30;
for page in 1.. {
let params = [
- ("page", &format!("{}", page)),
- ("per_page", &format!("{}", 200)),
+ ("page", &format!("{}", page)[..]),
+ ("per_page", &format!("{}", per_page)[..])
];
+
let result = strava
- .get("/athlete/activities", &token.access_token, &params)
- .expect("ok");
- // info!("import_strava_user: Got result: {:#?}", result);
- for activity in result.as_array().expect("FIX") {
+ .get("/athlete/activities", &token.access_token, &params[..])?;
+
+ let result = result.as_array().ok_or(
+ Error::UnexpectedJson(result.clone()))?;
+
+ for activity in result {
info!("activity id: {} start: {}", activity["id"], activity["start_date"]);
}
- sleep(Duration::from_secs(1));
- }
-}
-fn handle_command(state: ImporterState, command: Command) {
- info!("handle_command {:?}", command);
- match command {
- Command::ImportStravaUser(user) => import_strava_user(state, user),
- Command::Quit => (),
- }
+ if result.len() < per_page {
+ break;
+ }
+ thread::sleep(Duration::from_secs(1));
+ };
+
+ Err(Error::InternalError)
}
-fn receive_commands(state: ImporterState) {
- info!("receive_commands");
- match (|| -> Result<(), Box<dyn std::error::Error>> {
- let rx = state.rx.lock()?;
- let mut command = rx.recv()?;
- loop {
- info!("got command: {:?}", command);
- let state0 = state.clone();
- state.pool.execute(move || handle_command(state0, command));
- command = rx.recv()?;
- }
- })() {
- Ok(()) => (),
- Err(e) => {
- error!("receive_commands: {:?}", e);
- ()
- }
+fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result<models::StravaToken, Error> {
+ let mut token = db::get_strava_token(&conn, &user).expect("FIX");
+
+ if token.expires_at < Utc::now() {
+ info!("refresh expired token: {:?}", token.expires_at);
+ let new_token = strava.refresh_token(&From::from(&token))?;
+ new_token.update_model(&mut token);
}
-}
-pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender<Command> {
- let (tx, rx0) = channel();
- let state = ImporterState {
- pool: pool.clone(),
- conn: Arc::new(Mutex::new(conn)),
- strava: Arc::new(RwLock::new(strava::StravaImpl::new(
- params.strava_client_id.clone(), params.strava_client_secret.clone()))),
- rx: Arc::new(Mutex::new(rx0)),
- };
- pool.execute(move || receive_commands(state));
- tx
+ Ok(token)
}
+
+// fn handle_command(state: Importer, command: Command) {
+// info!("handle_command {:?}", command);
+// match command {
+// Command::ImportStravaUser(user) => import_strava_user(state, user),
+// Command::Quit => (),
+// }
+// }
+
+// fn receive_commands(state: Importer) {
+// info!("receive_commands");
+// match (|| -> Result<(), Box<dyn std::error::Error>> {
+// let rx = state.rx.lock()?;
+// let mut command = rx.recv()?;
+// loop {
+// info!("got command: {:?}", command);
+// let state0 = state.clone();
+// state.pool.execute(move || handle_command(state0, command));
+// command = rx.recv()?;
+// }
+// })() {
+// Ok(()) => (),
+// Err(e) => {
+// error!("receive_commands: {:?}", e);
+// ()
+// }
+// }
+// }
+
+// pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender<Command> {
+// let (tx, rx0) = channel();
+// let importer = Arc::new(Importer {
+// pool: Mutex::new(pool.clone()),
+// conn: Mutex::new(conn),
+// strava: RwLock::new(strava::StravaImpl::new(
+// params.strava_client_id.clone(), params.strava_client_secret.clone())),
+// rx: Mutex::new(rx0),
+// });
+// // pool.execute(move || receive_commands(state));
+// pool.execute(clone! { [importer] move || importer.run() });
+// tx
+// }
diff --git a/src/models.rs b/src/models.rs
index ce3dd19..0b7e5db 100644
--- a/src/models.rs
+++ b/src/models.rs
@@ -1,9 +1,68 @@
+use crate::schema::tasks;
use crate::schema::config;
use crate::schema::strava_tokens;
use crate::schema::users;
use chrono::DateTime;
use chrono::Utc;
use std::fmt;
+use serde_json::Value;
+use diesel::pg::Pg;
+use diesel::deserialize;
+use diesel::deserialize::FromSql;
+use diesel::serialize;
+use diesel::serialize::Output;
+use diesel::serialize::ToSql;
+use diesel::sql_types;
+use std::io::Write;
+
+#[derive(PartialEq, Debug, Clone, Copy, AsExpression, FromSqlRow)]
+#[sql_type = "sql_types::Text"]
+pub enum TaskState {
+ NEW = 0,
+ SUCCESSFUL,
+ FAILED,
+}
+
+impl ToSql<sql_types::Text, Pg> for TaskState {
+ fn to_sql<W: Write>(&self, out: &mut Output<W, Pg>) -> serialize::Result {
+ let t = match *self {
+ TaskState::NEW => "new".to_string(),
+ TaskState::SUCCESSFUL => "success".to_string(),
+ TaskState::FAILED => "failed".to_string(),
+ };
+ <String as ToSql<sql_types::Text, Pg>>::to_sql(&t, out)
+ }
+}
+
+impl FromSql<sql_types::Text, Pg> for TaskState {
+ fn from_sql(bytes: Option<&[u8]>) -> deserialize::Result<Self> {
+ let s = <String as FromSql<sql_types::Text, Pg>>::from_sql(bytes)?;
+ match s.as_str() {
+ "new" => Ok(TaskState::NEW),
+ "success" => Ok(TaskState::SUCCESSFUL),
+ "failed" => Ok(TaskState::FAILED),
+ &_ => Err("Unrecognized task state".into()),
+ }
+ }
+}
+
+#[derive(Insertable)]
+#[table_name = "tasks"]
+pub struct NewTask<'a> {
+ pub start_at: DateTime<Utc>,
+ pub state: TaskState,
+ pub username: &'a str,
+ pub payload: &'a Value,
+}
+
+#[derive(Queryable, Debug, Clone)]
+pub struct Task {
+ pub id: i64,
+ pub state: TaskState,
+ pub start_at: DateTime<Utc>,
+ pub username: String,
+ pub payload: Value,
+}
#[derive(Insertable, Queryable)]
#[table_name = "config"]
diff --git a/src/schema.rs b/src/schema.rs
index 7cf2892..8748f3c 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -17,6 +17,16 @@ table! {
}
table! {
+ tasks (id) {
+ id -> Int8,
+ state -> Varchar,
+ start_at -> Timestamptz,
+ username -> Varchar,
+ payload -> Jsonb,
+ }
+}
+
+table! {
users (username) {
username -> Varchar,
password -> Varchar,
@@ -24,5 +34,11 @@ table! {
}
joinable!(strava_tokens -> users (username));
+joinable!(tasks -> users (username));
-allow_tables_to_appear_in_same_query!(config, strava_tokens, users,);
+allow_tables_to_appear_in_same_query!(
+ config,
+ strava_tokens,
+ tasks,
+ users,
+);
diff --git a/src/server.rs b/src/server.rs
index abc430b..f0dd591 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -17,6 +17,8 @@ use std::collections::HashMap;
use std::sync::mpsc::Sender;
use std::sync::Mutex;
use threadpool::ThreadPool;
+use chrono::Utc;
+use serde_json::to_value;
use crate::db;
use crate::error::Error;
@@ -133,14 +135,18 @@ fn link_strava_callback(
#[get("/import_strava")]
fn import_strava(
conn: Db,
- tx: State<Mutex<Sender<importer::Command>>>,
user: LoggedInUser,
) -> Result<(), Error> {
let user = db::get_user(&*conn, &user.username)?;
- tx.lock()
- .expect("FIX")
- .send(importer::Command::ImportStravaUser(user))
- .expect("FIX");
+ let command =
+ importer::Command::ImportStravaUser { username: user.username.clone() };
+ db::insert_task(&conn,
+ &models::NewTask {
+ start_at: Utc::now(),
+ state: models::TaskState::NEW,
+ username: user.username.as_str(),
+ payload: &to_value(command)?,
+ })?;
Ok(())
}
@@ -179,12 +185,16 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) {
.finalize()
.unwrap();
- let importer_pool = ThreadPool::with_name("import".to_string(), importer::WORKERS);
- let tx = importer::run(importer_pool.clone(), conn, &params);
+ let strava = strava::StravaImpl::new(
+ params.strava_client_id.clone(),
+ params.strava_client_secret.clone(),
+ );
+
+ let importer = importer::Importer::new(conn, strava);
+ importer.run();
rocket::custom(config)
.manage(params)
- .manage(Mutex::new(tx))
.mount(
"/",
routes![
@@ -200,5 +210,5 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) {
.attach(Db::fairing())
.launch();
- importer_pool.join();
+ importer.join();
}
diff --git a/src/strava.rs b/src/strava.rs
index 284d8b1..ff59c66 100644
--- a/src/strava.rs
+++ b/src/strava.rs
@@ -36,12 +36,12 @@ impl From<&models::StravaToken> for Token {
}
}
-pub trait StravaApi {
- fn get<T: Serialize + ?Sized>(
+pub trait StravaApi: Sync + Send {
+ fn get(
&self,
method: &str,
access_token: &str,
- parasm: &T,
+ params: &[(&str, &str)],
) -> Result<Value, Error>;
fn refresh_token(
@@ -70,11 +70,11 @@ impl StravaImpl {
}
impl StravaApi for StravaImpl {
- fn get<T: Serialize + ?Sized>(
+ fn get(
&self,
method: &str,
access_token: &str,
- params: &T,
+ params: &[(&str, &str)],
) -> Result<Value, Error> {
let uri = format!("{}{}{}", self.base_url, self.api_url, method);
let response = self