diff options
author | Kjetil Orbekk <kjetil.orbekk@gmail.com> | 2020-02-01 13:11:33 -0500 |
---|---|---|
committer | Kjetil Orbekk <kjetil.orbekk@gmail.com> | 2020-02-01 13:11:33 -0500 |
commit | a226e7d888df3342f26e7eaaf1a24d0397d4dbad (patch) | |
tree | 5809c2044c3233d8ec7bb2bfa940d112563b126c | |
parent | a646b465246f737ea371e2acb30830180078e61d (diff) |
First steps of importer/db/strava integration
-rw-r--r-- | src/db.rs | 10 | ||||
-rw-r--r-- | src/importer.rs | 55 | ||||
-rw-r--r-- | src/server.rs | 4 | ||||
-rw-r--r-- | src/strava.rs | 32 |
4 files changed, 87 insertions, 14 deletions
@@ -86,3 +86,13 @@ pub fn get_user(conn: &PgConnection, username: &str) -> Result<models::User, Err user.password = "".to_string(); Ok(user) } + +pub fn get_strava_token(conn: &PgConnection, user: &models::User) + -> Result<models::StravaToken, Error> { + use crate::schema::strava_tokens; + + let token = strava_tokens::table + .filter(strava_tokens::username.eq(&user.username)) + .get_result::<models::StravaToken>(conn)?; + Ok(token) +} diff --git a/src/importer.rs b/src/importer.rs index 2966449..e1f73e2 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -1,12 +1,19 @@ -use crate::models; use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; use std::sync::mpsc::Sender; use std::sync::Arc; use std::sync::Mutex; +use std::sync::RwLock; use threadpool::ThreadPool; +use diesel::PgConnection; + +use crate::db; +use crate::strava; +use crate::models; +use crate::strava::StravaApi; pub const WORKERS: usize = 10; +pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; #[derive(Debug, Clone)] pub enum Command { @@ -14,19 +21,39 @@ pub enum Command { Quit, } -fn handle_command(pool: ThreadPool, command: Command) { - info!("handle {:?}", command); +#[derive(Clone)] +struct ImporterState { + pool: ThreadPool, + conn: Arc<Mutex<PgConnection>>, + strava: Arc<RwLock<strava::StravaImpl>>, + rx: Arc<Mutex<Receiver<Command>>>, +} + +fn import_strava_user(state: ImporterState, user: models::User) { + let strava = state.strava.read().expect("FIX"); + let conn = state.conn.lock().expect("FIX"); + let token = db::get_strava_token(&conn, &user).expect("FIX"); + let result = strava.get("/athlete/activities", &token.access_token, EMPTY_PARAMS).expect("ok"); + info!("Imported user. Got result: {:#?}", result); +} + +fn handle_command(state: ImporterState, command: Command) { + info!("handle_command {:?}", command); + match command { + Command::ImportStravaUser(user) => import_strava_user(state, user), + Command::Quit => (), + } } -fn receive_commands(pool: ThreadPool, rx: Arc<Mutex<Receiver<Command>>>) { +fn receive_commands(state: ImporterState) { info!("receive_commands"); - match (move || -> Result<(), Box<dyn std::error::Error>> { - let rx = rx.lock().expect("channel"); + match (|| -> Result<(), Box<dyn std::error::Error>> { + let rx = state.rx.lock()?; let mut command = rx.recv()?; loop { info!("got command: {:?}", command); - let pool0 = pool.clone(); - pool.execute(|| handle_command(pool0, command)); + let state0 = state.clone(); + state.pool.execute(move || handle_command(state0, command)); command = rx.recv()?; } })() { @@ -38,10 +65,14 @@ fn receive_commands(pool: ThreadPool, rx: Arc<Mutex<Receiver<Command>>>) { } } -pub fn run(pool: ThreadPool) -> Sender<Command> { +pub fn run(pool: ThreadPool, conn: PgConnection) -> Sender<Command> { let (tx, rx0) = channel(); - let rx = Arc::new(Mutex::new(rx0)); - let pool0 = pool.clone(); - pool.execute(move || receive_commands(pool0, rx)); + let state = ImporterState { + pool: pool.clone(), + conn: Arc::new(Mutex::new(conn)), + strava: Arc::new(RwLock::new(strava::StravaImpl::new())), + rx: Arc::new(Mutex::new(rx0)), + }; + pool.execute(move || receive_commands(state)); tx } diff --git a/src/server.rs b/src/server.rs index 994d059..79c3226 100644 --- a/src/server.rs +++ b/src/server.rs @@ -141,7 +141,7 @@ fn import_strava( tx: State<Mutex<Sender<importer::Command>>>, user: LoggedInUser) -> Result<(), Error> { let user = db::get_user(&*conn, &user.username)?; - tx.lock().expect("FIX").send(importer::Command::ImportStravaUser(user)); + tx.lock().expect("FIX").send(importer::Command::ImportStravaUser(user)).expect("FIX"); Ok(()) } @@ -181,7 +181,7 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) { .unwrap(); let importer_pool = ThreadPool::with_name("import".to_string(), importer::WORKERS); - let tx = importer::run(importer_pool.clone()); + let tx = importer::run(importer_pool.clone(), conn); rocket::custom(config) .manage(params) diff --git a/src/strava.rs b/src/strava.rs index 7df728c..6be5466 100644 --- a/src/strava.rs +++ b/src/strava.rs @@ -8,6 +8,38 @@ use serde::Serialize; use serde_json::from_value; use serde_json::Value; +pub trait StravaApi { + fn get<T: Serialize + ?Sized>(&self, method: &str, access_token: &str, parasm: &T) -> Result<Value, Error>; +} + +pub struct StravaImpl { + client: reqwest::blocking::Client, + base_url: String, +} + +impl StravaImpl { + pub fn new() -> StravaImpl { + StravaImpl { + client: reqwest::blocking::Client::new(), + base_url: "https://www.strava.com/api/v3".to_string(), + } + } +} + +impl StravaApi for StravaImpl { + fn get<T: Serialize + ?Sized>(&self, method: &str, access_token: &str, + params: &T) -> Result<Value, Error> { + let uri = format!("{}{}", self.base_url, method); + let response = self.client.get(&uri) + .bearer_auth(access_token) + .query(params) + .send()?; + info!("StravaApi::get({}) returned {:?}", method, response); + let json = response.json()?; + Ok(json) + } +} + #[derive(Serialize, Deserialize, Debug)] pub struct AthleteSummary { id: i64, |