summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-04 19:58:15 -0500
committerKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-04 19:58:15 -0500
commit97b82520aacacbe600c8918b26b5b29b8d47d4d1 (patch)
treec130abd56069c0b2a429d590dfb6ec7b54c68585
parentca69f19d0492ae420ba54a158676d246b0307be0 (diff)
Add entry table and link with raw data
-rw-r--r--migrations/2020-02-04-150559_entry/down.sql1
-rw-r--r--migrations/2020-02-04-150559_entry/up.sql9
-rw-r--r--migrations/2020-02-04-223649_entry_data/down.sql1
-rw-r--r--migrations/2020-02-04-223649_entry_data/up.sql15
-rw-r--r--src/db.rs59
-rw-r--r--src/importer.rs129
-rw-r--r--src/lib.rs1
-rw-r--r--src/main.rs14
-rw-r--r--src/models.rs17
-rw-r--r--src/schema.rs11
10 files changed, 190 insertions, 67 deletions
diff --git a/migrations/2020-02-04-150559_entry/down.sql b/migrations/2020-02-04-150559_entry/down.sql
new file mode 100644
index 0000000..ca787a3
--- /dev/null
+++ b/migrations/2020-02-04-150559_entry/down.sql
@@ -0,0 +1 @@
+drop table entries;
diff --git a/migrations/2020-02-04-150559_entry/up.sql b/migrations/2020-02-04-150559_entry/up.sql
new file mode 100644
index 0000000..122dec7
--- /dev/null
+++ b/migrations/2020-02-04-150559_entry/up.sql
@@ -0,0 +1,9 @@
+create table entries (
+ username varchar not null references users(username),
+ entry_type varchar(16) not null,
+ id bigserial not null,
+ timestamp timestamptz,
+ payload jsonb not null,
+
+ primary key(username, entry_type, id)
+);
diff --git a/migrations/2020-02-04-223649_entry_data/down.sql b/migrations/2020-02-04-223649_entry_data/down.sql
new file mode 100644
index 0000000..a10c419
--- /dev/null
+++ b/migrations/2020-02-04-223649_entry_data/down.sql
@@ -0,0 +1 @@
+drop table entry_data;
diff --git a/migrations/2020-02-04-223649_entry_data/up.sql b/migrations/2020-02-04-223649_entry_data/up.sql
new file mode 100644
index 0000000..dde9288
--- /dev/null
+++ b/migrations/2020-02-04-223649_entry_data/up.sql
@@ -0,0 +1,15 @@
+create table entry_data (
+ username varchar not null,
+ entry_type varchar(16) not null,
+ entry_id bigint not null,
+ data_type varchar(8) not null,
+ data_id bigint not null,
+
+primary key(username, entry_type, entry_id, data_type, data_id),
+
+foreign key (username, entry_type, entry_id)
+references entries(username, entry_type, id),
+
+foreign key (data_type, data_id)
+references raw_data(data_type, id)
+);
diff --git a/src/db.rs b/src/db.rs
index 24cf98d..5865231 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -6,11 +6,50 @@ use chrono::Utc;
use diesel::connection::Connection;
use diesel::pg::PgConnection;
use diesel::ExpressionMethods;
+use diesel::Insertable;
use diesel::QueryDsl;
use diesel::RunQueryDsl;
pub const COST: u32 = 10;
+#[macro_export]
+macro_rules! insert {
+ ($conn:expr, $table:expr, $values:expr, $returning:expr, $t:ty) => {{
+ use crate::diesel::RunQueryDsl;
+ let conn: &PgConnection = &$conn;
+ let r: Result<$t, Error> = diesel::insert_into($table)
+ .values($values)
+ .returning($returning)
+ .get_result(conn)
+ .map_err(From::from);
+ r
+ }};
+
+ ($conn:expr, $table:expr, $values:expr) => {{
+ use crate::diesel::RunQueryDsl;
+ let conn: &PgConnection = &$conn;
+ let r: Result<usize, Error> = diesel::insert_into($table)
+ .values($values)
+ .execute(conn)
+ .map_err(From::from);
+ r
+ }};
+
+ (entries <= $conn:expr, $values:expr) => {
+ insert!(
+ $conn,
+ schema::entries::table,
+ $values,
+ schema::entries::id,
+ i64
+ )
+ };
+
+ (entry_data <= $conn:expr, $values:expr) => {
+ insert!($conn, schema::entry_data::table, $values)
+ };
+}
+
pub fn create_config(conn: &PgConnection, config: &models::Config) -> Result<(), Error> {
use crate::schema::config;
@@ -164,7 +203,21 @@ pub fn insert_data(conn: &PgConnection, data: &models::RawData) -> Result<usize,
Ok(rows)
}
-pub fn get_raw_data(conn: &PgConnection, key: &models::RawDataKey) -> Result<models::RawData, Error> {
+pub fn insert_entry_data(
+ conn: &PgConnection,
+ entry_data: &models::EntryData,
+) -> Result<usize, Error> {
+ use crate::schema::entry_data;
+ let rows = diesel::insert_into(entry_data::table)
+ .values(entry_data)
+ .execute(conn)?;
+ Ok(rows)
+}
+
+pub fn get_raw_data(
+ conn: &PgConnection,
+ key: &models::RawDataKey,
+) -> Result<models::RawData, Error> {
use crate::schema::raw_data;
let data = raw_data::table
.find((key.data_type, key.id))
@@ -175,9 +228,7 @@ pub fn get_raw_data(conn: &PgConnection, key: &models::RawDataKey) -> Result<mod
pub fn get_raw_data_keys(conn: &PgConnection) -> Result<Vec<models::RawDataKey>, Error> {
use crate::schema::raw_data;
let rows = raw_data::table
- .select((raw_data::data_type,
- raw_data::id,
- raw_data::username))
+ .select((raw_data::data_type, raw_data::id, raw_data::username))
.get_results::<models::RawDataKey>(conn)?;
Ok(rows)
}
diff --git a/src/importer.rs b/src/importer.rs
index c831ca9..f039d03 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -2,6 +2,9 @@ use chrono::Utc;
use diesel::PgConnection;
use serde::Deserialize;
use serde::Serialize;
+use serde_json::to_value;
+use serde_json::Value;
+use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
@@ -9,13 +12,12 @@ use std::thread;
use std::time::Duration;
use std::time::Instant;
use threadpool::ThreadPool;
-use serde_json::to_value;
-use serde_json::Value;
-use crate::diesel::Connection;
use crate::db;
+use crate::diesel::Connection;
use crate::error::Error;
use crate::models;
+use crate::schema;
use crate::strava;
pub const WORKERS: usize = 10;
@@ -58,7 +60,7 @@ fn run_periodically<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>, pe
fn handle_one_task<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
- task: models::Task
+ task: models::Task,
) -> Result<(), Error> {
let command = serde_json::from_value(task.payload.clone())?;
@@ -67,7 +69,7 @@ fn handle_one_task<S: strava::StravaApi>(
Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?,
Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?,
}
-
+
Ok(())
}
@@ -86,18 +88,18 @@ fn handle_tasks<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>) {
Err(Error::NotFound) => {
info!("No more tasks");
done = true;
- },
+ }
Err(e) => {
error!("Failed to get task: {:?}", e);
done = true;
- },
- Ok(t) => {
- match handle_one_task(shared.clone(), t) {
- Err(e) => error!("Task failed: {:?}", e),
- _ => ()
- }
}
+ Ok(t) => match handle_one_task(shared.clone(), t) {
+ Err(e) => error!("Task failed: {:?}", e),
+ _ => (),
+ },
}
+
+ thread::sleep(Duration::from_secs(10));
}
}
@@ -135,13 +137,15 @@ fn to_run(data: &Value) -> Result<Value, Error> {
info!("to_run");
macro_rules! get {
($id:ident str $e:expr) => {
- $id[$e].as_str().ok_or_else(
- || Error::UnexpectedJson($id.clone()))
+ $id[$e]
+ .as_str()
+ .ok_or_else(|| Error::UnexpectedJson($id.clone()))
};
($id:ident f64 $e:expr) => {
- $id[$e].as_f64().ok_or_else(
- || Error::UnexpectedJson($id.clone()))
+ $id[$e]
+ .as_f64()
+ .ok_or_else(|| Error::UnexpectedJson($id.clone()))
};
};
@@ -154,46 +158,65 @@ fn to_run(data: &Value) -> Result<Value, Error> {
fn process_strava_activity<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
- data: models::RawData) -> Result<Value, Error> {
-
+ data: models::RawData,
+ mut task: models::Task,
+) -> Result<(), Error> {
let json_error = || Error::UnexpectedJson(data.payload.clone());
- let dtype = data.payload["type"].as_str().ok_or_else(json_error)?;
+ let strava_type = data.payload["type"].as_str().ok_or_else(json_error)?;
- let payload = match dtype {
- "Run" => {
- to_run(&data.payload)?
- },
- &_ => Err(Error::InternalError)?,
- };
+ let entry_type = match strava_type {
+ "Run" => Ok("run".to_string()),
+ &_ => Err(Error::NotFound),
+ }?;
- Ok(payload)
+ let conn = &shared.conn.lock().unwrap();
+ conn.transaction::<(), Error, _>(|| {
+ let entry = models::NewEntry {
+ username: data.username.as_str(),
+ entry_type: entry_type.as_str(),
+ timestamp: None,
+ payload: json!({}),
+ };
+
+ let id = insert!(entries <= conn, &entry)?;
+
+ let entry_data = models::EntryData {
+ username: entry.username.to_string(),
+ entry_type: entry.entry_type.to_string(),
+ entry_id: id,
+ data_type: data.data_type,
+ data_id: data.id,
+ };
+ insert!(entry_data <= conn, &entry_data)?;
+
+ task.state = models::TaskState::SUCCESSFUL;
+ db::update_task(conn, task)?;
+ Ok(())
+ })?;
+
+ Ok(())
}
fn process_raw_data<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
key: &models::RawDataKey,
- mut task: models::Task) -> Result<(), Error> {
-
+ mut task: models::Task,
+) -> Result<(), Error> {
let data = db::get_raw_data(&shared.conn.lock().unwrap(), key)?;
println!("Process raw data: {:#?}", data);
- let payload = match data.data_type {
- models::DataType::StravaActivity => {
- process_strava_activity(shared.clone(), data)?
- }
+ match data.data_type {
+ models::DataType::StravaActivity => process_strava_activity(shared.clone(), data, task)?,
};
- info!("Process finished. Payload: {:#?}", payload);
- unimplemented!();
-
- task.state = models::TaskState::SUCCESSFUL;
- db::update_task(&shared.conn.lock().unwrap(), task)?;
+ info!("Process finished.");
Ok(())
}
fn process_all_raw_data<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
- mut task: models::Task) -> Result<(), Error> {
+ mut task: models::Task,
+) -> Result<(), Error> {
let now = Utc::now();
let conn = &shared.conn.lock().unwrap();
@@ -209,7 +232,8 @@ fn process_all_raw_data<S: strava::StravaApi>(
state: models::TaskState::NEW,
username: key.username.as_str(),
payload: &to_value(Command::ProcessRawData(key.clone()))?,
- })?;
+ },
+ )?;
}
task.state = models::TaskState::SUCCESSFUL;
db::update_task(conn, task)?;
@@ -241,24 +265,21 @@ fn import_strava_user<S: strava::StravaApi>(
let result = strava.get("/athlete/activities", &token.access_token, &params[..])?;
let json_error = || Error::UnexpectedJson(result.clone());
- let result = result
- .as_array()
- .ok_or_else(json_error)?;
+ let result = result.as_array().ok_or_else(json_error)?;
for activity in result {
let id = activity["id"].as_i64().ok_or_else(json_error)?;
- info!(
- "activity id: {} start: {}",
- id, activity["start_date"]
- );
-
- db::insert_data(&shared.conn.lock().unwrap(),
- &models::RawData {
- data_type: models::DataType::StravaActivity,
- id: id,
- username: username.to_string(),
- payload: activity.clone(),
- })?;
+ info!("activity id: {} start: {}", id, activity["start_date"]);
+
+ db::insert_data(
+ &shared.conn.lock().unwrap(),
+ &models::RawData {
+ data_type: models::DataType::StravaActivity,
+ id: id,
+ username: username.to_string(),
+ payload: activity.clone(),
+ },
+ )?;
}
if result.len() < per_page {
diff --git a/src/lib.rs b/src/lib.rs
index 25b56e9..80cef09 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -16,6 +16,7 @@ extern crate diesel;
extern crate log;
extern crate fern;
+#[macro_use]
pub mod db;
pub mod error;
pub mod importer;
diff --git a/src/main.rs b/src/main.rs
index c745816..8d6eef8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -4,15 +4,15 @@ extern crate fern;
extern crate log;
extern crate clap;
+use chrono::Utc;
use clap::App;
use clap::Arg;
use clap::SubCommand;
use diesel::connection::Connection;
use diesel::pg::PgConnection;
+use pjournal::db;
use pjournal::importer;
use pjournal::models;
-use pjournal::db;
-use chrono::Utc;
use serde_json::to_value;
fn setup_logger() -> Result<(), fern::InitError> {
@@ -119,8 +119,7 @@ fn main() {
.arg(Arg::with_name("PASSWORD").required(true).index(2)),
)
.subcommand(
- SubCommand::with_name("process_all_data")
- .about("create a ProcessAllRawData task")
+ SubCommand::with_name("process_all_data").about("create a ProcessAllRawData task"),
)
.get_matches();
@@ -152,12 +151,15 @@ fn main() {
} else if let Some(_matches) = matches.subcommand_matches("process_all_data") {
let command = importer::Command::ProcessAllRawData;
db::insert_task(
- &conn, &models::NewTask {
+ &conn,
+ &models::NewTask {
start_at: Utc::now(),
state: models::TaskState::NEW,
username: "system",
payload: &to_value(command).unwrap(),
- }).expect("insert");
+ },
+ )
+ .expect("insert");
} else {
info!("Start server");
pjournal::server::start(conn, db_url, base_url);
diff --git a/src/models.rs b/src/models.rs
index c88f938..15eafd7 100644
--- a/src/models.rs
+++ b/src/models.rs
@@ -1,6 +1,7 @@
+use crate::schema::config;
use crate::schema::entries;
+use crate::schema::entry_data;
use crate::schema::raw_data;
-use crate::schema::config;
use crate::schema::strava_tokens;
use crate::schema::tasks;
use crate::schema::users;
@@ -13,11 +14,11 @@ use diesel::serialize;
use diesel::serialize::Output;
use diesel::serialize::ToSql;
use diesel::sql_types;
+use serde::Deserialize;
+use serde::Serialize;
use serde_json::Value;
use std::fmt;
use std::io::Write;
-use serde::Deserialize;
-use serde::Serialize;
#[derive(PartialEq, Debug, Clone, Copy, AsExpression, FromSqlRow)]
#[sql_type = "sql_types::Text"]
@@ -168,3 +169,13 @@ pub struct Entry {
pub timestamp: Option<DateTime<Utc>>,
pub payload: Value,
}
+
+#[derive(Queryable, Insertable, Debug, Serialize, Deserialize, Clone)]
+#[table_name = "entry_data"]
+pub struct EntryData {
+ pub username: String,
+ pub entry_type: String,
+ pub entry_id: i64,
+ pub data_type: DataType,
+ pub data_id: i64,
+}
diff --git a/src/schema.rs b/src/schema.rs
index 2303714..23690df 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -18,6 +18,16 @@ table! {
}
table! {
+ entry_data (username, entry_type, entry_id, data_type, data_id) {
+ username -> Varchar,
+ entry_type -> Varchar,
+ entry_id -> Int8,
+ data_type -> Varchar,
+ data_id -> Int8,
+ }
+}
+
+table! {
raw_data (data_type, id) {
data_type -> Varchar,
id -> Int8,
@@ -60,6 +70,7 @@ joinable!(tasks -> users (username));
allow_tables_to_appear_in_same_query!(
config,
entries,
+ entry_data,
raw_data,
strava_tokens,
tasks,