From 6d0a4d03705b96b252a6b29d3b8c188b9c903b89 Mon Sep 17 00:00:00 2001 From: Kjetil Orbekk Date: Mon, 3 Feb 2020 22:55:36 -0500 Subject: Refactor importer to store tasks in postgresql --- src/db.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) (limited to 'src/db.rs') diff --git a/src/db.rs b/src/db.rs index 20123bf..f3a261c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -6,6 +6,9 @@ use diesel::pg::PgConnection; use diesel::ExpressionMethods; use diesel::QueryDsl; use diesel::RunQueryDsl; +use std::time::Duration; +use chrono::DateTime; +use chrono::Utc; pub const COST: u32 = 10; @@ -98,3 +101,62 @@ pub fn get_strava_token( .get_result::(conn)?; Ok(token) } + +pub fn insert_task( + conn: &PgConnection, + task: &models::NewTask) -> Result { + use crate::schema::tasks; + let id = diesel::insert_into(tasks::table) + .values(task) + .returning(tasks::id) + .get_result(conn)?; + Ok(id) +} + +fn update_task_inner(conn: &PgConnection, task: &models::Task) + -> Result { + use crate::schema::tasks; + + diesel::delete(tasks::table.filter(tasks::columns::id.eq(task.id))) + .execute(conn)?; + + let new_id = insert_task(conn, &models::NewTask { + start_at: task.start_at, + state: task.state, + username: &task.username, + payload: &task.payload, + })?; + + let new_task = tasks::table.find(new_id) + .get_result::(conn)?; + + Ok(new_task) +} + +fn update_task(conn: &PgConnection, task: &models::Task) -> Result { + conn.transaction(|| { + update_task_inner(conn, task) + }) +} + +pub fn take_task( + conn: &PgConnection, + state: models::TaskState, + start_before: DateTime, + eta: DateTime) + -> Result { + use crate::schema::tasks; + + conn.transaction(|| { + let mut task = tasks::table + .filter(tasks::state.eq(state)) + .filter(tasks::start_at.lt(start_before)) + .order(tasks::start_at.asc()) + .first::(conn)?; + + task.start_at = eta; + let task = update_task_inner(conn, &task)?; + + Ok(task) + }) +} -- cgit v1.2.3