use async_trait::async_trait; use protocol::bridge_engine::{self, GameState, Player}; use serde_json::json; use sqlx::{query, PgPool}; use uuid::Uuid; use crate::error::BridgeError; #[async_trait] pub trait Journal { // Next sequence number to use. fn next(&self) -> i64; // Append payload to the journal at sequence number `seq`. async fn append(&mut self, seq: i64, payload: serde_json::Value) -> Result<(), BridgeError>; // Fetch all journal entries with sequence number greater or equal to `seq`. async fn replay(&mut self, seq: i64) -> Result, BridgeError>; } pub struct DbJournal { db: PgPool, id: Uuid, seq: i64 } impl DbJournal { pub fn new(db: PgPool, id: Uuid) -> Self { Self { db, id, seq: -1 } } } #[async_trait] impl Journal for DbJournal { async fn append(&mut self, seq: i64, payload: serde_json::Value) -> Result<(), BridgeError> { query!( r#" insert into object_journal (id, seq, payload) values ($1, $2, $3) "#, self.id, seq, payload, ) .execute(&self.db) .await?; Ok(()) } async fn replay(&mut self, seq: i64) -> Result, BridgeError> { let results = query!( r#" select payload from object_journal where id = $1 and seq >= $2 order by seq "#, self.id, seq ) .fetch_all(&self.db) .await?; Ok(results.into_iter().map(|v| v.payload).collect()) } fn next(&self) -> i64 { self.seq + 1 } } pub struct Table where J: Journal, { journal: J, game: GameState, } impl Table { pub fn game(&self) -> &GameState { &self.game } } impl Table { pub async fn new(mut journal: J) -> Result { let game = GameState::Bidding { dealer: Player::East, deal: bridge_engine::deal(), }; journal.append(0, json!(game)).await?; Ok(Table { journal, game }) } pub async fn replay(mut journal: J) -> Result { let games = journal.replay(0).await?; if games.is_empty() { return Err(BridgeError::NotFound("table journal missing".to_string())); } let game = serde_json::from_value(games[games.len() - 1].clone())?; Ok(Table { journal, game } ) } pub async fn new_or_replay(mut journal: J) -> Result { let games = journal.replay(0).await?; if games.is_empty() { return Self::new(journal).await; } let game = serde_json::from_value(games[games.len() - 1].clone())?; Ok(Table { journal, game } ) } } #[cfg(test)] mod test { use super::*; #[derive(Default)] pub struct TestJournal { log: Vec>, } #[async_trait] impl Journal for TestJournal { async fn append( &mut self, seq: i64, payload: serde_json::Value, ) -> Result<(), BridgeError> { if seq != self.log.len() as i64 { return Err(BridgeError::UpdateConflict(self.log.len() as i64, seq)); } self.log.push(Some(payload)); Ok(()) } async fn replay(&mut self, seq: i64) -> Result, BridgeError> { Ok(self.log[seq as usize..] .into_iter() .filter_map(|e| e.clone()) .collect()) } fn next(&self) -> i64 { self.log.len() as i64 } } #[tokio::test] async fn test_journal() { let mut jnl: TestJournal = Default::default(); let seq = jnl.next(); assert_eq!(jnl.next(), 0); assert_eq!(jnl.append(seq, json!(10)).await.unwrap(), ()); assert_eq!(jnl.next(), 1); assert!(jnl.append(0, json!(0)).await.is_err()); let seq = jnl.next(); assert_eq!(jnl.append(seq, json!(20)).await.unwrap(), ()); assert_eq!(jnl.replay(1).await.unwrap(), vec!(json!(20))); } #[tokio::test] async fn test_new_table() { let t1: Table = Table::new(Default::default()).await.unwrap(); match t1.game { GameState::Bidding { dealer, deal } => (), _ => panic!("should be Bidding"), }; } #[tokio::test] async fn test_replay_table() { let t1: Table = Table::new(Default::default()).await.unwrap(); let game = t1.game; let journal = t1.journal; let t2 = Table::replay(journal).await.unwrap(); assert_eq!(game, t2.game); } }