summaryrefslogtreecommitdiff
path: root/src/importer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/importer.rs')
-rw-r--r--src/importer.rs59
1 files changed, 35 insertions, 24 deletions
diff --git a/src/importer.rs b/src/importer.rs
index f039d03..7cbc1db 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -1,10 +1,10 @@
+use chrono::DateTime;
use chrono::Utc;
use diesel::PgConnection;
use serde::Deserialize;
use serde::Serialize;
use serde_json::to_value;
use serde_json::Value;
-use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
@@ -79,7 +79,7 @@ fn handle_tasks<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>) {
let task = (|| {
let conn = shared.conn.lock().unwrap();
let now = Utc::now();
- let eta = now + chrono::Duration::seconds(5);
+ let eta = now + chrono::Duration::minutes(5);
db::take_task(&conn, models::TaskState::NEW, now, eta)
})();
@@ -99,7 +99,7 @@ fn handle_tasks<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>) {
},
}
- thread::sleep(Duration::from_secs(10));
+ thread::sleep(Duration::from_millis(10));
}
}
@@ -151,8 +151,11 @@ fn to_run(data: &Value) -> Result<Value, Error> {
Ok(json!({
"type": "run",
- "distance": get!(data f64 "distance")?,
- "elapsed_time": get!(data f64 "elapsed_time")?,
+ "start_timestamp": data["start_date"],
+ "distance": data["distance"],
+ "moving_time": data["moving_time"],
+ "elapsed_time": data["elapsed_time"],
+ "name": data["name"],
}))
}
@@ -161,33 +164,41 @@ fn process_strava_activity<S: strava::StravaApi>(
data: models::RawData,
mut task: models::Task,
) -> Result<(), Error> {
+ if data.entry_type.is_some() && data.entry_id.is_some() {
+ return Err(Error::InternalError);
+ }
+
let json_error = || Error::UnexpectedJson(data.payload.clone());
let strava_type = data.payload["type"].as_str().ok_or_else(json_error)?;
- let entry_type = match strava_type {
- "Run" => Ok("run".to_string()),
+ let entry_payload = match strava_type {
+ "Run" => to_run(&data.payload),
&_ => Err(Error::NotFound),
}?;
+ let entry_type = entry_payload["type"]
+ .as_str()
+ .ok_or_else(json_error)?
+ .to_string();
+ let timestamp = entry_payload["start_timestamp"]
+ .as_str()
+ .map(|t| DateTime::parse_from_rfc3339(t).map(|t| t.with_timezone(&Utc)))
+ .transpose()?;
let conn = &shared.conn.lock().unwrap();
conn.transaction::<(), Error, _>(|| {
- let entry = models::NewEntry {
- username: data.username.as_str(),
- entry_type: entry_type.as_str(),
- timestamp: None,
- payload: json!({}),
+ let (entry_type, id) = {
+ let entry = models::NewEntry {
+ username: data.username.as_str(),
+ entry_type: entry_type.as_str(),
+ timestamp: timestamp,
+ payload: entry_payload,
+ };
+ info!("Inserting entry: {:#?}", entry);
+ let id = insert!(entries <= conn, &entry)?;
+ (entry.entry_type.to_string(), id)
};
- let id = insert!(entries <= conn, &entry)?;
-
- let entry_data = models::EntryData {
- username: entry.username.to_string(),
- entry_type: entry.entry_type.to_string(),
- entry_id: id,
- data_type: data.data_type,
- data_id: data.id,
- };
- insert!(entry_data <= conn, &entry_data)?;
+ db::link_data(conn, data, &entry_type, id)?;
task.state = models::TaskState::SUCCESSFUL;
db::update_task(conn, task)?;
@@ -203,8 +214,6 @@ fn process_raw_data<S: strava::StravaApi>(
mut task: models::Task,
) -> Result<(), Error> {
let data = db::get_raw_data(&shared.conn.lock().unwrap(), key)?;
- println!("Process raw data: {:#?}", data);
-
match data.data_type {
models::DataType::StravaActivity => process_strava_activity(shared.clone(), data, task)?,
};
@@ -278,6 +287,8 @@ fn import_strava_user<S: strava::StravaApi>(
id: id,
username: username.to_string(),
payload: activity.clone(),
+ entry_type: None,
+ entry_id: None,
},
)?;
}