summaryrefslogtreecommitdiff
path: root/src/importer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/importer.rs')
-rw-r--r--src/importer.rs84
1 files changed, 67 insertions, 17 deletions
diff --git a/src/importer.rs b/src/importer.rs
index 5f5790a..b1c1b18 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -9,7 +9,9 @@ use std::thread;
use std::time::Duration;
use std::time::Instant;
use threadpool::ThreadPool;
+use serde_json::to_value;
+use crate::diesel::Connection;
use crate::db;
use crate::error::Error;
use crate::models;
@@ -21,6 +23,8 @@ pub const EMPTY_PARAMS: &[(&str, &str)] = &[];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
ImportStravaUser { username: String },
+ ProcessAllRawData,
+ ProcessRawData(models::RawDataKey),
}
pub struct ImporterSharedData<StravaApi: strava::StravaApi + 'static> {
@@ -53,39 +57,46 @@ fn run_periodically<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>, pe
fn handle_one_task<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
-) -> Result<models::Task, Error> {
- let task = {
- let conn = shared.conn.lock().unwrap();
- let now = Utc::now();
- let eta = now + chrono::Duration::seconds(5);
-
- db::take_task(&conn, models::TaskState::NEW, now, eta)?
- };
-
+ task: models::Task
+) -> Result<(), Error> {
let command = serde_json::from_value(task.payload.clone())?;
match command {
Command::ImportStravaUser { username } => import_strava_user(shared, username.as_str())?,
+ Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?,
+ Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?,
}
-
- Ok(task)
+
+ Ok(())
}
fn handle_tasks<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>) {
let mut done = false;
while !done {
- match handle_one_task(shared.clone()) {
+ let task = (|| {
+ let conn = shared.conn.lock().unwrap();
+ let now = Utc::now();
+ let eta = now + chrono::Duration::seconds(5);
+
+ db::take_task(&conn, models::TaskState::NEW, now, eta)
+ })();
+
+ match task {
Err(Error::NotFound) => {
info!("No more tasks");
done = true;
- }
+ },
Err(e) => {
- error!("Error handling task: {}", e);
- }
+ error!("Failed to get task: {:?}", e);
+ done = true;
+ },
Ok(t) => {
- info!("Successfully handled task: {:?}", t);
+ match handle_one_task(shared.clone(), t) {
+ Err(e) => error!("Task failed: {:?}", e),
+ _ => ()
+ }
}
- };
+ }
}
}
@@ -119,6 +130,45 @@ impl<StravaApi: strava::StravaApi> Importer<StravaApi> {
}
}
+fn process_raw_data<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>,
+ key: &models::RawDataKey,
+ mut task: models::Task) -> Result<(), Error> {
+
+ let data = db::get_raw_data(&shared.conn.lock().unwrap(), key)?;
+ println!("Process raw data: {:#?}", data);
+
+ unimplemented!();
+}
+
+fn process_all_raw_data<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>,
+ mut task: models::Task) -> Result<(), Error> {
+ let now = Utc::now();
+ let conn = &shared.conn.lock().unwrap();
+
+ let mut tasks = 0;
+ conn.transaction::<(), Error, _>(|| {
+ let keys = db::get_raw_data_keys(conn)?;
+ tasks = keys.len();
+ for key in keys {
+ db::insert_task(
+ conn,
+ &models::NewTask {
+ start_at: now,
+ state: models::TaskState::NEW,
+ username: key.username.as_str(),
+ payload: &to_value(Command::ProcessRawData(key.clone()))?,
+ })?;
+ }
+ task.state = models::TaskState::SUCCESSFUL;
+ db::update_task(conn, task)?;
+ Ok(())
+ })?;
+ info!("process_all_raw_data: Added {} tasks", tasks);
+ Ok(())
+}
+
fn import_strava_user<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
username: &str,