diff options
| author | mat <github@matdoes.dev> | 2022-12-11 00:15:37 -0600 |
|---|---|---|
| committer | mat <github@matdoes.dev> | 2022-12-11 00:15:37 -0600 |
| commit | 37b9f10b3bcc74b48df2c6843a5286a7d41e2414 (patch) | |
| tree | 6a995f7ad3f3309e57c276e10dc7e655dae9c5bb /azalea/src | |
| parent | 2d6737b247b74e964fd734d5ab4828a3193c296f (diff) | |
| download | azalea-drasl-37b9f10b3bcc74b48df2c6843a5286a7d41e2414.tar.xz | |
make entities have a reference to WeakWorlds instead
... and other exciting bug fixes
Diffstat (limited to 'azalea/src')
| -rw-r--r-- | azalea/src/lib.rs | 1 | ||||
| -rw-r--r-- | azalea/src/pathfinder/mod.rs | 10 | ||||
| -rw-r--r-- | azalea/src/pathfinder/moves.rs | 51 | ||||
| -rw-r--r-- | azalea/src/swarm/chat.rs | 245 |
4 files changed, 221 insertions, 86 deletions
diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs index 93f4825f..49506a17 100644 --- a/azalea/src/lib.rs +++ b/azalea/src/lib.rs @@ -85,6 +85,7 @@ pub mod prelude; mod start; mod swarm; +pub use azalea_block::*; pub use azalea_client::*; pub use azalea_core::{BlockPos, Vec3}; pub use start::{start, Options}; diff --git a/azalea/src/pathfinder/mod.rs b/azalea/src/pathfinder/mod.rs index 8a9d7540..a1619c41 100644 --- a/azalea/src/pathfinder/mod.rs +++ b/azalea/src/pathfinder/mod.rs @@ -6,7 +6,7 @@ use crate::{Client, Event}; use async_trait::async_trait; use azalea_core::{BlockPos, CardinalDirection}; use azalea_world::entity::EntityData; -use log::debug; +use log::{debug, error}; use mtdstarlite::Edge; pub use mtdstarlite::MTDStarLite; use parking_lot::Mutex; @@ -80,7 +80,7 @@ impl Trait for azalea_client::Client { let successors = |node: &Node| { let mut edges = Vec::new(); - let world = self.world.read(); + let world = &self.world.read().shared; for possible_move in possible_moves.iter() { edges.push(Edge { target: possible_move.next_node(node), @@ -111,7 +111,11 @@ impl Trait for azalea_client::Client { .expect("Pathfinder plugin not installed!") .clone(); // convert the Option<Vec<Node>> to a VecDeque<Node> - *state.path.lock() = p.expect("no path").into_iter().collect(); + if let Some(p) = p { + *state.path.lock() = p.into_iter().collect(); + } else { + error!("no path found"); + } } } diff --git a/azalea/src/pathfinder/moves.rs b/azalea/src/pathfinder/moves.rs index ac2137d3..ccf8ba1a 100644 --- a/azalea/src/pathfinder/moves.rs +++ b/azalea/src/pathfinder/moves.rs @@ -1,10 +1,10 @@ use super::{Node, VerticalVel}; use azalea_core::{BlockPos, CardinalDirection}; use azalea_physics::collision::{self, BlockWithShape}; -use azalea_world::World; +use azalea_world::WeakWorld; /// whether this block is passable -fn is_block_passable(pos: &BlockPos, world: &World) -> bool { +fn is_block_passable(pos: &BlockPos, world: &WeakWorld) -> bool { if let Some(block) = world.get_block_state(pos) { block.shape() == &collision::empty_shape() } else { @@ -13,7 +13,7 @@ fn is_block_passable(pos: &BlockPos, world: &World) -> bool { } /// whether this block has a solid hitbox (i.e. we can stand on it) -fn is_block_solid(pos: &BlockPos, world: &World) -> bool { +fn is_block_solid(pos: &BlockPos, world: &WeakWorld) -> bool { if let Some(block) = world.get_block_state(pos) { block.shape() == &collision::block_shape() } else { @@ -22,14 +22,14 @@ fn is_block_solid(pos: &BlockPos, world: &World) -> bool { } /// Whether this block and the block above are passable -fn is_passable(pos: &BlockPos, world: &World) -> bool { +fn is_passable(pos: &BlockPos, world: &WeakWorld) -> bool { is_block_passable(pos, world) && is_block_passable(&pos.up(1), world) } /// Whether we can stand in this position. Checks if the block below is solid, /// and that the two blocks above that are passable. -fn is_standable(pos: &BlockPos, world: &World) -> bool { +fn is_standable(pos: &BlockPos, world: &WeakWorld) -> bool { is_block_solid(&pos.down(1), world) && is_passable(pos, world) } @@ -37,7 +37,7 @@ const JUMP_COST: f32 = 0.5; const WALK_ONE_BLOCK_COST: f32 = 1.0; pub trait Move { - fn cost(&self, world: &World, node: &Node) -> f32; + fn cost(&self, world: &WeakWorld, node: &Node) -> f32; /// Returns by how much the entity's position should be changed when this /// move is executed. fn offset(&self) -> BlockPos; @@ -51,7 +51,7 @@ pub trait Move { pub struct ForwardMove(pub CardinalDirection); impl Move for ForwardMove { - fn cost(&self, world: &World, node: &Node) -> f32 { + fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { if is_standable(&(node.pos + self.offset()), world) && node.vertical_vel == VerticalVel::None { @@ -67,7 +67,7 @@ impl Move for ForwardMove { pub struct AscendMove(pub CardinalDirection); impl Move for AscendMove { - fn cost(&self, world: &World, node: &Node) -> f32 { + fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { if node.vertical_vel == VerticalVel::None && is_block_passable(&node.pos.up(2), world) && is_standable(&(node.pos + self.offset()), world) @@ -89,7 +89,7 @@ impl Move for AscendMove { } pub struct DescendMove(pub CardinalDirection); impl Move for DescendMove { - fn cost(&self, world: &World, node: &Node) -> f32 { + fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { // check whether 3 blocks vertically forward are passable if node.vertical_vel == VerticalVel::None && is_standable(&(node.pos + self.offset()), world) @@ -112,7 +112,7 @@ impl Move for DescendMove { } pub struct DiagonalMove(pub CardinalDirection); impl Move for DiagonalMove { - fn cost(&self, world: &World, node: &Node) -> f32 { + fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { if node.vertical_vel != VerticalVel::None { return f32::INFINITY; } @@ -151,37 +151,46 @@ mod tests { use super::*; use azalea_block::BlockState; use azalea_core::ChunkPos; - use azalea_world::Chunk; + use azalea_world::{Chunk, PartialWorld}; #[test] fn test_is_passable() { - let mut world = World::default(); + let mut world = PartialWorld::default(); world .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default())) .unwrap(); world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone); world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air); - assert_eq!(is_block_passable(&BlockPos::new(0, 0, 0), &world), false); - assert_eq!(is_block_passable(&BlockPos::new(0, 1, 0), &world), true); + assert_eq!( + is_block_passable(&BlockPos::new(0, 0, 0), &world.shared), + false + ); + assert_eq!( + is_block_passable(&BlockPos::new(0, 1, 0), &world.shared), + true + ); } #[test] fn test_is_solid() { - let mut world = World::default(); + let mut world = PartialWorld::default(); world .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default())) .unwrap(); world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone); world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air); - assert_eq!(is_block_solid(&BlockPos::new(0, 0, 0), &world), true); - assert_eq!(is_block_solid(&BlockPos::new(0, 1, 0), &world), false); + assert_eq!(is_block_solid(&BlockPos::new(0, 0, 0), &world.shared), true); + assert_eq!( + is_block_solid(&BlockPos::new(0, 1, 0), &world.shared), + false + ); } #[test] fn test_is_standable() { - let mut world = World::default(); + let mut world = PartialWorld::default(); world .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default())) .unwrap(); @@ -190,8 +199,8 @@ mod tests { world.set_block_state(&BlockPos::new(0, 2, 0), BlockState::Air); world.set_block_state(&BlockPos::new(0, 3, 0), BlockState::Air); - assert!(is_standable(&BlockPos::new(0, 1, 0), &world)); - assert!(!is_standable(&BlockPos::new(0, 0, 0), &world)); - assert!(!is_standable(&BlockPos::new(0, 2, 0), &world)); + assert!(is_standable(&BlockPos::new(0, 1, 0), &world.shared)); + assert!(!is_standable(&BlockPos::new(0, 0, 0), &world.shared)); + assert!(!is_standable(&BlockPos::new(0, 2, 0), &world.shared)); } } diff --git a/azalea/src/swarm/chat.rs b/azalea/src/swarm/chat.rs index 6c51ba33..4582c59e 100644 --- a/azalea/src/swarm/chat.rs +++ b/azalea/src/swarm/chat.rs @@ -18,12 +18,12 @@ use async_trait::async_trait; use azalea_client::{ChatPacket, Client, Event}; use parking_lot::Mutex; use std::{collections::VecDeque, sync::Arc}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::broadcast::{Receiver, Sender}; #[derive(Clone)] pub struct Plugin { pub swarm_state: SwarmState, - pub tx: UnboundedSender<ChatPacket>, + pub tx: Sender<ChatPacket>, } impl crate::Plugin for Plugin { @@ -31,7 +31,7 @@ impl crate::Plugin for Plugin { fn build(&self) -> State { State { - farthest_chat_index: Arc::new(Mutex::new(0)), + chat_index: Arc::new(Mutex::new(0)), swarm_state: self.swarm_state.clone(), tx: self.tx.clone(), } @@ -40,8 +40,8 @@ impl crate::Plugin for Plugin { #[derive(Clone)] pub struct State { - pub farthest_chat_index: Arc<Mutex<usize>>, - pub tx: UnboundedSender<ChatPacket>, + pub chat_index: Arc<Mutex<usize>>, + pub tx: Sender<ChatPacket>, pub swarm_state: SwarmState, } @@ -49,7 +49,49 @@ pub struct State { pub struct SwarmState { pub chat_queue: Arc<Mutex<VecDeque<ChatPacket>>>, pub chat_min_index: Arc<Mutex<usize>>, - pub rx: Arc<tokio::sync::Mutex<UnboundedReceiver<ChatPacket>>>, + pub rx: Arc<tokio::sync::Mutex<Receiver<ChatPacket>>>, +} + +impl State { + pub fn handle_chat(&self, message: ChatPacket) { + // When a bot receives a chat messages, it looks into the queue to find the + // earliest instance of the message content that's after the bot's chat index. + // If it finds it, then its personal index is simply updated. Otherwise, fire + // the event and add to the queue. + + let mut chat_queue = self.swarm_state.chat_queue.lock(); + let chat_min_index = self.swarm_state.chat_min_index.lock(); + let mut chat_index = self.chat_index.lock(); + + if *chat_min_index > *chat_index { + // if this happens it's because this bot just logged in, so + // ignore it and let another bot handle it + println!("chat_min_index ({chat_min_index}) > chat_index ({chat_index})"); + *chat_index = *chat_min_index; + return; + } + let actual_vec_index = *chat_index - *chat_min_index; + + // go through the queue and find the first message that's after the bot's index + let mut found = false; + for (i, past_message) in chat_queue.iter().enumerate().skip(actual_vec_index) { + if past_message == &message { + // found the message, update the index + *chat_index = i + *chat_min_index + 1; + found = true; + break; + } + } + + if !found { + // didn't find the message, so fire the swarm event and add to the queue + self.tx + .send(message.clone()) + .expect("failed to send chat message to swarm"); + chat_queue.push_back(message); + *chat_index = chat_queue.len() + *chat_min_index; + } + } } #[async_trait] @@ -57,46 +99,17 @@ impl crate::PluginState for State { async fn handle(self: Box<Self>, event: Event, _bot: Client) { // we're allowed to access Plugin::swarm_state since it's shared for every bot if let Event::Chat(m) = event { - // When a bot receives a chat messages, it looks into the queue to find the - // earliest instance of the message content that's after the bot's chat index. - // If it finds it, then its personal index is simply updated. Otherwise, fire - // the event and add to the queue. - - let mut chat_queue = self.swarm_state.chat_queue.lock(); - let chat_min_index = self.swarm_state.chat_min_index.lock(); - let mut farthest_chat_index = self.farthest_chat_index.lock(); - - let actual_vec_index = *farthest_chat_index - *chat_min_index; - - // go through the queue and find the first message that's after the bot's index - let mut found = false; - for (i, msg) in chat_queue.iter().enumerate().skip(actual_vec_index) { - if msg == &m { - // found the message, update the index - *farthest_chat_index = i + *chat_min_index + 1; - found = true; - break; - } - } - - if !found { - // didn't find the message, so fire the swarm event and add to the queue - self.tx - .send(m.clone()) - .expect("failed to send chat message to swarm"); - chat_queue.push_back(m); - *farthest_chat_index = chat_queue.len() - 1 + *chat_min_index; - } + self.handle_chat(m); } } } impl SwarmState { - pub fn new<S>(swarm: Swarm<S>) -> (Self, UnboundedSender<ChatPacket>) + pub fn new<S>(swarm: Swarm<S>) -> (Self, Sender<ChatPacket>) where S: Send + Sync + Clone + 'static, { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = tokio::sync::broadcast::channel(1); let swarm_state = SwarmState { chat_queue: Arc::new(Mutex::new(VecDeque::new())), @@ -114,35 +127,143 @@ impl SwarmState { // it should never be locked unless we reused the same plugin for two swarms // (bad) let mut rx = self.rx.lock().await; - while let Some(m) = rx.recv().await { + while let Ok(m) = rx.recv().await { swarm.swarm_tx.send(SwarmEvent::Chat(m)).unwrap(); + let bot_states = swarm + .bot_datas + .lock() + .iter() + .map(|(bot, _)| { + bot.plugins + .get::<State>() + .expect("Chat plugin not installed") + .clone() + }) + .collect::<Vec<_>>(); + self.handle_new_chat_message(&bot_states); + } + } +} - // To make sure the queue doesn't grow too large, we keep a `chat_min_index` - // in Swarm that's set to the smallest index of all the bots, and we remove all - // messages from the queue that are before that index. - - let chat_min_index = *self.chat_min_index.lock(); - let mut new_chat_min_index = usize::MAX; - for (bot, _) in swarm.bot_datas.lock().iter() { - let this_farthest_chat_index = *bot - .plugins - .get::<State>() - .expect("Chat plugin not installed") - .farthest_chat_index - .lock(); - if this_farthest_chat_index < new_chat_min_index { - new_chat_min_index = this_farthest_chat_index; - } - } +impl SwarmState { + pub fn handle_new_chat_message(&self, bot_states: &[State]) { + // To make sure the queue doesn't grow too large, we keep a `chat_min_index` + // in Swarm that's set to the smallest index of all the bots, and we remove all + // messages from the queue that are before that index. - let mut chat_queue = self.chat_queue.lock(); - // remove all messages from the queue that are before the min index - for _ in 0..(new_chat_min_index - chat_min_index) { - chat_queue.pop_front(); + let chat_min_index = *self.chat_min_index.lock(); + let mut new_chat_min_index = usize::MAX; + for bot_state in bot_states { + let this_chat_index = *bot_state.chat_index.lock(); + if this_chat_index < new_chat_min_index { + new_chat_min_index = this_chat_index; } + } - // update the min index - *self.chat_min_index.lock() = new_chat_min_index; + let mut chat_queue = self.chat_queue.lock(); + if chat_min_index > new_chat_min_index { + println!( + "chat_min_index ({chat_min_index}) > new_chat_min_index ({new_chat_min_index})" + ); + return; + } + // remove all messages from the queue that are before the min index + for _ in 0..(new_chat_min_index - chat_min_index) { + chat_queue.pop_front(); } + + // update the min index + *self.chat_min_index.lock() = new_chat_min_index; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_swarm_chat() { + let (tx, mut rx) = tokio::sync::broadcast::channel(1); + let swarm_state = SwarmState { + chat_queue: Arc::new(Mutex::new(VecDeque::new())), + chat_min_index: Arc::new(Mutex::new(0)), + rx: Arc::new(tokio::sync::Mutex::new(rx)), + }; + let mut bot_states = vec![]; + let bot0 = State { + swarm_state: swarm_state.clone(), + chat_index: Arc::new(Mutex::new(0)), + tx: tx.clone(), + }; + let bot1 = State { + swarm_state: swarm_state.clone(), + chat_index: Arc::new(Mutex::new(0)), + tx: tx.clone(), + }; + bot_states.push(bot0.clone()); + bot_states.push(bot1.clone()); + + bot0.handle_chat(ChatPacket::new("a")); + // the swarm should get the event immediately after the bot gets it + assert_eq!( + swarm_state.rx.lock().await.try_recv(), + Ok(ChatPacket::new("a")) + ); + assert_eq!(*bot0.chat_index.lock(), 1); + // and a second bot sending the event shouldn't do anything + bot1.handle_chat(ChatPacket::new("a")); + assert!(swarm_state.rx.lock().await.try_recv().is_err()); + assert_eq!(*bot1.chat_index.lock(), 1); + + // but if the first one gets it again, it should sent it again + bot0.handle_chat(ChatPacket::new("a")); + assert_eq!( + swarm_state.rx.lock().await.try_recv(), + Ok(ChatPacket::new("a")) + ); + + // alright and now the second bot got a different chat message and it should be + // sent + bot1.handle_chat(ChatPacket::new("b")); + assert_eq!( + swarm_state.rx.lock().await.try_recv(), + Ok(ChatPacket::new("b")) + ); + } + + #[tokio::test] + async fn test_new_bot() { + let (tx, mut rx) = tokio::sync::broadcast::channel(1); + let swarm_state = SwarmState { + chat_queue: Arc::new(Mutex::new(VecDeque::new())), + chat_min_index: Arc::new(Mutex::new(0)), + rx: Arc::new(tokio::sync::Mutex::new(rx)), + }; + let mut bot_states = vec![]; + let bot0 = State { + swarm_state: swarm_state.clone(), + chat_index: Arc::new(Mutex::new(0)), + tx: tx.clone(), + }; + bot_states.push(bot0.clone()); + + // bot0 gets a chat message + bot0.handle_chat(ChatPacket::new("a")); + assert_eq!( + swarm_state.rx.lock().await.try_recv(), + Ok(ChatPacket::new("a")) + ); + // now a second bot joined and got a different chat message + let bot1 = State { + swarm_state: swarm_state.clone(), + chat_index: Arc::new(Mutex::new(0)), + tx: tx.clone(), + }; + bot_states.push(bot1.clone()); + bot1.handle_chat(ChatPacket::new("b")); + assert_eq!( + swarm_state.rx.lock().await.try_recv(), + Ok(ChatPacket::new("b")) + ); } } |
