summaryrefslogtreecommitdiff
path: root/src/importer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/importer.rs')
-rw-r--r--src/importer.rs132
1 files changed, 111 insertions, 21 deletions
diff --git a/src/importer.rs b/src/importer.rs
index 799b6bb..644f3b0 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -12,6 +12,7 @@ use std::thread;
use std::time::Duration;
use std::time::Instant;
use threadpool::ThreadPool;
+use std::collections::HashSet;
use crate::db;
use crate::diesel::Connection;
@@ -26,6 +27,7 @@ pub const EMPTY_PARAMS: &[(&str, &str)] = &[];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
ImportStravaUser { username: String },
+ ImportStravaActivity { username: String, id: i64, },
ProcessAllRawData,
ProcessRawData(models::RawDataKey),
}
@@ -65,7 +67,8 @@ fn handle_one_task<S: strava::StravaApi>(
let command = serde_json::from_value(task.payload.clone())?;
match command {
- Command::ImportStravaUser { username } => import_strava_user(shared, username.as_str())?,
+ Command::ImportStravaUser { username } => import_strava_user(shared, task, username.as_str())?,
+ Command::ImportStravaActivity { username, id } => import_strava_activity(shared, task, username.as_str(), id)?,
Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?,
Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?,
}
@@ -158,6 +161,7 @@ fn to_run(data: &Value) -> Result<Value, Error> {
"moving_time": data["moving_time"],
"elapsed_time": data["elapsed_time"],
"name": data["name"],
+ "description": data["description"],
}))
}
@@ -167,7 +171,10 @@ fn process_strava_activity<S: strava::StravaApi>(
mut task: models::Task,
) -> Result<(), Error> {
if data.entry_type.is_some() && data.entry_id.is_some() {
- return Err(Error::InternalError);
+ task.state = models::TaskState::SUCCESSFUL;
+ let conn = &shared.conn.lock().unwrap();
+ db::update_task(conn, task)?;
+ return Ok(())
}
let json_error = || Error::UnexpectedJson(data.payload.clone());
@@ -175,7 +182,13 @@ fn process_strava_activity<S: strava::StravaApi>(
let entry_payload = match strava_type {
"Run" => to_run(&data.payload),
- &_ => Err(Error::NotFound),
+ t => {
+ info!("Ignoring strava activity with type {:?}", t);
+ task.state = models::TaskState::SUCCESSFUL;
+ let conn = &shared.conn.lock().unwrap();
+ db::update_task(conn, task)?;
+ return Ok(())
+ }
}?;
let entry_type = entry_payload["type"]
.as_str()
@@ -220,7 +233,7 @@ fn process_raw_data<S: strava::StravaApi>(
models::DataType::StravaActivity => process_strava_activity(shared.clone(), data, task)?,
};
- info!("Process finished.");
+ info!("Finished processing {:?}", key);
Ok(())
}
@@ -254,8 +267,58 @@ fn process_all_raw_data<S: strava::StravaApi>(
Ok(())
}
+fn import_strava_activity<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>,
+ mut task: models::Task,
+ username: &str,
+ id: i64
+ ) -> Result<(), Error> {
+ let strava = shared.strava.read().unwrap();
+ let user = db::get_user(&shared.conn.lock().unwrap(), username)?;
+
+ let token = {
+ let conn = shared.conn.lock().unwrap();
+ get_or_refresh_token(&*strava, &conn, &user)?
+ };
+
+ let activity = strava.get(&format!("/activities/{}", id), &token.access_token, &[])?;
+
+ let now = Utc::now();
+ let conn = &shared.conn.lock().unwrap();
+ conn.transaction::<(), Error, _>(|| {
+ db::insert_data(
+ conn,
+ &models::RawData {
+ data_type: models::DataType::StravaActivity,
+ id: id,
+ username: username.to_string(),
+ payload: activity,
+ entry_type: None,
+ entry_id: None,
+ })?;
+ db::insert_task(
+ conn,
+ &models::NewTask {
+ start_at: now,
+ state: models::TaskState::NEW,
+ username: username,
+ payload: &to_value(
+ Command::ProcessRawData(models::RawDataKey {
+ data_type: models::DataType::StravaActivity,
+ id: id,
+ username: username.to_string(),
+ }))?
+ })?;
+ task.state = models::TaskState::SUCCESSFUL;
+ db::update_task(conn, task)?;
+ Ok(())
+ })?;
+ Ok(())
+}
+
fn import_strava_user<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
+ mut task: models::Task,
username: &str,
) -> Result<(), Error> {
let strava = shared.strava.read().unwrap();
@@ -266,6 +329,7 @@ fn import_strava_user<S: strava::StravaApi>(
get_or_refresh_token(&*strava, &conn, &user)?
};
+ let mut missing_ids = HashSet::new();
let per_page = 30;
for page in 1.. {
let params = [
@@ -274,25 +338,27 @@ fn import_strava_user<S: strava::StravaApi>(
];
let result = strava.get("/athlete/activities", &token.access_token, &params[..])?;
+ if page == 1 {
+ info!("{:#?}", result);
+ }
let json_error = || Error::UnexpectedJson(result.clone());
let result = result.as_array().ok_or_else(json_error)?;
- for activity in result {
- let id = activity["id"].as_i64().ok_or_else(json_error)?;
- info!("activity id: {} start: {}", id, activity["start_date"]);
-
- db::insert_data(
- &shared.conn.lock().unwrap(),
- &models::RawData {
- data_type: models::DataType::StravaActivity,
- id: id,
- username: username.to_string(),
- payload: activity.clone(),
- entry_type: None,
- entry_id: None,
- },
- )?;
+ let activity_ids = result
+ .iter()
+ .map(|a| a["id"].as_i64().ok_or_else(json_error))
+ .collect::<Result<Vec<_>, _>>()?;
+
+ let missing_on_page = db::find_missing_data(
+ &shared.conn.lock().unwrap(),
+ username,
+ models::DataType::StravaActivity,
+ &activity_ids)?;
+ missing_ids.extend(missing_on_page.iter());
+
+ if missing_on_page.len() < per_page / 3 {
+ break;
}
if result.len() < per_page {
@@ -301,7 +367,30 @@ fn import_strava_user<S: strava::StravaApi>(
thread::sleep(Duration::from_secs(1));
}
- Err(Error::InternalError)
+ let conn = &shared.conn.lock().unwrap();
+ let now = Utc::now();
+ conn.transaction::<(), Error, _>(|| {
+ for &missing_id in &missing_ids {
+ db::insert_task(
+ conn,
+ &models::NewTask {
+ start_at: now,
+ state: models::TaskState::NEW,
+ username: username,
+ payload: &to_value(
+ Command::ImportStravaActivity {
+ username: username.to_string(),
+ id: missing_id,
+ })?,
+ })?;
+ }
+ task.state = models::TaskState::SUCCESSFUL;
+ db::update_task(conn, task)?;
+ info!("scheduled {} tasks for {}", missing_ids.len(), username);
+ Ok(())
+ })?;
+
+ Ok(())
}
fn get_or_refresh_token<Strava: strava::StravaApi>(
@@ -309,12 +398,13 @@ fn get_or_refresh_token<Strava: strava::StravaApi>(
conn: &PgConnection,
user: &models::User,
) -> Result<models::StravaToken, Error> {
- let mut token = db::get_strava_token(&conn, &user).expect("FIX");
+ let mut token = db::get_strava_token(&conn, &user)?;
if token.expires_at < Utc::now() {
info!("refresh expired token: {:?}", token.expires_at);
let new_token = strava.refresh_token(&From::from(&token))?;
new_token.update_model(&mut token);
+ db::update_strava_token(conn, &token)?;
}
Ok(token)