summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-01 13:11:33 -0500
committerKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-01 13:11:33 -0500
commita226e7d888df3342f26e7eaaf1a24d0397d4dbad (patch)
tree5809c2044c3233d8ec7bb2bfa940d112563b126c
parenta646b465246f737ea371e2acb30830180078e61d (diff)
First steps of importer/db/strava integration
-rw-r--r--src/db.rs10
-rw-r--r--src/importer.rs55
-rw-r--r--src/server.rs4
-rw-r--r--src/strava.rs32
4 files changed, 87 insertions, 14 deletions
diff --git a/src/db.rs b/src/db.rs
index 8b39c49..091bc64 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -86,3 +86,13 @@ pub fn get_user(conn: &PgConnection, username: &str) -> Result<models::User, Err
user.password = "".to_string();
Ok(user)
}
+
+pub fn get_strava_token(conn: &PgConnection, user: &models::User)
+ -> Result<models::StravaToken, Error> {
+ use crate::schema::strava_tokens;
+
+ let token = strava_tokens::table
+ .filter(strava_tokens::username.eq(&user.username))
+ .get_result::<models::StravaToken>(conn)?;
+ Ok(token)
+}
diff --git a/src/importer.rs b/src/importer.rs
index 2966449..e1f73e2 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -1,12 +1,19 @@
-use crate::models;
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::sync::Mutex;
+use std::sync::RwLock;
use threadpool::ThreadPool;
+use diesel::PgConnection;
+
+use crate::db;
+use crate::strava;
+use crate::models;
+use crate::strava::StravaApi;
pub const WORKERS: usize = 10;
+pub const EMPTY_PARAMS: &[(&str, &str)] = &[];
#[derive(Debug, Clone)]
pub enum Command {
@@ -14,19 +21,39 @@ pub enum Command {
Quit,
}
-fn handle_command(pool: ThreadPool, command: Command) {
- info!("handle {:?}", command);
+#[derive(Clone)]
+struct ImporterState {
+ pool: ThreadPool,
+ conn: Arc<Mutex<PgConnection>>,
+ strava: Arc<RwLock<strava::StravaImpl>>,
+ rx: Arc<Mutex<Receiver<Command>>>,
+}
+
+fn import_strava_user(state: ImporterState, user: models::User) {
+ let strava = state.strava.read().expect("FIX");
+ let conn = state.conn.lock().expect("FIX");
+ let token = db::get_strava_token(&conn, &user).expect("FIX");
+ let result = strava.get("/athlete/activities", &token.access_token, EMPTY_PARAMS).expect("ok");
+ info!("Imported user. Got result: {:#?}", result);
+}
+
+fn handle_command(state: ImporterState, command: Command) {
+ info!("handle_command {:?}", command);
+ match command {
+ Command::ImportStravaUser(user) => import_strava_user(state, user),
+ Command::Quit => (),
+ }
}
-fn receive_commands(pool: ThreadPool, rx: Arc<Mutex<Receiver<Command>>>) {
+fn receive_commands(state: ImporterState) {
info!("receive_commands");
- match (move || -> Result<(), Box<dyn std::error::Error>> {
- let rx = rx.lock().expect("channel");
+ match (|| -> Result<(), Box<dyn std::error::Error>> {
+ let rx = state.rx.lock()?;
let mut command = rx.recv()?;
loop {
info!("got command: {:?}", command);
- let pool0 = pool.clone();
- pool.execute(|| handle_command(pool0, command));
+ let state0 = state.clone();
+ state.pool.execute(move || handle_command(state0, command));
command = rx.recv()?;
}
})() {
@@ -38,10 +65,14 @@ fn receive_commands(pool: ThreadPool, rx: Arc<Mutex<Receiver<Command>>>) {
}
}
-pub fn run(pool: ThreadPool) -> Sender<Command> {
+pub fn run(pool: ThreadPool, conn: PgConnection) -> Sender<Command> {
let (tx, rx0) = channel();
- let rx = Arc::new(Mutex::new(rx0));
- let pool0 = pool.clone();
- pool.execute(move || receive_commands(pool0, rx));
+ let state = ImporterState {
+ pool: pool.clone(),
+ conn: Arc::new(Mutex::new(conn)),
+ strava: Arc::new(RwLock::new(strava::StravaImpl::new())),
+ rx: Arc::new(Mutex::new(rx0)),
+ };
+ pool.execute(move || receive_commands(state));
tx
}
diff --git a/src/server.rs b/src/server.rs
index 994d059..79c3226 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -141,7 +141,7 @@ fn import_strava(
tx: State<Mutex<Sender<importer::Command>>>,
user: LoggedInUser) -> Result<(), Error> {
let user = db::get_user(&*conn, &user.username)?;
- tx.lock().expect("FIX").send(importer::Command::ImportStravaUser(user));
+ tx.lock().expect("FIX").send(importer::Command::ImportStravaUser(user)).expect("FIX");
Ok(())
}
@@ -181,7 +181,7 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) {
.unwrap();
let importer_pool = ThreadPool::with_name("import".to_string(), importer::WORKERS);
- let tx = importer::run(importer_pool.clone());
+ let tx = importer::run(importer_pool.clone(), conn);
rocket::custom(config)
.manage(params)
diff --git a/src/strava.rs b/src/strava.rs
index 7df728c..6be5466 100644
--- a/src/strava.rs
+++ b/src/strava.rs
@@ -8,6 +8,38 @@ use serde::Serialize;
use serde_json::from_value;
use serde_json::Value;
+pub trait StravaApi {
+ fn get<T: Serialize + ?Sized>(&self, method: &str, access_token: &str, parasm: &T) -> Result<Value, Error>;
+}
+
+pub struct StravaImpl {
+ client: reqwest::blocking::Client,
+ base_url: String,
+}
+
+impl StravaImpl {
+ pub fn new() -> StravaImpl {
+ StravaImpl {
+ client: reqwest::blocking::Client::new(),
+ base_url: "https://www.strava.com/api/v3".to_string(),
+ }
+ }
+}
+
+impl StravaApi for StravaImpl {
+ fn get<T: Serialize + ?Sized>(&self, method: &str, access_token: &str,
+ params: &T) -> Result<Value, Error> {
+ let uri = format!("{}{}", self.base_url, method);
+ let response = self.client.get(&uri)
+ .bearer_auth(access_token)
+ .query(params)
+ .send()?;
+ info!("StravaApi::get({}) returned {:?}", method, response);
+ let json = response.json()?;
+ Ok(json)
+ }
+}
+
#[derive(Serialize, Deserialize, Debug)]
pub struct AthleteSummary {
id: i64,