diff options
author | Kjetil Orbekk <kjetil.orbekk@gmail.com> | 2020-02-03 22:55:36 -0500 |
---|---|---|
committer | Kjetil Orbekk <kjetil.orbekk@gmail.com> | 2020-02-03 22:55:36 -0500 |
commit | 6d0a4d03705b96b252a6b29d3b8c188b9c903b89 (patch) | |
tree | b8ea3f7459ae4c9b22a976259e637cc7a3d695c7 /src/db.rs | |
parent | c459b5e85ef9b695b3c9a107b7cf7f08847c608f (diff) |
Refactor importer to store tasks in postgresql
Diffstat (limited to 'src/db.rs')
-rw-r--r-- | src/db.rs | 62 |
1 files changed, 62 insertions, 0 deletions
@@ -6,6 +6,9 @@ use diesel::pg::PgConnection; use diesel::ExpressionMethods; use diesel::QueryDsl; use diesel::RunQueryDsl; +use std::time::Duration; +use chrono::DateTime; +use chrono::Utc; pub const COST: u32 = 10; @@ -98,3 +101,62 @@ pub fn get_strava_token( .get_result::<models::StravaToken>(conn)?; Ok(token) } + +pub fn insert_task( + conn: &PgConnection, + task: &models::NewTask) -> Result<i64, Error> { + use crate::schema::tasks; + let id = diesel::insert_into(tasks::table) + .values(task) + .returning(tasks::id) + .get_result(conn)?; + Ok(id) +} + +fn update_task_inner(conn: &PgConnection, task: &models::Task) + -> Result<models::Task, Error> { + use crate::schema::tasks; + + diesel::delete(tasks::table.filter(tasks::columns::id.eq(task.id))) + .execute(conn)?; + + let new_id = insert_task(conn, &models::NewTask { + start_at: task.start_at, + state: task.state, + username: &task.username, + payload: &task.payload, + })?; + + let new_task = tasks::table.find(new_id) + .get_result::<models::Task>(conn)?; + + Ok(new_task) +} + +fn update_task(conn: &PgConnection, task: &models::Task) -> Result<models::Task, Error> { + conn.transaction(|| { + update_task_inner(conn, task) + }) +} + +pub fn take_task( + conn: &PgConnection, + state: models::TaskState, + start_before: DateTime<Utc>, + eta: DateTime<Utc>) + -> Result<models::Task, Error> { + use crate::schema::tasks; + + conn.transaction(|| { + let mut task = tasks::table + .filter(tasks::state.eq(state)) + .filter(tasks::start_at.lt(start_before)) + .order(tasks::start_at.asc()) + .first::<models::Task>(conn)?; + + task.start_at = eta; + let task = update_task_inner(conn, &task)?; + + Ok(task) + }) +} |