summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-01 09:10:54 -0500
committerKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-01 09:10:54 -0500
commitfb9f143f2353dc8c64a18be84c12b53cdad847e7 (patch)
tree5d7d2e5f80aa5aff705e6b9e05753a3005bb3bcf /src
parent0773347daf9dd5b1433884aeabd007f2f605adeb (diff)
Add strava importer threadpool
Diffstat (limited to 'src')
-rw-r--r--src/importer.rs48
-rw-r--r--src/lib.rs3
-rw-r--r--src/main.rs38
-rw-r--r--src/models.rs13
-rw-r--r--src/server.rs8
5 files changed, 105 insertions, 5 deletions
diff --git a/src/importer.rs b/src/importer.rs
new file mode 100644
index 0000000..8513b54
--- /dev/null
+++ b/src/importer.rs
@@ -0,0 +1,48 @@
+use crate::models;
+use std::sync::mpsc::channel;
+use std::sync::mpsc::Receiver;
+use std::sync::mpsc::Sender;
+use std::sync::Arc;
+use std::sync::Mutex;
+use threadpool::ThreadPool;
+
+pub const WORKERS: usize = 10;
+
+#[derive(Debug, Clone)]
+pub enum Command {
+ ImportStravaUser(models::User),
+ Quit,
+}
+
+fn handle_command(pool: ThreadPool,
+ command: Command) {
+ info!("handle {:?}", command);
+}
+
+fn receive_commands(pool: ThreadPool,
+ rx: Arc<Mutex<Receiver<Command>>>) {
+ info!("receive_commands");
+ match (move || -> Result<(), Box<dyn std::error::Error>> {
+ let rx = rx.lock().expect("channel");
+ let mut command = rx.recv()?;
+ loop {
+ info!("got command: {:?}", command);
+ let pool0 = pool.clone();
+ pool.execute(|| handle_command(pool0, command));
+ command = rx.recv()?;
+ }})() {
+ Ok(()) => (),
+ Err(e) => {
+ error!("receive_commands: {:?}", e);
+ ()
+ }
+ }
+}
+
+pub fn run(pool: ThreadPool) -> Sender<Command> {
+ let (tx, rx0) = channel();
+ let rx = Arc::new(Mutex::new(rx0));
+ let pool0 = pool.clone();
+ pool.execute(move || receive_commands(pool0, rx));
+ tx
+}
diff --git a/src/lib.rs b/src/lib.rs
index 5d3ff06..f55fef1 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -9,10 +9,13 @@ extern crate rocket_contrib;
#[macro_use]
extern crate diesel;
+#[macro_use]
+extern crate log;
extern crate fern;
pub mod db;
pub mod error;
+pub mod importer;
pub mod models;
mod schema;
pub mod server;
diff --git a/src/main.rs b/src/main.rs
index 6e11fc4..7ced591 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,3 +1,4 @@
+#![feature(str_strip)]
extern crate fern;
#[macro_use]
extern crate log;
@@ -9,13 +10,42 @@ use diesel::connection::Connection;
use diesel::pg::PgConnection;
fn setup_logger() -> Result<(), fern::InitError> {
- use fern::colors::ColoredLevelConfig;
+ use fern::colors::{Color, ColoredLevelConfig};
let colors = ColoredLevelConfig::new();
+
fern::Dispatch::new()
.format(move |out, message, record| {
- out.finish(format_args!("[{}] {}",
- colors.color(record.level()),
- message))
+ let thread = std::thread::current();
+
+ let thread_id = &format!("{:?}", thread.id())[..];
+ let prefix = "ThreadId(";
+ let thread_id = if thread_id.find(prefix).is_some() {
+ &thread_id[prefix.len() .. thread_id.len() - 1]
+ } else {
+ thread_id
+ };
+
+ let thread_colors = [Color::Red, Color::Green, Color::Magenta,
+ Color::Cyan, Color::White, Color::BrightRed,
+ Color::BrightGreen, Color::BrightMagenta,
+ Color::BrightWhite];
+ use std::collections::hash_map::DefaultHasher;
+ use std::hash::{Hash, Hasher};
+ let mut hasher = DefaultHasher::new();
+ thread_id.hash(&mut hasher);
+ let thread_color = thread_colors[hasher.finish() as usize % thread_colors.len()];
+
+ // if thread_id.find("ThreadId(") {
+ // }
+ // TODO: Make a random color based on the thread name.
+ out.finish(format_args!(
+ "[{}] \x1B[{}m{}@{}\x1B[0m {}",
+ colors.color(record.level()),
+ thread_color.to_fg_str(),
+ thread.name().unwrap_or(""),
+ thread_id,
+ message
+ ))
})
.level(log::LevelFilter::Info)
.chain(std::io::stdout())
diff --git a/src/models.rs b/src/models.rs
index b3d653f..3837b3d 100644
--- a/src/models.rs
+++ b/src/models.rs
@@ -3,6 +3,7 @@ use crate::schema::strava_tokens;
use crate::schema::users;
use chrono::DateTime;
use chrono::Utc;
+use std::fmt;
#[derive(Insertable, Queryable)]
#[table_name = "config"]
@@ -20,12 +21,22 @@ pub struct NewUser<'a> {
pub password: &'a str,
}
-#[derive(Queryable)]
+#[derive(Queryable, Clone)]
pub struct User {
pub username: String,
pub password: String,
}
+impl fmt::Debug for User {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "User {{ username: {}, password: <redacted> }}",
+ self.username
+ )
+ }
+}
+
#[derive(Insertable, Queryable)]
#[table_name = "strava_tokens"]
pub struct StravaToken {
diff --git a/src/server.rs b/src/server.rs
index e2ac7dd..c5a59f5 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -14,9 +14,11 @@ use rocket::response::Redirect;
use rocket::State;
use rocket_contrib::templates::Template;
use std::collections::HashMap;
+use threadpool::ThreadPool;
use crate::db;
use crate::error::Error;
+use crate::importer;
use crate::models;
use crate::strava;
@@ -165,6 +167,10 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) {
.finalize()
.unwrap();
+ let importer_pool = ThreadPool::with_name("import".to_string(), importer::WORKERS);
+ let tx = importer::run(importer_pool.clone());
+ tx.send(importer::Command::Quit).expect("send");
+
rocket::custom(config)
.manage(params)
.mount(
@@ -180,4 +186,6 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) {
.attach(Template::fairing())
.attach(Db::fairing())
.launch();
+
+ importer_pool.join();
}