summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-04 09:07:12 -0500
committerKjetil Orbekk <kjetil.orbekk@gmail.com>2020-02-04 09:07:12 -0500
commitffc459d5bcca732474fd770d97f7bbd55223ca9a (patch)
tree22af9a7a8456f6d5e90d3b8d547d423138dd676f
parentfa919608641021561c620897f7aed9789d4790b5 (diff)
Add command to batch process all raw data
-rw-r--r--src/db.rs24
-rw-r--r--src/importer.rs84
-rw-r--r--src/main.rs25
-rw-r--r--src/models.rs14
-rw-r--r--src/schema.rs2
5 files changed, 123 insertions, 26 deletions
diff --git a/src/db.rs b/src/db.rs
index 5135b91..24cf98d 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -110,7 +110,7 @@ pub fn insert_task(conn: &PgConnection, task: &models::NewTask) -> Result<i64, E
Ok(id)
}
-fn update_task_inner(conn: &PgConnection, task: &models::Task) -> Result<models::Task, Error> {
+fn update_task_inner(conn: &PgConnection, task: models::Task) -> Result<models::Task, Error> {
use crate::schema::tasks;
diesel::delete(tasks::table.filter(tasks::columns::id.eq(task.id))).execute(conn)?;
@@ -130,7 +130,7 @@ fn update_task_inner(conn: &PgConnection, task: &models::Task) -> Result<models:
Ok(new_task)
}
-pub fn update_task(conn: &PgConnection, task: &models::Task) -> Result<models::Task, Error> {
+pub fn update_task(conn: &PgConnection, task: models::Task) -> Result<models::Task, Error> {
conn.transaction(|| update_task_inner(conn, task))
}
@@ -150,7 +150,7 @@ pub fn take_task(
.first::<models::Task>(conn)?;
task.start_at = eta;
- let task = update_task_inner(conn, &task)?;
+ let task = update_task_inner(conn, task)?;
Ok(task)
})
@@ -163,3 +163,21 @@ pub fn insert_data(conn: &PgConnection, data: &models::RawData) -> Result<usize,
.execute(conn)?;
Ok(rows)
}
+
+pub fn get_raw_data(conn: &PgConnection, key: &models::RawDataKey) -> Result<models::RawData, Error> {
+ use crate::schema::raw_data;
+ let data = raw_data::table
+ .find((key.data_type, key.id))
+ .get_result::<models::RawData>(conn)?;
+ Ok(data)
+}
+
+pub fn get_raw_data_keys(conn: &PgConnection) -> Result<Vec<models::RawDataKey>, Error> {
+ use crate::schema::raw_data;
+ let rows = raw_data::table
+ .select((raw_data::data_type,
+ raw_data::id,
+ raw_data::username))
+ .get_results::<models::RawDataKey>(conn)?;
+ Ok(rows)
+}
diff --git a/src/importer.rs b/src/importer.rs
index 5f5790a..b1c1b18 100644
--- a/src/importer.rs
+++ b/src/importer.rs
@@ -9,7 +9,9 @@ use std::thread;
use std::time::Duration;
use std::time::Instant;
use threadpool::ThreadPool;
+use serde_json::to_value;
+use crate::diesel::Connection;
use crate::db;
use crate::error::Error;
use crate::models;
@@ -21,6 +23,8 @@ pub const EMPTY_PARAMS: &[(&str, &str)] = &[];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
ImportStravaUser { username: String },
+ ProcessAllRawData,
+ ProcessRawData(models::RawDataKey),
}
pub struct ImporterSharedData<StravaApi: strava::StravaApi + 'static> {
@@ -53,39 +57,46 @@ fn run_periodically<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>, pe
fn handle_one_task<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
-) -> Result<models::Task, Error> {
- let task = {
- let conn = shared.conn.lock().unwrap();
- let now = Utc::now();
- let eta = now + chrono::Duration::seconds(5);
-
- db::take_task(&conn, models::TaskState::NEW, now, eta)?
- };
-
+ task: models::Task
+) -> Result<(), Error> {
let command = serde_json::from_value(task.payload.clone())?;
match command {
Command::ImportStravaUser { username } => import_strava_user(shared, username.as_str())?,
+ Command::ProcessAllRawData => process_all_raw_data(shared, task.clone())?,
+ Command::ProcessRawData(ref d) => process_raw_data(shared, d, task.clone())?,
}
-
- Ok(task)
+
+ Ok(())
}
fn handle_tasks<S: strava::StravaApi>(shared: Arc<ImporterSharedData<S>>) {
let mut done = false;
while !done {
- match handle_one_task(shared.clone()) {
+ let task = (|| {
+ let conn = shared.conn.lock().unwrap();
+ let now = Utc::now();
+ let eta = now + chrono::Duration::seconds(5);
+
+ db::take_task(&conn, models::TaskState::NEW, now, eta)
+ })();
+
+ match task {
Err(Error::NotFound) => {
info!("No more tasks");
done = true;
- }
+ },
Err(e) => {
- error!("Error handling task: {}", e);
- }
+ error!("Failed to get task: {:?}", e);
+ done = true;
+ },
Ok(t) => {
- info!("Successfully handled task: {:?}", t);
+ match handle_one_task(shared.clone(), t) {
+ Err(e) => error!("Task failed: {:?}", e),
+ _ => ()
+ }
}
- };
+ }
}
}
@@ -119,6 +130,45 @@ impl<StravaApi: strava::StravaApi> Importer<StravaApi> {
}
}
+fn process_raw_data<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>,
+ key: &models::RawDataKey,
+ mut task: models::Task) -> Result<(), Error> {
+
+ let data = db::get_raw_data(&shared.conn.lock().unwrap(), key)?;
+ println!("Process raw data: {:#?}", data);
+
+ unimplemented!();
+}
+
+fn process_all_raw_data<S: strava::StravaApi>(
+ shared: Arc<ImporterSharedData<S>>,
+ mut task: models::Task) -> Result<(), Error> {
+ let now = Utc::now();
+ let conn = &shared.conn.lock().unwrap();
+
+ let mut tasks = 0;
+ conn.transaction::<(), Error, _>(|| {
+ let keys = db::get_raw_data_keys(conn)?;
+ tasks = keys.len();
+ for key in keys {
+ db::insert_task(
+ conn,
+ &models::NewTask {
+ start_at: now,
+ state: models::TaskState::NEW,
+ username: key.username.as_str(),
+ payload: &to_value(Command::ProcessRawData(key.clone()))?,
+ })?;
+ }
+ task.state = models::TaskState::SUCCESSFUL;
+ db::update_task(conn, task)?;
+ Ok(())
+ })?;
+ info!("process_all_raw_data: Added {} tasks", tasks);
+ Ok(())
+}
+
fn import_strava_user<S: strava::StravaApi>(
shared: Arc<ImporterSharedData<S>>,
username: &str,
diff --git a/src/main.rs b/src/main.rs
index 235c71f..c745816 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,11 +3,17 @@ extern crate fern;
#[macro_use]
extern crate log;
extern crate clap;
+
use clap::App;
use clap::Arg;
use clap::SubCommand;
use diesel::connection::Connection;
use diesel::pg::PgConnection;
+use pjournal::importer;
+use pjournal::models;
+use pjournal::db;
+use chrono::Utc;
+use serde_json::to_value;
fn setup_logger() -> Result<(), fern::InitError> {
use fern::colors::{Color, ColoredLevelConfig};
@@ -112,6 +118,10 @@ fn main() {
.arg(Arg::with_name("USERNAME").required(true).index(1))
.arg(Arg::with_name("PASSWORD").required(true).index(2)),
)
+ .subcommand(
+ SubCommand::with_name("process_all_data")
+ .about("create a ProcessAllRawData task")
+ )
.get_matches();
setup_logger().expect("logger");
@@ -124,7 +134,7 @@ fn main() {
let conn = PgConnection::establish(db_url).unwrap();
if let Some(matches) = matches.subcommand_matches("init") {
- let config = pjournal::models::Config {
+ let config = models::Config {
strava_client_id: matches.value_of("strava_client_id").unwrap().to_string(),
strava_client_secret: matches
.value_of("strava_client_secret")
@@ -134,11 +144,20 @@ fn main() {
singleton: true,
};
- pjournal::db::create_config(&conn, &config).unwrap();
+ db::create_config(&conn, &config).unwrap();
} else if let Some(matches) = matches.subcommand_matches("adduser") {
let user = matches.value_of("USERNAME").unwrap();
let password = matches.value_of("PASSWORD").unwrap();
- pjournal::db::adduser(&conn, user, password).unwrap();
+ db::adduser(&conn, user, password).unwrap();
+ } else if let Some(_matches) = matches.subcommand_matches("process_all_data") {
+ let command = importer::Command::ProcessAllRawData;
+ db::insert_task(
+ &conn, &models::NewTask {
+ start_at: Utc::now(),
+ state: models::TaskState::NEW,
+ username: "system",
+ payload: &to_value(command).unwrap(),
+ }).expect("insert");
} else {
info!("Start server");
pjournal::server::start(conn, db_url, base_url);
diff --git a/src/models.rs b/src/models.rs
index cd8ddf1..50e1877 100644
--- a/src/models.rs
+++ b/src/models.rs
@@ -15,6 +15,8 @@ use diesel::sql_types;
use serde_json::Value;
use std::fmt;
use std::io::Write;
+use serde::Deserialize;
+use serde::Serialize;
#[derive(PartialEq, Debug, Clone, Copy, AsExpression, FromSqlRow)]
#[sql_type = "sql_types::Text"]
@@ -106,7 +108,7 @@ pub struct StravaToken {
pub expires_at: DateTime<Utc>,
}
-#[derive(PartialEq, Debug, Clone, Copy, AsExpression, FromSqlRow)]
+#[derive(PartialEq, Debug, Clone, Copy, AsExpression, FromSqlRow, Serialize, Deserialize)]
#[sql_type = "sql_types::Text"]
pub enum DataType {
StravaActivity = 0,
@@ -131,7 +133,15 @@ impl FromSql<sql_types::Text, Pg> for DataType {
}
}
-#[derive(Insertable, Queryable)]
+#[derive(Insertable, Queryable, Debug, Serialize, Deserialize, Clone)]
+#[table_name = "raw_data"]
+pub struct RawDataKey {
+ pub data_type: DataType,
+ pub id: i64,
+ pub username: String,
+}
+
+#[derive(Insertable, Queryable, Debug, Serialize, Deserialize, Clone)]
#[table_name = "raw_data"]
pub struct RawData {
pub data_type: DataType,
diff --git a/src/schema.rs b/src/schema.rs
index df20c4f..1584424 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -12,7 +12,7 @@ table! {
data_type -> Varchar,
id -> Int8,
username -> Varchar,
- payload -> Nullable<Jsonb>,
+ payload -> Jsonb,
}
}