summaryrefslogtreecommitdiff
path: root/src/importer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/importer.rs')
-rw-r--r--src/importer.rs55
1 files changed, 43 insertions, 12 deletions
diff --git a/src/importer.rs b/src/importer.rs
index 2966449..e1f73e2 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -1,12 +1,19 @@
-use crate::models;
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::sync::Mutex;
+use std::sync::RwLock;
use threadpool::ThreadPool;
+use diesel::PgConnection;
+
+use crate::db;
+use crate::strava;
+use crate::models;
+use crate::strava::StravaApi;
pub const WORKERS: usize = 10;
+pub const EMPTY_PARAMS: &[(&str, &str)] = &[];
#[derive(Debug, Clone)]
pub enum Command {
@@ -14,19 +21,39 @@ pub enum Command {
Quit,
}
-fn handle_command(pool: ThreadPool, command: Command) {
- info!("handle {:?}", command);
+#[derive(Clone)]
+struct ImporterState {
+ pool: ThreadPool,
+ conn: Arc<Mutex<PgConnection>>,
+ strava: Arc<RwLock<strava::StravaImpl>>,
+ rx: Arc<Mutex<Receiver<Command>>>,
+}
+
+fn import_strava_user(state: ImporterState, user: models::User) {
+ let strava = state.strava.read().expect("FIX");
+ let conn = state.conn.lock().expect("FIX");
+ let token = db::get_strava_token(&conn, &user).expect("FIX");
+ let result = strava.get("/athlete/activities", &token.access_token, EMPTY_PARAMS).expect("ok");
+ info!("Imported user. Got result: {:#?}", result);
+}
+
+fn handle_command(state: ImporterState, command: Command) {
+ info!("handle_command {:?}", command);
+ match command {
+ Command::ImportStravaUser(user) => import_strava_user(state, user),
+ Command::Quit => (),
+ }
}
-fn receive_commands(pool: ThreadPool, rx: Arc<Mutex<Receiver<Command>>>) {
+fn receive_commands(state: ImporterState) {
info!("receive_commands");
- match (move || -> Result<(), Box<dyn std::error::Error>> {
- let rx = rx.lock().expect("channel");
+ match (|| -> Result<(), Box<dyn std::error::Error>> {
+ let rx = state.rx.lock()?;
let mut command = rx.recv()?;
loop {
info!("got command: {:?}", command);
- let pool0 = pool.clone();
- pool.execute(|| handle_command(pool0, command));
+ let state0 = state.clone();
+ state.pool.execute(move || handle_command(state0, command));
command = rx.recv()?;
}
})() {
@@ -38,10 +65,14 @@ fn receive_commands(pool: ThreadPool, rx: Arc<Mutex<Receiver<Command>>>) {
}
}
-pub fn run(pool: ThreadPool) -> Sender<Command> {
+pub fn run(pool: ThreadPool, conn: PgConnection) -> Sender<Command> {
let (tx, rx0) = channel();
- let rx = Arc::new(Mutex::new(rx0));
- let pool0 = pool.clone();
- pool.execute(move || receive_commands(pool0, rx));
+ let state = ImporterState {
+ pool: pool.clone(),
+ conn: Arc::new(Mutex::new(conn)),
+ strava: Arc::new(RwLock::new(strava::StravaImpl::new())),
+ rx: Arc::new(Mutex::new(rx0)),
+ };
+ pool.execute(move || receive_commands(state));
tx
}