summaryrefslogtreecommitdiff
path: root/src/importer.rs
blob: ab77837ff6e929d33fddef467657089b4313b9f0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use diesel::PgConnection;
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use threadpool::ThreadPool;
use chrono::Utc;

use crate::error::Error;
use crate::db;
use crate::models;
use crate::strava;
use crate::strava::StravaApi;
use crate::Params;

pub const WORKERS: usize = 10;
pub const EMPTY_PARAMS: &[(&str, &str)] = &[];

#[derive(Debug, Clone)]
pub enum Command {
    ImportStravaUser(models::User),
    Quit,
}

#[derive(Clone)]
struct ImporterState {
    pool: ThreadPool,
    conn: Arc<Mutex<PgConnection>>,
    strava: Arc<RwLock<strava::StravaImpl>>,
    rx: Arc<Mutex<Receiver<Command>>>,
}

fn get_or_refresh_token<Strava: strava::StravaApi>(strava: &Strava, conn: &PgConnection, user: &models::User) -> Result<models::StravaToken, Error> {
    let mut token = db::get_strava_token(&conn, &user).expect("FIX");

    if token.expires_at < Utc::now() {
        info!("refresh expired token: {:?}", token.expires_at);
        let new_token = strava.refresh_token(&From::from(&token))?;
        new_token.update_model(&mut token);
    }

    Ok(token)
}

fn import_strava_user(state: ImporterState, user: models::User) {
    let strava = state.strava.read().expect("FIX");
    let conn = state.conn.lock().expect("FIX");
    let token = get_or_refresh_token(&*strava, &conn, &user).expect("FIX");
    let result = strava
        .get("/athlete/activities", &token.access_token, EMPTY_PARAMS)
        .expect("ok");
    info!("import_strava_user: Got result: {:#?}", result);
}

fn handle_command(state: ImporterState, command: Command) {
    info!("handle_command {:?}", command);
    match command {
        Command::ImportStravaUser(user) => import_strava_user(state, user),
        Command::Quit => (),
    }
}

fn receive_commands(state: ImporterState) {
    info!("receive_commands");
    match (|| -> Result<(), Box<dyn std::error::Error>> {
        let rx = state.rx.lock()?;
        let mut command = rx.recv()?;
        loop {
            info!("got command: {:?}", command);
            let state0 = state.clone();
            state.pool.execute(move || handle_command(state0, command));
            command = rx.recv()?;
        }
    })() {
        Ok(()) => (),
        Err(e) => {
            error!("receive_commands: {:?}", e);
            ()
        }
    }
}

pub fn run(pool: ThreadPool, conn: PgConnection, params: &Params) -> Sender<Command> {
    let (tx, rx0) = channel();
    let state = ImporterState {
        pool: pool.clone(),
        conn: Arc::new(Mutex::new(conn)),
        strava: Arc::new(RwLock::new(strava::StravaImpl::new(
        params.strava_client_id.clone(), params.strava_client_secret.clone()))),
        rx: Arc::new(Mutex::new(rx0)),
    };
    pool.execute(move || receive_commands(state));
    tx
}