From fb9f143f2353dc8c64a18be84c12b53cdad847e7 Mon Sep 17 00:00:00 2001 From: Kjetil Orbekk Date: Sat, 1 Feb 2020 09:10:54 -0500 Subject: Add strava importer threadpool --- Cargo.lock | 10 ++++++++++ Cargo.toml | 3 ++- src/importer.rs | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 +++ src/main.rs | 38 ++++++++++++++++++++++++++++++++++---- src/models.rs | 13 ++++++++++++- src/server.rs | 8 ++++++++ 7 files changed, 117 insertions(+), 6 deletions(-) create mode 100644 src/importer.rs diff --git a/Cargo.lock b/Cargo.lock index c39255e..f501f35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1031,6 +1031,7 @@ dependencies = [ "rocket_contrib 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)", + "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1506,6 +1507,14 @@ dependencies = [ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "threadpool" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "time" version = "0.1.42" @@ -2055,6 +2064,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" "checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b" "checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +"checksum threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f0c90a5f3459330ac8bc0d2f879c693bb7a2f59689c1083fc4ef83834da865" "checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" "checksum tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b" "checksum tokio-tls 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7bde02a3a5291395f59b06ec6945a3077602fac2b07eeeaf0dee2122f3619828" diff --git a/Cargo.toml b/Cargo.toml index 848585b..a89bd57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,4 +18,5 @@ base64 = "0.11" rand = "0.7" chrono = { version = "0.4", features = ["serde"] } log = "0.4" -fern = { version = "0.5", features = ["colored"] } \ No newline at end of file +fern = { version = "0.5", features = ["colored"] } +threadpool = "1.7" \ No newline at end of file diff --git a/src/importer.rs b/src/importer.rs new file mode 100644 index 0000000..8513b54 --- /dev/null +++ b/src/importer.rs @@ -0,0 +1,48 @@ +use crate::models; +use std::sync::mpsc::channel; +use std::sync::mpsc::Receiver; +use std::sync::mpsc::Sender; +use std::sync::Arc; +use std::sync::Mutex; +use threadpool::ThreadPool; + +pub const WORKERS: usize = 10; + +#[derive(Debug, Clone)] +pub enum Command { + ImportStravaUser(models::User), + Quit, +} + +fn handle_command(pool: ThreadPool, + command: Command) { + info!("handle {:?}", command); +} + +fn receive_commands(pool: ThreadPool, + rx: Arc>>) { + info!("receive_commands"); + match (move || -> Result<(), Box> { + let rx = rx.lock().expect("channel"); + let mut command = rx.recv()?; + loop { + info!("got command: {:?}", command); + let pool0 = pool.clone(); + pool.execute(|| handle_command(pool0, command)); + command = rx.recv()?; + }})() { + Ok(()) => (), + Err(e) => { + error!("receive_commands: {:?}", e); + () + } + } +} + +pub fn run(pool: ThreadPool) -> Sender { + let (tx, rx0) = channel(); + let rx = Arc::new(Mutex::new(rx0)); + let pool0 = pool.clone(); + pool.execute(move || receive_commands(pool0, rx)); + tx +} diff --git a/src/lib.rs b/src/lib.rs index 5d3ff06..f55fef1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,10 +9,13 @@ extern crate rocket_contrib; #[macro_use] extern crate diesel; +#[macro_use] +extern crate log; extern crate fern; pub mod db; pub mod error; +pub mod importer; pub mod models; mod schema; pub mod server; diff --git a/src/main.rs b/src/main.rs index 6e11fc4..7ced591 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#![feature(str_strip)] extern crate fern; #[macro_use] extern crate log; @@ -9,13 +10,42 @@ use diesel::connection::Connection; use diesel::pg::PgConnection; fn setup_logger() -> Result<(), fern::InitError> { - use fern::colors::ColoredLevelConfig; + use fern::colors::{Color, ColoredLevelConfig}; let colors = ColoredLevelConfig::new(); + fern::Dispatch::new() .format(move |out, message, record| { - out.finish(format_args!("[{}] {}", - colors.color(record.level()), - message)) + let thread = std::thread::current(); + + let thread_id = &format!("{:?}", thread.id())[..]; + let prefix = "ThreadId("; + let thread_id = if thread_id.find(prefix).is_some() { + &thread_id[prefix.len() .. thread_id.len() - 1] + } else { + thread_id + }; + + let thread_colors = [Color::Red, Color::Green, Color::Magenta, + Color::Cyan, Color::White, Color::BrightRed, + Color::BrightGreen, Color::BrightMagenta, + Color::BrightWhite]; + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + thread_id.hash(&mut hasher); + let thread_color = thread_colors[hasher.finish() as usize % thread_colors.len()]; + + // if thread_id.find("ThreadId(") { + // } + // TODO: Make a random color based on the thread name. + out.finish(format_args!( + "[{}] \x1B[{}m{}@{}\x1B[0m {}", + colors.color(record.level()), + thread_color.to_fg_str(), + thread.name().unwrap_or(""), + thread_id, + message + )) }) .level(log::LevelFilter::Info) .chain(std::io::stdout()) diff --git a/src/models.rs b/src/models.rs index b3d653f..3837b3d 100644 --- a/src/models.rs +++ b/src/models.rs @@ -3,6 +3,7 @@ use crate::schema::strava_tokens; use crate::schema::users; use chrono::DateTime; use chrono::Utc; +use std::fmt; #[derive(Insertable, Queryable)] #[table_name = "config"] @@ -20,12 +21,22 @@ pub struct NewUser<'a> { pub password: &'a str, } -#[derive(Queryable)] +#[derive(Queryable, Clone)] pub struct User { pub username: String, pub password: String, } +impl fmt::Debug for User { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "User {{ username: {}, password: }}", + self.username + ) + } +} + #[derive(Insertable, Queryable)] #[table_name = "strava_tokens"] pub struct StravaToken { diff --git a/src/server.rs b/src/server.rs index e2ac7dd..c5a59f5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -14,9 +14,11 @@ use rocket::response::Redirect; use rocket::State; use rocket_contrib::templates::Template; use std::collections::HashMap; +use threadpool::ThreadPool; use crate::db; use crate::error::Error; +use crate::importer; use crate::models; use crate::strava; @@ -165,6 +167,10 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) { .finalize() .unwrap(); + let importer_pool = ThreadPool::with_name("import".to_string(), importer::WORKERS); + let tx = importer::run(importer_pool.clone()); + tx.send(importer::Command::Quit).expect("send"); + rocket::custom(config) .manage(params) .mount( @@ -180,4 +186,6 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) { .attach(Template::fairing()) .attach(Db::fairing()) .launch(); + + importer_pool.join(); } -- cgit v1.2.3