From a226e7d888df3342f26e7eaaf1a24d0397d4dbad Mon Sep 17 00:00:00 2001 From: Kjetil Orbekk Date: Sat, 1 Feb 2020 13:11:33 -0500 Subject: First steps of importer/db/strava integration --- src/db.rs | 10 ++++++++++ src/importer.rs | 55 +++++++++++++++++++++++++++++++++++++++++++------------ src/server.rs | 4 ++-- src/strava.rs | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 14 deletions(-) diff --git a/src/db.rs b/src/db.rs index 8b39c49..091bc64 100644 --- a/src/db.rs +++ b/src/db.rs @@ -86,3 +86,13 @@ pub fn get_user(conn: &PgConnection, username: &str) -> Result Result { + use crate::schema::strava_tokens; + + let token = strava_tokens::table + .filter(strava_tokens::username.eq(&user.username)) + .get_result::(conn)?; + Ok(token) +} diff --git a/src/importer.rs b/src/importer.rs index 2966449..e1f73e2 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -1,12 +1,19 @@ -use crate::models; use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; use std::sync::mpsc::Sender; use std::sync::Arc; use std::sync::Mutex; +use std::sync::RwLock; use threadpool::ThreadPool; +use diesel::PgConnection; + +use crate::db; +use crate::strava; +use crate::models; +use crate::strava::StravaApi; pub const WORKERS: usize = 10; +pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; #[derive(Debug, Clone)] pub enum Command { @@ -14,19 +21,39 @@ pub enum Command { Quit, } -fn handle_command(pool: ThreadPool, command: Command) { - info!("handle {:?}", command); +#[derive(Clone)] +struct ImporterState { + pool: ThreadPool, + conn: Arc>, + strava: Arc>, + rx: Arc>>, +} + +fn import_strava_user(state: ImporterState, user: models::User) { + let strava = state.strava.read().expect("FIX"); + let conn = state.conn.lock().expect("FIX"); + let token = db::get_strava_token(&conn, &user).expect("FIX"); + let result = strava.get("/athlete/activities", &token.access_token, EMPTY_PARAMS).expect("ok"); + info!("Imported user. Got result: {:#?}", result); +} + +fn handle_command(state: ImporterState, command: Command) { + info!("handle_command {:?}", command); + match command { + Command::ImportStravaUser(user) => import_strava_user(state, user), + Command::Quit => (), + } } -fn receive_commands(pool: ThreadPool, rx: Arc>>) { +fn receive_commands(state: ImporterState) { info!("receive_commands"); - match (move || -> Result<(), Box> { - let rx = rx.lock().expect("channel"); + match (|| -> Result<(), Box> { + let rx = state.rx.lock()?; let mut command = rx.recv()?; loop { info!("got command: {:?}", command); - let pool0 = pool.clone(); - pool.execute(|| handle_command(pool0, command)); + let state0 = state.clone(); + state.pool.execute(move || handle_command(state0, command)); command = rx.recv()?; } })() { @@ -38,10 +65,14 @@ fn receive_commands(pool: ThreadPool, rx: Arc>>) { } } -pub fn run(pool: ThreadPool) -> Sender { +pub fn run(pool: ThreadPool, conn: PgConnection) -> Sender { let (tx, rx0) = channel(); - let rx = Arc::new(Mutex::new(rx0)); - let pool0 = pool.clone(); - pool.execute(move || receive_commands(pool0, rx)); + let state = ImporterState { + pool: pool.clone(), + conn: Arc::new(Mutex::new(conn)), + strava: Arc::new(RwLock::new(strava::StravaImpl::new())), + rx: Arc::new(Mutex::new(rx0)), + }; + pool.execute(move || receive_commands(state)); tx } diff --git a/src/server.rs b/src/server.rs index 994d059..79c3226 100644 --- a/src/server.rs +++ b/src/server.rs @@ -141,7 +141,7 @@ fn import_strava( tx: State>>, user: LoggedInUser) -> Result<(), Error> { let user = db::get_user(&*conn, &user.username)?; - tx.lock().expect("FIX").send(importer::Command::ImportStravaUser(user)); + tx.lock().expect("FIX").send(importer::Command::ImportStravaUser(user)).expect("FIX"); Ok(()) } @@ -181,7 +181,7 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) { .unwrap(); let importer_pool = ThreadPool::with_name("import".to_string(), importer::WORKERS); - let tx = importer::run(importer_pool.clone()); + let tx = importer::run(importer_pool.clone(), conn); rocket::custom(config) .manage(params) diff --git a/src/strava.rs b/src/strava.rs index 7df728c..6be5466 100644 --- a/src/strava.rs +++ b/src/strava.rs @@ -8,6 +8,38 @@ use serde::Serialize; use serde_json::from_value; use serde_json::Value; +pub trait StravaApi { + fn get(&self, method: &str, access_token: &str, parasm: &T) -> Result; +} + +pub struct StravaImpl { + client: reqwest::blocking::Client, + base_url: String, +} + +impl StravaImpl { + pub fn new() -> StravaImpl { + StravaImpl { + client: reqwest::blocking::Client::new(), + base_url: "https://www.strava.com/api/v3".to_string(), + } + } +} + +impl StravaApi for StravaImpl { + fn get(&self, method: &str, access_token: &str, + params: &T) -> Result { + let uri = format!("{}{}", self.base_url, method); + let response = self.client.get(&uri) + .bearer_auth(access_token) + .query(params) + .send()?; + info!("StravaApi::get({}) returned {:?}", method, response); + let json = response.json()?; + Ok(json) + } +} + #[derive(Serialize, Deserialize, Debug)] pub struct AthleteSummary { id: i64, -- cgit v1.2.3