summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-01 09:10:54 -0500
committerKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-01 09:10:54 -0500
commitfb9f143f2353dc8c64a18be84c12b53cdad847e7 (patch)
tree5d7d2e5f80aa5aff705e6b9e05753a3005bb3bcf
parent0773347daf9dd5b1433884aeabd007f2f605adeb (diff)
Add strava importer threadpool
-rw-r--r--Cargo.lock10
-rw-r--r--Cargo.toml3
-rw-r--r--src/importer.rs48
-rw-r--r--src/lib.rs3
-rw-r--r--src/main.rs38
-rw-r--r--src/models.rs13
-rw-r--r--src/server.rs8
7 files changed, 117 insertions, 6 deletions
diff --git a/Cargo.lock b/Cargo.lock
index c39255e..f501f35 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1031,6 +1031,7 @@ dependencies = [
"rocket_contrib 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)",
+ "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1507,6 +1508,14 @@ dependencies = [
]
[[package]]
+name = "threadpool"
+version = "1.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "time"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2055,6 +2064,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
"checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b"
"checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
+"checksum threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f0c90a5f3459330ac8bc0d2f879c693bb7a2f59689c1083fc4ef83834da865"
"checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f"
"checksum tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b"
"checksum tokio-tls 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7bde02a3a5291395f59b06ec6945a3077602fac2b07eeeaf0dee2122f3619828"
diff --git a/Cargo.toml b/Cargo.toml
index 848585b..a89bd57 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,4 +18,5 @@ base64 = "0.11"
rand = "0.7"
chrono = { version = "0.4", features = ["serde"] }
log = "0.4"
-fern = { version = "0.5", features = ["colored"] } \ No newline at end of file
+fern = { version = "0.5", features = ["colored"] }
+threadpool = "1.7" \ No newline at end of file
diff --git a/src/importer.rs b/src/importer.rs
new file mode 100644
index 0000000..8513b54
--- /dev/null
+++ b/src/importer.rs
@@ -0,0 +1,48 @@
+use crate::models;
+use std::sync::mpsc::channel;
+use std::sync::mpsc::Receiver;
+use std::sync::mpsc::Sender;
+use std::sync::Arc;
+use std::sync::Mutex;
+use threadpool::ThreadPool;
+
+pub const WORKERS: usize = 10;
+
+#[derive(Debug, Clone)]
+pub enum Command {
+ ImportStravaUser(models::User),
+ Quit,
+}
+
+fn handle_command(pool: ThreadPool,
+ command: Command) {
+ info!("handle {:?}", command);
+}
+
+fn receive_commands(pool: ThreadPool,
+ rx: Arc<Mutex<Receiver<Command>>>) {
+ info!("receive_commands");
+ match (move || -> Result<(), Box<dyn std::error::Error>> {
+ let rx = rx.lock().expect("channel");
+ let mut command = rx.recv()?;
+ loop {
+ info!("got command: {:?}", command);
+ let pool0 = pool.clone();
+ pool.execute(|| handle_command(pool0, command));
+ command = rx.recv()?;
+ }})() {
+ Ok(()) => (),
+ Err(e) => {
+ error!("receive_commands: {:?}", e);
+ ()
+ }
+ }
+}
+
+pub fn run(pool: ThreadPool) -> Sender<Command> {
+ let (tx, rx0) = channel();
+ let rx = Arc::new(Mutex::new(rx0));
+ let pool0 = pool.clone();
+ pool.execute(move || receive_commands(pool0, rx));
+ tx
+}
diff --git a/src/lib.rs b/src/lib.rs
index 5d3ff06..f55fef1 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -9,10 +9,13 @@ extern crate rocket_contrib;
#[macro_use]
extern crate diesel;
+#[macro_use]
+extern crate log;
extern crate fern;
pub mod db;
pub mod error;
+pub mod importer;
pub mod models;
mod schema;
pub mod server;
diff --git a/src/main.rs b/src/main.rs
index 6e11fc4..7ced591 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,3 +1,4 @@
+#![feature(str_strip)]
extern crate fern;
#[macro_use]
extern crate log;
@@ -9,13 +10,42 @@ use diesel::connection::Connection;
use diesel::pg::PgConnection;
fn setup_logger() -> Result<(), fern::InitError> {
- use fern::colors::ColoredLevelConfig;
+ use fern::colors::{Color, ColoredLevelConfig};
let colors = ColoredLevelConfig::new();
+
fern::Dispatch::new()
.format(move |out, message, record| {
- out.finish(format_args!("[{}] {}",
- colors.color(record.level()),
- message))
+ let thread = std::thread::current();
+
+ let thread_id = &format!("{:?}", thread.id())[..];
+ let prefix = "ThreadId(";
+ let thread_id = if thread_id.find(prefix).is_some() {
+ &thread_id[prefix.len() .. thread_id.len() - 1]
+ } else {
+ thread_id
+ };
+
+ let thread_colors = [Color::Red, Color::Green, Color::Magenta,
+ Color::Cyan, Color::White, Color::BrightRed,
+ Color::BrightGreen, Color::BrightMagenta,
+ Color::BrightWhite];
+ use std::collections::hash_map::DefaultHasher;
+ use std::hash::{Hash, Hasher};
+ let mut hasher = DefaultHasher::new();
+ thread_id.hash(&mut hasher);
+ let thread_color = thread_colors[hasher.finish() as usize % thread_colors.len()];
+
+ // if thread_id.find("ThreadId(") {
+ // }
+ // TODO: Make a random color based on the thread name.
+ out.finish(format_args!(
+ "[{}] \x1B[{}m{}@{}\x1B[0m {}",
+ colors.color(record.level()),
+ thread_color.to_fg_str(),
+ thread.name().unwrap_or(""),
+ thread_id,
+ message
+ ))
})
.level(log::LevelFilter::Info)
.chain(std::io::stdout())
diff --git a/src/models.rs b/src/models.rs
index b3d653f..3837b3d 100644
--- a/src/models.rs
+++ b/src/models.rs
@@ -3,6 +3,7 @@ use crate::schema::strava_tokens;
use crate::schema::users;
use chrono::DateTime;
use chrono::Utc;
+use std::fmt;
#[derive(Insertable, Queryable)]
#[table_name = "config"]
@@ -20,12 +21,22 @@ pub struct NewUser<'a> {
pub password: &'a str,
}
-#[derive(Queryable)]
+#[derive(Queryable, Clone)]
pub struct User {
pub username: String,
pub password: String,
}
+impl fmt::Debug for User {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "User {{ username: {}, password: <redacted> }}",
+ self.username
+ )
+ }
+}
+
#[derive(Insertable, Queryable)]
#[table_name = "strava_tokens"]
pub struct StravaToken {
diff --git a/src/server.rs b/src/server.rs
index e2ac7dd..c5a59f5 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -14,9 +14,11 @@ use rocket::response::Redirect;
use rocket::State;
use rocket_contrib::templates::Template;
use std::collections::HashMap;
+use threadpool::ThreadPool;
use crate::db;
use crate::error::Error;
+use crate::importer;
use crate::models;
use crate::strava;
@@ -165,6 +167,10 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) {
.finalize()
.unwrap();
+ let importer_pool = ThreadPool::with_name("import".to_string(), importer::WORKERS);
+ let tx = importer::run(importer_pool.clone());
+ tx.send(importer::Command::Quit).expect("send");
+
rocket::custom(config)
.manage(params)
.mount(
@@ -180,4 +186,6 @@ pub fn start(conn: diesel::PgConnection, db_url: &str, base_url: &str) {
.attach(Template::fairing())
.attach(Db::fairing())
.launch();
+
+ importer_pool.join();
}