From 97b82520aacacbe600c8918b26b5b29b8d47d4d1 Mon Sep 17 00:00:00 2001 From: Kjetil Orbekk Date: Tue, 4 Feb 2020 19:58:15 -0500 Subject: Add entry table and link with raw data --- migrations/2020-02-04-150559_entry/down.sql | 1 + migrations/2020-02-04-150559_entry/up.sql | 9 ++ migrations/2020-02-04-223649_entry_data/down.sql | 1 + migrations/2020-02-04-223649_entry_data/up.sql | 15 +++ src/db.rs | 59 ++++++++++- src/importer.rs | 129 +++++++++++++---------- src/lib.rs | 1 + src/main.rs | 14 +-- src/models.rs | 17 ++- src/schema.rs | 11 ++ 10 files changed, 190 insertions(+), 67 deletions(-) create mode 100644 migrations/2020-02-04-150559_entry/down.sql create mode 100644 migrations/2020-02-04-150559_entry/up.sql create mode 100644 migrations/2020-02-04-223649_entry_data/down.sql create mode 100644 migrations/2020-02-04-223649_entry_data/up.sql diff --git a/migrations/2020-02-04-150559_entry/down.sql b/migrations/2020-02-04-150559_entry/down.sql new file mode 100644 index 0000000..ca787a3 --- /dev/null +++ b/migrations/2020-02-04-150559_entry/down.sql @@ -0,0 +1 @@ +drop table entries; diff --git a/migrations/2020-02-04-150559_entry/up.sql b/migrations/2020-02-04-150559_entry/up.sql new file mode 100644 index 0000000..122dec7 --- /dev/null +++ b/migrations/2020-02-04-150559_entry/up.sql @@ -0,0 +1,9 @@ +create table entries ( + username varchar not null references users(username), + entry_type varchar(16) not null, + id bigserial not null, + timestamp timestamptz, + payload jsonb not null, + + primary key(username, entry_type, id) +); diff --git a/migrations/2020-02-04-223649_entry_data/down.sql b/migrations/2020-02-04-223649_entry_data/down.sql new file mode 100644 index 0000000..a10c419 --- /dev/null +++ b/migrations/2020-02-04-223649_entry_data/down.sql @@ -0,0 +1 @@ +drop table entry_data; diff --git a/migrations/2020-02-04-223649_entry_data/up.sql b/migrations/2020-02-04-223649_entry_data/up.sql new file mode 100644 index 0000000..dde9288 --- /dev/null +++ b/migrations/2020-02-04-223649_entry_data/up.sql @@ -0,0 +1,15 @@ +create table entry_data ( + username varchar not null, + entry_type varchar(16) not null, + entry_id bigint not null, + data_type varchar(8) not null, + data_id bigint not null, + +primary key(username, entry_type, entry_id, data_type, data_id), + +foreign key (username, entry_type, entry_id) +references entries(username, entry_type, id), + +foreign key (data_type, data_id) +references raw_data(data_type, id) +); diff --git a/src/db.rs b/src/db.rs index 24cf98d..5865231 100644 --- a/src/db.rs +++ b/src/db.rs @@ -6,11 +6,50 @@ use chrono::Utc; use diesel::connection::Connection; use diesel::pg::PgConnection; use diesel::ExpressionMethods; +use diesel::Insertable; use diesel::QueryDsl; use diesel::RunQueryDsl; pub const COST: u32 = 10; +#[macro_export] +macro_rules! insert { + ($conn:expr, $table:expr, $values:expr, $returning:expr, $t:ty) => {{ + use crate::diesel::RunQueryDsl; + let conn: &PgConnection = &$conn; + let r: Result<$t, Error> = diesel::insert_into($table) + .values($values) + .returning($returning) + .get_result(conn) + .map_err(From::from); + r + }}; + + ($conn:expr, $table:expr, $values:expr) => {{ + use crate::diesel::RunQueryDsl; + let conn: &PgConnection = &$conn; + let r: Result = diesel::insert_into($table) + .values($values) + .execute(conn) + .map_err(From::from); + r + }}; + + (entries <= $conn:expr, $values:expr) => { + insert!( + $conn, + schema::entries::table, + $values, + schema::entries::id, + i64 + ) + }; + + (entry_data <= $conn:expr, $values:expr) => { + insert!($conn, schema::entry_data::table, $values) + }; +} + pub fn create_config(conn: &PgConnection, config: &models::Config) -> Result<(), Error> { use crate::schema::config; @@ -164,7 +203,21 @@ pub fn insert_data(conn: &PgConnection, data: &models::RawData) -> Result Result { +pub fn insert_entry_data( + conn: &PgConnection, + entry_data: &models::EntryData, +) -> Result { + use crate::schema::entry_data; + let rows = diesel::insert_into(entry_data::table) + .values(entry_data) + .execute(conn)?; + Ok(rows) +} + +pub fn get_raw_data( + conn: &PgConnection, + key: &models::RawDataKey, +) -> Result { use crate::schema::raw_data; let data = raw_data::table .find((key.data_type, key.id)) @@ -175,9 +228,7 @@ pub fn get_raw_data(conn: &PgConnection, key: &models::RawDataKey) -> Result Result, Error> { use crate::schema::raw_data; let rows = raw_data::table - .select((raw_data::data_type, - raw_data::id, - raw_data::username)) + .select((raw_data::data_type, raw_data::id, raw_data::username)) .get_results::(conn)?; Ok(rows) } diff --git a/src/importer.rs b/src/importer.rs index c831ca9..f039d03 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -2,6 +2,9 @@ use chrono::Utc; use diesel::PgConnection; use serde::Deserialize; use serde::Serialize; +use serde_json::to_value; +use serde_json::Value; +use std::collections::HashMap; use std::sync::Arc; use std::sync::Mutex; use std::sync::RwLock; @@ -9,13 +12,12 @@ use std::thread; use std::time::Duration; use std::time::Instant; use threadpool::ThreadPool; -use serde_json::to_value; -use serde_json::Value; -use crate::diesel::Connection; use crate::db; +use crate::diesel::Connection; use crate::error::Error; use crate::models; +use crate::schema; use crate::strava; pub const WORKERS: usize = 10; @@ -58,7 +60,7 @@ fn run_periodically(shared: Arc>, pe fn handle_one_task( shared: Arc>, - task: models::Task + task: models::Task, ) -> Result<(), Error> { let command = serde_json::from_value(task.payload.clone())?; @@ -67,7 +69,7 @@ fn handle_one_task( Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?, Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?, } - + Ok(()) } @@ -86,18 +88,18 @@ fn handle_tasks(shared: Arc>) { Err(Error::NotFound) => { info!("No more tasks"); done = true; - }, + } Err(e) => { error!("Failed to get task: {:?}", e); done = true; - }, - Ok(t) => { - match handle_one_task(shared.clone(), t) { - Err(e) => error!("Task failed: {:?}", e), - _ => () - } } + Ok(t) => match handle_one_task(shared.clone(), t) { + Err(e) => error!("Task failed: {:?}", e), + _ => (), + }, } + + thread::sleep(Duration::from_secs(10)); } } @@ -135,13 +137,15 @@ fn to_run(data: &Value) -> Result { info!("to_run"); macro_rules! get { ($id:ident str $e:expr) => { - $id[$e].as_str().ok_or_else( - || Error::UnexpectedJson($id.clone())) + $id[$e] + .as_str() + .ok_or_else(|| Error::UnexpectedJson($id.clone())) }; ($id:ident f64 $e:expr) => { - $id[$e].as_f64().ok_or_else( - || Error::UnexpectedJson($id.clone())) + $id[$e] + .as_f64() + .ok_or_else(|| Error::UnexpectedJson($id.clone())) }; }; @@ -154,46 +158,65 @@ fn to_run(data: &Value) -> Result { fn process_strava_activity( shared: Arc>, - data: models::RawData) -> Result { - + data: models::RawData, + mut task: models::Task, +) -> Result<(), Error> { let json_error = || Error::UnexpectedJson(data.payload.clone()); - let dtype = data.payload["type"].as_str().ok_or_else(json_error)?; + let strava_type = data.payload["type"].as_str().ok_or_else(json_error)?; - let payload = match dtype { - "Run" => { - to_run(&data.payload)? - }, - &_ => Err(Error::InternalError)?, - }; + let entry_type = match strava_type { + "Run" => Ok("run".to_string()), + &_ => Err(Error::NotFound), + }?; - Ok(payload) + let conn = &shared.conn.lock().unwrap(); + conn.transaction::<(), Error, _>(|| { + let entry = models::NewEntry { + username: data.username.as_str(), + entry_type: entry_type.as_str(), + timestamp: None, + payload: json!({}), + }; + + let id = insert!(entries <= conn, &entry)?; + + let entry_data = models::EntryData { + username: entry.username.to_string(), + entry_type: entry.entry_type.to_string(), + entry_id: id, + data_type: data.data_type, + data_id: data.id, + }; + insert!(entry_data <= conn, &entry_data)?; + + task.state = models::TaskState::SUCCESSFUL; + db::update_task(conn, task)?; + Ok(()) + })?; + + Ok(()) } fn process_raw_data( shared: Arc>, key: &models::RawDataKey, - mut task: models::Task) -> Result<(), Error> { - + mut task: models::Task, +) -> Result<(), Error> { let data = db::get_raw_data(&shared.conn.lock().unwrap(), key)?; println!("Process raw data: {:#?}", data); - let payload = match data.data_type { - models::DataType::StravaActivity => { - process_strava_activity(shared.clone(), data)? - } + match data.data_type { + models::DataType::StravaActivity => process_strava_activity(shared.clone(), data, task)?, }; - info!("Process finished. Payload: {:#?}", payload); - unimplemented!(); - - task.state = models::TaskState::SUCCESSFUL; - db::update_task(&shared.conn.lock().unwrap(), task)?; + info!("Process finished."); Ok(()) } fn process_all_raw_data( shared: Arc>, - mut task: models::Task) -> Result<(), Error> { + mut task: models::Task, +) -> Result<(), Error> { let now = Utc::now(); let conn = &shared.conn.lock().unwrap(); @@ -209,7 +232,8 @@ fn process_all_raw_data( state: models::TaskState::NEW, username: key.username.as_str(), payload: &to_value(Command::ProcessRawData(key.clone()))?, - })?; + }, + )?; } task.state = models::TaskState::SUCCESSFUL; db::update_task(conn, task)?; @@ -241,24 +265,21 @@ fn import_strava_user( let result = strava.get("/athlete/activities", &token.access_token, ¶ms[..])?; let json_error = || Error::UnexpectedJson(result.clone()); - let result = result - .as_array() - .ok_or_else(json_error)?; + let result = result.as_array().ok_or_else(json_error)?; for activity in result { let id = activity["id"].as_i64().ok_or_else(json_error)?; - info!( - "activity id: {} start: {}", - id, activity["start_date"] - ); - - db::insert_data(&shared.conn.lock().unwrap(), - &models::RawData { - data_type: models::DataType::StravaActivity, - id: id, - username: username.to_string(), - payload: activity.clone(), - })?; + info!("activity id: {} start: {}", id, activity["start_date"]); + + db::insert_data( + &shared.conn.lock().unwrap(), + &models::RawData { + data_type: models::DataType::StravaActivity, + id: id, + username: username.to_string(), + payload: activity.clone(), + }, + )?; } if result.len() < per_page { diff --git a/src/lib.rs b/src/lib.rs index 25b56e9..80cef09 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ extern crate diesel; extern crate log; extern crate fern; +#[macro_use] pub mod db; pub mod error; pub mod importer; diff --git a/src/main.rs b/src/main.rs index c745816..8d6eef8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,15 +4,15 @@ extern crate fern; extern crate log; extern crate clap; +use chrono::Utc; use clap::App; use clap::Arg; use clap::SubCommand; use diesel::connection::Connection; use diesel::pg::PgConnection; +use pjournal::db; use pjournal::importer; use pjournal::models; -use pjournal::db; -use chrono::Utc; use serde_json::to_value; fn setup_logger() -> Result<(), fern::InitError> { @@ -119,8 +119,7 @@ fn main() { .arg(Arg::with_name("PASSWORD").required(true).index(2)), ) .subcommand( - SubCommand::with_name("process_all_data") - .about("create a ProcessAllRawData task") + SubCommand::with_name("process_all_data").about("create a ProcessAllRawData task"), ) .get_matches(); @@ -152,12 +151,15 @@ fn main() { } else if let Some(_matches) = matches.subcommand_matches("process_all_data") { let command = importer::Command::ProcessAllRawData; db::insert_task( - &conn, &models::NewTask { + &conn, + &models::NewTask { start_at: Utc::now(), state: models::TaskState::NEW, username: "system", payload: &to_value(command).unwrap(), - }).expect("insert"); + }, + ) + .expect("insert"); } else { info!("Start server"); pjournal::server::start(conn, db_url, base_url); diff --git a/src/models.rs b/src/models.rs index c88f938..15eafd7 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,6 +1,7 @@ +use crate::schema::config; use crate::schema::entries; +use crate::schema::entry_data; use crate::schema::raw_data; -use crate::schema::config; use crate::schema::strava_tokens; use crate::schema::tasks; use crate::schema::users; @@ -13,11 +14,11 @@ use diesel::serialize; use diesel::serialize::Output; use diesel::serialize::ToSql; use diesel::sql_types; +use serde::Deserialize; +use serde::Serialize; use serde_json::Value; use std::fmt; use std::io::Write; -use serde::Deserialize; -use serde::Serialize; #[derive(PartialEq, Debug, Clone, Copy, AsExpression, FromSqlRow)] #[sql_type = "sql_types::Text"] @@ -168,3 +169,13 @@ pub struct Entry { pub timestamp: Option>, pub payload: Value, } + +#[derive(Queryable, Insertable, Debug, Serialize, Deserialize, Clone)] +#[table_name = "entry_data"] +pub struct EntryData { + pub username: String, + pub entry_type: String, + pub entry_id: i64, + pub data_type: DataType, + pub data_id: i64, +} diff --git a/src/schema.rs b/src/schema.rs index 2303714..23690df 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -17,6 +17,16 @@ table! { } } +table! { + entry_data (username, entry_type, entry_id, data_type, data_id) { + username -> Varchar, + entry_type -> Varchar, + entry_id -> Int8, + data_type -> Varchar, + data_id -> Int8, + } +} + table! { raw_data (data_type, id) { data_type -> Varchar, @@ -60,6 +70,7 @@ joinable!(tasks -> users (username)); allow_tables_to_appear_in_same_query!( config, entries, + entry_data, raw_data, strava_tokens, tasks, -- cgit v1.2.3