From 8fa1b37bb705371bf5dee574f1f136019d3db9d1 Mon Sep 17 00:00:00 2001 From: Kjetil Orbekk Date: Tue, 15 Nov 2022 14:14:02 -0500 Subject: Implement DbJournal --- Cargo.lock | 1 + server/Cargo.toml | 2 +- server/migrations/20221008120534_init.down.sql | 2 +- server/migrations/20221008120534_init.up.sql | 8 +++--- server/src/main.rs | 6 +++- server/src/play.rs | 39 ++++++++++++++++++++++++++ 6 files changed, 51 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df63e24..b852fd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1944,6 +1944,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", + "serde_json", "sha2", "sqlx-core", "sqlx-rt", diff --git a/server/Cargo.toml b/server/Cargo.toml index 228f1e6..041eb4e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -21,7 +21,7 @@ uuid = { version = "1.1.2", features = ["serde", "fast-rng", "v4"] } tower-cookies = "0.7.0" tower = { version = "0.4.13", features = ["full"] } urlencoding = "2.1.2" -sqlx = { version = "0.6", features = ["runtime-tokio-native-tls", "postgres", "uuid", "chrono"] } +sqlx = { version = "0.6", features = ["runtime-tokio-native-tls", "postgres", "uuid", "chrono", "json"] } anyhow = "1.0.65" chrono = { version = "0.4.22", features = ["serde"] } thiserror = "1.0.37" diff --git a/server/migrations/20221008120534_init.down.sql b/server/migrations/20221008120534_init.down.sql index 0c9bbed..50ba511 100644 --- a/server/migrations/20221008120534_init.down.sql +++ b/server/migrations/20221008120534_init.down.sql @@ -2,7 +2,7 @@ begin; drop table if exists sessions; drop table if exists table_players; -drop table if exists table_journal; +drop table if exists object_journal; drop table if exists active_tables; drop table if exists players; drop type if exists player_position; diff --git a/server/migrations/20221008120534_init.up.sql b/server/migrations/20221008120534_init.up.sql index e4b9eb1..db9fd3c 100644 --- a/server/migrations/20221008120534_init.up.sql +++ b/server/migrations/20221008120534_init.up.sql @@ -16,12 +16,12 @@ create table active_tables ( id uuid primary key not null ); -create table table_journal ( - table_id uuid references active_tables(id) not null, +create table object_journal ( + id uuid not null, seq bigint not null, - payload jsonb + payload jsonb not null ); -create unique index journal_entry on table_journal (table_id, seq); +create unique index journal_entry on object_journal (id, seq); create type player_position as enum ('west', 'north', 'east', 'south'); diff --git a/server/src/main.rs b/server/src/main.rs index 9b5adb7..6056dcd 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,4 +1,5 @@ use std::{collections::HashMap, env, str::FromStr, sync::Arc}; +use serde_json::json; use uuid::Uuid; use auth::AuthenticatedSession; @@ -18,7 +19,7 @@ mod auth; mod error; mod server; mod play; -use crate::error::BridgeError; +use crate::{error::BridgeError, play::{DbJournal, Journal}}; use crate::{ auth::{Authenticator, SessionId}, server::ServerContext, @@ -44,6 +45,9 @@ async fn main() { .await .expect("db connection"); + let mut jnl = DbJournal::new(db_pool.clone(), Uuid::new_v4()); + jnl.append(0, json!("starting server")).await.expect("new object"); + info!("Running db migrations"); sqlx::migrate!().run(&db_pool).await.expect("db migration"); diff --git a/server/src/play.rs b/server/src/play.rs index 8fe1872..242bdb8 100644 --- a/server/src/play.rs +++ b/server/src/play.rs @@ -1,4 +1,6 @@ use async_trait::async_trait; +use sqlx::{PgPool, query}; +use uuid::Uuid; use crate::error::BridgeError; @@ -11,6 +13,43 @@ pub trait Journal { async fn replay(&mut self, seq: i64) -> Result, BridgeError>; } +pub struct DbJournal { + db: PgPool, + id: Uuid, +} + +impl DbJournal { + pub fn new(db: PgPool, id: Uuid) -> Self { + Self { db, id } + } +} + +#[async_trait] +impl Journal for DbJournal { + async fn append(&mut self,seq:i64,payload:serde_json::Value) -> Result<(),BridgeError> { + query!( + r#" + insert into object_journal (id, seq, payload) + values ($1, $2, $3) + "#, + self.id, seq, payload, + ).execute(&self.db).await?; + Ok(()) + } + + async fn replay(&mut self,seq:i64) -> Result,BridgeError> { + let results = query!( + r#" + select payload from object_journal + where id = $1 and seq >= $2 + order by seq + "#, + self.id, seq + ).fetch_all(&self.db).await?; + Ok(results.into_iter().map(|v| v.payload).collect()) + } +} + pub struct Table {} #[cfg(test)] -- cgit v1.2.3