summaryrefslogtreecommitdiff
path: root/src/importer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/importer.rs')
-rw-r--r--src/importer.rs257
1 files changed, 191 insertions, 66 deletions
diff --git a/src/importer.rs b/src/importer.rs
index 9ea7e35..6909350 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -7,6 +7,13 @@ use std::sync::Mutex;
use std::sync::RwLock;
use threadpool::ThreadPool;
use chrono::Utc;
+use timer::Timer;
+use timer::Guard;
+use std::time::Instant;
+use std::time::Duration;
+use std::thread;
+use serde::Deserialize;
+use serde::Serialize;
use crate::error::Error;
use crate::db;
@@ -18,92 +25,210 @@ use crate::Params;
pub const WORKERS: usize = 10;
pub const EMPTY_PARAMS: &[(&str, &str)] = &[];
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
- ImportStravaUser(models::User),
- Quit,
+ ImportStravaUser { username: String },
}
-#[derive(Clone)]
-struct ImporterState {
- pool: ThreadPool,
- conn: Arc<Mutex<PgConnection>>,
- strava: Arc<RwLock<strava::StravaImpl>>,
- rx: Arc<Mutex<Receiver<Command>>>,
+macro_rules! clone {
+ ( [ $( $i:ident ),* ] $e:expr ) => {
+ {
+ $(let $i = $i.clone();)*
+ $e
+ }
+ }
}
-fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result<models::StravaToken, Error> {
- let mut token = db::get_strava_token(&conn, &user).expect("FIX");
+pub struct ImporterSharedData<StravaApi: strava::StravaApi + 'static> {
+ strava: RwLock<StravaApi>,
+ pool: Mutex<ThreadPool>,
+ conn: Mutex<PgConnection>,
+ running: Mutex<bool>,
+}
- if token.expires_at < Utc::now() {
- info!("refresh expired token: {:?}", token.expires_at);
- let new_token = strava.refresh_token(&From::from(&token))?;
- new_token.update_model(&mut token);
+pub struct Importer<StravaApi: strava::StravaApi + 'static> {
+ shared: Arc<ImporterSharedData<StravaApi>>,
+}
+
+fn run_periodically<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>,
+ period: Duration) {
+ let sleep_time = Duration::from_millis(1000);
+ let mut now = Instant::now();
+ loop {
+ while now.elapsed() < period {
+ if !*shared.running.lock().unwrap() {
+ return;
+ }
+ thread::sleep(sleep_time);
+ }
+ now = Instant::now();
+
+ info!("run_periodically: wakeup");
+ handle_tasks(shared.clone())
}
+}
- Ok(token)
+
+fn handle_one_task<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>) -> Result<models::Task, Error> {
+ let task = {
+ let conn = shared.conn.lock().unwrap();
+ let now = Utc::now();
+ let eta = now + chrono::Duration::seconds(5);
+
+ db::take_task(&conn,
+ models::TaskState::NEW,
+ now,
+ eta)?
+ };
+
+ let command = serde_json::from_value(task.payload.clone())?;
+
+ match command {
+ Command::ImportStravaUser{ username } => {
+ import_strava_user(shared, username.as_str())?
+ },
+ }
+
+ Ok(task)
}
-fn import_strava_user(state: ImporterState, user: models::User) {
- use std::thread::sleep;
- use std::time::Duration;
+fn handle_tasks<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>) {
+ let mut done = false;
+ while !done {
+ match handle_one_task(shared.clone()) {
+ Err(Error::NotFound) => {
+ info!("No more tasks");
+ done = true;
+ },
+ Err(e) => {
+ error!("Error handling task: {}", e);
+ }
+ Ok(t) => {
+ info!("Successfully handled task: {:?}", t);
+ }
+ };
+ }
+}
+
+impl<StravaApi: strava::StravaApi> Importer<StravaApi> {
+ pub fn new(conn: PgConnection, strava: StravaApi) -> Importer<StravaApi> {
+ let shared = Arc::new(ImporterSharedData {
+ pool: Mutex::new(ThreadPool::with_name("importer".to_string(), WORKERS)),
+ conn: Mutex::new(conn),
+ strava: RwLock::new(strava),
+ running: Mutex::new(false),
+ });
+ Importer { shared: shared }
+ }
- let strava = state.strava.read().expect("FIX");
- let conn = state.conn.lock().expect("FIX");
- let token = get_or_refresh_token(&*strava, &conn, &user).expect("FIX");
+ pub fn run(&self) {
+ info!("run()");
+ let pool = self.shared.pool.lock().unwrap();
+ let mut running = self.shared.running.lock().unwrap();
+ if !*running {
+ *running = true;
+ pool.execute({
+ let shared = self.shared.clone();
+ move || run_periodically(shared, Duration::from_secs(10))
+ });
+ }
+ }
+
+ pub fn join(&self) {
+ self.shared.pool.lock().expect("FIX").join()
+ }
+}
+
+fn import_strava_user<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>,
+ username: &str) -> Result<(), Error> {
+ let strava = shared.strava.read().unwrap();
+ let user = db::get_user(&shared.conn.lock().unwrap(), username)?;
+
+ let token = {
+ let conn = shared.conn.lock().unwrap();
+ get_or_refresh_token(&*strava, &conn, &user)?
+ };
+
+ let per_page = 30;
for page in 1.. {
let params = [
- ("page", &format!("{}", page)),
- ("per_page", &format!("{}", 200)),
+ ("page", &format!("{}", page)[..]),
+ ("per_page", &format!("{}", per_page)[..])
];
+
let result = strava
- .get("/athlete/activities", &token.access_token, &params)
- .expect("ok");
- // info!("import_strava_user: Got result: {:#?}", result);
- for activity in result.as_array().expect("FIX") {
+ .get("/athlete/activities", &token.access_token, &params[..])?;
+
+ let result = result.as_array().ok_or(
+ Error::UnexpectedJson(result.clone()))?;
+
+ for activity in result {
info!("activity id: {} start: {}", activity["id"], activity["start_date"]);
}
- sleep(Duration::from_secs(1));
- }
-}
-fn handle_command(state: ImporterState, command: Command) {
- info!("handle_command {:?}", command);
- match command {
- Command::ImportStravaUser(user) => import_strava_user(state, user),
- Command::Quit => (),
- }
+ if result.len() < per_page {
+ break;
+ }
+ thread::sleep(Duration::from_secs(1));
+ };
+
+ Err(Error::InternalError)
}
-fn receive_commands(state: ImporterState) {
- info!("receive_commands");
- match (|| -> Result<(), Box<dyn std::error::Error>> {
- let rx = state.rx.lock()?;
- let mut command = rx.recv()?;
- loop {
- info!("got command: {:?}", command);
- let state0 = state.clone();
- state.pool.execute(move || handle_command(state0, command));
- command = rx.recv()?;
- }
- })() {
- Ok(()) => (),
- Err(e) => {
- error!("receive_commands: {:?}", e);
- ()
- }
+fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result<models::StravaToken, Error> {
+ let mut token = db::get_strava_token(&conn, &user).expect("FIX");
+
+ if token.expires_at < Utc::now() {
+ info!("refresh expired token: {:?}", token.expires_at);
+ let new_token = strava.refresh_token(&From::from(&token))?;
+ new_token.update_model(&mut token);
}
-}
-pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender<Command> {
- let (tx, rx0) = channel();
- let state = ImporterState {
- pool: pool.clone(),
- conn: Arc::new(Mutex::new(conn)),
- strava: Arc::new(RwLock::new(strava::StravaImpl::new(
- params.strava_client_id.clone(), params.strava_client_secret.clone()))),
- rx: Arc::new(Mutex::new(rx0)),
- };
- pool.execute(move || receive_commands(state));
- tx
+ Ok(token)
}
+
+// fn handle_command(state: Importer, command: Command) {
+// info!("handle_command {:?}", command);
+// match command {
+// Command::ImportStravaUser(user) => import_strava_user(state, user),
+// Command::Quit => (),
+// }
+// }
+
+// fn receive_commands(state: Importer) {
+// info!("receive_commands");
+// match (|| -> Result<(), Box<dyn std::error::Error>> {
+// let rx = state.rx.lock()?;
+// let mut command = rx.recv()?;
+// loop {
+// info!("got command: {:?}", command);
+// let state0 = state.clone();
+// state.pool.execute(move || handle_command(state0, command));
+// command = rx.recv()?;
+// }
+// })() {
+// Ok(()) => (),
+// Err(e) => {
+// error!("receive_commands: {:?}", e);
+// ()
+// }
+// }
+// }
+
+// pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender<Command> {
+// let (tx, rx0) = channel();
+// let importer = Arc::new(Importer {
+// pool: Mutex::new(pool.clone()),
+// conn: Mutex::new(conn),
+// strava: RwLock::new(strava::StravaImpl::new(
+// params.strava_client_id.clone(), params.strava_client_secret.clone())),
+// rx: Mutex::new(rx0),
+// });
+// // pool.execute(move || receive_commands(state));
+// pool.execute(clone! { [importer] move || importer.run() });
+// tx
+// }