aboutsummaryrefslogtreecommitdiff
path: root/azalea/src/swarm
diff options
context:
space:
mode:
authormat <github@matdoes.dev>2022-12-11 00:15:37 -0600
committermat <github@matdoes.dev>2022-12-11 00:15:37 -0600
commit37b9f10b3bcc74b48df2c6843a5286a7d41e2414 (patch)
tree6a995f7ad3f3309e57c276e10dc7e655dae9c5bb /azalea/src/swarm
parent2d6737b247b74e964fd734d5ab4828a3193c296f (diff)
downloadazalea-drasl-37b9f10b3bcc74b48df2c6843a5286a7d41e2414.tar.xz
make entities have a reference to WeakWorlds instead
... and other exciting bug fixes
Diffstat (limited to 'azalea/src/swarm')
-rw-r--r--azalea/src/swarm/chat.rs245
1 files changed, 183 insertions, 62 deletions
diff --git a/azalea/src/swarm/chat.rs b/azalea/src/swarm/chat.rs
index 6c51ba33..4582c59e 100644
--- a/azalea/src/swarm/chat.rs
+++ b/azalea/src/swarm/chat.rs
@@ -18,12 +18,12 @@ use async_trait::async_trait;
use azalea_client::{ChatPacket, Client, Event};
use parking_lot::Mutex;
use std::{collections::VecDeque, sync::Arc};
-use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
+use tokio::sync::broadcast::{Receiver, Sender};
#[derive(Clone)]
pub struct Plugin {
pub swarm_state: SwarmState,
- pub tx: UnboundedSender<ChatPacket>,
+ pub tx: Sender<ChatPacket>,
}
impl crate::Plugin for Plugin {
@@ -31,7 +31,7 @@ impl crate::Plugin for Plugin {
fn build(&self) -> State {
State {
- farthest_chat_index: Arc::new(Mutex::new(0)),
+ chat_index: Arc::new(Mutex::new(0)),
swarm_state: self.swarm_state.clone(),
tx: self.tx.clone(),
}
@@ -40,8 +40,8 @@ impl crate::Plugin for Plugin {
#[derive(Clone)]
pub struct State {
- pub farthest_chat_index: Arc<Mutex<usize>>,
- pub tx: UnboundedSender<ChatPacket>,
+ pub chat_index: Arc<Mutex<usize>>,
+ pub tx: Sender<ChatPacket>,
pub swarm_state: SwarmState,
}
@@ -49,7 +49,49 @@ pub struct State {
pub struct SwarmState {
pub chat_queue: Arc<Mutex<VecDeque<ChatPacket>>>,
pub chat_min_index: Arc<Mutex<usize>>,
- pub rx: Arc<tokio::sync::Mutex<UnboundedReceiver<ChatPacket>>>,
+ pub rx: Arc<tokio::sync::Mutex<Receiver<ChatPacket>>>,
+}
+
+impl State {
+ pub fn handle_chat(&self, message: ChatPacket) {
+ // When a bot receives a chat messages, it looks into the queue to find the
+ // earliest instance of the message content that's after the bot's chat index.
+ // If it finds it, then its personal index is simply updated. Otherwise, fire
+ // the event and add to the queue.
+
+ let mut chat_queue = self.swarm_state.chat_queue.lock();
+ let chat_min_index = self.swarm_state.chat_min_index.lock();
+ let mut chat_index = self.chat_index.lock();
+
+ if *chat_min_index > *chat_index {
+ // if this happens it's because this bot just logged in, so
+ // ignore it and let another bot handle it
+ println!("chat_min_index ({chat_min_index}) > chat_index ({chat_index})");
+ *chat_index = *chat_min_index;
+ return;
+ }
+ let actual_vec_index = *chat_index - *chat_min_index;
+
+ // go through the queue and find the first message that's after the bot's index
+ let mut found = false;
+ for (i, past_message) in chat_queue.iter().enumerate().skip(actual_vec_index) {
+ if past_message == &message {
+ // found the message, update the index
+ *chat_index = i + *chat_min_index + 1;
+ found = true;
+ break;
+ }
+ }
+
+ if !found {
+ // didn't find the message, so fire the swarm event and add to the queue
+ self.tx
+ .send(message.clone())
+ .expect("failed to send chat message to swarm");
+ chat_queue.push_back(message);
+ *chat_index = chat_queue.len() + *chat_min_index;
+ }
+ }
}
#[async_trait]
@@ -57,46 +99,17 @@ impl crate::PluginState for State {
async fn handle(self: Box<Self>, event: Event, _bot: Client) {
// we're allowed to access Plugin::swarm_state since it's shared for every bot
if let Event::Chat(m) = event {
- // When a bot receives a chat messages, it looks into the queue to find the
- // earliest instance of the message content that's after the bot's chat index.
- // If it finds it, then its personal index is simply updated. Otherwise, fire
- // the event and add to the queue.
-
- let mut chat_queue = self.swarm_state.chat_queue.lock();
- let chat_min_index = self.swarm_state.chat_min_index.lock();
- let mut farthest_chat_index = self.farthest_chat_index.lock();
-
- let actual_vec_index = *farthest_chat_index - *chat_min_index;
-
- // go through the queue and find the first message that's after the bot's index
- let mut found = false;
- for (i, msg) in chat_queue.iter().enumerate().skip(actual_vec_index) {
- if msg == &m {
- // found the message, update the index
- *farthest_chat_index = i + *chat_min_index + 1;
- found = true;
- break;
- }
- }
-
- if !found {
- // didn't find the message, so fire the swarm event and add to the queue
- self.tx
- .send(m.clone())
- .expect("failed to send chat message to swarm");
- chat_queue.push_back(m);
- *farthest_chat_index = chat_queue.len() - 1 + *chat_min_index;
- }
+ self.handle_chat(m);
}
}
}
impl SwarmState {
- pub fn new<S>(swarm: Swarm<S>) -> (Self, UnboundedSender<ChatPacket>)
+ pub fn new<S>(swarm: Swarm<S>) -> (Self, Sender<ChatPacket>)
where
S: Send + Sync + Clone + 'static,
{
- let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+ let (tx, rx) = tokio::sync::broadcast::channel(1);
let swarm_state = SwarmState {
chat_queue: Arc::new(Mutex::new(VecDeque::new())),
@@ -114,35 +127,143 @@ impl SwarmState {
// it should never be locked unless we reused the same plugin for two swarms
// (bad)
let mut rx = self.rx.lock().await;
- while let Some(m) = rx.recv().await {
+ while let Ok(m) = rx.recv().await {
swarm.swarm_tx.send(SwarmEvent::Chat(m)).unwrap();
+ let bot_states = swarm
+ .bot_datas
+ .lock()
+ .iter()
+ .map(|(bot, _)| {
+ bot.plugins
+ .get::<State>()
+ .expect("Chat plugin not installed")
+ .clone()
+ })
+ .collect::<Vec<_>>();
+ self.handle_new_chat_message(&bot_states);
+ }
+ }
+}
- // To make sure the queue doesn't grow too large, we keep a `chat_min_index`
- // in Swarm that's set to the smallest index of all the bots, and we remove all
- // messages from the queue that are before that index.
-
- let chat_min_index = *self.chat_min_index.lock();
- let mut new_chat_min_index = usize::MAX;
- for (bot, _) in swarm.bot_datas.lock().iter() {
- let this_farthest_chat_index = *bot
- .plugins
- .get::<State>()
- .expect("Chat plugin not installed")
- .farthest_chat_index
- .lock();
- if this_farthest_chat_index < new_chat_min_index {
- new_chat_min_index = this_farthest_chat_index;
- }
- }
+impl SwarmState {
+ pub fn handle_new_chat_message(&self, bot_states: &[State]) {
+ // To make sure the queue doesn't grow too large, we keep a `chat_min_index`
+ // in Swarm that's set to the smallest index of all the bots, and we remove all
+ // messages from the queue that are before that index.
- let mut chat_queue = self.chat_queue.lock();
- // remove all messages from the queue that are before the min index
- for _ in 0..(new_chat_min_index - chat_min_index) {
- chat_queue.pop_front();
+ let chat_min_index = *self.chat_min_index.lock();
+ let mut new_chat_min_index = usize::MAX;
+ for bot_state in bot_states {
+ let this_chat_index = *bot_state.chat_index.lock();
+ if this_chat_index < new_chat_min_index {
+ new_chat_min_index = this_chat_index;
}
+ }
- // update the min index
- *self.chat_min_index.lock() = new_chat_min_index;
+ let mut chat_queue = self.chat_queue.lock();
+ if chat_min_index > new_chat_min_index {
+ println!(
+ "chat_min_index ({chat_min_index}) > new_chat_min_index ({new_chat_min_index})"
+ );
+ return;
+ }
+ // remove all messages from the queue that are before the min index
+ for _ in 0..(new_chat_min_index - chat_min_index) {
+ chat_queue.pop_front();
}
+
+ // update the min index
+ *self.chat_min_index.lock() = new_chat_min_index;
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ async fn test_swarm_chat() {
+ let (tx, mut rx) = tokio::sync::broadcast::channel(1);
+ let swarm_state = SwarmState {
+ chat_queue: Arc::new(Mutex::new(VecDeque::new())),
+ chat_min_index: Arc::new(Mutex::new(0)),
+ rx: Arc::new(tokio::sync::Mutex::new(rx)),
+ };
+ let mut bot_states = vec![];
+ let bot0 = State {
+ swarm_state: swarm_state.clone(),
+ chat_index: Arc::new(Mutex::new(0)),
+ tx: tx.clone(),
+ };
+ let bot1 = State {
+ swarm_state: swarm_state.clone(),
+ chat_index: Arc::new(Mutex::new(0)),
+ tx: tx.clone(),
+ };
+ bot_states.push(bot0.clone());
+ bot_states.push(bot1.clone());
+
+ bot0.handle_chat(ChatPacket::new("a"));
+ // the swarm should get the event immediately after the bot gets it
+ assert_eq!(
+ swarm_state.rx.lock().await.try_recv(),
+ Ok(ChatPacket::new("a"))
+ );
+ assert_eq!(*bot0.chat_index.lock(), 1);
+ // and a second bot sending the event shouldn't do anything
+ bot1.handle_chat(ChatPacket::new("a"));
+ assert!(swarm_state.rx.lock().await.try_recv().is_err());
+ assert_eq!(*bot1.chat_index.lock(), 1);
+
+ // but if the first one gets it again, it should sent it again
+ bot0.handle_chat(ChatPacket::new("a"));
+ assert_eq!(
+ swarm_state.rx.lock().await.try_recv(),
+ Ok(ChatPacket::new("a"))
+ );
+
+ // alright and now the second bot got a different chat message and it should be
+ // sent
+ bot1.handle_chat(ChatPacket::new("b"));
+ assert_eq!(
+ swarm_state.rx.lock().await.try_recv(),
+ Ok(ChatPacket::new("b"))
+ );
+ }
+
+ #[tokio::test]
+ async fn test_new_bot() {
+ let (tx, mut rx) = tokio::sync::broadcast::channel(1);
+ let swarm_state = SwarmState {
+ chat_queue: Arc::new(Mutex::new(VecDeque::new())),
+ chat_min_index: Arc::new(Mutex::new(0)),
+ rx: Arc::new(tokio::sync::Mutex::new(rx)),
+ };
+ let mut bot_states = vec![];
+ let bot0 = State {
+ swarm_state: swarm_state.clone(),
+ chat_index: Arc::new(Mutex::new(0)),
+ tx: tx.clone(),
+ };
+ bot_states.push(bot0.clone());
+
+ // bot0 gets a chat message
+ bot0.handle_chat(ChatPacket::new("a"));
+ assert_eq!(
+ swarm_state.rx.lock().await.try_recv(),
+ Ok(ChatPacket::new("a"))
+ );
+ // now a second bot joined and got a different chat message
+ let bot1 = State {
+ swarm_state: swarm_state.clone(),
+ chat_index: Arc::new(Mutex::new(0)),
+ tx: tx.clone(),
+ };
+ bot_states.push(bot1.clone());
+ bot1.handle_chat(ChatPacket::new("b"));
+ assert_eq!(
+ swarm_state.rx.lock().await.try_recv(),
+ Ok(ChatPacket::new("b"))
+ );
}
}