summaryrefslogtreecommitdiff
path: root/src/importer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/importer.rs')
-rw-r--r--src/importer.rs129
1 files changed, 75 insertions, 54 deletions
diff --git a/src/importer.rs b/src/importer.rs
index c831ca9..f039d03 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -2,6 +2,9 @@ use chrono::Utc;
use diesel::PgConnection;
use serde::Deserialize;
use serde::Serialize;
+use serde_json::to_value;
+use serde_json::Value;
+use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
@@ -9,13 +12,12 @@ use std::thread;
use std::time::Duration;
use std::time::Instant;
use threadpool::ThreadPool;
-use serde_json::to_value;
-use serde_json::Value;
-use crate::diesel::Connection;
use crate::db;
+use crate::diesel::Connection;
use crate::error::Error;
use crate::models;
+use crate::schema;
use crate::strava;
pub const WORKERS: usize = 10;
@@ -58,7 +60,7 @@ fn run_periodically<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>, pe
fn handle_one_task<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
- task: models::Task
+ task: models::Task,
) -> Result<(), Error> {
let command = serde_json::from_value(task.payload.clone())?;
@@ -67,7 +69,7 @@ fn handle_one_task<S: strava::StravaApi>(
Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?,
Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?,
}
-
+
Ok(())
}
@@ -86,18 +88,18 @@ fn handle_tasks<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>) {
Err(Error::NotFound) => {
info!("No more tasks");
done = true;
- },
+ }
Err(e) => {
error!("Failed to get task: {:?}", e);
done = true;
- },
- Ok(t) => {
- match handle_one_task(shared.clone(), t) {
- Err(e) => error!("Task failed: {:?}", e),
- _ => ()
- }
}
+ Ok(t) => match handle_one_task(shared.clone(), t) {
+ Err(e) => error!("Task failed: {:?}", e),
+ _ => (),
+ },
}
+
+ thread::sleep(Duration::from_secs(10));
}
}
@@ -135,13 +137,15 @@ fn to_run(data: &Value) -> Result<Value, Error> {
info!("to_run");
macro_rules! get {
($id:ident str $e:expr) => {
- $id[$e].as_str().ok_or_else(
- || Error::UnexpectedJson($id.clone()))
+ $id[$e]
+ .as_str()
+ .ok_or_else(|| Error::UnexpectedJson($id.clone()))
};
($id:ident f64 $e:expr) => {
- $id[$e].as_f64().ok_or_else(
- || Error::UnexpectedJson($id.clone()))
+ $id[$e]
+ .as_f64()
+ .ok_or_else(|| Error::UnexpectedJson($id.clone()))
};
};
@@ -154,46 +158,65 @@ fn to_run(data: &Value) -> Result<Value, Error> {
fn process_strava_activity<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
- data: models::RawData) -> Result<Value, Error> {
-
+ data: models::RawData,
+ mut task: models::Task,
+) -> Result<(), Error> {
let json_error = || Error::UnexpectedJson(data.payload.clone());
- let dtype = data.payload["type"].as_str().ok_or_else(json_error)?;
+ let strava_type = data.payload["type"].as_str().ok_or_else(json_error)?;
- let payload = match dtype {
- "Run" => {
- to_run(&data.payload)?
- },
- &_ => Err(Error::InternalError)?,
- };
+ let entry_type = match strava_type {
+ "Run" => Ok("run".to_string()),
+ &_ => Err(Error::NotFound),
+ }?;
- Ok(payload)
+ let conn = &shared.conn.lock().unwrap();
+ conn.transaction::<(), Error, _>(|| {
+ let entry = models::NewEntry {
+ username: data.username.as_str(),
+ entry_type: entry_type.as_str(),
+ timestamp: None,
+ payload: json!({}),
+ };
+
+ let id = insert!(entries <= conn, &entry)?;
+
+ let entry_data = models::EntryData {
+ username: entry.username.to_string(),
+ entry_type: entry.entry_type.to_string(),
+ entry_id: id,
+ data_type: data.data_type,
+ data_id: data.id,
+ };
+ insert!(entry_data <= conn, &entry_data)?;
+
+ task.state = models::TaskState::SUCCESSFUL;
+ db::update_task(conn, task)?;
+ Ok(())
+ })?;
+
+ Ok(())
}
fn process_raw_data<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
key: &models::RawDataKey,
- mut task: models::Task) -> Result<(), Error> {
-
+ mut task: models::Task,
+) -> Result<(), Error> {
let data = db::get_raw_data(&shared.conn.lock().unwrap(), key)?;
println!("Process raw data: {:#?}", data);
- let payload = match data.data_type {
- models::DataType::StravaActivity => {
- process_strava_activity(shared.clone(), data)?
- }
+ match data.data_type {
+ models::DataType::StravaActivity => process_strava_activity(shared.clone(), data, task)?,
};
- info!("Process finished. Payload: {:#?}", payload);
- unimplemented!();
-
- task.state = models::TaskState::SUCCESSFUL;
- db::update_task(&shared.conn.lock().unwrap(), task)?;
+ info!("Process finished.");
Ok(())
}
fn process_all_raw_data<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
- mut task: models::Task) -> Result<(), Error> {
+ mut task: models::Task,
+) -> Result<(), Error> {
let now = Utc::now();
let conn = &shared.conn.lock().unwrap();
@@ -209,7 +232,8 @@ fn process_all_raw_data<S: strava::StravaApi>(
state: models::TaskState::NEW,
username: key.username.as_str(),
payload: &to_value(Command::ProcessRawData(key.clone()))?,
- })?;
+ },
+ )?;
}
task.state = models::TaskState::SUCCESSFUL;
db::update_task(conn, task)?;
@@ -241,24 +265,21 @@ fn import_strava_user<S: strava::StravaApi>(
let result = strava.get("/athlete/activities", &token.access_token, &params[..])?;
let json_error = || Error::UnexpectedJson(result.clone());
- let result = result
- .as_array()
- .ok_or_else(json_error)?;
+ let result = result.as_array().ok_or_else(json_error)?;
for activity in result {
let id = activity["id"].as_i64().ok_or_else(json_error)?;
- info!(
- "activity id: {} start: {}",
- id, activity["start_date"]
- );
-
- db::insert_data(&shared.conn.lock().unwrap(),
- &models::RawData {
- data_type: models::DataType::StravaActivity,
- id: id,
- username: username.to_string(),
- payload: activity.clone(),
- })?;
+ info!("activity id: {} start: {}", id, activity["start_date"]);
+
+ db::insert_data(
+ &shared.conn.lock().unwrap(),
+ &models::RawData {
+ data_type: models::DataType::StravaActivity,
+ id: id,
+ username: username.to_string(),
+ payload: activity.clone(),
+ },
+ )?;
}
if result.len() < per_page {