diff options
| author | mat <github@matdoes.dev> | 2022-12-11 00:15:37 -0600 |
|---|---|---|
| committer | mat <github@matdoes.dev> | 2022-12-11 00:15:37 -0600 |
| commit | 37b9f10b3bcc74b48df2c6843a5286a7d41e2414 (patch) | |
| tree | 6a995f7ad3f3309e57c276e10dc7e655dae9c5bb /azalea/src/swarm | |
| parent | 2d6737b247b74e964fd734d5ab4828a3193c296f (diff) | |
| download | azalea-drasl-37b9f10b3bcc74b48df2c6843a5286a7d41e2414.tar.xz | |
make entities have a reference to WeakWorlds instead
... and other exciting bug fixes
Diffstat (limited to 'azalea/src/swarm')
| -rw-r--r-- | azalea/src/swarm/chat.rs | 245 |
1 files changed, 183 insertions, 62 deletions
diff --git a/azalea/src/swarm/chat.rs b/azalea/src/swarm/chat.rs index 6c51ba33..4582c59e 100644 --- a/azalea/src/swarm/chat.rs +++ b/azalea/src/swarm/chat.rs @@ -18,12 +18,12 @@ use async_trait::async_trait; use azalea_client::{ChatPacket, Client, Event}; use parking_lot::Mutex; use std::{collections::VecDeque, sync::Arc}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::broadcast::{Receiver, Sender}; #[derive(Clone)] pub struct Plugin { pub swarm_state: SwarmState, - pub tx: UnboundedSender<ChatPacket>, + pub tx: Sender<ChatPacket>, } impl crate::Plugin for Plugin { @@ -31,7 +31,7 @@ impl crate::Plugin for Plugin { fn build(&self) -> State { State { - farthest_chat_index: Arc::new(Mutex::new(0)), + chat_index: Arc::new(Mutex::new(0)), swarm_state: self.swarm_state.clone(), tx: self.tx.clone(), } @@ -40,8 +40,8 @@ impl crate::Plugin for Plugin { #[derive(Clone)] pub struct State { - pub farthest_chat_index: Arc<Mutex<usize>>, - pub tx: UnboundedSender<ChatPacket>, + pub chat_index: Arc<Mutex<usize>>, + pub tx: Sender<ChatPacket>, pub swarm_state: SwarmState, } @@ -49,7 +49,49 @@ pub struct State { pub struct SwarmState { pub chat_queue: Arc<Mutex<VecDeque<ChatPacket>>>, pub chat_min_index: Arc<Mutex<usize>>, - pub rx: Arc<tokio::sync::Mutex<UnboundedReceiver<ChatPacket>>>, + pub rx: Arc<tokio::sync::Mutex<Receiver<ChatPacket>>>, +} + +impl State { + pub fn handle_chat(&self, message: ChatPacket) { + // When a bot receives a chat messages, it looks into the queue to find the + // earliest instance of the message content that's after the bot's chat index. + // If it finds it, then its personal index is simply updated. Otherwise, fire + // the event and add to the queue. + + let mut chat_queue = self.swarm_state.chat_queue.lock(); + let chat_min_index = self.swarm_state.chat_min_index.lock(); + let mut chat_index = self.chat_index.lock(); + + if *chat_min_index > *chat_index { + // if this happens it's because this bot just logged in, so + // ignore it and let another bot handle it + println!("chat_min_index ({chat_min_index}) > chat_index ({chat_index})"); + *chat_index = *chat_min_index; + return; + } + let actual_vec_index = *chat_index - *chat_min_index; + + // go through the queue and find the first message that's after the bot's index + let mut found = false; + for (i, past_message) in chat_queue.iter().enumerate().skip(actual_vec_index) { + if past_message == &message { + // found the message, update the index + *chat_index = i + *chat_min_index + 1; + found = true; + break; + } + } + + if !found { + // didn't find the message, so fire the swarm event and add to the queue + self.tx + .send(message.clone()) + .expect("failed to send chat message to swarm"); + chat_queue.push_back(message); + *chat_index = chat_queue.len() + *chat_min_index; + } + } } #[async_trait] @@ -57,46 +99,17 @@ impl crate::PluginState for State { async fn handle(self: Box<Self>, event: Event, _bot: Client) { // we're allowed to access Plugin::swarm_state since it's shared for every bot if let Event::Chat(m) = event { - // When a bot receives a chat messages, it looks into the queue to find the - // earliest instance of the message content that's after the bot's chat index. - // If it finds it, then its personal index is simply updated. Otherwise, fire - // the event and add to the queue. - - let mut chat_queue = self.swarm_state.chat_queue.lock(); - let chat_min_index = self.swarm_state.chat_min_index.lock(); - let mut farthest_chat_index = self.farthest_chat_index.lock(); - - let actual_vec_index = *farthest_chat_index - *chat_min_index; - - // go through the queue and find the first message that's after the bot's index - let mut found = false; - for (i, msg) in chat_queue.iter().enumerate().skip(actual_vec_index) { - if msg == &m { - // found the message, update the index - *farthest_chat_index = i + *chat_min_index + 1; - found = true; - break; - } - } - - if !found { - // didn't find the message, so fire the swarm event and add to the queue - self.tx - .send(m.clone()) - .expect("failed to send chat message to swarm"); - chat_queue.push_back(m); - *farthest_chat_index = chat_queue.len() - 1 + *chat_min_index; - } + self.handle_chat(m); } } } impl SwarmState { - pub fn new<S>(swarm: Swarm<S>) -> (Self, UnboundedSender<ChatPacket>) + pub fn new<S>(swarm: Swarm<S>) -> (Self, Sender<ChatPacket>) where S: Send + Sync + Clone + 'static, { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = tokio::sync::broadcast::channel(1); let swarm_state = SwarmState { chat_queue: Arc::new(Mutex::new(VecDeque::new())), @@ -114,35 +127,143 @@ impl SwarmState { // it should never be locked unless we reused the same plugin for two swarms // (bad) let mut rx = self.rx.lock().await; - while let Some(m) = rx.recv().await { + while let Ok(m) = rx.recv().await { swarm.swarm_tx.send(SwarmEvent::Chat(m)).unwrap(); + let bot_states = swarm + .bot_datas + .lock() + .iter() + .map(|(bot, _)| { + bot.plugins + .get::<State>() + .expect("Chat plugin not installed") + .clone() + }) + .collect::<Vec<_>>(); + self.handle_new_chat_message(&bot_states); + } + } +} - // To make sure the queue doesn't grow too large, we keep a `chat_min_index` - // in Swarm that's set to the smallest index of all the bots, and we remove all - // messages from the queue that are before that index. - - let chat_min_index = *self.chat_min_index.lock(); - let mut new_chat_min_index = usize::MAX; - for (bot, _) in swarm.bot_datas.lock().iter() { - let this_farthest_chat_index = *bot - .plugins - .get::<State>() - .expect("Chat plugin not installed") - .farthest_chat_index - .lock(); - if this_farthest_chat_index < new_chat_min_index { - new_chat_min_index = this_farthest_chat_index; - } - } +impl SwarmState { + pub fn handle_new_chat_message(&self, bot_states: &[State]) { + // To make sure the queue doesn't grow too large, we keep a `chat_min_index` + // in Swarm that's set to the smallest index of all the bots, and we remove all + // messages from the queue that are before that index. - let mut chat_queue = self.chat_queue.lock(); - // remove all messages from the queue that are before the min index - for _ in 0..(new_chat_min_index - chat_min_index) { - chat_queue.pop_front(); + let chat_min_index = *self.chat_min_index.lock(); + let mut new_chat_min_index = usize::MAX; + for bot_state in bot_states { + let this_chat_index = *bot_state.chat_index.lock(); + if this_chat_index < new_chat_min_index { + new_chat_min_index = this_chat_index; } + } - // update the min index - *self.chat_min_index.lock() = new_chat_min_index; + let mut chat_queue = self.chat_queue.lock(); + if chat_min_index > new_chat_min_index { + println!( + "chat_min_index ({chat_min_index}) > new_chat_min_index ({new_chat_min_index})" + ); + return; + } + // remove all messages from the queue that are before the min index + for _ in 0..(new_chat_min_index - chat_min_index) { + chat_queue.pop_front(); } + + // update the min index + *self.chat_min_index.lock() = new_chat_min_index; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_swarm_chat() { + let (tx, mut rx) = tokio::sync::broadcast::channel(1); + let swarm_state = SwarmState { + chat_queue: Arc::new(Mutex::new(VecDeque::new())), + chat_min_index: Arc::new(Mutex::new(0)), + rx: Arc::new(tokio::sync::Mutex::new(rx)), + }; + let mut bot_states = vec![]; + let bot0 = State { + swarm_state: swarm_state.clone(), + chat_index: Arc::new(Mutex::new(0)), + tx: tx.clone(), + }; + let bot1 = State { + swarm_state: swarm_state.clone(), + chat_index: Arc::new(Mutex::new(0)), + tx: tx.clone(), + }; + bot_states.push(bot0.clone()); + bot_states.push(bot1.clone()); + + bot0.handle_chat(ChatPacket::new("a")); + // the swarm should get the event immediately after the bot gets it + assert_eq!( + swarm_state.rx.lock().await.try_recv(), + Ok(ChatPacket::new("a")) + ); + assert_eq!(*bot0.chat_index.lock(), 1); + // and a second bot sending the event shouldn't do anything + bot1.handle_chat(ChatPacket::new("a")); + assert!(swarm_state.rx.lock().await.try_recv().is_err()); + assert_eq!(*bot1.chat_index.lock(), 1); + + // but if the first one gets it again, it should sent it again + bot0.handle_chat(ChatPacket::new("a")); + assert_eq!( + swarm_state.rx.lock().await.try_recv(), + Ok(ChatPacket::new("a")) + ); + + // alright and now the second bot got a different chat message and it should be + // sent + bot1.handle_chat(ChatPacket::new("b")); + assert_eq!( + swarm_state.rx.lock().await.try_recv(), + Ok(ChatPacket::new("b")) + ); + } + + #[tokio::test] + async fn test_new_bot() { + let (tx, mut rx) = tokio::sync::broadcast::channel(1); + let swarm_state = SwarmState { + chat_queue: Arc::new(Mutex::new(VecDeque::new())), + chat_min_index: Arc::new(Mutex::new(0)), + rx: Arc::new(tokio::sync::Mutex::new(rx)), + }; + let mut bot_states = vec![]; + let bot0 = State { + swarm_state: swarm_state.clone(), + chat_index: Arc::new(Mutex::new(0)), + tx: tx.clone(), + }; + bot_states.push(bot0.clone()); + + // bot0 gets a chat message + bot0.handle_chat(ChatPacket::new("a")); + assert_eq!( + swarm_state.rx.lock().await.try_recv(), + Ok(ChatPacket::new("a")) + ); + // now a second bot joined and got a different chat message + let bot1 = State { + swarm_state: swarm_state.clone(), + chat_index: Arc::new(Mutex::new(0)), + tx: tx.clone(), + }; + bot_states.push(bot1.clone()); + bot1.handle_chat(ChatPacket::new("b")); + assert_eq!( + swarm_state.rx.lock().await.try_recv(), + Ok(ChatPacket::new("b")) + ); } } |
