diff options
Diffstat (limited to 'azalea/src')
| -rw-r--r-- | azalea/src/bot.rs | 131 | ||||
| -rw-r--r-- | azalea/src/lib.rs | 209 | ||||
| -rw-r--r-- | azalea/src/pathfinder/mod.rs | 341 | ||||
| -rw-r--r-- | azalea/src/pathfinder/moves.rs | 136 | ||||
| -rw-r--r-- | azalea/src/pathfinder/mtdstarlite.rs | 8 | ||||
| -rw-r--r-- | azalea/src/prelude.rs | 6 | ||||
| -rw-r--r-- | azalea/src/start.rs | 136 | ||||
| -rw-r--r-- | azalea/src/swarm/chat.rs | 344 | ||||
| -rw-r--r-- | azalea/src/swarm/events.rs | 45 | ||||
| -rw-r--r-- | azalea/src/swarm/mod.rs | 614 | ||||
| -rw-r--r-- | azalea/src/swarm/plugins.rs | 135 |
11 files changed, 1105 insertions, 1000 deletions
diff --git a/azalea/src/bot.rs b/azalea/src/bot.rs index 0674c692..510449d3 100644 --- a/azalea/src/bot.rs +++ b/azalea/src/bot.rs @@ -1,56 +1,115 @@ -use crate::{Client, Event}; -use async_trait::async_trait; use azalea_core::Vec3; -use parking_lot::Mutex; -use std::{f64::consts::PI, sync::Arc}; +use azalea_ecs::{ + app::{App, Plugin, PluginGroup, PluginGroupBuilder}, + component::Component, + entity::Entity, + event::EventReader, + query::{With, Without}, + schedule::IntoSystemDescriptor, + system::{Commands, Query}, + AppTickExt, +}; +use azalea_world::{ + entity::{metadata::Player, set_rotation, Jumping, Physics, Position}, + Local, +}; +use std::f64::consts::PI; -#[derive(Clone, Default)] -pub struct Plugin; -impl crate::Plugin for Plugin { - type State = State; +use crate::pathfinder::PathfinderPlugin; - fn build(&self) -> State { - State::default() +#[derive(Clone, Default)] +pub struct BotPlugin; +impl Plugin for BotPlugin { + fn build(&self, app: &mut App) { + app.add_event::<LookAtEvent>() + .add_event::<JumpEvent>() + .add_system(insert_bot.before("deduplicate_entities")) + .add_system(look_at_listener) + .add_system(jump_listener.label("jump_listener").before("ai_step")) + .add_tick_system(stop_jumping.after("ai_step")); } } -#[derive(Default, Clone)] -pub struct State { - jumping_once: Arc<Mutex<bool>>, +/// Component for all bots. +#[derive(Default, Component)] +pub struct Bot { + jumping_once: bool, +} + +/// Insert the [`Bot`] component for any local players that don't have it. +#[allow(clippy::type_complexity)] +fn insert_bot( + mut commands: Commands, + mut query: Query<Entity, (Without<Bot>, With<Local>, With<Player>)>, +) { + for entity in &mut query { + commands.entity(entity).insert(Bot::default()); + } } -#[async_trait] -impl crate::PluginState for State { - async fn handle(self: Box<Self>, event: Event, mut bot: Client) { - if let Event::Tick = event { - if *self.jumping_once.lock() && bot.jumping() { - *self.jumping_once.lock() = false; - bot.set_jumping(false); - } +fn stop_jumping(mut query: Query<(&mut Jumping, &mut Bot)>) { + for (mut jumping, mut bot) in &mut query { + if bot.jumping_once && **jumping { + bot.jumping_once = false; + **jumping = false; } } } -pub trait BotTrait { +pub trait BotClientExt { fn jump(&mut self); - fn look_at(&mut self, pos: &Vec3); + fn look_at(&mut self, pos: Vec3); } -impl BotTrait for azalea_client::Client { +impl BotClientExt for azalea_client::Client { /// Queue a jump for the next tick. fn jump(&mut self) { - self.set_jumping(true); - let state = self.plugins.get::<State>().unwrap().clone(); - *state.jumping_once.lock() = true; + let mut ecs = self.ecs.lock(); + ecs.send_event(JumpEvent(self.entity)); } /// Turn the bot's head to look at the coordinate in the world. - fn look_at(&mut self, pos: &Vec3) { - let (y_rot, x_rot) = direction_looking_at(self.entity().pos(), pos); - self.set_rotation(y_rot, x_rot); + fn look_at(&mut self, position: Vec3) { + let mut ecs = self.ecs.lock(); + ecs.send_event(LookAtEvent { + entity: self.entity, + position, + }); + } +} + +/// Event to jump once. +pub struct JumpEvent(pub Entity); + +fn jump_listener(mut query: Query<(&mut Jumping, &mut Bot)>, mut events: EventReader<JumpEvent>) { + for event in events.iter() { + if let Ok((mut jumping, mut bot)) = query.get_mut(event.0) { + **jumping = true; + bot.jumping_once = true; + } } } +/// Make an entity look towards a certain position in the world. +pub struct LookAtEvent { + pub entity: Entity, + /// The position we want the entity to be looking at. + pub position: Vec3, +} +fn look_at_listener( + mut events: EventReader<LookAtEvent>, + mut query: Query<(&Position, &mut Physics)>, +) { + for event in events.iter() { + if let Ok((position, mut physics)) = query.get_mut(event.entity) { + let (y_rot, x_rot) = direction_looking_at(position, &event.position); + set_rotation(&mut physics, y_rot, x_rot); + } + } +} + +/// Return the (`y_rot`, `x_rot`) that would make a client at `current` be +/// looking at `target`. fn direction_looking_at(current: &Vec3, target: &Vec3) -> (f32, f32) { // borrowed from mineflayer's Bot.lookAt because i didn't want to do math let delta = target - current; @@ -59,3 +118,15 @@ fn direction_looking_at(current: &Vec3, target: &Vec3) -> (f32, f32) { let x_rot = f64::atan2(delta.y, ground_distance) * -(180.0 / PI); (y_rot as f32, x_rot as f32) } + +/// A [`PluginGroup`] for the plugins that add extra bot functionality to the +/// client. +pub struct DefaultBotPlugins; + +impl PluginGroup for DefaultBotPlugins { + fn build(self) -> PluginGroupBuilder { + PluginGroupBuilder::start::<Self>() + .add(BotPlugin) + .add(PathfinderPlugin) + } +} diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs index 4ceb5b2e..026a35f5 100644 --- a/azalea/src/lib.rs +++ b/azalea/src/lib.rs @@ -1,94 +1,139 @@ -//! Azalea is a framework for creating Minecraft bots. -//! -//! Internally, it's just a wrapper over [`azalea_client`], adding useful -//! functions for making bots. Because of this, lots of the documentation will -//! refer to `azalea_client`. You can just replace these with `azalea` in your -//! code, since everything from azalea_client is re-exported in azalea. -//! -//! # Installation -//! -//! First, install Rust nightly with `rustup install nightly` and `rustup -//! default nightly`. -//! -//! Then, add one of the following lines to your Cargo.toml: -//! -//! Latest bleeding-edge version: -//! `azalea = { git="https://github.com/mat-1/azalea" }`\ -//! Latest "stable" release: -//! `azalea = "0.5.0"` -//! -//! ## Optimization -//! -//! For faster compile times, make a `.cargo/config.toml` file in your project -//! and copy -//! [this file](https://github.com/mat-1/azalea/blob/main/.cargo/config.toml) -//! into it. -//! -//! For faster performance in debug mode, add -//! ```toml -//! [profile.dev] -//! opt-level = 1 -//! [profile.dev.package."*"] -//! opt-level = 3 -//! ``` -//! to your Cargo.toml. You may have to install the LLD linker. -//! -//! # Examples -//! -//! ```rust,no_run -//! //! A bot that logs chat messages sent in the server to the console. -//! -//! use azalea::prelude::*; -//! use parking_lot::Mutex; -//! use std::sync::Arc; -//! -//! #[tokio::main] -//! async fn main() { -//! let account = Account::offline("bot"); -//! // or Account::microsoft("example@example.com").await.unwrap(); -//! -//! azalea::start(azalea::Options { -//! account, -//! address: "localhost", -//! state: State::default(), -//! plugins: plugins![], -//! handle, -//! }) -//! .await -//! .unwrap(); -//! } -//! -//! #[derive(Default, Clone)] -//! pub struct State {} -//! -//! async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { -//! match event { -//! Event::Chat(m) => { -//! println!("{}", m.message().to_ansi()); -//! } -//! _ => {} -//! } -//! -//! Ok(()) -//! } -//! ``` -//! -//! [`azalea_client`]: https://docs.rs/azalea-client - -#![feature(trait_upcasting)] +#![doc = include_str!("../README.md")] #![feature(async_closure)] -#![allow(incomplete_features)] mod bot; pub mod pathfinder; pub mod prelude; -mod start; mod swarm; -pub use azalea_block::*; +pub use azalea_block as blocks; pub use azalea_client::*; pub use azalea_core::{BlockPos, Vec3}; -pub use start::{start, Options}; +use azalea_ecs::{ + app::{App, Plugin}, + component::Component, +}; +pub use azalea_protocol as protocol; +pub use azalea_registry::EntityKind; +pub use azalea_world::{entity, World}; +use bot::DefaultBotPlugins; +use ecs::app::PluginGroup; +use futures::Future; +use protocol::{ + resolver::{self, ResolverError}, + ServerAddress, +}; pub use swarm::*; +use thiserror::Error; +use tokio::sync::mpsc; pub type HandleFn<Fut, S> = fn(Client, Event, S) -> Fut; + +#[derive(Error, Debug)] +pub enum StartError { + #[error("Invalid address")] + InvalidAddress, + #[error(transparent)] + ResolveAddress(#[from] ResolverError), + #[error("Join error: {0}")] + Join(#[from] azalea_client::JoinError), +} + +pub struct ClientBuilder<S, Fut> +where + S: Default + Send + Sync + Clone + 'static, + Fut: Future<Output = Result<(), anyhow::Error>>, +{ + app: App, + /// The function that's called every time a bot receives an [`Event`]. + handler: Option<HandleFn<Fut, S>>, + state: S, +} +impl<S, Fut> ClientBuilder<S, Fut> +where + S: Default + Send + Sync + Clone + Component + 'static, + Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, +{ + /// Start building a client that can join the world. + #[must_use] + pub fn new() -> Self { + Self { + // we create the app here so plugins can add onto it. + // the schedules won't run until [`Self::start`] is called. + app: init_ecs_app(), + + handler: None, + state: S::default(), + } + .add_plugins(DefaultBotPlugins) + } + /// Set the function that's called every time a bot receives an [`Event`]. + /// This is the way to handle normal per-bot events. + /// + /// You can only have one client handler, calling this again will replace + /// the old client handler function (you can have a client handler and swarm + /// handler separately though). + #[must_use] + pub fn set_handler(mut self, handler: HandleFn<Fut, S>) -> Self { + self.handler = Some(handler); + self + } + /// Add a plugin to the client. + #[must_use] + pub fn add_plugin<T: Plugin>(mut self, plugin: T) -> Self { + self.app.add_plugin(plugin); + self + } + /// Add a group of plugins to the client. + #[must_use] + pub fn add_plugins<T: PluginGroup>(mut self, plugin_group: T) -> Self { + self.app.add_plugins(plugin_group); + self + } + + /// Build this `ClientBuilder` into an actual [`Client`] and join the given + /// server. + /// + /// The `address` argument can be a `&str`, [`ServerAddress`], or anything + /// that implements `TryInto<ServerAddress>`. + /// + /// [`ServerAddress`]: azalea_protocol::ServerAddress + pub async fn start( + self, + account: Account, + address: impl TryInto<ServerAddress>, + ) -> Result<(), StartError> { + let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?; + let resolved_address = resolver::resolve_address(&address).await?; + + // An event that causes the schedule to run. This is only used internally. + let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1); + let ecs_lock = start_ecs(self.app, run_schedule_receiver, run_schedule_sender.clone()); + + let (bot, mut rx) = Client::start_client( + ecs_lock, + &account, + &address, + &resolved_address, + run_schedule_sender, + ) + .await?; + + while let Some(event) = rx.recv().await { + if let Some(handler) = self.handler { + tokio::spawn((handler)(bot.clone(), event.clone(), self.state.clone())); + } + } + + Ok(()) + } +} +impl<S, Fut> Default for ClientBuilder<S, Fut> +where + S: Default + Send + Sync + Clone + Component + 'static, + Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/azalea/src/pathfinder/mod.rs b/azalea/src/pathfinder/mod.rs index de62e9a7..5246290d 100644 --- a/azalea/src/pathfinder/mod.rs +++ b/azalea/src/pathfinder/mod.rs @@ -1,147 +1,255 @@ mod moves; mod mtdstarlite; -use crate::{prelude::*, SprintDirection, WalkDirection}; -use crate::{Client, Event}; -use async_trait::async_trait; +use crate::bot::{JumpEvent, LookAtEvent}; +use crate::{SprintDirection, WalkDirection}; + +use azalea_client::{StartSprintEvent, StartWalkEvent}; use azalea_core::{BlockPos, CardinalDirection}; -use azalea_world::entity::EntityData; +use azalea_ecs::{ + app::{App, Plugin}, + component::Component, + entity::Entity, + event::{EventReader, EventWriter}, + query::{With, Without}, + schedule::IntoSystemDescriptor, + system::{Commands, Query, Res}, + AppTickExt, +}; +use azalea_world::entity::metadata::Player; +use azalea_world::Local; +use azalea_world::{ + entity::{Physics, Position, WorldName}, + WorldContainer, +}; +use bevy_tasks::{AsyncComputeTaskPool, Task}; +use futures_lite::future; use log::{debug, error}; use mtdstarlite::Edge; pub use mtdstarlite::MTDStarLite; -use parking_lot::Mutex; use std::collections::VecDeque; use std::sync::Arc; #[derive(Clone, Default)] -pub struct Plugin; -impl crate::Plugin for Plugin { - type State = State; - - fn build(&self) -> State { - State::default() +pub struct PathfinderPlugin; +impl Plugin for PathfinderPlugin { + fn build(&self, app: &mut App) { + app.add_event::<GotoEvent>() + .add_event::<PathFoundEvent>() + .add_tick_system(tick_execute_path.before("walk_listener")) + .add_system(goto_listener) + .add_system(add_default_pathfinder.after("deduplicate_entities")) + .add_system(handle_tasks) + .add_system(path_found_listener); } } -#[derive(Default, Clone)] -pub struct State { - // pathfinder: Option<MTDStarLite<Node, f32>>, - pub path: Arc<Mutex<VecDeque<Node>>>, +/// A component that makes this entity able to pathfind. +#[derive(Component, Default)] +pub struct Pathfinder { + pub path: VecDeque<Node>, +} +#[allow(clippy::type_complexity)] +fn add_default_pathfinder( + mut commands: Commands, + mut query: Query<Entity, (Without<Pathfinder>, With<Local>, With<Player>)>, +) { + for entity in &mut query { + commands.entity(entity).insert(Pathfinder::default()); + } } -#[async_trait] -impl crate::PluginState for State { - async fn handle(self: Box<Self>, event: Event, mut bot: Client) { - if let Event::Tick = event { - let mut path = self.path.lock(); +pub trait PathfinderClientExt { + fn goto(&self, goal: impl Goal + Send + Sync + 'static); +} - if !path.is_empty() { - tick_execute_path(&mut bot, &mut path); - } - } +impl PathfinderClientExt for azalea_client::Client { + fn goto(&self, goal: impl Goal + Send + Sync + 'static) { + self.ecs.lock().send_event(GotoEvent { + entity: self.entity, + goal: Arc::new(goal), + }); } } - -pub trait Trait { - fn goto(&self, goal: impl Goal); +pub struct GotoEvent { + pub entity: Entity, + pub goal: Arc<dyn Goal + Send + Sync>, } +pub struct PathFoundEvent { + pub entity: Entity, + pub path: VecDeque<Node>, +} + +#[derive(Component)] +pub struct ComputePath(Task<Option<PathFoundEvent>>); -impl Trait for azalea_client::Client { - fn goto(&self, goal: impl Goal) { +fn goto_listener( + mut commands: Commands, + mut events: EventReader<GotoEvent>, + mut query: Query<(&Position, &WorldName)>, + world_container: Res<WorldContainer>, +) { + let thread_pool = AsyncComputeTaskPool::get(); + + for event in events.iter() { + let (position, world_name) = query + .get_mut(event.entity) + .expect("Called goto on an entity that's not in the world"); let start = Node { - pos: BlockPos::from(self.entity().pos()), + pos: BlockPos::from(position), vertical_vel: VerticalVel::None, }; - let end = goal.goal_node(); - debug!("start: {start:?}, end: {end:?}"); - - let possible_moves: Vec<&dyn moves::Move> = vec![ - &moves::ForwardMove(CardinalDirection::North), - &moves::ForwardMove(CardinalDirection::East), - &moves::ForwardMove(CardinalDirection::South), - &moves::ForwardMove(CardinalDirection::West), - // - &moves::AscendMove(CardinalDirection::North), - &moves::AscendMove(CardinalDirection::East), - &moves::AscendMove(CardinalDirection::South), - &moves::AscendMove(CardinalDirection::West), - // - &moves::DescendMove(CardinalDirection::North), - &moves::DescendMove(CardinalDirection::East), - &moves::DescendMove(CardinalDirection::South), - &moves::DescendMove(CardinalDirection::West), - // - &moves::DiagonalMove(CardinalDirection::North), - &moves::DiagonalMove(CardinalDirection::East), - &moves::DiagonalMove(CardinalDirection::South), - &moves::DiagonalMove(CardinalDirection::West), - ]; - - let successors = |node: &Node| { - let mut edges = Vec::new(); - - let world = &self.world.read().shared; - for possible_move in possible_moves.iter() { - edges.push(Edge { - target: possible_move.next_node(node), - cost: possible_move.cost(world, node), - }); + + let world_lock = world_container + .get(world_name) + .expect("Entity tried to pathfind but the entity isn't in a valid world"); + let end = event.goal.goal_node(); + + let goal = event.goal.clone(); + let entity = event.entity; + + let task = thread_pool.spawn(async move { + debug!("start: {start:?}, end: {end:?}"); + + let possible_moves: Vec<&dyn moves::Move> = vec![ + &moves::ForwardMove(CardinalDirection::North), + &moves::ForwardMove(CardinalDirection::East), + &moves::ForwardMove(CardinalDirection::South), + &moves::ForwardMove(CardinalDirection::West), + // + &moves::AscendMove(CardinalDirection::North), + &moves::AscendMove(CardinalDirection::East), + &moves::AscendMove(CardinalDirection::South), + &moves::AscendMove(CardinalDirection::West), + // + &moves::DescendMove(CardinalDirection::North), + &moves::DescendMove(CardinalDirection::East), + &moves::DescendMove(CardinalDirection::South), + &moves::DescendMove(CardinalDirection::West), + // + &moves::DiagonalMove(CardinalDirection::North), + &moves::DiagonalMove(CardinalDirection::East), + &moves::DiagonalMove(CardinalDirection::South), + &moves::DiagonalMove(CardinalDirection::West), + ]; + + let successors = |node: &Node| { + let mut edges = Vec::new(); + + let world = world_lock.read(); + for possible_move in &possible_moves { + edges.push(Edge { + target: possible_move.next_node(node), + cost: possible_move.cost(&world, node), + }); + } + edges + }; + + let mut pf = MTDStarLite::new( + start, + end, + |n| goal.heuristic(n), + successors, + successors, + |n| goal.success(n), + ); + + let start_time = std::time::Instant::now(); + let p = pf.find_path(); + let end_time = std::time::Instant::now(); + debug!("path: {p:?}"); + debug!("time: {:?}", end_time - start_time); + + // convert the Option<Vec<Node>> to a VecDeque<Node> + if let Some(p) = p { + let path = p.into_iter().collect::<VecDeque<_>>(); + // commands.entity(event.entity).insert(Pathfinder { path: p }); + Some(PathFoundEvent { entity, path }) + } else { + error!("no path found"); + None } - edges - }; + }); - let mut pf = MTDStarLite::new( - start, - end, - |n| goal.heuristic(n), - successors, - successors, - |n| goal.success(n), - ); - - let start = std::time::Instant::now(); - let p = pf.find_path(); - let end = std::time::Instant::now(); - debug!("path: {p:?}"); - debug!("time: {:?}", end - start); - - let state = self - .plugins - .get::<State>() - .expect("Pathfinder plugin not installed!") - .clone(); - // convert the Option<Vec<Node>> to a VecDeque<Node> - if let Some(p) = p { - *state.path.lock() = p.into_iter().collect(); - } else { - error!("no path found"); + commands.spawn(ComputePath(task)); + } +} + +// poll the tasks and send the PathFoundEvent if they're done +fn handle_tasks( + mut commands: Commands, + mut transform_tasks: Query<(Entity, &mut ComputePath)>, + mut path_found_events: EventWriter<PathFoundEvent>, +) { + for (entity, mut task) in &mut transform_tasks { + if let Some(optional_path_found_event) = future::block_on(future::poll_once(&mut task.0)) { + if let Some(path_found_event) = optional_path_found_event { + path_found_events.send(path_found_event); + } + + // Task is complete, so remove task component from entity + commands.entity(entity).remove::<ComputePath>(); } } } -fn tick_execute_path(bot: &mut Client, path: &mut VecDeque<Node>) { - let target = if let Some(target) = path.front() { - target - } else { - return; - }; - let center = target.pos.center(); - // println!("going to {center:?} (at {pos:?})", pos = bot.entity().pos()); - bot.look_at(¢er); - bot.sprint(SprintDirection::Forward); - // check if we should jump - if target.pos.y > bot.entity().pos().y.floor() as i32 { - bot.jump(); +// set the path for the target entity when we get the PathFoundEvent +fn path_found_listener(mut events: EventReader<PathFoundEvent>, mut query: Query<&mut Pathfinder>) { + for event in events.iter() { + let mut pathfinder = query + .get_mut(event.entity) + .expect("Path found for an entity that doesn't have a pathfinder"); + pathfinder.path = event.path.clone(); } +} - if target.is_reached(&bot.entity()) { - // println!("ok target {target:?} reached"); - path.pop_front(); - if path.is_empty() { - bot.walk(WalkDirection::None); +fn tick_execute_path( + mut query: Query<(Entity, &mut Pathfinder, &Position, &Physics)>, + mut look_at_events: EventWriter<LookAtEvent>, + mut sprint_events: EventWriter<StartSprintEvent>, + mut walk_events: EventWriter<StartWalkEvent>, + mut jump_events: EventWriter<JumpEvent>, +) { + for (entity, mut pathfinder, position, physics) in &mut query { + loop { + let Some(target) = pathfinder.path.front() else { + break; + }; + let center = target.pos.center(); + // println!("going to {center:?} (at {pos:?})", pos = bot.entity().pos()); + look_at_events.send(LookAtEvent { + entity, + position: center, + }); + debug!( + "tick: pathfinder {entity:?}; going to {:?}; currently at {position:?}", + target.pos + ); + sprint_events.send(StartSprintEvent { + entity, + direction: SprintDirection::Forward, + }); + // check if we should jump + if target.pos.y > position.y.floor() as i32 { + jump_events.send(JumpEvent(entity)); + } + + if target.is_reached(position, physics) { + // println!("reached target"); + pathfinder.path.pop_front(); + if pathfinder.path.is_empty() { + // println!("reached goal"); + walk_events.send(StartWalkEvent { + entity, + direction: WalkDirection::None, + }); + } + // tick again, maybe we already reached the next node! + } else { + break; + } } - // tick again, maybe we already reached the next node! - tick_execute_path(bot, path); } } @@ -172,7 +280,8 @@ pub trait Goal { impl Node { /// Returns whether the entity is at the node and should start going to the /// next node. - pub fn is_reached(&self, entity: &EntityData) -> bool { + #[must_use] + pub fn is_reached(&self, position: &Position, physics: &Physics) -> bool { // println!( // "entity.delta.y: {} {:?}=={:?}, self.vertical_vel={:?}", // entity.delta.y, @@ -180,11 +289,11 @@ impl Node { // self.pos, // self.vertical_vel // ); - BlockPos::from(entity.pos()) == self.pos + BlockPos::from(position) == self.pos && match self.vertical_vel { - VerticalVel::NoneMidair => entity.delta.y > -0.1 && entity.delta.y < 0.1, - VerticalVel::None => entity.on_ground, - VerticalVel::FallingLittle => entity.delta.y < -0.1, + VerticalVel::NoneMidair => physics.delta.y > -0.1 && physics.delta.y < 0.1, + VerticalVel::None => physics.on_ground, + VerticalVel::FallingLittle => physics.delta.y < -0.1, } } } diff --git a/azalea/src/pathfinder/moves.rs b/azalea/src/pathfinder/moves.rs index ccf8ba1a..9bb5c7c1 100644 --- a/azalea/src/pathfinder/moves.rs +++ b/azalea/src/pathfinder/moves.rs @@ -1,11 +1,11 @@ use super::{Node, VerticalVel}; use azalea_core::{BlockPos, CardinalDirection}; use azalea_physics::collision::{self, BlockWithShape}; -use azalea_world::WeakWorld; +use azalea_world::World; /// whether this block is passable -fn is_block_passable(pos: &BlockPos, world: &WeakWorld) -> bool { - if let Some(block) = world.get_block_state(pos) { +fn is_block_passable(pos: &BlockPos, world: &World) -> bool { + if let Some(block) = world.chunks.get_block_state(pos) { block.shape() == &collision::empty_shape() } else { false @@ -13,8 +13,8 @@ fn is_block_passable(pos: &BlockPos, world: &WeakWorld) -> bool { } /// whether this block has a solid hitbox (i.e. we can stand on it) -fn is_block_solid(pos: &BlockPos, world: &WeakWorld) -> bool { - if let Some(block) = world.get_block_state(pos) { +fn is_block_solid(pos: &BlockPos, world: &World) -> bool { + if let Some(block) = world.chunks.get_block_state(pos) { block.shape() == &collision::block_shape() } else { false @@ -22,22 +22,22 @@ fn is_block_solid(pos: &BlockPos, world: &WeakWorld) -> bool { } /// Whether this block and the block above are passable -fn is_passable(pos: &BlockPos, world: &WeakWorld) -> bool { +fn is_passable(pos: &BlockPos, world: &World) -> bool { is_block_passable(pos, world) && is_block_passable(&pos.up(1), world) } /// Whether we can stand in this position. Checks if the block below is solid, /// and that the two blocks above that are passable. -fn is_standable(pos: &BlockPos, world: &WeakWorld) -> bool { +fn is_standable(pos: &BlockPos, world: &World) -> bool { is_block_solid(&pos.down(1), world) && is_passable(pos, world) } const JUMP_COST: f32 = 0.5; const WALK_ONE_BLOCK_COST: f32 = 1.0; -pub trait Move { - fn cost(&self, world: &WeakWorld, node: &Node) -> f32; +pub trait Move: Send + Sync { + fn cost(&self, world: &World, node: &Node) -> f32; /// Returns by how much the entity's position should be changed when this /// move is executed. fn offset(&self) -> BlockPos; @@ -51,7 +51,7 @@ pub trait Move { pub struct ForwardMove(pub CardinalDirection); impl Move for ForwardMove { - fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { + fn cost(&self, world: &World, node: &Node) -> f32 { if is_standable(&(node.pos + self.offset()), world) && node.vertical_vel == VerticalVel::None { @@ -67,7 +67,7 @@ impl Move for ForwardMove { pub struct AscendMove(pub CardinalDirection); impl Move for AscendMove { - fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { + fn cost(&self, world: &World, node: &Node) -> f32 { if node.vertical_vel == VerticalVel::None && is_block_passable(&node.pos.up(2), world) && is_standable(&(node.pos + self.offset()), world) @@ -89,7 +89,7 @@ impl Move for AscendMove { } pub struct DescendMove(pub CardinalDirection); impl Move for DescendMove { - fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { + fn cost(&self, world: &World, node: &Node) -> f32 { // check whether 3 blocks vertically forward are passable if node.vertical_vel == VerticalVel::None && is_standable(&(node.pos + self.offset()), world) @@ -112,7 +112,7 @@ impl Move for DescendMove { } pub struct DiagonalMove(pub CardinalDirection); impl Move for DiagonalMove { - fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { + fn cost(&self, world: &World, node: &Node) -> f32 { if node.vertical_vel != VerticalVel::None { return f32::INFINITY; } @@ -151,56 +151,92 @@ mod tests { use super::*; use azalea_block::BlockState; use azalea_core::ChunkPos; - use azalea_world::{Chunk, PartialWorld}; + use azalea_world::{Chunk, ChunkStorage, PartialWorld}; #[test] fn test_is_passable() { - let mut world = PartialWorld::default(); - world - .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default())) - .unwrap(); - world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone); - world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air); - - assert_eq!( - is_block_passable(&BlockPos::new(0, 0, 0), &world.shared), - false + let mut partial_world = PartialWorld::default(); + let mut chunk_storage = ChunkStorage::default(); + + partial_world.chunks.set( + &ChunkPos { x: 0, z: 0 }, + Some(Chunk::default()), + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 0, 0), + BlockState::Stone, + &mut chunk_storage, ); - assert_eq!( - is_block_passable(&BlockPos::new(0, 1, 0), &world.shared), - true + partial_world.chunks.set_block_state( + &BlockPos::new(0, 1, 0), + BlockState::Air, + &mut chunk_storage, ); + + let world = chunk_storage.into(); + assert_eq!(is_block_passable(&BlockPos::new(0, 0, 0), &world), false); + assert_eq!(is_block_passable(&BlockPos::new(0, 1, 0), &world), true); } #[test] fn test_is_solid() { - let mut world = PartialWorld::default(); - world - .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default())) - .unwrap(); - world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone); - world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air); - - assert_eq!(is_block_solid(&BlockPos::new(0, 0, 0), &world.shared), true); - assert_eq!( - is_block_solid(&BlockPos::new(0, 1, 0), &world.shared), - false + let mut partial_world = PartialWorld::default(); + let mut chunk_storage = ChunkStorage::default(); + partial_world.chunks.set( + &ChunkPos { x: 0, z: 0 }, + Some(Chunk::default()), + &mut chunk_storage, ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 0, 0), + BlockState::Stone, + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 1, 0), + BlockState::Air, + &mut chunk_storage, + ); + + let world = chunk_storage.into(); + assert_eq!(is_block_solid(&BlockPos::new(0, 0, 0), &world), true); + assert_eq!(is_block_solid(&BlockPos::new(0, 1, 0), &world), false); } #[test] fn test_is_standable() { - let mut world = PartialWorld::default(); - world - .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default())) - .unwrap(); - world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone); - world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air); - world.set_block_state(&BlockPos::new(0, 2, 0), BlockState::Air); - world.set_block_state(&BlockPos::new(0, 3, 0), BlockState::Air); - - assert!(is_standable(&BlockPos::new(0, 1, 0), &world.shared)); - assert!(!is_standable(&BlockPos::new(0, 0, 0), &world.shared)); - assert!(!is_standable(&BlockPos::new(0, 2, 0), &world.shared)); + let mut partial_world = PartialWorld::default(); + let mut chunk_storage = ChunkStorage::default(); + partial_world.chunks.set( + &ChunkPos { x: 0, z: 0 }, + Some(Chunk::default()), + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 0, 0), + BlockState::Stone, + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 1, 0), + BlockState::Air, + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 2, 0), + BlockState::Air, + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 3, 0), + BlockState::Air, + &mut chunk_storage, + ); + + let world = chunk_storage.into(); + assert!(is_standable(&BlockPos::new(0, 1, 0), &world)); + assert!(!is_standable(&BlockPos::new(0, 0, 0), &world)); + assert!(!is_standable(&BlockPos::new(0, 2, 0), &world)); } } diff --git a/azalea/src/pathfinder/mtdstarlite.rs b/azalea/src/pathfinder/mtdstarlite.rs index 50a467df..ce463279 100644 --- a/azalea/src/pathfinder/mtdstarlite.rs +++ b/azalea/src/pathfinder/mtdstarlite.rs @@ -3,9 +3,9 @@ //! //! Future optimization attempt ideas: //! - Use a different priority queue (e.g. fibonacci heap) -//! - Use FxHash instead of the default hasher +//! - Use `FxHash` instead of the default hasher //! - Have `par` be a raw pointer -//! - Try borrowing vs copying the Node in several places (like state_mut) +//! - Try borrowing vs copying the Node in several places (like `state_mut`) //! - Store edge costs in their own map use priority_queue::DoublePriorityQueue; @@ -260,9 +260,7 @@ impl< // identify a path from sstart to sgoal using the parent pointers let mut target = self.state(&self.goal).par; while !(Some(self.start) == target) { - let this_target = if let Some(this_target) = target { - this_target - } else { + let Some(this_target) = target else { break; }; // hunter follows path from start to goal; diff --git a/azalea/src/prelude.rs b/azalea/src/prelude.rs index bedf724f..3d8cc13e 100644 --- a/azalea/src/prelude.rs +++ b/azalea/src/prelude.rs @@ -1,7 +1,7 @@ //! The Azalea prelude. Things that are necessary for a bare-bones bot are //! re-exported here. -pub use crate::bot::BotTrait; -pub use crate::pathfinder::Trait; -pub use crate::{plugins, swarm_plugins, Plugin}; +pub use crate::bot::BotClientExt; +pub use crate::pathfinder::PathfinderClientExt; pub use azalea_client::{Account, Client, Event}; +pub use azalea_ecs::component::Component; diff --git a/azalea/src/start.rs b/azalea/src/start.rs deleted file mode 100644 index e2374fb8..00000000 --- a/azalea/src/start.rs +++ /dev/null @@ -1,136 +0,0 @@ -use crate::{bot, pathfinder, HandleFn}; -use azalea_client::{Account, Client, Plugins}; -use azalea_protocol::ServerAddress; -use std::{future::Future, sync::Arc}; -use thiserror::Error; - -/// A helper macro that generates a [`Plugins`] struct from a list of objects -/// that implement [`Plugin`]. -/// -/// ```rust,no_run -/// plugins![azalea_pathfinder::Plugin]; -/// ``` -/// -/// [`Plugin`]: crate::Plugin -#[macro_export] -macro_rules! plugins { - ($($plugin:expr),*) => { - { - let mut plugins = azalea::Plugins::new(); - $( - plugins.add($plugin); - )* - plugins - } - }; -} - -/// The options that are passed to [`azalea::start`]. -/// -/// [`azalea::start`]: crate::start() -pub struct Options<S, A, Fut> -where - A: TryInto<ServerAddress>, - Fut: Future<Output = Result<(), anyhow::Error>>, -{ - /// The address of the server that we're connecting to. This can be a - /// `&str`, [`ServerAddress`], or anything that implements - /// `TryInto<ServerAddress>`. - /// - /// [`ServerAddress`]: azalea_protocol::ServerAddress - pub address: A, - /// The account that's going to join the server. - pub account: Account, - /// The plugins that are going to be used. Plugins are external crates that - /// add extra functionality to Azalea. You should use the [`plugins`] macro - /// for this field. - /// - /// ```rust,no_run - /// plugins![azalea_pathfinder::Plugin] - /// ``` - pub plugins: Plugins, - /// A struct that contains the data that you want your bot to remember - /// across events. - /// - /// # Examples - /// - /// ```rust - /// use parking_lot::Mutex; - /// use std::sync::Arc; - /// - /// #[derive(Default, Clone)] - /// struct State { - /// farming: Arc<Mutex<bool>>, - /// } - /// ``` - pub state: S, - /// The function that's called whenever we get an event. - /// - /// # Examples - /// - /// ```rust - /// use azalea::prelude::*; - /// - /// async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { - /// Ok(()) - /// } - /// ``` - pub handle: HandleFn<Fut, S>, -} - -#[derive(Error, Debug)] -pub enum StartError { - #[error("Invalid address")] - InvalidAddress, - #[error("Join error: {0}")] - Join(#[from] azalea_client::JoinError), -} - -/// Join a server and start handling events. This function will run forever -/// until it gets disconnected from the server. -/// -/// # Examples -/// -/// ```rust,no_run -/// let error = azalea::start(azalea::Options { -/// account, -/// address: "localhost", -/// state: State::default(), -/// plugins: plugins![azalea_pathfinder::Plugin], -/// handle, -/// }).await; -/// ``` -pub async fn start< - S: Send + Sync + Clone + 'static, - A: Send + TryInto<ServerAddress>, - Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, ->( - options: Options<S, A, Fut>, -) -> Result<(), StartError> { - let address = match options.address.try_into() { - Ok(address) => address, - Err(_) => return Err(StartError::InvalidAddress), - }; - - let (mut bot, mut rx) = Client::join(&options.account, address).await?; - - let mut plugins = options.plugins; - // DEFAULT PLUGINS - plugins.add(bot::Plugin); - plugins.add(pathfinder::Plugin); - - bot.plugins = Arc::new(plugins.build()); - - let state = options.state; - - while let Some(event) = rx.recv().await { - let cloned_plugins = (*bot.plugins).clone(); - for plugin in cloned_plugins.into_iter() { - tokio::spawn(plugin.handle(event.clone(), bot.clone())); - } - - tokio::spawn((options.handle)(bot.clone(), event.clone(), state.clone())); - } - - Ok(()) -} diff --git a/azalea/src/swarm/chat.rs b/azalea/src/swarm/chat.rs index 4582c59e..a55e9bf6 100644 --- a/azalea/src/swarm/chat.rs +++ b/azalea/src/swarm/chat.rs @@ -1,4 +1,4 @@ -//! Implements SwarmEvent::Chat +//! Implements `SwarmEvent::Chat`. // How the chat event works (to avoid firing the event multiple times): // --- @@ -13,71 +13,81 @@ // in Swarm that's set to the smallest index of all the bots, and we remove all // messages from the queue that are before that index. +use azalea_client::{packet_handling::ChatReceivedEvent, ChatPacket}; +use azalea_ecs::{ + app::{App, Plugin}, + component::Component, + event::{EventReader, EventWriter}, + schedule::IntoSystemDescriptor, + system::{Commands, Query, Res, ResMut, Resource}, +}; +use std::collections::VecDeque; + use crate::{Swarm, SwarmEvent}; -use async_trait::async_trait; -use azalea_client::{ChatPacket, Client, Event}; -use parking_lot::Mutex; -use std::{collections::VecDeque, sync::Arc}; -use tokio::sync::broadcast::{Receiver, Sender}; #[derive(Clone)] -pub struct Plugin { - pub swarm_state: SwarmState, - pub tx: Sender<ChatPacket>, -} - -impl crate::Plugin for Plugin { - type State = State; - - fn build(&self) -> State { - State { - chat_index: Arc::new(Mutex::new(0)), - swarm_state: self.swarm_state.clone(), - tx: self.tx.clone(), - } +pub struct SwarmChatPlugin; +impl Plugin for SwarmChatPlugin { + fn build(&self, app: &mut App) { + app.add_event::<NewChatMessageEvent>() + .add_system(chat_listener.label("chat_listener")) + .add_system(update_min_index_and_shrink_queue.after("chat_listener")) + .insert_resource(GlobalChatState { + chat_queue: VecDeque::new(), + chat_min_index: 0, + }); } } -#[derive(Clone)] -pub struct State { - pub chat_index: Arc<Mutex<usize>>, - pub tx: Sender<ChatPacket>, - pub swarm_state: SwarmState, +#[derive(Component, Debug)] +pub struct ClientChatState { + pub chat_index: usize, } -#[derive(Clone)] -pub struct SwarmState { - pub chat_queue: Arc<Mutex<VecDeque<ChatPacket>>>, - pub chat_min_index: Arc<Mutex<usize>>, - pub rx: Arc<tokio::sync::Mutex<Receiver<ChatPacket>>>, +/// A chat message that no other bots have seen yet was received by a bot. +#[derive(Debug)] +pub struct NewChatMessageEvent(ChatPacket); + +#[derive(Resource)] +pub struct GlobalChatState { + pub chat_queue: VecDeque<ChatPacket>, + pub chat_min_index: usize, } -impl State { - pub fn handle_chat(&self, message: ChatPacket) { +fn chat_listener( + mut commands: Commands, + mut query: Query<&mut ClientChatState>, + mut events: EventReader<ChatReceivedEvent>, + mut global_chat_state: ResMut<GlobalChatState>, + mut new_chat_messages_events: EventWriter<NewChatMessageEvent>, +) { + for event in events.iter() { + let mut client_chat_state = query.get_mut(event.entity); + let mut client_chat_index = if let Ok(ref client_chat_state) = client_chat_state { + client_chat_state.chat_index + } else { + // if the client has no chat state, we default to this and insert it at the end + global_chat_state.chat_min_index + }; + // When a bot receives a chat messages, it looks into the queue to find the // earliest instance of the message content that's after the bot's chat index. // If it finds it, then its personal index is simply updated. Otherwise, fire // the event and add to the queue. - let mut chat_queue = self.swarm_state.chat_queue.lock(); - let chat_min_index = self.swarm_state.chat_min_index.lock(); - let mut chat_index = self.chat_index.lock(); - - if *chat_min_index > *chat_index { - // if this happens it's because this bot just logged in, so - // ignore it and let another bot handle it - println!("chat_min_index ({chat_min_index}) > chat_index ({chat_index})"); - *chat_index = *chat_min_index; - return; - } - let actual_vec_index = *chat_index - *chat_min_index; + let actual_vec_index = client_chat_index - global_chat_state.chat_min_index; // go through the queue and find the first message that's after the bot's index let mut found = false; - for (i, past_message) in chat_queue.iter().enumerate().skip(actual_vec_index) { - if past_message == &message { + for (i, past_message) in global_chat_state + .chat_queue + .iter() + .enumerate() + .skip(actual_vec_index) + { + if past_message == &event.packet { // found the message, update the index - *chat_index = i + *chat_min_index + 1; + client_chat_index = i + global_chat_state.chat_min_index + 1; found = true; break; } @@ -85,185 +95,157 @@ impl State { if !found { // didn't find the message, so fire the swarm event and add to the queue - self.tx - .send(message.clone()) - .expect("failed to send chat message to swarm"); - chat_queue.push_back(message); - *chat_index = chat_queue.len() + *chat_min_index; + new_chat_messages_events.send(NewChatMessageEvent(event.packet.clone())); + global_chat_state.chat_queue.push_back(event.packet.clone()); + client_chat_index = + global_chat_state.chat_queue.len() + global_chat_state.chat_min_index; } - } -} - -#[async_trait] -impl crate::PluginState for State { - async fn handle(self: Box<Self>, event: Event, _bot: Client) { - // we're allowed to access Plugin::swarm_state since it's shared for every bot - if let Event::Chat(m) = event { - self.handle_chat(m); + if let Ok(ref mut client_chat_state) = client_chat_state { + client_chat_state.chat_index = client_chat_index; + } else { + commands.entity(event.entity).insert(ClientChatState { + chat_index: client_chat_index, + }); } } } -impl SwarmState { - pub fn new<S>(swarm: Swarm<S>) -> (Self, Sender<ChatPacket>) - where - S: Send + Sync + Clone + 'static, - { - let (tx, rx) = tokio::sync::broadcast::channel(1); - - let swarm_state = SwarmState { - chat_queue: Arc::new(Mutex::new(VecDeque::new())), - chat_min_index: Arc::new(Mutex::new(0)), - rx: Arc::new(tokio::sync::Mutex::new(rx)), - }; - tokio::spawn(swarm_state.clone().start(swarm)); - - (swarm_state, tx) - } - async fn start<S>(self, swarm: Swarm<S>) - where - S: Send + Sync + Clone + 'static, - { - // it should never be locked unless we reused the same plugin for two swarms - // (bad) - let mut rx = self.rx.lock().await; - while let Ok(m) = rx.recv().await { - swarm.swarm_tx.send(SwarmEvent::Chat(m)).unwrap(); - let bot_states = swarm - .bot_datas - .lock() - .iter() - .map(|(bot, _)| { - bot.plugins - .get::<State>() - .expect("Chat plugin not installed") - .clone() - }) - .collect::<Vec<_>>(); - self.handle_new_chat_message(&bot_states); +fn update_min_index_and_shrink_queue( + query: Query<&ClientChatState>, + mut global_chat_state: ResMut<GlobalChatState>, + mut events: EventReader<NewChatMessageEvent>, + swarm: Option<Res<Swarm>>, +) { + for event in events.iter() { + if let Some(swarm) = &swarm { + // it should also work if Swarm isn't present (so the tests don't need it) + swarm + .swarm_tx + .send(SwarmEvent::Chat(event.0.clone())) + .unwrap(); } - } -} - -impl SwarmState { - pub fn handle_new_chat_message(&self, bot_states: &[State]) { // To make sure the queue doesn't grow too large, we keep a `chat_min_index` // in Swarm that's set to the smallest index of all the bots, and we remove all // messages from the queue that are before that index. - let chat_min_index = *self.chat_min_index.lock(); - let mut new_chat_min_index = usize::MAX; - for bot_state in bot_states { - let this_chat_index = *bot_state.chat_index.lock(); + let mut new_chat_min_index = global_chat_state.chat_min_index; + for client_chat_state in query.iter() { + let this_chat_index = client_chat_state.chat_index; if this_chat_index < new_chat_min_index { new_chat_min_index = this_chat_index; } } - let mut chat_queue = self.chat_queue.lock(); - if chat_min_index > new_chat_min_index { - println!( - "chat_min_index ({chat_min_index}) > new_chat_min_index ({new_chat_min_index})" - ); + if global_chat_state.chat_min_index > new_chat_min_index { return; } // remove all messages from the queue that are before the min index - for _ in 0..(new_chat_min_index - chat_min_index) { - chat_queue.pop_front(); + for _ in 0..(new_chat_min_index - global_chat_state.chat_min_index) { + global_chat_state.chat_queue.pop_front(); } // update the min index - *self.chat_min_index.lock() = new_chat_min_index; + global_chat_state.chat_min_index = new_chat_min_index; } } #[cfg(test)] mod tests { + use azalea_ecs::{ecs::Ecs, event::Events, system::SystemState}; + use super::*; + fn make_test_app() -> App { + let mut app = App::new(); + // we add the events like this instead of with .add_event so we can have our own + // event mangement in drain_events + app.init_resource::<Events<ChatReceivedEvent>>() + .init_resource::<Events<NewChatMessageEvent>>() + .add_system(chat_listener.label("chat_listener")) + .add_system(update_min_index_and_shrink_queue.after("chat_listener")) + .insert_resource(GlobalChatState { + chat_queue: VecDeque::new(), + chat_min_index: 0, + }); + app + } + + fn drain_events(ecs: &mut Ecs) -> Vec<ChatPacket> { + let mut system_state: SystemState<ResMut<Events<NewChatMessageEvent>>> = + SystemState::new(ecs); + let mut events = system_state.get_mut(ecs); + + events.drain().map(|e| e.0.clone()).collect::<Vec<_>>() + } + #[tokio::test] async fn test_swarm_chat() { - let (tx, mut rx) = tokio::sync::broadcast::channel(1); - let swarm_state = SwarmState { - chat_queue: Arc::new(Mutex::new(VecDeque::new())), - chat_min_index: Arc::new(Mutex::new(0)), - rx: Arc::new(tokio::sync::Mutex::new(rx)), - }; - let mut bot_states = vec![]; - let bot0 = State { - swarm_state: swarm_state.clone(), - chat_index: Arc::new(Mutex::new(0)), - tx: tx.clone(), - }; - let bot1 = State { - swarm_state: swarm_state.clone(), - chat_index: Arc::new(Mutex::new(0)), - tx: tx.clone(), - }; - bot_states.push(bot0.clone()); - bot_states.push(bot1.clone()); + let mut app = make_test_app(); + + let bot0 = app.world.spawn_empty().id(); + let bot1 = app.world.spawn_empty().id(); + + app.world.send_event(ChatReceivedEvent { + entity: bot0, + packet: ChatPacket::new("a"), + }); + app.update(); - bot0.handle_chat(ChatPacket::new("a")); // the swarm should get the event immediately after the bot gets it + assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("a")]); assert_eq!( - swarm_state.rx.lock().await.try_recv(), - Ok(ChatPacket::new("a")) + app.world.get::<ClientChatState>(bot0).unwrap().chat_index, + 1 ); - assert_eq!(*bot0.chat_index.lock(), 1); // and a second bot sending the event shouldn't do anything - bot1.handle_chat(ChatPacket::new("a")); - assert!(swarm_state.rx.lock().await.try_recv().is_err()); - assert_eq!(*bot1.chat_index.lock(), 1); - - // but if the first one gets it again, it should sent it again - bot0.handle_chat(ChatPacket::new("a")); + app.world.send_event(ChatReceivedEvent { + entity: bot1, + packet: ChatPacket::new("a"), + }); + app.update(); + assert_eq!(drain_events(&mut app.world), vec![]); assert_eq!( - swarm_state.rx.lock().await.try_recv(), - Ok(ChatPacket::new("a")) + app.world.get::<ClientChatState>(bot1).unwrap().chat_index, + 1 ); + // but if the first one gets it again, it should sent it again + app.world.send_event(ChatReceivedEvent { + entity: bot0, + packet: ChatPacket::new("a"), + }); + app.update(); + assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("a")]); + // alright and now the second bot got a different chat message and it should be // sent - bot1.handle_chat(ChatPacket::new("b")); - assert_eq!( - swarm_state.rx.lock().await.try_recv(), - Ok(ChatPacket::new("b")) - ); + app.world.send_event(ChatReceivedEvent { + entity: bot1, + packet: ChatPacket::new("b"), + }); + app.update(); + assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("b")]); } #[tokio::test] async fn test_new_bot() { - let (tx, mut rx) = tokio::sync::broadcast::channel(1); - let swarm_state = SwarmState { - chat_queue: Arc::new(Mutex::new(VecDeque::new())), - chat_min_index: Arc::new(Mutex::new(0)), - rx: Arc::new(tokio::sync::Mutex::new(rx)), - }; - let mut bot_states = vec![]; - let bot0 = State { - swarm_state: swarm_state.clone(), - chat_index: Arc::new(Mutex::new(0)), - tx: tx.clone(), - }; - bot_states.push(bot0.clone()); + let mut app = make_test_app(); + + let bot0 = app.world.spawn_empty().id(); // bot0 gets a chat message - bot0.handle_chat(ChatPacket::new("a")); - assert_eq!( - swarm_state.rx.lock().await.try_recv(), - Ok(ChatPacket::new("a")) - ); - // now a second bot joined and got a different chat message - let bot1 = State { - swarm_state: swarm_state.clone(), - chat_index: Arc::new(Mutex::new(0)), - tx: tx.clone(), - }; - bot_states.push(bot1.clone()); - bot1.handle_chat(ChatPacket::new("b")); - assert_eq!( - swarm_state.rx.lock().await.try_recv(), - Ok(ChatPacket::new("b")) - ); + app.world.send_event(ChatReceivedEvent { + entity: bot0, + packet: ChatPacket::new("a"), + }); + app.update(); + assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("a")]); + let bot1 = app.world.spawn_empty().id(); + app.world.send_event(ChatReceivedEvent { + entity: bot1, + packet: ChatPacket::new("b"), + }); + app.update(); + assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("b")]); } } diff --git a/azalea/src/swarm/events.rs b/azalea/src/swarm/events.rs new file mode 100644 index 00000000..81d8c731 --- /dev/null +++ b/azalea/src/swarm/events.rs @@ -0,0 +1,45 @@ +use azalea_client::LocalPlayer; +use azalea_ecs::{ + app::{App, Plugin}, + event::EventWriter, + query::With, + system::{Query, ResMut, Resource}, +}; +use azalea_world::entity::MinecraftEntityId; +use derive_more::{Deref, DerefMut}; + +pub struct SwarmPlugin; +impl Plugin for SwarmPlugin { + fn build(&self, app: &mut App) { + app.add_event::<SwarmReadyEvent>() + .add_system(check_ready) + .init_resource::<IsSwarmReady>(); + } +} + +/// All the bots from the swarm are now in the world. +pub struct SwarmReadyEvent; + +#[derive(Default, Resource, Deref, DerefMut)] +struct IsSwarmReady(bool); + +fn check_ready( + query: Query<Option<&MinecraftEntityId>, With<LocalPlayer>>, + mut is_swarm_ready: ResMut<IsSwarmReady>, + mut ready_events: EventWriter<SwarmReadyEvent>, +) { + // if we already know the swarm is ready, do nothing + if **is_swarm_ready { + return; + } + // if all the players are in the world, we're ready + for entity_id in query.iter() { + if entity_id.is_none() { + return; + } + } + + // all the players are in the world, so we're ready + **is_swarm_ready = true; + ready_events.send(SwarmReadyEvent); +} diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index 6fc3e40c..9c16bc01 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -1,41 +1,29 @@ -/// Swarms are a way to conveniently control many bots. -mod chat; -mod plugins; +//! Swarms are a way to conveniently control many bots. -pub use self::plugins::*; -use crate::{bot, HandleFn}; -use azalea_client::{Account, ChatPacket, Client, Event, JoinError, Plugins}; +mod chat; +mod events; + +use crate::{bot::DefaultBotPlugins, HandleFn}; +use azalea_client::{init_ecs_app, start_ecs, Account, ChatPacket, Client, Event, JoinError}; +use azalea_ecs::{ + app::{App, Plugin, PluginGroup, PluginGroupBuilder}, + component::Component, + ecs::Ecs, + entity::Entity, + system::Resource, +}; use azalea_protocol::{ - connect::{Connection, ConnectionError}, + connect::ConnectionError, resolver::{self, ResolverError}, ServerAddress, }; -use azalea_world::WeakWorldContainer; +use azalea_world::WorldContainer; use futures::future::join_all; use log::error; use parking_lot::{Mutex, RwLock}; -use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; +use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration}; use thiserror::Error; -use tokio::sync::mpsc::{self, UnboundedSender}; - -/// A helper macro that generates a [`SwarmPlugins`] struct from a list of -/// objects that implement [`SwarmPlugin`]. -/// -/// ```rust,no_run -/// swarm_plugins![azalea_pathfinder::Plugin]; -/// ``` -#[macro_export] -macro_rules! swarm_plugins { - ($($plugin:expr),*) => { - { - let mut plugins = azalea::SwarmPlugins::new(); - $( - plugins.add($plugin); - )* - plugins - } - }; -} +use tokio::sync::mpsc; /// A swarm is a way to conveniently control many bots at once, while also /// being able to control bots at an individual level when desired. @@ -46,82 +34,295 @@ macro_rules! swarm_plugins { /// It's used to make the [`Swarm::add`] function work. /// /// [`azalea::start_swarm`]: fn.start_swarm.html -#[derive(Clone)] -pub struct Swarm<S> { - bot_datas: Arc<Mutex<Vec<(Client, S)>>>, +#[derive(Clone, Resource)] +pub struct Swarm { + pub ecs_lock: Arc<Mutex<Ecs>>, + bots: Arc<Mutex<HashMap<Entity, Client>>>, + + // bot_datas: Arc<Mutex<Vec<(Client, S)>>>, resolved_address: SocketAddr, address: ServerAddress, - pub worlds: Arc<RwLock<WeakWorldContainer>>, - /// Plugins that are set for new bots - plugins: Plugins, + pub world_container: Arc<RwLock<WorldContainer>>, - bots_tx: UnboundedSender<(Option<Event>, (Client, S))>, - swarm_tx: UnboundedSender<SwarmEvent>, -} + bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>, + swarm_tx: mpsc::UnboundedSender<SwarmEvent>, -/// An event about something that doesn't have to do with a single bot. -#[derive(Clone, Debug)] -pub enum SwarmEvent { - /// All the bots in the swarm have successfully joined the server. - Login, - /// The swarm was created. This is only fired once, and it's guaranteed to - /// be the first event to fire. - Init, - /// A bot got disconnected from the server. - /// - /// You can implement an auto-reconnect by calling [`Swarm::add`] - /// with the account from this event. - Disconnect(Account), - /// At least one bot received a chat message. - Chat(ChatPacket), + run_schedule_sender: mpsc::Sender<()>, } -pub type SwarmHandleFn<Fut, S, SS> = fn(Swarm<S>, SwarmEvent, SS) -> Fut; - -/// The options that are passed to [`azalea::start_swarm`]. -/// -/// [`azalea::start_swarm`]: crate::start_swarm() -pub struct SwarmOptions<S, SS, A, Fut, SwarmFut> +/// Create a new [`Swarm`]. +pub struct SwarmBuilder<S, SS, Fut, SwarmFut> where - A: TryInto<ServerAddress>, + S: Default + Send + Sync + Clone + 'static, + SS: Default + Send + Sync + Clone + 'static, Fut: Future<Output = Result<(), anyhow::Error>>, SwarmFut: Future<Output = Result<(), anyhow::Error>>, { - /// The address of the server that we're connecting to. This can be a - /// `&str`, [`ServerAddress`], or anything that implements - /// `TryInto<ServerAddress>`. - /// - /// [`ServerAddress`]: azalea_protocol::ServerAddress - pub address: A, + app: App, /// The accounts that are going to join the server. - pub accounts: Vec<Account>, - /// The plugins that are going to be used for all the bots. - /// - /// You can usually leave this as `plugins![]`. - pub plugins: Plugins, - /// The plugins that are going to be used for the swarm. - /// - /// You can usually leave this as `swarm_plugins![]`. - pub swarm_plugins: SwarmPlugins<S>, + accounts: Vec<Account>, /// The individual bot states. This must be the same length as `accounts`, /// since each bot gets one state. - pub states: Vec<S>, + states: Vec<S>, /// The state for the overall swarm. - pub swarm_state: SS, + swarm_state: SS, /// The function that's called every time a bot receives an [`Event`]. - pub handle: HandleFn<Fut, S>, + handler: Option<HandleFn<Fut, S>>, /// The function that's called every time the swarm receives a /// [`SwarmEvent`]. - pub swarm_handle: SwarmHandleFn<SwarmFut, S, SS>, + swarm_handler: Option<SwarmHandleFn<SwarmFut, SS>>, /// How long we should wait between each bot joining the server. Set to /// None to have every bot connect at the same time. None is different than /// a duration of 0, since if a duration is present the bots will wait for /// the previous one to be ready. - pub join_delay: Option<std::time::Duration>, + join_delay: Option<std::time::Duration>, +} +impl<S, SS, Fut, SwarmFut> SwarmBuilder<S, SS, Fut, SwarmFut> +where + Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, + SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, + S: Default + Send + Sync + Clone + Component + 'static, + SS: Default + Send + Sync + Clone + Component + 'static, +{ + /// Start creating the swarm. + #[must_use] + pub fn new() -> Self { + Self { + // we create the app here so plugins can add onto it. + // the schedules won't run until [`Self::start`] is called. + app: init_ecs_app(), + + accounts: Vec::new(), + states: Vec::new(), + swarm_state: SS::default(), + handler: None, + swarm_handler: None, + join_delay: None, + } + .add_plugins(DefaultSwarmPlugins) + .add_plugins(DefaultBotPlugins) + } + + /// Add a vec of [`Account`]s to the swarm. + /// + /// Use [`Self::add_account`] to only add one account. If you want the + /// clients to have different default states, add them one at a time with + /// [`Self::add_account_with_state`]. + #[must_use] + pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self { + for account in accounts { + self = self.add_account(account); + } + self + } + /// Add a single new [`Account`] to the swarm. Use [`Self::add_accounts`] to + /// add multiple accounts at a time. + /// + /// This will make the state for this client be the default, use + /// [`Self::add_account_with_state`] to avoid that. + #[must_use] + pub fn add_account(self, account: Account) -> Self { + self.add_account_with_state(account, S::default()) + } + /// Add an account with a custom initial state. Use just + /// [`Self::add_account`] to use the Default implementation for the state. + #[must_use] + pub fn add_account_with_state(mut self, account: Account, state: S) -> Self { + self.accounts.push(account); + self.states.push(state); + self + } + + /// Set the function that's called every time a bot receives an [`Event`]. + /// This is the way to handle normal per-bot events. + /// + /// You can only have one client handler, calling this again will replace + /// the old client handler function (you can have a client handler and swarm + /// handler separately though). + #[must_use] + pub fn set_handler(mut self, handler: HandleFn<Fut, S>) -> Self { + self.handler = Some(handler); + self + } + /// Set the function that's called every time the swarm receives a + /// [`SwarmEvent`]. This is the way to handle global swarm events. + /// + /// You can only have one swarm handler, calling this again will replace + /// the old swarm handler function (you can have a client handler and swarm + /// handler separately though). + #[must_use] + pub fn set_swarm_handler(mut self, handler: SwarmHandleFn<SwarmFut, SS>) -> Self { + self.swarm_handler = Some(handler); + self + } + + /// Add a plugin to the swarm. + #[must_use] + pub fn add_plugin<T: Plugin>(mut self, plugin: T) -> Self { + self.app.add_plugin(plugin); + self + } + /// Add a group of plugins to the swarm. + #[must_use] + pub fn add_plugins<T: PluginGroup>(mut self, plugin_group: T) -> Self { + self.app.add_plugins(plugin_group); + self + } + + /// Set how long we should wait between each bot joining the server. + /// + /// By default, every bot will connect at the same time. If you set this + /// field, however, the bots will wait for the previous one to have + /// connected and *then* they'll wait the given duration. + #[must_use] + pub fn join_delay(mut self, delay: std::time::Duration) -> Self { + self.join_delay = Some(delay); + self + } + + /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given + /// server. + /// + /// The `address` argument can be a `&str`, [`ServerAddress`], or anything + /// that implements `TryInto<ServerAddress>`. + /// + /// [`ServerAddress`]: azalea_protocol::ServerAddress + pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<(), SwarmStartError> { + assert_eq!( + self.accounts.len(), + self.states.len(), + "There must be exactly one state per bot." + ); + + // convert the TryInto<ServerAddress> into a ServerAddress + let address: ServerAddress = match address.try_into() { + Ok(address) => address, + Err(_) => return Err(SwarmStartError::InvalidAddress), + }; + + // resolve the address + let resolved_address = resolver::resolve_address(&address).await?; + + let world_container = Arc::new(RwLock::new(WorldContainer::default())); + + // we can't modify the swarm plugins after this + let (bots_tx, mut bots_rx) = mpsc::unbounded_channel(); + let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel(); + + let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1); + let ecs_lock = start_ecs(self.app, run_schedule_receiver, run_schedule_sender.clone()); + + let swarm = Swarm { + ecs_lock: ecs_lock.clone(), + bots: Arc::new(Mutex::new(HashMap::new())), + + resolved_address, + address, + world_container, + + bots_tx, + + swarm_tx: swarm_tx.clone(), + + run_schedule_sender, + }; + ecs_lock.lock().insert_resource(swarm.clone()); + + // SwarmBuilder (self) isn't Send so we have to take all the things we need out + // of it + let mut swarm_clone = swarm.clone(); + let join_delay = self.join_delay; + let accounts = self.accounts.clone(); + let states = self.states.clone(); + + let join_task = tokio::spawn(async move { + if let Some(join_delay) = join_delay { + // if there's a join delay, then join one by one + for (account, state) in accounts.iter().zip(states) { + swarm_clone + .add_with_exponential_backoff(account, state.clone()) + .await; + tokio::time::sleep(join_delay).await; + } + } else { + // otherwise, join all at once + let swarm_borrow = &swarm_clone; + join_all(accounts.iter().zip(states).map( + async move |(account, state)| -> Result<(), JoinError> { + swarm_borrow + .clone() + .add_with_exponential_backoff(account, state.clone()) + .await; + Ok(()) + }, + )) + .await; + } + }); + + let swarm_state = self.swarm_state; + + // Watch swarm_rx and send those events to the swarm_handle. + let swarm_clone = swarm.clone(); + tokio::spawn(async move { + while let Some(event) = swarm_rx.recv().await { + if let Some(swarm_handler) = self.swarm_handler { + tokio::spawn((swarm_handler)( + swarm_clone.clone(), + event, + swarm_state.clone(), + )); + } + } + }); + + // bot events + while let Some((Some(event), bot)) = bots_rx.recv().await { + if let Some(handler) = self.handler { + let state = bot.component::<S>(); + tokio::spawn((handler)(bot, event, state)); + } + } + + join_task.abort(); + + Ok(()) + } +} + +impl<S, SS, Fut, SwarmFut> Default for SwarmBuilder<S, SS, Fut, SwarmFut> +where + Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, + SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, + S: Default + Send + Sync + Clone + Component + 'static, + SS: Default + Send + Sync + Clone + Component + 'static, +{ + fn default() -> Self { + Self::new() + } +} + +/// An event about something that doesn't have to do with a single bot. +#[derive(Clone, Debug)] +pub enum SwarmEvent { + /// All the bots in the swarm have successfully joined the server. + Login, + /// The swarm was created. This is only fired once, and it's guaranteed to + /// be the first event to fire. + Init, + /// A bot got disconnected from the server. + /// + /// You can implement an auto-reconnect by calling [`Swarm::add`] + /// with the account from this event. + Disconnect(Account), + /// At least one bot received a chat message. + Chat(ChatPacket), } +pub type SwarmHandleFn<Fut, SS> = fn(Swarm, SwarmEvent, SS) -> Fut; + #[derive(Error, Debug)] pub enum SwarmStartError { #[error("Invalid address")] @@ -154,7 +355,7 @@ pub enum SwarmStartError { /// let mut states = Vec::new(); /// /// for i in 0..10 { -/// accounts.push(Account::offline(&format!("bot{}", i))); +/// accounts.push(Account::offline(&format!("bot{i}"))); /// states.push(State::default()); /// } /// @@ -204,193 +405,69 @@ pub enum SwarmStartError { /// } /// Ok(()) /// } -pub async fn start_swarm< - S: Send + Sync + Clone + 'static, - SS: Send + Sync + Clone + 'static, - A: Send + TryInto<ServerAddress>, - Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, - SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, ->( - options: SwarmOptions<S, SS, A, Fut, SwarmFut>, -) -> Result<(), SwarmStartError> { - assert_eq!( - options.accounts.len(), - options.states.len(), - "There must be exactly one state per bot." - ); - - // convert the TryInto<ServerAddress> into a ServerAddress - let address: ServerAddress = match options.address.try_into() { - Ok(address) => address, - Err(_) => return Err(SwarmStartError::InvalidAddress), - }; - - // resolve the address - let resolved_address = resolver::resolve_address(&address).await?; - - let world_container = Arc::new(RwLock::new(WeakWorldContainer::default())); - - let mut plugins = options.plugins; - let swarm_plugins = options.swarm_plugins; - - // DEFAULT CLIENT PLUGINS - plugins.add(bot::Plugin); - plugins.add(crate::pathfinder::Plugin); - // DEFAULT SWARM PLUGINS - - // we can't modify the swarm plugins after this - let (bots_tx, mut bots_rx) = mpsc::unbounded_channel(); - let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel(); - - let mut swarm = Swarm { - bot_datas: Arc::new(Mutex::new(Vec::new())), - - resolved_address, - address, - worlds: world_container, - plugins, - - bots_tx, - - swarm_tx: swarm_tx.clone(), - }; - - { - // the chat plugin is hacky and needs the swarm to be passed like this - let (chat_swarm_state, chat_tx) = chat::SwarmState::new(swarm.clone()); - swarm.plugins.add(chat::Plugin { - swarm_state: chat_swarm_state, - tx: chat_tx, - }); - } - - let swarm_plugins = swarm_plugins.build(); - - let mut swarm_clone = swarm.clone(); - let join_task = tokio::spawn(async move { - if let Some(join_delay) = options.join_delay { - // if there's a join delay, then join one by one - for (account, state) in options.accounts.iter().zip(options.states) { - swarm_clone - .add_with_exponential_backoff(account, state.clone()) - .await; - tokio::time::sleep(join_delay).await; - } - } else { - let swarm_borrow = &swarm_clone; - join_all(options.accounts.iter().zip(options.states).map( - async move |(account, state)| -> Result<(), JoinError> { - swarm_borrow - .clone() - .add_with_exponential_backoff(account, state.clone()) - .await; - Ok(()) - }, - )) - .await; - } - }); - - let swarm_state = options.swarm_state; - let mut internal_state = InternalSwarmState::default(); - - // Watch swarm_rx and send those events to the plugins and swarm_handle. - let swarm_clone = swarm.clone(); - let swarm_plugins_clone = swarm_plugins.clone(); - tokio::spawn(async move { - while let Some(event) = swarm_rx.recv().await { - for plugin in swarm_plugins_clone.clone().into_iter() { - tokio::spawn(plugin.handle(event.clone(), swarm_clone.clone())); - } - tokio::spawn((options.swarm_handle)( - swarm_clone.clone(), - event, - swarm_state.clone(), - )); - } - }); - - // bot events - while let Some((Some(event), (bot, state))) = bots_rx.recv().await { - // bot event handling - let cloned_plugins = (*bot.plugins).clone(); - for plugin in cloned_plugins.into_iter() { - tokio::spawn(plugin.handle(event.clone(), bot.clone())); - } - - // swarm event handling - // remove this #[allow] when more checks are added - #[allow(clippy::single_match)] - match &event { - Event::Login => { - internal_state.bots_joined += 1; - if internal_state.bots_joined == swarm.bot_datas.lock().len() { - swarm_tx.send(SwarmEvent::Login).unwrap(); - } - } - _ => {} - } - - tokio::spawn((options.handle)(bot, event, state)); - } - - join_task.abort(); - - Ok(()) -} - -impl<S> Swarm<S> -where - S: Send + Sync + Clone + 'static, -{ +// pub async fn start_swarm< +// S: Send + Sync + Clone + 'static, +// SS: Send + Sync + Clone + 'static, +// A: Send + TryInto<ServerAddress>, +// Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, +// SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, +// >( +// options: SwarmOptions<S, SS, A, Fut, SwarmFut>, +// ) -> Result<(), SwarmStartError> { +// } + +impl Swarm { /// Add a new account to the swarm. You can remove it later by calling /// [`Client::disconnect`]. - pub async fn add(&mut self, account: &Account, state: S) -> Result<Client, JoinError> { - let conn = Connection::new(&self.resolved_address).await?; - let (conn, game_profile) = Client::handshake(conn, account, &self.address.clone()).await?; - + /// + /// # Errors + /// + /// Returns an `Err` if the bot could not do a handshake successfully. + pub async fn add<S: Component + Clone>( + &mut self, + account: &Account, + state: S, + ) -> Result<Client, JoinError> { // tx is moved to the bot so it can send us events // rx is used to receive events from the bot - let (tx, mut rx) = mpsc::channel(1); - let mut bot = Client::new(game_profile, conn, Some(self.worlds.clone())); - tx.send(Event::Init).await.expect("Failed to send event"); - bot.start_tasks(tx); + // An event that causes the schedule to run. This is only used internally. + // let (run_schedule_sender, run_schedule_receiver) = mpsc::unbounded_channel(); + // let ecs_lock = start_ecs(run_schedule_receiver, run_schedule_sender.clone()); + let (bot, mut rx) = Client::start_client( + self.ecs_lock.clone(), + account, + &self.address, + &self.resolved_address, + self.run_schedule_sender.clone(), + ) + .await?; + // add the state to the client + { + let mut ecs = self.ecs_lock.lock(); + ecs.entity_mut(bot.entity).insert(state); + } - bot.plugins = Arc::new(self.plugins.clone().build()); + self.bots.lock().insert(bot.entity, bot.clone()); + let cloned_bots = self.bots.clone(); let cloned_bots_tx = self.bots_tx.clone(); let cloned_bot = bot.clone(); - let cloned_state = state.clone(); let owned_account = account.clone(); - let bot_datas = self.bot_datas.clone(); let swarm_tx = self.swarm_tx.clone(); - // send the init event immediately so it's the first thing we get - swarm_tx.send(SwarmEvent::Init).unwrap(); tokio::spawn(async move { while let Some(event) = rx.recv().await { // we can't handle events here (since we can't copy the handler), // they're handled above in start_swarm - if let Err(e) = - cloned_bots_tx.send((Some(event), (cloned_bot.clone(), cloned_state.clone()))) - { + if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) { error!("Error sending event to swarm: {e}"); } } - // the bot disconnected, so we remove it from the swarm - let mut bot_datas = bot_datas.lock(); - let index = bot_datas - .iter() - .position(|(b, _)| b.profile.uuid == cloned_bot.profile.uuid) - .expect("bot disconnected but not found in swarm"); - bot_datas.remove(index); - + cloned_bots.lock().remove(&bot.entity); swarm_tx .send(SwarmEvent::Disconnect(owned_account)) .unwrap(); }); - self.bot_datas.lock().push((bot.clone(), state.clone())); - Ok(bot) } @@ -399,7 +476,11 @@ where /// /// Exponential backoff means if it fails joining it will initially wait 10 /// seconds, then 20, then 40, up to 2 minutes. - pub async fn add_with_exponential_backoff(&mut self, account: &Account, state: S) -> Client { + pub async fn add_with_exponential_backoff<S: Component + Clone>( + &mut self, + account: &Account, + state: S, + ) -> Client { let mut disconnects = 0; loop { match self.add(account, state.clone()).await { @@ -409,7 +490,7 @@ where let delay = (Duration::from_secs(5) * 2u32.pow(disconnects)) .min(Duration::from_secs(120)); let username = account.username.clone(); - error!("Error joining {username}: {e}. Waiting {delay:?} and trying again."); + error!("Error joining as {username}: {e}. Waiting {delay:?} and trying again."); tokio::time::sleep(delay).await; } } @@ -417,33 +498,42 @@ where } } -impl<S> IntoIterator for Swarm<S> -where - S: Send + Sync + Clone + 'static, -{ - type Item = (Client, S); +impl IntoIterator for Swarm { + type Item = Client; type IntoIter = std::vec::IntoIter<Self::Item>; - /// Iterate over the bots and their states in this swarm. + /// Iterate over the bots in this swarm. /// /// ```rust,no_run - /// for (bot, state) in swarm { + /// for bot in swarm { + /// let state = bot.component::<State>(); /// // ... /// } /// ``` fn into_iter(self) -> Self::IntoIter { - self.bot_datas.lock().clone().into_iter() + self.bots + .lock() + .clone() + .into_values() + .collect::<Vec<_>>() + .into_iter() } } -#[derive(Default)] -struct InternalSwarmState { - /// The number of bots connected to the server - pub bots_joined: usize, -} - impl From<ConnectionError> for SwarmStartError { fn from(e: ConnectionError) -> Self { SwarmStartError::from(JoinError::from(e)) } } + +/// This plugin group will add all the default plugins necessary for swarms to +/// work. +pub struct DefaultSwarmPlugins; + +impl PluginGroup for DefaultSwarmPlugins { + fn build(self) -> PluginGroupBuilder { + PluginGroupBuilder::start::<Self>() + .add(chat::SwarmChatPlugin) + .add(events::SwarmPlugin) + } +} diff --git a/azalea/src/swarm/plugins.rs b/azalea/src/swarm/plugins.rs deleted file mode 100644 index f92d40e6..00000000 --- a/azalea/src/swarm/plugins.rs +++ /dev/null @@ -1,135 +0,0 @@ -use crate::{Swarm, SwarmEvent}; -use async_trait::async_trait; -use nohash_hasher::NoHashHasher; -use std::{ - any::{Any, TypeId}, - collections::HashMap, - hash::BuildHasherDefault, -}; - -type U64Hasher = BuildHasherDefault<NoHashHasher<u64>>; - -// kind of based on https://docs.rs/http/latest/src/http/extensions.rs.html -/// A map of plugin ids to [`SwarmPlugin`] trait objects. The client stores -/// this so we can keep the state for our [`Swarm`] plugins. -/// -/// If you're using azalea, you should generate this from the `swarm_plugins!` -/// macro. -#[derive(Clone, Default)] -pub struct SwarmPlugins<S> { - map: Option<HashMap<TypeId, Box<dyn SwarmPlugin<S>>, U64Hasher>>, -} - -#[derive(Clone)] -pub struct SwarmPluginStates<S> { - map: Option<HashMap<TypeId, Box<dyn SwarmPluginState<S>>, U64Hasher>>, -} - -impl<S> SwarmPluginStates<S> { - pub fn get<T: SwarmPluginState<S>>(&self) -> Option<&T> { - self.map - .as_ref() - .and_then(|map| map.get(&TypeId::of::<T>())) - .and_then(|boxed| (boxed.as_ref() as &dyn Any).downcast_ref::<T>()) - } -} - -impl<S> SwarmPlugins<S> -where - S: 'static, -{ - /// Create a new empty set of plugins. - pub fn new() -> Self { - Self { map: None } - } - - /// Add a new plugin to this set. - pub fn add<T: SwarmPlugin<S>>(&mut self, plugin: T) { - if self.map.is_none() { - self.map = Some(HashMap::with_hasher(BuildHasherDefault::default())); - } - self.map - .as_mut() - .unwrap() - .insert(TypeId::of::<T>(), Box::new(plugin)); - } - - /// Build our plugin states from this set of plugins. Note that if you're - /// using `azalea` you'll probably never need to use this as it's called - /// for you. - pub fn build(self) -> SwarmPluginStates<S> { - if self.map.is_none() { - return SwarmPluginStates { map: None }; - } - let mut map = HashMap::with_hasher(BuildHasherDefault::default()); - for (id, plugin) in self.map.unwrap().into_iter() { - map.insert(id, plugin.build()); - } - SwarmPluginStates { map: Some(map) } - } -} - -impl<S> IntoIterator for SwarmPluginStates<S> { - type Item = Box<dyn SwarmPluginState<S>>; - type IntoIter = std::vec::IntoIter<Self::Item>; - - /// Iterate over the plugin states. - fn into_iter(self) -> Self::IntoIter { - self.map - .map(|map| map.into_values().collect::<Vec<_>>()) - .unwrap_or_default() - .into_iter() - } -} - -/// A `SwarmPluginState` keeps the current state of a plugin for a client. All -/// the fields must be atomic. Unique `SwarmPluginState`s are built from -/// [`SwarmPlugin`]s. -#[async_trait] -pub trait SwarmPluginState<S>: Send + Sync + SwarmPluginStateClone<S> + Any + 'static { - async fn handle(self: Box<Self>, event: SwarmEvent, swarm: Swarm<S>); -} - -/// Swarm plugins can keep their own personal state ([`SwarmPluginState`]), -/// listen to [`SwarmEvent`]s, and add new functions to [`Swarm`]. -pub trait SwarmPlugin<S>: Send + Sync + SwarmPluginClone<S> + Any + 'static { - fn build(&self) -> Box<dyn SwarmPluginState<S>>; -} - -/// An internal trait that allows SwarmPluginState to be cloned. -#[doc(hidden)] -pub trait SwarmPluginStateClone<S> { - fn clone_box(&self) -> Box<dyn SwarmPluginState<S>>; -} -impl<T, S> SwarmPluginStateClone<S> for T -where - T: 'static + SwarmPluginState<S> + Clone, -{ - fn clone_box(&self) -> Box<dyn SwarmPluginState<S>> { - Box::new(self.clone()) - } -} -impl<S> Clone for Box<dyn SwarmPluginState<S>> { - fn clone(&self) -> Self { - self.clone_box() - } -} - -/// An internal trait that allows SwarmPlugin to be cloned. -#[doc(hidden)] -pub trait SwarmPluginClone<S> { - fn clone_box(&self) -> Box<dyn SwarmPlugin<S>>; -} -impl<T, S> SwarmPluginClone<S> for T -where - T: 'static + SwarmPlugin<S> + Clone, -{ - fn clone_box(&self) -> Box<dyn SwarmPlugin<S>> { - Box::new(self.clone()) - } -} -impl<S> Clone for Box<dyn SwarmPlugin<S>> { - fn clone(&self) -> Self { - self.clone_box() - } -} |
