summaryrefslogtreecommitdiff
path: root/src/importer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/importer.rs')
-rw-r--r--src/importer.rs123
1 files changed, 33 insertions, 90 deletions
diff --git a/src/importer.rs b/src/importer.rs
index 6909350..05d56fd 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -1,26 +1,19 @@
+use chrono::Utc;
use diesel::PgConnection;
-use std::sync::mpsc::channel;
-use std::sync::mpsc::Receiver;
-use std::sync::mpsc::Sender;
+use serde::Deserialize;
+use serde::Serialize;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
-use threadpool::ThreadPool;
-use chrono::Utc;
-use timer::Timer;
-use timer::Guard;
-use std::time::Instant;
-use std::time::Duration;
use std::thread;
-use serde::Deserialize;
-use serde::Serialize;
+use std::time::Duration;
+use std::time::Instant;
+use threadpool::ThreadPool;
-use crate::error::Error;
use crate::db;
+use crate::error::Error;
use crate::models;
use crate::strava;
-use crate::strava::StravaApi;
-use crate::Params;
pub const WORKERS: usize = 10;
pub const EMPTY_PARAMS: &[(&str, &str)] = &[];
@@ -30,15 +23,6 @@ pub enum Command {
ImportStravaUser { username: String },
}
-macro_rules! clone {
- ( [ $( $i:ident ),* ] $e:expr ) => {
- {
- $(let $i = $i.clone();)*
- $e
- }
- }
-}
-
pub struct ImporterSharedData<StravaApi: strava::StravaApi + 'static> {
strava: RwLock<StravaApi>,
pool: Mutex<ThreadPool>,
@@ -50,9 +34,7 @@ pub struct Importer<StravaApi: strava::StravaApi + 'static> {
shared: Arc<ImporterSharedData<StravaApi>>,
}
-fn run_periodically<S: strava::StravaApi>(
- shared: Arc<ImporterSharedData<S>>,
- period: Duration) {
+fn run_periodically<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>, period: Duration) {
let sleep_time = Duration::from_millis(1000);
let mut now = Instant::now();
loop {
@@ -69,40 +51,34 @@ fn run_periodically<S: strava::StravaApi>(
}
}
-
fn handle_one_task<S: strava::StravaApi>(
- shared: Arc<ImporterSharedData<S>>) -> Result<models::Task, Error> {
+ shared: Arc<ImporterSharedData<S>>,
+) -> Result<models::Task, Error> {
let task = {
let conn = shared.conn.lock().unwrap();
let now = Utc::now();
let eta = now + chrono::Duration::seconds(5);
- db::take_task(&conn,
- models::TaskState::NEW,
- now,
- eta)?
+ db::take_task(&conn, models::TaskState::NEW, now, eta)?
};
let command = serde_json::from_value(task.payload.clone())?;
match command {
- Command::ImportStravaUser{ username } => {
- import_strava_user(shared, username.as_str())?
- },
+ Command::ImportStravaUser { username } => import_strava_user(shared, username.as_str())?,
}
Ok(task)
}
-fn handle_tasks<S: strava::StravaApi>(
- shared: Arc<ImporterSharedData<S>>) {
+fn handle_tasks<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>) {
let mut done = false;
while !done {
match handle_one_task(shared.clone()) {
Err(Error::NotFound) => {
info!("No more tasks");
done = true;
- },
+ }
Err(e) => {
error!("Error handling task: {}", e);
}
@@ -138,13 +114,15 @@ impl<StravaApi: strava::StravaApi> Importer<StravaApi> {
}
pub fn join(&self) {
- self.shared.pool.lock().expect("FIX").join()
+ *self.shared.running.lock().unwrap() = false;
+ self.shared.pool.lock().unwrap().join();
}
}
fn import_strava_user<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
- username: &str) -> Result<(), Error> {
+ username: &str,
+) -> Result<(), Error> {
let strava = shared.strava.read().unwrap();
let user = db::get_user(&shared.conn.lock().unwrap(), username)?;
@@ -157,29 +135,36 @@ fn import_strava_user<S: strava::StravaApi>(
for page in 1.. {
let params = [
("page", &format!("{}", page)[..]),
- ("per_page", &format!("{}", per_page)[..])
+ ("per_page", &format!("{}", per_page)[..]),
];
- let result = strava
- .get("/athlete/activities", &token.access_token, &params[..])?;
+ let result = strava.get("/athlete/activities", &token.access_token, &params[..])?;
- let result = result.as_array().ok_or(
- Error::UnexpectedJson(result.clone()))?;
+ let result = result
+ .as_array()
+ .ok_or(Error::UnexpectedJson(result.clone()))?;
for activity in result {
- info!("activity id: {} start: {}", activity["id"], activity["start_date"]);
+ info!(
+ "activity id: {} start: {}",
+ activity["id"], activity["start_date"]
+ );
}
if result.len() < per_page {
break;
}
thread::sleep(Duration::from_secs(1));
- };
+ }
Err(Error::InternalError)
}
-fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result<models::StravaToken, Error> {
+fn get_or_refresh_token<Strava: strava::StravaApi>(
+ strava: &Strava,
+ conn: &PgConnection,
+ user: &models::User,
+) -> Result<models::StravaToken, Error> {
let mut token = db::get_strava_token(&conn, &user).expect("FIX");
if token.expires_at < Utc::now() {
@@ -190,45 +175,3 @@ fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgCon
Ok(token)
}
-
-// fn handle_command(state: Importer, command: Command) {
-// info!("handle_command {:?}", command);
-// match command {
-// Command::ImportStravaUser(user) => import_strava_user(state, user),
-// Command::Quit => (),
-// }
-// }
-
-// fn receive_commands(state: Importer) {
-// info!("receive_commands");
-// match (|| -> Result<(), Box<dyn std::error::Error>> {
-// let rx = state.rx.lock()?;
-// let mut command = rx.recv()?;
-// loop {
-// info!("got command: {:?}", command);
-// let state0 = state.clone();
-// state.pool.execute(move || handle_command(state0, command));
-// command = rx.recv()?;
-// }
-// })() {
-// Ok(()) => (),
-// Err(e) => {
-// error!("receive_commands: {:?}", e);
-// ()
-// }
-// }
-// }
-
-// pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender<Command> {
-// let (tx, rx0) = channel();
-// let importer = Arc::new(Importer {
-// pool: Mutex::new(pool.clone()),
-// conn: Mutex::new(conn),
-// strava: RwLock::new(strava::StravaImpl::new(
-// params.strava_client_id.clone(), params.strava_client_secret.clone())),
-// rx: Mutex::new(rx0),
-// });
-// // pool.execute(move || receive_commands(state));
-// pool.execute(clone! { [importer] move || importer.run() });
-// tx
-// }