From ffc459d5bcca732474fd770d97f7bbd55223ca9a Mon Sep 17 00:00:00 2001 From: Kjetil Orbekk Date: Tue, 4 Feb 2020 09:07:12 -0500 Subject: Add command to batch process all raw data --- src/db.rs | 24 ++++++++++++++--- src/importer.rs | 84 +++++++++++++++++++++++++++++++++++++++++++++------------ src/main.rs | 25 ++++++++++++++--- src/models.rs | 14 ++++++++-- src/schema.rs | 2 +- 5 files changed, 123 insertions(+), 26 deletions(-) diff --git a/src/db.rs b/src/db.rs index 5135b91..24cf98d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -110,7 +110,7 @@ pub fn insert_task(conn: &PgConnection, task: &models::NewTask) -> Result Result { +fn update_task_inner(conn: &PgConnection, task: models::Task) -> Result { use crate::schema::tasks; diesel::delete(tasks::table.filter(tasks::columns::id.eq(task.id))).execute(conn)?; @@ -130,7 +130,7 @@ fn update_task_inner(conn: &PgConnection, task: &models::Task) -> Result Result { +pub fn update_task(conn: &PgConnection, task: models::Task) -> Result { conn.transaction(|| update_task_inner(conn, task)) } @@ -150,7 +150,7 @@ pub fn take_task( .first::(conn)?; task.start_at = eta; - let task = update_task_inner(conn, &task)?; + let task = update_task_inner(conn, task)?; Ok(task) }) @@ -163,3 +163,21 @@ pub fn insert_data(conn: &PgConnection, data: &models::RawData) -> Result Result { + use crate::schema::raw_data; + let data = raw_data::table + .find((key.data_type, key.id)) + .get_result::(conn)?; + Ok(data) +} + +pub fn get_raw_data_keys(conn: &PgConnection) -> Result, Error> { + use crate::schema::raw_data; + let rows = raw_data::table + .select((raw_data::data_type, + raw_data::id, + raw_data::username)) + .get_results::(conn)?; + Ok(rows) +} diff --git a/src/importer.rs b/src/importer.rs index 5f5790a..b1c1b18 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -9,7 +9,9 @@ use std::thread; use std::time::Duration; use std::time::Instant; use threadpool::ThreadPool; +use serde_json::to_value; +use crate::diesel::Connection; use crate::db; use crate::error::Error; use crate::models; @@ -21,6 +23,8 @@ pub const EMPTY_PARAMS: &[(&str, &str)] = &[]; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Command { ImportStravaUser { username: String }, + ProcessAllRawData, + ProcessRawData(models::RawDataKey), } pub struct ImporterSharedData { @@ -53,39 +57,46 @@ fn run_periodically(shared: Arc>, pe fn handle_one_task( shared: Arc>, -) -> Result { - let task = { - let conn = shared.conn.lock().unwrap(); - let now = Utc::now(); - let eta = now + chrono::Duration::seconds(5); - - db::take_task(&conn, models::TaskState::NEW, now, eta)? - }; - + task: models::Task +) -> Result<(), Error> { let command = serde_json::from_value(task.payload.clone())?; match command { Command::ImportStravaUser { username } => import_strava_user(shared, username.as_str())?, + Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?, + Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?, } - - Ok(task) + + Ok(()) } fn handle_tasks(shared: Arc>) { let mut done = false; while !done { - match handle_one_task(shared.clone()) { + let task = (|| { + let conn = shared.conn.lock().unwrap(); + let now = Utc::now(); + let eta = now + chrono::Duration::seconds(5); + + db::take_task(&conn, models::TaskState::NEW, now, eta) + })(); + + match task { Err(Error::NotFound) => { info!("No more tasks"); done = true; - } + }, Err(e) => { - error!("Error handling task: {}", e); - } + error!("Failed to get task: {:?}", e); + done = true; + }, Ok(t) => { - info!("Successfully handled task: {:?}", t); + match handle_one_task(shared.clone(), t) { + Err(e) => error!("Task failed: {:?}", e), + _ => () + } } - }; + } } } @@ -119,6 +130,45 @@ impl Importer { } } +fn process_raw_data( + shared: Arc>, + key: &models::RawDataKey, + mut task: models::Task) -> Result<(), Error> { + + let data = db::get_raw_data(&shared.conn.lock().unwrap(), key)?; + println!("Process raw data: {:#?}", data); + + unimplemented!(); +} + +fn process_all_raw_data( + shared: Arc>, + mut task: models::Task) -> Result<(), Error> { + let now = Utc::now(); + let conn = &shared.conn.lock().unwrap(); + + let mut tasks = 0; + conn.transaction::<(), Error, _>(|| { + let keys = db::get_raw_data_keys(conn)?; + tasks = keys.len(); + for key in keys { + db::insert_task( + conn, + &models::NewTask { + start_at: now, + state: models::TaskState::NEW, + username: key.username.as_str(), + payload: &to_value(Command::ProcessRawData(key.clone()))?, + })?; + } + task.state = models::TaskState::SUCCESSFUL; + db::update_task(conn, task)?; + Ok(()) + })?; + info!("process_all_raw_data: Added {} tasks", tasks); + Ok(()) +} + fn import_strava_user( shared: Arc>, username: &str, diff --git a/src/main.rs b/src/main.rs index 235c71f..c745816 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,11 +3,17 @@ extern crate fern; #[macro_use] extern crate log; extern crate clap; + use clap::App; use clap::Arg; use clap::SubCommand; use diesel::connection::Connection; use diesel::pg::PgConnection; +use pjournal::importer; +use pjournal::models; +use pjournal::db; +use chrono::Utc; +use serde_json::to_value; fn setup_logger() -> Result<(), fern::InitError> { use fern::colors::{Color, ColoredLevelConfig}; @@ -112,6 +118,10 @@ fn main() { .arg(Arg::with_name("USERNAME").required(true).index(1)) .arg(Arg::with_name("PASSWORD").required(true).index(2)), ) + .subcommand( + SubCommand::with_name("process_all_data") + .about("create a ProcessAllRawData task") + ) .get_matches(); setup_logger().expect("logger"); @@ -124,7 +134,7 @@ fn main() { let conn = PgConnection::establish(db_url).unwrap(); if let Some(matches) = matches.subcommand_matches("init") { - let config = pjournal::models::Config { + let config = models::Config { strava_client_id: matches.value_of("strava_client_id").unwrap().to_string(), strava_client_secret: matches .value_of("strava_client_secret") @@ -134,11 +144,20 @@ fn main() { singleton: true, }; - pjournal::db::create_config(&conn, &config).unwrap(); + db::create_config(&conn, &config).unwrap(); } else if let Some(matches) = matches.subcommand_matches("adduser") { let user = matches.value_of("USERNAME").unwrap(); let password = matches.value_of("PASSWORD").unwrap(); - pjournal::db::adduser(&conn, user, password).unwrap(); + db::adduser(&conn, user, password).unwrap(); + } else if let Some(_matches) = matches.subcommand_matches("process_all_data") { + let command = importer::Command::ProcessAllRawData; + db::insert_task( + &conn, &models::NewTask { + start_at: Utc::now(), + state: models::TaskState::NEW, + username: "system", + payload: &to_value(command).unwrap(), + }).expect("insert"); } else { info!("Start server"); pjournal::server::start(conn, db_url, base_url); diff --git a/src/models.rs b/src/models.rs index cd8ddf1..50e1877 100644 --- a/src/models.rs +++ b/src/models.rs @@ -15,6 +15,8 @@ use diesel::sql_types; use serde_json::Value; use std::fmt; use std::io::Write; +use serde::Deserialize; +use serde::Serialize; #[derive(PartialEq, Debug, Clone, Copy, AsExpression, FromSqlRow)] #[sql_type = "sql_types::Text"] @@ -106,7 +108,7 @@ pub struct StravaToken { pub expires_at: DateTime, } -#[derive(PartialEq, Debug, Clone, Copy, AsExpression, FromSqlRow)] +#[derive(PartialEq, Debug, Clone, Copy, AsExpression, FromSqlRow, Serialize, Deserialize)] #[sql_type = "sql_types::Text"] pub enum DataType { StravaActivity = 0, @@ -131,7 +133,15 @@ impl FromSql for DataType { } } -#[derive(Insertable, Queryable)] +#[derive(Insertable, Queryable, Debug, Serialize, Deserialize, Clone)] +#[table_name = "raw_data"] +pub struct RawDataKey { + pub data_type: DataType, + pub id: i64, + pub username: String, +} + +#[derive(Insertable, Queryable, Debug, Serialize, Deserialize, Clone)] #[table_name = "raw_data"] pub struct RawData { pub data_type: DataType, diff --git a/src/schema.rs b/src/schema.rs index df20c4f..1584424 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -12,7 +12,7 @@ table! { data_type -> Varchar, id -> Int8, username -> Varchar, - payload -> Nullable, + payload -> Jsonb, } } -- cgit v1.2.3