use std::mem; use async_trait::async_trait; use protocol::{ bot::BiddingBot, bridge_engine::{Bid, BiddingStatePlayerView, Deal, GameState, Player}, card::Card, play_result::MoveResult, simple_bots::AlwaysPassBiddingBot, }; use rand::random; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::json; use sqlx::{query, query_as, PgPool}; use tracing::info; use uuid::Uuid; use crate::error::BridgeError; #[async_trait] pub trait Journal { // Next sequence number to use. fn next(&self) -> i64; // Append payload to the journal at sequence number `seq`. async fn append( &mut self, seq: i64, payload: Item, ) -> Result<(), BridgeError>; // Fetch all journal entries with sequence number greater or equal to `seq`. async fn replay(&mut self, seq: i64) -> Result, BridgeError>; } pub struct DbJournal { db: PgPool, id: Uuid, seq: i64, } impl DbJournal { pub fn new(db: PgPool, id: Uuid) -> Self { Self { db, id, seq: -1 } } } #[async_trait] impl Journal for DbJournal where Item: Clone + Serialize + DeserializeOwned + Send + Sync + Unpin + 'static, { async fn append( &mut self, seq: i64, payload: Item, ) -> Result<(), BridgeError> { let result = query!( r#" insert into object_journal (id, seq, payload) values ($1, $2, $3) "#, self.id, seq, sqlx::types::Json(payload) as _, ) .execute(&self.db) .await; if let Err(sqlx::Error::Database(e)) = result { if e.constraint() == Some("journal_entry") { return Err(BridgeError::JournalConflict( format!("{}", self.id), seq, )); } } self.seq += 1; Ok(()) } async fn replay(&mut self, seq: i64) -> Result, BridgeError> { let rows = query_as!( ReplayRow::, r#" select seq, payload as "payload: _" from object_journal where id = $1 and seq >= $2 order by seq "#, self.id, seq ) .fetch_all(&self.db) .await?; let mut payloads: Vec = vec![]; for v in rows { payloads.push(v.payload.0); self.seq = v.seq; } Ok(payloads) } fn next(&self) -> i64 { self.seq + 1 } } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum TableUpdate { NewDeal(Deal, Player), ChangeSettings(TableSettings), Bid(Bid), SetState(TableState), } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Default)] pub struct TableSettings { west_player: Option, nort_player: Option, east_player: Option, south_player: Option, } pub struct Table where J: Journal, { journal: J, settings: TableSettings, state: TableState, } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum TableState { Unknown, Game(GameState), } impl Into for GameState { fn into(self) -> TableState { TableState::Game(self) } } impl> Table { pub fn game(&self) -> Option<&GameState> { match &self.state { TableState::Game(g) => Some(g), _ => None, } } } impl> Table { pub async fn new(mut journal: J) -> Result { let game = Self::init(&mut journal).await?; Ok(Table { journal, state: game.into(), settings: Default::default(), }) } async fn init(journal: &mut J) -> Result { let game = GameState::new(random(), random()); journal .append(0, TableUpdate::SetState(game.clone().into())) .await?; Ok(game) } pub async fn bid(&mut self, bid: Bid) -> Result<(), BridgeError> { let mut state = TableState::Unknown; mem::swap(&mut state, &mut self.state); let game = match state { TableState::Game(game) => game, _ => { return Err(BridgeError::InvalidRequest( "no game in progress".to_string(), )) } }; let mut state: TableState = match game.bid(bid)? { MoveResult::Stay(game) => game.into(), MoveResult::Go(_) => todo!(), }; self.journal .append(self.journal.next(), TableUpdate::SetState(state.clone())) .await?; mem::swap(&mut state, &mut self.state); Ok(()) } pub async fn play(&mut self, _card: Card) -> Result<(), BridgeError> { Ok(()) } pub async fn replay(mut journal: J) -> Result { let log = journal.replay(0).await?; if log.is_empty() { return Err(BridgeError::NotFound( "table journal missing".to_string(), )); } let mut state = TableState::Unknown; for update in log { info!("Replaying update {update:?}"); match update { TableUpdate::NewDeal(_, _) => todo!(), TableUpdate::ChangeSettings(_) => todo!(), TableUpdate::Bid(_) => todo!(), TableUpdate::SetState(s) => state = s, } } Ok(Table { journal, state: state, settings: Default::default(), }) } pub async fn new_or_replay(mut journal: J) -> Result { let game = Self::init(&mut journal).await; if let Err(BridgeError::JournalConflict(..)) = game { return Self::replay(journal).await; } Ok(Self { journal, state: game?.into(), settings: Default::default(), }) } } #[derive(Debug)] struct ReplayRow { seq: i64, payload: sqlx::types::Json, } pub async fn advance_play>( table: &mut Table, ) -> Result<(), BridgeError> { let game = table .game() .ok_or(BridgeError::InvalidRequest(format!("no game in progress")))?; match game { GameState::Bidding(bidding) => { let player_view = BiddingStatePlayerView::from_bidding_state( bidding, game.current_player(), ); let bot = AlwaysPassBiddingBot {}; let bid = bot.bid(&player_view).await; table.bid(bid).await?; Ok(()) } GameState::Play(_) => todo!(), } } #[cfg(test)] mod test { use super::*; pub struct TestJournal { log: Vec>, } impl TestJournal { pub fn new() -> Self { Self { log: vec![] } } } #[async_trait] impl Journal for TestJournal { async fn append( &mut self, seq: i64, payload: Item, ) -> Result<(), BridgeError> { if seq != self.log.len() as i64 { return Err(BridgeError::UpdateConflict( self.log.len() as i64, seq, )); } self.log.push(Some(payload)); Ok(()) } async fn replay(&mut self, seq: i64) -> Result, BridgeError> { Ok(self.log[seq as usize..] .into_iter() .filter_map(|e| e.clone()) .collect()) } fn next(&self) -> i64 { self.log.len() as i64 } } #[tokio::test] async fn test_journal() { let mut jnl = TestJournal::new(); let seq = jnl.next(); assert_eq!(jnl.next(), 0); assert_eq!(jnl.append(seq, json!(10)).await.unwrap(), ()); assert_eq!(jnl.next(), 1); assert!(jnl.append(0, json!(0)).await.is_err()); let seq = jnl.next(); assert_eq!(jnl.append(seq, json!(20)).await.unwrap(), ()); assert_eq!(jnl.replay(1).await.unwrap(), vec!(json!(20))); } #[tokio::test] async fn test_new_table() { let t1 = Table::new(TestJournal::new()).await.unwrap(); assert!(t1.game().unwrap().is_bidding()); } #[tokio::test] async fn test_replay_table() { let t1 = Table::new(TestJournal::new()).await.unwrap(); let game = t1.game().unwrap().clone(); let journal = t1.journal; let t2 = Table::replay(journal).await.unwrap(); assert_eq!(&game, t2.game().unwrap()); } #[tokio::test] async fn test_advance_play() { let mut t1 = Table::new(TestJournal::new()).await.unwrap(); let player = t1.game().unwrap().current_player(); advance_play(&mut t1).await.unwrap(); assert_ne!(player, t1.game().unwrap().current_player()); } }