use async_trait::async_trait; use protocol::{ actions::Bid, bot::{BiddingBot, PlayingBot}, bridge_engine::{ BiddingStatePlayerView, GameState, PlayResult, PlayStatePlayerView, TableState, }, card::Card, core::Deal, simple_bots::{AlwaysPassBiddingBot, RandomPlayingBot}, }; use rand::random; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sqlx::{query, query_as, PgPool}; use uuid::Uuid; use crate::error::BridgeError; #[async_trait] pub trait Journal { // Next sequence number to use. fn next(&self) -> i64; // Append payload to the journal at sequence number `seq`. async fn append( &mut self, seq: i64, payload: Item, ) -> Result<(), BridgeError>; // Fetch all journal entries with sequence number greater or equal to `seq`. async fn replay(&mut self, seq: i64) -> Result, BridgeError>; } pub struct DbJournal { db: PgPool, id: Uuid, seq: i64, } impl DbJournal { pub fn new(db: PgPool, id: Uuid) -> Self { Self { db, id, seq: -1 } } } #[async_trait] impl Journal for DbJournal where Item: Clone + Serialize + DeserializeOwned + Send + Sync + Unpin + 'static, { async fn append( &mut self, seq: i64, payload: Item, ) -> Result<(), BridgeError> { let result = query!( r#" insert into object_journal (id, seq, payload) values ($1, $2, $3) "#, self.id, seq, sqlx::types::Json(payload) as _, ) .execute(&self.db) .await; if let Err(sqlx::Error::Database(e)) = result { if e.constraint() == Some("journal_entry") { return Err(BridgeError::JournalConflict( format!("{}", self.id), seq, )); } } self.seq += 1; Ok(()) } async fn replay(&mut self, seq: i64) -> Result, BridgeError> { let rows = query_as!( ReplayRow::, r#" select seq, payload as "payload: _" from object_journal where id = $1 and seq >= $2 order by seq "#, self.id, seq ) .fetch_all(&self.db) .await?; let mut payloads: Vec = vec![]; for v in rows { payloads.push(v.payload.0); self.seq = v.seq; } Ok(payloads) } fn next(&self) -> i64 { self.seq + 1 } } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum TableUpdate { NewDeal { deal: Deal }, ChangeSettings(TableSettings), Bid(Bid), Play(Card), } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Default)] pub struct TableSettings { west_player: Option, nort_player: Option, east_player: Option, south_player: Option, } pub struct Table where J: Journal, { journal: J, settings: TableSettings, pub state: TableState, } impl> Table { pub async fn new(journal: J) -> Result { let mut table = Self { journal, state: Default::default(), settings: Default::default(), }; table.init().await?; Ok(table) } pub fn game_in_progress(&self) -> bool { matches!(&self.state, TableState::Game(_)) } pub fn game(&self) -> Result<&GameState, BridgeError> { match &self.state { TableState::Game(g) => Ok(g), _ => Err(BridgeError::InvalidRequest( "no game in progress".to_string(), )), } } pub fn result(&self) -> Result<&PlayResult, BridgeError> { match &self.state { TableState::Result(r) => Ok(r), _ => Err(BridgeError::InvalidRequest("no result".to_string())), } } async fn init(&mut self) -> Result<(), BridgeError> { self.insert_and_apply(TableUpdate::NewDeal { deal: random(), }) .await } pub async fn bid(&mut self, bid: Bid) -> Result<(), BridgeError> { self.insert_and_apply(TableUpdate::Bid(bid)).await } pub async fn play(&mut self, card: Card) -> Result<(), BridgeError> { self.insert_and_apply(TableUpdate::Play(card)).await } pub async fn new_deal(&mut self) -> Result<(), BridgeError> { self.insert_and_apply(TableUpdate::NewDeal { deal: random(), }) .await } async fn insert_and_apply( &mut self, update: TableUpdate, ) -> Result<(), BridgeError> { self.journal .append(self.journal.next(), update.clone()) .await?; self.apply_update(update)?; Ok(()) } fn apply_update(&mut self, update: TableUpdate) -> Result<(), BridgeError> { match update { TableUpdate::ChangeSettings(settings) => { self.settings = settings; Ok(()) } TableUpdate::Bid(bid) => { self.state = self.game()?.clone().bid(bid)?.into(); Ok(()) } TableUpdate::NewDeal { deal } => { self.state = GameState::new(deal).into(); Ok(()) } TableUpdate::Play(card) => { self.state = self.game()?.clone().play(card)?.into(); Ok(()) } } } pub async fn replay(journal: J) -> Result { let mut table = Self { journal, state: Default::default(), settings: Default::default(), }; table.replay_internal().await?; Ok(table) } async fn replay_internal(&mut self) -> Result<(), BridgeError> { let log = self.journal.replay(self.journal.next()).await?; if log.is_empty() { return Err(BridgeError::NotFound( "table journal missing".to_string(), )); } for update in log { self.apply_update(update).ok(); } Ok(()) } pub async fn new_or_replay(journal: J) -> Result { let mut table = Self { journal, state: Default::default(), settings: Default::default(), }; if let Err(BridgeError::JournalConflict(..)) = table.init().await { table.replay_internal().await?; } Ok(table) } } #[derive(Debug)] struct ReplayRow { seq: i64, payload: sqlx::types::Json, } pub async fn advance_play>( table: &mut Table, ) -> Result<(), BridgeError> { let game = table.game()?; match game { GameState::Bidding(bidding) => { let player_view = BiddingStatePlayerView::from_bidding_state( bidding, game.current_player(), ); let bot = AlwaysPassBiddingBot {}; let bid = bot.bid(&player_view).await; table.bid(bid).await?; Ok(()) } GameState::Play(game) => { let player_view = PlayStatePlayerView::from_play_state( game, game.current_player(), ); let bot = RandomPlayingBot {}; let card = bot.play(&player_view).await; table.play(card).await?; Ok(()) } } } #[cfg(test)] mod test { use protocol::{contract::{LevelAndSuit, ContractLevel}, card::Suit}; use serde_json::json; use tracing::info; use crate::tests::test_setup; use super::*; pub struct TestJournal { log: Vec>, seq: i64, } impl TestJournal { pub fn new() -> Self { Self { log: vec![], seq: -1, } } } #[async_trait] impl Journal for TestJournal { async fn append( &mut self, seq: i64, payload: Item, ) -> Result<(), BridgeError> { if seq != self.log.len() as i64 { return Err(BridgeError::UpdateConflict( self.log.len() as i64, seq, )); } self.log.push(Some(payload)); self.seq += 1; Ok(()) } async fn replay(&mut self, seq: i64) -> Result, BridgeError> { Ok(self.log[seq as usize..] .into_iter() .filter_map(|e| e.clone()) .collect()) } fn next(&self) -> i64 { self.seq + 1 } } #[tokio::test] async fn test_journal() { let mut jnl = TestJournal::new(); let seq = jnl.next(); assert_eq!(jnl.next(), 0); assert_eq!(jnl.append(seq, json!(10)).await.unwrap(), ()); assert_eq!(jnl.next(), 1); assert!(jnl.append(0, json!(0)).await.is_err()); let seq = jnl.next(); assert_eq!(jnl.append(seq, json!(20)).await.unwrap(), ()); assert_eq!(jnl.replay(1).await.unwrap(), vec!(json!(20))); } #[tokio::test] async fn test_new_table() { let t1 = Table::new(TestJournal::new()).await.unwrap(); assert!(t1.game().unwrap().is_bidding()); } // TODO: Enable this // #[tokio::test] // async fn test_replay_table() { // let t1 = Table::new(TestJournal::new()).await.unwrap(); // let game = t1.game().unwrap().clone(); // let journal = t1.journal; // let t2 = Table::replay(journal).await.unwrap(); // assert_eq!(&game, t2.game().unwrap()); // } #[tokio::test] async fn test_advance_play_once() { let mut t1 = Table::new(TestJournal::new()).await.unwrap(); let player = t1.game().unwrap().current_player(); advance_play(&mut t1).await.unwrap(); assert_ne!(player, t1.game().unwrap().current_player()); } #[tokio::test] async fn test_advance_play_until_result() { test_setup(); let mut t1 = Table::new(TestJournal::new()).await.unwrap(); assert!(t1.game().is_ok()); let raise1c = LevelAndSuit { level: ContractLevel::One, suit: Some(Suit::Club), }; // Make sure the game doesn't get passed out. t1.bid(Bid::Raise(raise1c)).await.unwrap(); while t1.game().is_ok() { info!("Game is: {:#?}", t1.game()); advance_play(&mut t1).await.unwrap(); } assert!(t1.result().is_ok()); } }