summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-11 20:32:29 -0500
committerKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-11 20:32:29 -0500
commit02eca80d8e357e80585f766b873ef3efceabfe42 (patch)
tree201707371ee11b660934216ed8262bdcc4871180
parent1be4dce20256f22d0be648fad016e819a050e95d (diff)
Import detailed activities from Strava
-rw-r--r--src/db.rs30
-rw-r--r--src/importer.rs132
-rw-r--r--src/models.rs2
-rw-r--r--src/template.rs8
-rw-r--r--templates/profile.hbs3
5 files changed, 150 insertions, 25 deletions
diff --git a/src/db.rs b/src/db.rs
index 5b165c9..7e271bc 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -137,6 +137,16 @@ pub fn get_strava_token(
Ok(token)
}
+pub fn update_strava_token(
+ conn: &PgConnection,
+ token: &models::StravaToken) -> Result<(), Error> {
+ use crate::schema::strava_tokens;
+ diesel::update(strava_tokens::table).set(token)
+ .execute(conn)?;
+ Ok(())
+}
+
+
pub fn insert_task(conn: &PgConnection, task: &models::NewTask) -> Result<i64, Error> {
use crate::schema::tasks;
let id = diesel::insert_into(tasks::table)
@@ -192,6 +202,25 @@ pub fn take_task(
})
}
+pub fn find_missing_data(conn: &PgConnection, username: &str, data_type: models::DataType, ids: &[i64])
+ -> Result<Vec<i64>, Error> {
+ use diesel::pg::expression::dsl::any;
+ use crate::schema::raw_data;
+ use std::collections::HashSet;
+
+ let present: HashSet<i64> = raw_data::table
+ .select(raw_data::id)
+ .filter(raw_data::username.eq(username)
+ .and(raw_data::data_type.eq(data_type))
+ .and(raw_data::id.eq(any(ids))))
+ .get_results::<i64>(conn)?
+ .into_iter().collect();
+
+ let ids: HashSet<i64> = ids.iter().map(|v| *v).collect();
+ let missing = ids.difference(&present);
+ Ok(missing.map(|v| *v).collect())
+}
+
pub fn insert_data(conn: &PgConnection, data: &models::RawData) -> Result<usize, Error> {
use crate::schema::raw_data;
let rows = diesel::insert_into(raw_data::table)
@@ -253,6 +282,7 @@ pub fn get_entries(
.eq(username)
.and(entries::entry_type.eq(entry_type)),
)
+ .order(entries::timestamp.desc())
.get_results::<models::Entry>(conn)?;
Ok(r)
}
diff --git a/src/importer.rs b/src/importer.rs
index 799b6bb..644f3b0 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -12,6 +12,7 @@ use std::thread;
use std::time::Duration;
use std::time::Instant;
use threadpool::ThreadPool;
+use std::collections::HashSet;
use crate::db;
use crate::diesel::Connection;
@@ -26,6 +27,7 @@ pub const EMPTY_PARAMS: &[(&str, &str)] = &[];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
ImportStravaUser { username: String },
+ ImportStravaActivity { username: String, id: i64, },
ProcessAllRawData,
ProcessRawData(models::RawDataKey),
}
@@ -65,7 +67,8 @@ fn handle_one_task<S: strava::StravaApi>(
let command = serde_json::from_value(task.payload.clone())?;
match command {
- Command::ImportStravaUser { username } => import_strava_user(shared, username.as_str())?,
+ Command::ImportStravaUser { username } => import_strava_user(shared, task, username.as_str())?,
+ Command::ImportStravaActivity { username, id } => import_strava_activity(shared, task, username.as_str(), id)?,
Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?,
Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?,
}
@@ -158,6 +161,7 @@ fn to_run(data: &Value) -> Result<Value, Error> {
"moving_time": data["moving_time"],
"elapsed_time": data["elapsed_time"],
"name": data["name"],
+ "description": data["description"],
}))
}
@@ -167,7 +171,10 @@ fn process_strava_activity<S: strava::StravaApi>(
mut task: models::Task,
) -> Result<(), Error> {
if data.entry_type.is_some() && data.entry_id.is_some() {
- return Err(Error::InternalError);
+ task.state = models::TaskState::SUCCESSFUL;
+ let conn = &shared.conn.lock().unwrap();
+ db::update_task(conn, task)?;
+ return Ok(())
}
let json_error = || Error::UnexpectedJson(data.payload.clone());
@@ -175,7 +182,13 @@ fn process_strava_activity<S: strava::StravaApi>(
let entry_payload = match strava_type {
"Run" => to_run(&data.payload),
- &_ => Err(Error::NotFound),
+ t => {
+ info!("Ignoring strava activity with type {:?}", t);
+ task.state = models::TaskState::SUCCESSFUL;
+ let conn = &shared.conn.lock().unwrap();
+ db::update_task(conn, task)?;
+ return Ok(())
+ }
}?;
let entry_type = entry_payload["type"]
.as_str()
@@ -220,7 +233,7 @@ fn process_raw_data<S: strava::StravaApi>(
models::DataType::StravaActivity => process_strava_activity(shared.clone(), data, task)?,
};
- info!("Process finished.");
+ info!("Finished processing {:?}", key);
Ok(())
}
@@ -254,8 +267,58 @@ fn process_all_raw_data<S: strava::StravaApi>(
Ok(())
}
+fn import_strava_activity<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>,
+ mut task: models::Task,
+ username: &str,
+ id: i64
+ ) -> Result<(), Error> {
+ let strava = shared.strava.read().unwrap();
+ let user = db::get_user(&shared.conn.lock().unwrap(), username)?;
+
+ let token = {
+ let conn = shared.conn.lock().unwrap();
+ get_or_refresh_token(&*strava, &conn, &user)?
+ };
+
+ let activity = strava.get(&format!("/activities/{}", id), &token.access_token, &[])?;
+
+ let now = Utc::now();
+ let conn = &shared.conn.lock().unwrap();
+ conn.transaction::<(), Error, _>(|| {
+ db::insert_data(
+ conn,
+ &models::RawData {
+ data_type: models::DataType::StravaActivity,
+ id: id,
+ username: username.to_string(),
+ payload: activity,
+ entry_type: None,
+ entry_id: None,
+ })?;
+ db::insert_task(
+ conn,
+ &models::NewTask {
+ start_at: now,
+ state: models::TaskState::NEW,
+ username: username,
+ payload: &to_value(
+ Command::ProcessRawData(models::RawDataKey {
+ data_type: models::DataType::StravaActivity,
+ id: id,
+ username: username.to_string(),
+ }))?
+ })?;
+ task.state = models::TaskState::SUCCESSFUL;
+ db::update_task(conn, task)?;
+ Ok(())
+ })?;
+ Ok(())
+}
+
fn import_strava_user<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
+ mut task: models::Task,
username: &str,
) -> Result<(), Error> {
let strava = shared.strava.read().unwrap();
@@ -266,6 +329,7 @@ fn import_strava_user<S: strava::StravaApi>(
get_or_refresh_token(&*strava, &conn, &user)?
};
+ let mut missing_ids = HashSet::new();
let per_page = 30;
for page in 1.. {
let params = [
@@ -274,25 +338,27 @@ fn import_strava_user<S: strava::StravaApi>(
];
let result = strava.get("/athlete/activities", &token.access_token, &params[..])?;
+ if page == 1 {
+ info!("{:#?}", result);
+ }
let json_error = || Error::UnexpectedJson(result.clone());
let result = result.as_array().ok_or_else(json_error)?;
- for activity in result {
- let id = activity["id"].as_i64().ok_or_else(json_error)?;
- info!("activity id: {} start: {}", id, activity["start_date"]);
-
- db::insert_data(
- &shared.conn.lock().unwrap(),
- &models::RawData {
- data_type: models::DataType::StravaActivity,
- id: id,
- username: username.to_string(),
- payload: activity.clone(),
- entry_type: None,
- entry_id: None,
- },
- )?;
+ let activity_ids = result
+ .iter()
+ .map(|a| a["id"].as_i64().ok_or_else(json_error))
+ .collect::<Result<Vec<_>, _>>()?;
+
+ let missing_on_page = db::find_missing_data(
+ &shared.conn.lock().unwrap(),
+ username,
+ models::DataType::StravaActivity,
+ &activity_ids)?;
+ missing_ids.extend(missing_on_page.iter());
+
+ if missing_on_page.len() < per_page / 3 {
+ break;
}
if result.len() < per_page {
@@ -301,7 +367,30 @@ fn import_strava_user<S: strava::StravaApi>(
thread::sleep(Duration::from_secs(1));
}
- Err(Error::InternalError)
+ let conn = &shared.conn.lock().unwrap();
+ let now = Utc::now();
+ conn.transaction::<(), Error, _>(|| {
+ for &missing_id in &missing_ids {
+ db::insert_task(
+ conn,
+ &models::NewTask {
+ start_at: now,
+ state: models::TaskState::NEW,
+ username: username,
+ payload: &to_value(
+ Command::ImportStravaActivity {
+ username: username.to_string(),
+ id: missing_id,
+ })?,
+ })?;
+ }
+ task.state = models::TaskState::SUCCESSFUL;
+ db::update_task(conn, task)?;
+ info!("scheduled {} tasks for {}", missing_ids.len(), username);
+ Ok(())
+ })?;
+
+ Ok(())
}
fn get_or_refresh_token<Strava: strava::StravaApi>(
@@ -309,12 +398,13 @@ fn get_or_refresh_token<Strava: strava::StravaApi>(
conn: &PgConnection,
user: &models::User,
) -> Result<models::StravaToken, Error> {
- let mut token = db::get_strava_token(&conn, &user).expect("FIX");
+ let mut token = db::get_strava_token(&conn, &user)?;
if token.expires_at < Utc::now() {
info!("refresh expired token: {:?}", token.expires_at);
let new_token = strava.refresh_token(&From::from(&token))?;
new_token.update_model(&mut token);
+ db::update_strava_token(conn, &token)?;
}
Ok(token)
diff --git a/src/models.rs b/src/models.rs
index 83153fd..4a85369 100644
--- a/src/models.rs
+++ b/src/models.rs
@@ -100,7 +100,7 @@ impl fmt::Debug for User {
}
}
-#[derive(Insertable, Queryable)]
+#[derive(AsChangeset, Insertable, Queryable)]
#[table_name = "strava_tokens"]
pub struct StravaToken {
pub username: String,
diff --git a/src/template.rs b/src/template.rs
index 0ac67fb..e50ad21 100644
--- a/src/template.rs
+++ b/src/template.rs
@@ -42,6 +42,14 @@ pub fn running_template() -> TemplateSpec {
)],
),
},
+ // Column {
+ // display_name: "Title".to_string(),
+ // field: FieldSpec::Field("name".to_string()),
+ // },
+ // Column {
+ // display_name: "Description".to_string(),
+ // field: FieldSpec::Field("description".to_string()),
+ // },
])
}
diff --git a/templates/profile.hbs b/templates/profile.hbs
index cd69e61..f01e4fe 100644
--- a/templates/profile.hbs
+++ b/templates/profile.hbs
@@ -38,9 +38,6 @@ function toggleDetails(e) {
{{#each columns ~}}
<td>{{ this }}</td>
{{/each ~}}
- <td class="details">
- Here are the details about this activity.
- </td>
</tr>
{{/each ~}}
</tbody>