diff options
| author | mat <27899617+mat-1@users.noreply.github.com> | 2023-02-04 19:32:27 -0600 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-02-04 19:32:27 -0600 |
| commit | a5672815ccef520b433363ac622dbb6d6af60c91 (patch) | |
| tree | f9bb1b41876d81423ac3f188f4d368e6d362eed1 /azalea/src | |
| parent | 7c7446ab1e467c29f86e9bfba260741fc469389a (diff) | |
| download | azalea-drasl-a5672815ccef520b433363ac622dbb6d6af60c91.tar.xz | |
Use an ECS (#52)
* add EntityData::kind
* start making metadata use hecs
* make entity codegen generate ecs stuff
* fix registry codegen
* get rid of worldhaver
it's not even used
* add bevy_ecs to deps
* rename Component to FormattedText
also start making the metadata use bevy_ecs but bevy_ecs doesn't let you query on Bundles so it's annoying
* generate metadata.rs correctly for bevy_ecs
* start switching more entity stuff to use ecs
* more ecs stuff for entity storage
* ok well it compiles but
it definitely doesn't work
* random fixes
* change a bunch of entity things to use the components
* some ecs stuff in az-client
* packet handler uses the ecs now
and other fun changes
i still need to make ticking use the ecs but that's tricker, i'm considering using bevy_ecs systems for those
bevy_ecs systems can't be async but the only async things in ticking is just sending packets which can just be done as a tokio task so that's not a big deal
* start converting some functions in az-client into systems
committing because i'm about to try something that might go horribly wrong
* start splitting client
i'm probably gonna change it so azalea entity ids are separate from minecraft entity ids next (so stuff like player ids can be consistent and we don't have to wait for the login packet)
* separate minecraft entity ids from azalea entity ids + more ecs stuff
i guess i'm using bevy_app now too huh
it's necessary for plugins and it lets us control the tick rate anyways so it's fine i think
i'm still not 100% sure how packet handling that interacts with the world will work, but i think if i can sneak the ecs world into there it'll be fine. Can't put packet handling in the schedule because that'd make it tick-bound, which it's not (technically it'd still work but it'd be wrong and anticheats might realize).
* packet handling
now it runs the schedule only when we get a tick or packet :smile:
also i systemified some more functions and did other random fixes so az-world and az-physics compile
making azalea-client use the ecs is almost done! all the hard parts are done now i hope, i just have to finish writing all the code so it actually works
* start figuring out how functions in Client will work
generally just lifetimes being annoying but i think i can get it all to work
* make writing packets work synchronously*
* huh az-client compiles
* start fixing stuff
* start fixing some packets
* make packet handler work
i still haven't actually tested any of this yet lol but in theory it should all work
i'll probably either actually test az-client and fix all the remaining issues or update the azalea crate next
ok also one thing that i'm not particularly happy with is how the packet handlers are doing ugly queries like
```rs
let local_player = ecs
.query::<&LocalPlayer>()
.get_mut(ecs, player_entity)
.unwrap();
```
i think the right way to solve it would be by putting every packet handler in its own system but i haven't come up with a way to make that not be really annoying yet
* fix warnings
* ok what if i just have a bunch of queries and a single packet handler system
* simple example for azalea-client
* :bug:
* maybe fix deadlock idk
can't test it rn lmao
* make physicsstate its own component
* use the default plugins
* azalea compiles lol
* use systemstate for packet handler
* fix entities
basically moved some stuff from being in the world to just being components
* physics (ticking) works
* try to add a .entity_by function
still doesn't work because i want to make the predicate magic
* try to make entity_by work
well it does work but i couldn't figure out how to make it look not terrible. Will hopefully change in the future
* everything compiles
* start converting swarm to use builder
* continue switching swarm to builder and fix stuff
* make swarm use builder
still have to fix some stuff and make client use builder
* fix death event
* client builder
* fix some warnings
* document plugins a bit
* start trying to fix tests
* azalea-ecs
* azalea-ecs stuff compiles
* az-physics tests pass :tada:
* fix all the tests
* clippy on azalea-ecs-macros
* remove now-unnecessary trait_upcasting feature
* fix some clippy::pedantic warnings lol
* why did cargo fmt not remove the trailing spaces
* FIX ALL THE THINGS
* when i said 'all' i meant non-swarm bugs
* start adding task pool
* fix entity deduplication
* fix pathfinder not stopping
* fix some more random bugs
* fix panic that sometimes happens in swarms
* make pathfinder run in task
* fix some tests
* fix doctests and clippy
* deadlock
* fix systems running in wrong order
* fix non-swarm bots
Diffstat (limited to 'azalea/src')
| -rw-r--r-- | azalea/src/bot.rs | 131 | ||||
| -rw-r--r-- | azalea/src/lib.rs | 209 | ||||
| -rw-r--r-- | azalea/src/pathfinder/mod.rs | 341 | ||||
| -rw-r--r-- | azalea/src/pathfinder/moves.rs | 136 | ||||
| -rw-r--r-- | azalea/src/pathfinder/mtdstarlite.rs | 8 | ||||
| -rw-r--r-- | azalea/src/prelude.rs | 6 | ||||
| -rw-r--r-- | azalea/src/start.rs | 136 | ||||
| -rw-r--r-- | azalea/src/swarm/chat.rs | 344 | ||||
| -rw-r--r-- | azalea/src/swarm/events.rs | 45 | ||||
| -rw-r--r-- | azalea/src/swarm/mod.rs | 614 | ||||
| -rw-r--r-- | azalea/src/swarm/plugins.rs | 135 |
11 files changed, 1105 insertions, 1000 deletions
diff --git a/azalea/src/bot.rs b/azalea/src/bot.rs index 0674c692..510449d3 100644 --- a/azalea/src/bot.rs +++ b/azalea/src/bot.rs @@ -1,56 +1,115 @@ -use crate::{Client, Event}; -use async_trait::async_trait; use azalea_core::Vec3; -use parking_lot::Mutex; -use std::{f64::consts::PI, sync::Arc}; +use azalea_ecs::{ + app::{App, Plugin, PluginGroup, PluginGroupBuilder}, + component::Component, + entity::Entity, + event::EventReader, + query::{With, Without}, + schedule::IntoSystemDescriptor, + system::{Commands, Query}, + AppTickExt, +}; +use azalea_world::{ + entity::{metadata::Player, set_rotation, Jumping, Physics, Position}, + Local, +}; +use std::f64::consts::PI; -#[derive(Clone, Default)] -pub struct Plugin; -impl crate::Plugin for Plugin { - type State = State; +use crate::pathfinder::PathfinderPlugin; - fn build(&self) -> State { - State::default() +#[derive(Clone, Default)] +pub struct BotPlugin; +impl Plugin for BotPlugin { + fn build(&self, app: &mut App) { + app.add_event::<LookAtEvent>() + .add_event::<JumpEvent>() + .add_system(insert_bot.before("deduplicate_entities")) + .add_system(look_at_listener) + .add_system(jump_listener.label("jump_listener").before("ai_step")) + .add_tick_system(stop_jumping.after("ai_step")); } } -#[derive(Default, Clone)] -pub struct State { - jumping_once: Arc<Mutex<bool>>, +/// Component for all bots. +#[derive(Default, Component)] +pub struct Bot { + jumping_once: bool, +} + +/// Insert the [`Bot`] component for any local players that don't have it. +#[allow(clippy::type_complexity)] +fn insert_bot( + mut commands: Commands, + mut query: Query<Entity, (Without<Bot>, With<Local>, With<Player>)>, +) { + for entity in &mut query { + commands.entity(entity).insert(Bot::default()); + } } -#[async_trait] -impl crate::PluginState for State { - async fn handle(self: Box<Self>, event: Event, mut bot: Client) { - if let Event::Tick = event { - if *self.jumping_once.lock() && bot.jumping() { - *self.jumping_once.lock() = false; - bot.set_jumping(false); - } +fn stop_jumping(mut query: Query<(&mut Jumping, &mut Bot)>) { + for (mut jumping, mut bot) in &mut query { + if bot.jumping_once && **jumping { + bot.jumping_once = false; + **jumping = false; } } } -pub trait BotTrait { +pub trait BotClientExt { fn jump(&mut self); - fn look_at(&mut self, pos: &Vec3); + fn look_at(&mut self, pos: Vec3); } -impl BotTrait for azalea_client::Client { +impl BotClientExt for azalea_client::Client { /// Queue a jump for the next tick. fn jump(&mut self) { - self.set_jumping(true); - let state = self.plugins.get::<State>().unwrap().clone(); - *state.jumping_once.lock() = true; + let mut ecs = self.ecs.lock(); + ecs.send_event(JumpEvent(self.entity)); } /// Turn the bot's head to look at the coordinate in the world. - fn look_at(&mut self, pos: &Vec3) { - let (y_rot, x_rot) = direction_looking_at(self.entity().pos(), pos); - self.set_rotation(y_rot, x_rot); + fn look_at(&mut self, position: Vec3) { + let mut ecs = self.ecs.lock(); + ecs.send_event(LookAtEvent { + entity: self.entity, + position, + }); + } +} + +/// Event to jump once. +pub struct JumpEvent(pub Entity); + +fn jump_listener(mut query: Query<(&mut Jumping, &mut Bot)>, mut events: EventReader<JumpEvent>) { + for event in events.iter() { + if let Ok((mut jumping, mut bot)) = query.get_mut(event.0) { + **jumping = true; + bot.jumping_once = true; + } } } +/// Make an entity look towards a certain position in the world. +pub struct LookAtEvent { + pub entity: Entity, + /// The position we want the entity to be looking at. + pub position: Vec3, +} +fn look_at_listener( + mut events: EventReader<LookAtEvent>, + mut query: Query<(&Position, &mut Physics)>, +) { + for event in events.iter() { + if let Ok((position, mut physics)) = query.get_mut(event.entity) { + let (y_rot, x_rot) = direction_looking_at(position, &event.position); + set_rotation(&mut physics, y_rot, x_rot); + } + } +} + +/// Return the (`y_rot`, `x_rot`) that would make a client at `current` be +/// looking at `target`. fn direction_looking_at(current: &Vec3, target: &Vec3) -> (f32, f32) { // borrowed from mineflayer's Bot.lookAt because i didn't want to do math let delta = target - current; @@ -59,3 +118,15 @@ fn direction_looking_at(current: &Vec3, target: &Vec3) -> (f32, f32) { let x_rot = f64::atan2(delta.y, ground_distance) * -(180.0 / PI); (y_rot as f32, x_rot as f32) } + +/// A [`PluginGroup`] for the plugins that add extra bot functionality to the +/// client. +pub struct DefaultBotPlugins; + +impl PluginGroup for DefaultBotPlugins { + fn build(self) -> PluginGroupBuilder { + PluginGroupBuilder::start::<Self>() + .add(BotPlugin) + .add(PathfinderPlugin) + } +} diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs index 4ceb5b2e..026a35f5 100644 --- a/azalea/src/lib.rs +++ b/azalea/src/lib.rs @@ -1,94 +1,139 @@ -//! Azalea is a framework for creating Minecraft bots. -//! -//! Internally, it's just a wrapper over [`azalea_client`], adding useful -//! functions for making bots. Because of this, lots of the documentation will -//! refer to `azalea_client`. You can just replace these with `azalea` in your -//! code, since everything from azalea_client is re-exported in azalea. -//! -//! # Installation -//! -//! First, install Rust nightly with `rustup install nightly` and `rustup -//! default nightly`. -//! -//! Then, add one of the following lines to your Cargo.toml: -//! -//! Latest bleeding-edge version: -//! `azalea = { git="https://github.com/mat-1/azalea" }`\ -//! Latest "stable" release: -//! `azalea = "0.5.0"` -//! -//! ## Optimization -//! -//! For faster compile times, make a `.cargo/config.toml` file in your project -//! and copy -//! [this file](https://github.com/mat-1/azalea/blob/main/.cargo/config.toml) -//! into it. -//! -//! For faster performance in debug mode, add -//! ```toml -//! [profile.dev] -//! opt-level = 1 -//! [profile.dev.package."*"] -//! opt-level = 3 -//! ``` -//! to your Cargo.toml. You may have to install the LLD linker. -//! -//! # Examples -//! -//! ```rust,no_run -//! //! A bot that logs chat messages sent in the server to the console. -//! -//! use azalea::prelude::*; -//! use parking_lot::Mutex; -//! use std::sync::Arc; -//! -//! #[tokio::main] -//! async fn main() { -//! let account = Account::offline("bot"); -//! // or Account::microsoft("example@example.com").await.unwrap(); -//! -//! azalea::start(azalea::Options { -//! account, -//! address: "localhost", -//! state: State::default(), -//! plugins: plugins![], -//! handle, -//! }) -//! .await -//! .unwrap(); -//! } -//! -//! #[derive(Default, Clone)] -//! pub struct State {} -//! -//! async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { -//! match event { -//! Event::Chat(m) => { -//! println!("{}", m.message().to_ansi()); -//! } -//! _ => {} -//! } -//! -//! Ok(()) -//! } -//! ``` -//! -//! [`azalea_client`]: https://docs.rs/azalea-client - -#![feature(trait_upcasting)] +#![doc = include_str!("../README.md")] #![feature(async_closure)] -#![allow(incomplete_features)] mod bot; pub mod pathfinder; pub mod prelude; -mod start; mod swarm; -pub use azalea_block::*; +pub use azalea_block as blocks; pub use azalea_client::*; pub use azalea_core::{BlockPos, Vec3}; -pub use start::{start, Options}; +use azalea_ecs::{ + app::{App, Plugin}, + component::Component, +}; +pub use azalea_protocol as protocol; +pub use azalea_registry::EntityKind; +pub use azalea_world::{entity, World}; +use bot::DefaultBotPlugins; +use ecs::app::PluginGroup; +use futures::Future; +use protocol::{ + resolver::{self, ResolverError}, + ServerAddress, +}; pub use swarm::*; +use thiserror::Error; +use tokio::sync::mpsc; pub type HandleFn<Fut, S> = fn(Client, Event, S) -> Fut; + +#[derive(Error, Debug)] +pub enum StartError { + #[error("Invalid address")] + InvalidAddress, + #[error(transparent)] + ResolveAddress(#[from] ResolverError), + #[error("Join error: {0}")] + Join(#[from] azalea_client::JoinError), +} + +pub struct ClientBuilder<S, Fut> +where + S: Default + Send + Sync + Clone + 'static, + Fut: Future<Output = Result<(), anyhow::Error>>, +{ + app: App, + /// The function that's called every time a bot receives an [`Event`]. + handler: Option<HandleFn<Fut, S>>, + state: S, +} +impl<S, Fut> ClientBuilder<S, Fut> +where + S: Default + Send + Sync + Clone + Component + 'static, + Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, +{ + /// Start building a client that can join the world. + #[must_use] + pub fn new() -> Self { + Self { + // we create the app here so plugins can add onto it. + // the schedules won't run until [`Self::start`] is called. + app: init_ecs_app(), + + handler: None, + state: S::default(), + } + .add_plugins(DefaultBotPlugins) + } + /// Set the function that's called every time a bot receives an [`Event`]. + /// This is the way to handle normal per-bot events. + /// + /// You can only have one client handler, calling this again will replace + /// the old client handler function (you can have a client handler and swarm + /// handler separately though). + #[must_use] + pub fn set_handler(mut self, handler: HandleFn<Fut, S>) -> Self { + self.handler = Some(handler); + self + } + /// Add a plugin to the client. + #[must_use] + pub fn add_plugin<T: Plugin>(mut self, plugin: T) -> Self { + self.app.add_plugin(plugin); + self + } + /// Add a group of plugins to the client. + #[must_use] + pub fn add_plugins<T: PluginGroup>(mut self, plugin_group: T) -> Self { + self.app.add_plugins(plugin_group); + self + } + + /// Build this `ClientBuilder` into an actual [`Client`] and join the given + /// server. + /// + /// The `address` argument can be a `&str`, [`ServerAddress`], or anything + /// that implements `TryInto<ServerAddress>`. + /// + /// [`ServerAddress`]: azalea_protocol::ServerAddress + pub async fn start( + self, + account: Account, + address: impl TryInto<ServerAddress>, + ) -> Result<(), StartError> { + let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?; + let resolved_address = resolver::resolve_address(&address).await?; + + // An event that causes the schedule to run. This is only used internally. + let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1); + let ecs_lock = start_ecs(self.app, run_schedule_receiver, run_schedule_sender.clone()); + + let (bot, mut rx) = Client::start_client( + ecs_lock, + &account, + &address, + &resolved_address, + run_schedule_sender, + ) + .await?; + + while let Some(event) = rx.recv().await { + if let Some(handler) = self.handler { + tokio::spawn((handler)(bot.clone(), event.clone(), self.state.clone())); + } + } + + Ok(()) + } +} +impl<S, Fut> Default for ClientBuilder<S, Fut> +where + S: Default + Send + Sync + Clone + Component + 'static, + Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/azalea/src/pathfinder/mod.rs b/azalea/src/pathfinder/mod.rs index de62e9a7..5246290d 100644 --- a/azalea/src/pathfinder/mod.rs +++ b/azalea/src/pathfinder/mod.rs @@ -1,147 +1,255 @@ mod moves; mod mtdstarlite; -use crate::{prelude::*, SprintDirection, WalkDirection}; -use crate::{Client, Event}; -use async_trait::async_trait; +use crate::bot::{JumpEvent, LookAtEvent}; +use crate::{SprintDirection, WalkDirection}; + +use azalea_client::{StartSprintEvent, StartWalkEvent}; use azalea_core::{BlockPos, CardinalDirection}; -use azalea_world::entity::EntityData; +use azalea_ecs::{ + app::{App, Plugin}, + component::Component, + entity::Entity, + event::{EventReader, EventWriter}, + query::{With, Without}, + schedule::IntoSystemDescriptor, + system::{Commands, Query, Res}, + AppTickExt, +}; +use azalea_world::entity::metadata::Player; +use azalea_world::Local; +use azalea_world::{ + entity::{Physics, Position, WorldName}, + WorldContainer, +}; +use bevy_tasks::{AsyncComputeTaskPool, Task}; +use futures_lite::future; use log::{debug, error}; use mtdstarlite::Edge; pub use mtdstarlite::MTDStarLite; -use parking_lot::Mutex; use std::collections::VecDeque; use std::sync::Arc; #[derive(Clone, Default)] -pub struct Plugin; -impl crate::Plugin for Plugin { - type State = State; - - fn build(&self) -> State { - State::default() +pub struct PathfinderPlugin; +impl Plugin for PathfinderPlugin { + fn build(&self, app: &mut App) { + app.add_event::<GotoEvent>() + .add_event::<PathFoundEvent>() + .add_tick_system(tick_execute_path.before("walk_listener")) + .add_system(goto_listener) + .add_system(add_default_pathfinder.after("deduplicate_entities")) + .add_system(handle_tasks) + .add_system(path_found_listener); } } -#[derive(Default, Clone)] -pub struct State { - // pathfinder: Option<MTDStarLite<Node, f32>>, - pub path: Arc<Mutex<VecDeque<Node>>>, +/// A component that makes this entity able to pathfind. +#[derive(Component, Default)] +pub struct Pathfinder { + pub path: VecDeque<Node>, +} +#[allow(clippy::type_complexity)] +fn add_default_pathfinder( + mut commands: Commands, + mut query: Query<Entity, (Without<Pathfinder>, With<Local>, With<Player>)>, +) { + for entity in &mut query { + commands.entity(entity).insert(Pathfinder::default()); + } } -#[async_trait] -impl crate::PluginState for State { - async fn handle(self: Box<Self>, event: Event, mut bot: Client) { - if let Event::Tick = event { - let mut path = self.path.lock(); +pub trait PathfinderClientExt { + fn goto(&self, goal: impl Goal + Send + Sync + 'static); +} - if !path.is_empty() { - tick_execute_path(&mut bot, &mut path); - } - } +impl PathfinderClientExt for azalea_client::Client { + fn goto(&self, goal: impl Goal + Send + Sync + 'static) { + self.ecs.lock().send_event(GotoEvent { + entity: self.entity, + goal: Arc::new(goal), + }); } } - -pub trait Trait { - fn goto(&self, goal: impl Goal); +pub struct GotoEvent { + pub entity: Entity, + pub goal: Arc<dyn Goal + Send + Sync>, } +pub struct PathFoundEvent { + pub entity: Entity, + pub path: VecDeque<Node>, +} + +#[derive(Component)] +pub struct ComputePath(Task<Option<PathFoundEvent>>); -impl Trait for azalea_client::Client { - fn goto(&self, goal: impl Goal) { +fn goto_listener( + mut commands: Commands, + mut events: EventReader<GotoEvent>, + mut query: Query<(&Position, &WorldName)>, + world_container: Res<WorldContainer>, +) { + let thread_pool = AsyncComputeTaskPool::get(); + + for event in events.iter() { + let (position, world_name) = query + .get_mut(event.entity) + .expect("Called goto on an entity that's not in the world"); let start = Node { - pos: BlockPos::from(self.entity().pos()), + pos: BlockPos::from(position), vertical_vel: VerticalVel::None, }; - let end = goal.goal_node(); - debug!("start: {start:?}, end: {end:?}"); - - let possible_moves: Vec<&dyn moves::Move> = vec![ - &moves::ForwardMove(CardinalDirection::North), - &moves::ForwardMove(CardinalDirection::East), - &moves::ForwardMove(CardinalDirection::South), - &moves::ForwardMove(CardinalDirection::West), - // - &moves::AscendMove(CardinalDirection::North), - &moves::AscendMove(CardinalDirection::East), - &moves::AscendMove(CardinalDirection::South), - &moves::AscendMove(CardinalDirection::West), - // - &moves::DescendMove(CardinalDirection::North), - &moves::DescendMove(CardinalDirection::East), - &moves::DescendMove(CardinalDirection::South), - &moves::DescendMove(CardinalDirection::West), - // - &moves::DiagonalMove(CardinalDirection::North), - &moves::DiagonalMove(CardinalDirection::East), - &moves::DiagonalMove(CardinalDirection::South), - &moves::DiagonalMove(CardinalDirection::West), - ]; - - let successors = |node: &Node| { - let mut edges = Vec::new(); - - let world = &self.world.read().shared; - for possible_move in possible_moves.iter() { - edges.push(Edge { - target: possible_move.next_node(node), - cost: possible_move.cost(world, node), - }); + + let world_lock = world_container + .get(world_name) + .expect("Entity tried to pathfind but the entity isn't in a valid world"); + let end = event.goal.goal_node(); + + let goal = event.goal.clone(); + let entity = event.entity; + + let task = thread_pool.spawn(async move { + debug!("start: {start:?}, end: {end:?}"); + + let possible_moves: Vec<&dyn moves::Move> = vec![ + &moves::ForwardMove(CardinalDirection::North), + &moves::ForwardMove(CardinalDirection::East), + &moves::ForwardMove(CardinalDirection::South), + &moves::ForwardMove(CardinalDirection::West), + // + &moves::AscendMove(CardinalDirection::North), + &moves::AscendMove(CardinalDirection::East), + &moves::AscendMove(CardinalDirection::South), + &moves::AscendMove(CardinalDirection::West), + // + &moves::DescendMove(CardinalDirection::North), + &moves::DescendMove(CardinalDirection::East), + &moves::DescendMove(CardinalDirection::South), + &moves::DescendMove(CardinalDirection::West), + // + &moves::DiagonalMove(CardinalDirection::North), + &moves::DiagonalMove(CardinalDirection::East), + &moves::DiagonalMove(CardinalDirection::South), + &moves::DiagonalMove(CardinalDirection::West), + ]; + + let successors = |node: &Node| { + let mut edges = Vec::new(); + + let world = world_lock.read(); + for possible_move in &possible_moves { + edges.push(Edge { + target: possible_move.next_node(node), + cost: possible_move.cost(&world, node), + }); + } + edges + }; + + let mut pf = MTDStarLite::new( + start, + end, + |n| goal.heuristic(n), + successors, + successors, + |n| goal.success(n), + ); + + let start_time = std::time::Instant::now(); + let p = pf.find_path(); + let end_time = std::time::Instant::now(); + debug!("path: {p:?}"); + debug!("time: {:?}", end_time - start_time); + + // convert the Option<Vec<Node>> to a VecDeque<Node> + if let Some(p) = p { + let path = p.into_iter().collect::<VecDeque<_>>(); + // commands.entity(event.entity).insert(Pathfinder { path: p }); + Some(PathFoundEvent { entity, path }) + } else { + error!("no path found"); + None } - edges - }; + }); - let mut pf = MTDStarLite::new( - start, - end, - |n| goal.heuristic(n), - successors, - successors, - |n| goal.success(n), - ); - - let start = std::time::Instant::now(); - let p = pf.find_path(); - let end = std::time::Instant::now(); - debug!("path: {p:?}"); - debug!("time: {:?}", end - start); - - let state = self - .plugins - .get::<State>() - .expect("Pathfinder plugin not installed!") - .clone(); - // convert the Option<Vec<Node>> to a VecDeque<Node> - if let Some(p) = p { - *state.path.lock() = p.into_iter().collect(); - } else { - error!("no path found"); + commands.spawn(ComputePath(task)); + } +} + +// poll the tasks and send the PathFoundEvent if they're done +fn handle_tasks( + mut commands: Commands, + mut transform_tasks: Query<(Entity, &mut ComputePath)>, + mut path_found_events: EventWriter<PathFoundEvent>, +) { + for (entity, mut task) in &mut transform_tasks { + if let Some(optional_path_found_event) = future::block_on(future::poll_once(&mut task.0)) { + if let Some(path_found_event) = optional_path_found_event { + path_found_events.send(path_found_event); + } + + // Task is complete, so remove task component from entity + commands.entity(entity).remove::<ComputePath>(); } } } -fn tick_execute_path(bot: &mut Client, path: &mut VecDeque<Node>) { - let target = if let Some(target) = path.front() { - target - } else { - return; - }; - let center = target.pos.center(); - // println!("going to {center:?} (at {pos:?})", pos = bot.entity().pos()); - bot.look_at(¢er); - bot.sprint(SprintDirection::Forward); - // check if we should jump - if target.pos.y > bot.entity().pos().y.floor() as i32 { - bot.jump(); +// set the path for the target entity when we get the PathFoundEvent +fn path_found_listener(mut events: EventReader<PathFoundEvent>, mut query: Query<&mut Pathfinder>) { + for event in events.iter() { + let mut pathfinder = query + .get_mut(event.entity) + .expect("Path found for an entity that doesn't have a pathfinder"); + pathfinder.path = event.path.clone(); } +} - if target.is_reached(&bot.entity()) { - // println!("ok target {target:?} reached"); - path.pop_front(); - if path.is_empty() { - bot.walk(WalkDirection::None); +fn tick_execute_path( + mut query: Query<(Entity, &mut Pathfinder, &Position, &Physics)>, + mut look_at_events: EventWriter<LookAtEvent>, + mut sprint_events: EventWriter<StartSprintEvent>, + mut walk_events: EventWriter<StartWalkEvent>, + mut jump_events: EventWriter<JumpEvent>, +) { + for (entity, mut pathfinder, position, physics) in &mut query { + loop { + let Some(target) = pathfinder.path.front() else { + break; + }; + let center = target.pos.center(); + // println!("going to {center:?} (at {pos:?})", pos = bot.entity().pos()); + look_at_events.send(LookAtEvent { + entity, + position: center, + }); + debug!( + "tick: pathfinder {entity:?}; going to {:?}; currently at {position:?}", + target.pos + ); + sprint_events.send(StartSprintEvent { + entity, + direction: SprintDirection::Forward, + }); + // check if we should jump + if target.pos.y > position.y.floor() as i32 { + jump_events.send(JumpEvent(entity)); + } + + if target.is_reached(position, physics) { + // println!("reached target"); + pathfinder.path.pop_front(); + if pathfinder.path.is_empty() { + // println!("reached goal"); + walk_events.send(StartWalkEvent { + entity, + direction: WalkDirection::None, + }); + } + // tick again, maybe we already reached the next node! + } else { + break; + } } - // tick again, maybe we already reached the next node! - tick_execute_path(bot, path); } } @@ -172,7 +280,8 @@ pub trait Goal { impl Node { /// Returns whether the entity is at the node and should start going to the /// next node. - pub fn is_reached(&self, entity: &EntityData) -> bool { + #[must_use] + pub fn is_reached(&self, position: &Position, physics: &Physics) -> bool { // println!( // "entity.delta.y: {} {:?}=={:?}, self.vertical_vel={:?}", // entity.delta.y, @@ -180,11 +289,11 @@ impl Node { // self.pos, // self.vertical_vel // ); - BlockPos::from(entity.pos()) == self.pos + BlockPos::from(position) == self.pos && match self.vertical_vel { - VerticalVel::NoneMidair => entity.delta.y > -0.1 && entity.delta.y < 0.1, - VerticalVel::None => entity.on_ground, - VerticalVel::FallingLittle => entity.delta.y < -0.1, + VerticalVel::NoneMidair => physics.delta.y > -0.1 && physics.delta.y < 0.1, + VerticalVel::None => physics.on_ground, + VerticalVel::FallingLittle => physics.delta.y < -0.1, } } } diff --git a/azalea/src/pathfinder/moves.rs b/azalea/src/pathfinder/moves.rs index ccf8ba1a..9bb5c7c1 100644 --- a/azalea/src/pathfinder/moves.rs +++ b/azalea/src/pathfinder/moves.rs @@ -1,11 +1,11 @@ use super::{Node, VerticalVel}; use azalea_core::{BlockPos, CardinalDirection}; use azalea_physics::collision::{self, BlockWithShape}; -use azalea_world::WeakWorld; +use azalea_world::World; /// whether this block is passable -fn is_block_passable(pos: &BlockPos, world: &WeakWorld) -> bool { - if let Some(block) = world.get_block_state(pos) { +fn is_block_passable(pos: &BlockPos, world: &World) -> bool { + if let Some(block) = world.chunks.get_block_state(pos) { block.shape() == &collision::empty_shape() } else { false @@ -13,8 +13,8 @@ fn is_block_passable(pos: &BlockPos, world: &WeakWorld) -> bool { } /// whether this block has a solid hitbox (i.e. we can stand on it) -fn is_block_solid(pos: &BlockPos, world: &WeakWorld) -> bool { - if let Some(block) = world.get_block_state(pos) { +fn is_block_solid(pos: &BlockPos, world: &World) -> bool { + if let Some(block) = world.chunks.get_block_state(pos) { block.shape() == &collision::block_shape() } else { false @@ -22,22 +22,22 @@ fn is_block_solid(pos: &BlockPos, world: &WeakWorld) -> bool { } /// Whether this block and the block above are passable -fn is_passable(pos: &BlockPos, world: &WeakWorld) -> bool { +fn is_passable(pos: &BlockPos, world: &World) -> bool { is_block_passable(pos, world) && is_block_passable(&pos.up(1), world) } /// Whether we can stand in this position. Checks if the block below is solid, /// and that the two blocks above that are passable. -fn is_standable(pos: &BlockPos, world: &WeakWorld) -> bool { +fn is_standable(pos: &BlockPos, world: &World) -> bool { is_block_solid(&pos.down(1), world) && is_passable(pos, world) } const JUMP_COST: f32 = 0.5; const WALK_ONE_BLOCK_COST: f32 = 1.0; -pub trait Move { - fn cost(&self, world: &WeakWorld, node: &Node) -> f32; +pub trait Move: Send + Sync { + fn cost(&self, world: &World, node: &Node) -> f32; /// Returns by how much the entity's position should be changed when this /// move is executed. fn offset(&self) -> BlockPos; @@ -51,7 +51,7 @@ pub trait Move { pub struct ForwardMove(pub CardinalDirection); impl Move for ForwardMove { - fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { + fn cost(&self, world: &World, node: &Node) -> f32 { if is_standable(&(node.pos + self.offset()), world) && node.vertical_vel == VerticalVel::None { @@ -67,7 +67,7 @@ impl Move for ForwardMove { pub struct AscendMove(pub CardinalDirection); impl Move for AscendMove { - fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { + fn cost(&self, world: &World, node: &Node) -> f32 { if node.vertical_vel == VerticalVel::None && is_block_passable(&node.pos.up(2), world) && is_standable(&(node.pos + self.offset()), world) @@ -89,7 +89,7 @@ impl Move for AscendMove { } pub struct DescendMove(pub CardinalDirection); impl Move for DescendMove { - fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { + fn cost(&self, world: &World, node: &Node) -> f32 { // check whether 3 blocks vertically forward are passable if node.vertical_vel == VerticalVel::None && is_standable(&(node.pos + self.offset()), world) @@ -112,7 +112,7 @@ impl Move for DescendMove { } pub struct DiagonalMove(pub CardinalDirection); impl Move for DiagonalMove { - fn cost(&self, world: &WeakWorld, node: &Node) -> f32 { + fn cost(&self, world: &World, node: &Node) -> f32 { if node.vertical_vel != VerticalVel::None { return f32::INFINITY; } @@ -151,56 +151,92 @@ mod tests { use super::*; use azalea_block::BlockState; use azalea_core::ChunkPos; - use azalea_world::{Chunk, PartialWorld}; + use azalea_world::{Chunk, ChunkStorage, PartialWorld}; #[test] fn test_is_passable() { - let mut world = PartialWorld::default(); - world - .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default())) - .unwrap(); - world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone); - world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air); - - assert_eq!( - is_block_passable(&BlockPos::new(0, 0, 0), &world.shared), - false + let mut partial_world = PartialWorld::default(); + let mut chunk_storage = ChunkStorage::default(); + + partial_world.chunks.set( + &ChunkPos { x: 0, z: 0 }, + Some(Chunk::default()), + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 0, 0), + BlockState::Stone, + &mut chunk_storage, ); - assert_eq!( - is_block_passable(&BlockPos::new(0, 1, 0), &world.shared), - true + partial_world.chunks.set_block_state( + &BlockPos::new(0, 1, 0), + BlockState::Air, + &mut chunk_storage, ); + + let world = chunk_storage.into(); + assert_eq!(is_block_passable(&BlockPos::new(0, 0, 0), &world), false); + assert_eq!(is_block_passable(&BlockPos::new(0, 1, 0), &world), true); } #[test] fn test_is_solid() { - let mut world = PartialWorld::default(); - world - .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default())) - .unwrap(); - world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone); - world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air); - - assert_eq!(is_block_solid(&BlockPos::new(0, 0, 0), &world.shared), true); - assert_eq!( - is_block_solid(&BlockPos::new(0, 1, 0), &world.shared), - false + let mut partial_world = PartialWorld::default(); + let mut chunk_storage = ChunkStorage::default(); + partial_world.chunks.set( + &ChunkPos { x: 0, z: 0 }, + Some(Chunk::default()), + &mut chunk_storage, ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 0, 0), + BlockState::Stone, + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 1, 0), + BlockState::Air, + &mut chunk_storage, + ); + + let world = chunk_storage.into(); + assert_eq!(is_block_solid(&BlockPos::new(0, 0, 0), &world), true); + assert_eq!(is_block_solid(&BlockPos::new(0, 1, 0), &world), false); } #[test] fn test_is_standable() { - let mut world = PartialWorld::default(); - world - .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default())) - .unwrap(); - world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone); - world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air); - world.set_block_state(&BlockPos::new(0, 2, 0), BlockState::Air); - world.set_block_state(&BlockPos::new(0, 3, 0), BlockState::Air); - - assert!(is_standable(&BlockPos::new(0, 1, 0), &world.shared)); - assert!(!is_standable(&BlockPos::new(0, 0, 0), &world.shared)); - assert!(!is_standable(&BlockPos::new(0, 2, 0), &world.shared)); + let mut partial_world = PartialWorld::default(); + let mut chunk_storage = ChunkStorage::default(); + partial_world.chunks.set( + &ChunkPos { x: 0, z: 0 }, + Some(Chunk::default()), + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 0, 0), + BlockState::Stone, + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 1, 0), + BlockState::Air, + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 2, 0), + BlockState::Air, + &mut chunk_storage, + ); + partial_world.chunks.set_block_state( + &BlockPos::new(0, 3, 0), + BlockState::Air, + &mut chunk_storage, + ); + + let world = chunk_storage.into(); + assert!(is_standable(&BlockPos::new(0, 1, 0), &world)); + assert!(!is_standable(&BlockPos::new(0, 0, 0), &world)); + assert!(!is_standable(&BlockPos::new(0, 2, 0), &world)); } } diff --git a/azalea/src/pathfinder/mtdstarlite.rs b/azalea/src/pathfinder/mtdstarlite.rs index 50a467df..ce463279 100644 --- a/azalea/src/pathfinder/mtdstarlite.rs +++ b/azalea/src/pathfinder/mtdstarlite.rs @@ -3,9 +3,9 @@ //! //! Future optimization attempt ideas: //! - Use a different priority queue (e.g. fibonacci heap) -//! - Use FxHash instead of the default hasher +//! - Use `FxHash` instead of the default hasher //! - Have `par` be a raw pointer -//! - Try borrowing vs copying the Node in several places (like state_mut) +//! - Try borrowing vs copying the Node in several places (like `state_mut`) //! - Store edge costs in their own map use priority_queue::DoublePriorityQueue; @@ -260,9 +260,7 @@ impl< // identify a path from sstart to sgoal using the parent pointers let mut target = self.state(&self.goal).par; while !(Some(self.start) == target) { - let this_target = if let Some(this_target) = target { - this_target - } else { + let Some(this_target) = target else { break; }; // hunter follows path from start to goal; diff --git a/azalea/src/prelude.rs b/azalea/src/prelude.rs index bedf724f..3d8cc13e 100644 --- a/azalea/src/prelude.rs +++ b/azalea/src/prelude.rs @@ -1,7 +1,7 @@ //! The Azalea prelude. Things that are necessary for a bare-bones bot are //! re-exported here. -pub use crate::bot::BotTrait; -pub use crate::pathfinder::Trait; -pub use crate::{plugins, swarm_plugins, Plugin}; +pub use crate::bot::BotClientExt; +pub use crate::pathfinder::PathfinderClientExt; pub use azalea_client::{Account, Client, Event}; +pub use azalea_ecs::component::Component; diff --git a/azalea/src/start.rs b/azalea/src/start.rs deleted file mode 100644 index e2374fb8..00000000 --- a/azalea/src/start.rs +++ /dev/null @@ -1,136 +0,0 @@ -use crate::{bot, pathfinder, HandleFn}; -use azalea_client::{Account, Client, Plugins}; -use azalea_protocol::ServerAddress; -use std::{future::Future, sync::Arc}; -use thiserror::Error; - -/// A helper macro that generates a [`Plugins`] struct from a list of objects -/// that implement [`Plugin`]. -/// -/// ```rust,no_run -/// plugins![azalea_pathfinder::Plugin]; -/// ``` -/// -/// [`Plugin`]: crate::Plugin -#[macro_export] -macro_rules! plugins { - ($($plugin:expr),*) => { - { - let mut plugins = azalea::Plugins::new(); - $( - plugins.add($plugin); - )* - plugins - } - }; -} - -/// The options that are passed to [`azalea::start`]. -/// -/// [`azalea::start`]: crate::start() -pub struct Options<S, A, Fut> -where - A: TryInto<ServerAddress>, - Fut: Future<Output = Result<(), anyhow::Error>>, -{ - /// The address of the server that we're connecting to. This can be a - /// `&str`, [`ServerAddress`], or anything that implements - /// `TryInto<ServerAddress>`. - /// - /// [`ServerAddress`]: azalea_protocol::ServerAddress - pub address: A, - /// The account that's going to join the server. - pub account: Account, - /// The plugins that are going to be used. Plugins are external crates that - /// add extra functionality to Azalea. You should use the [`plugins`] macro - /// for this field. - /// - /// ```rust,no_run - /// plugins![azalea_pathfinder::Plugin] - /// ``` - pub plugins: Plugins, - /// A struct that contains the data that you want your bot to remember - /// across events. - /// - /// # Examples - /// - /// ```rust - /// use parking_lot::Mutex; - /// use std::sync::Arc; - /// - /// #[derive(Default, Clone)] - /// struct State { - /// farming: Arc<Mutex<bool>>, - /// } - /// ``` - pub state: S, - /// The function that's called whenever we get an event. - /// - /// # Examples - /// - /// ```rust - /// use azalea::prelude::*; - /// - /// async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { - /// Ok(()) - /// } - /// ``` - pub handle: HandleFn<Fut, S>, -} - -#[derive(Error, Debug)] -pub enum StartError { - #[error("Invalid address")] - InvalidAddress, - #[error("Join error: {0}")] - Join(#[from] azalea_client::JoinError), -} - -/// Join a server and start handling events. This function will run forever -/// until it gets disconnected from the server. -/// -/// # Examples -/// -/// ```rust,no_run -/// let error = azalea::start(azalea::Options { -/// account, -/// address: "localhost", -/// state: State::default(), -/// plugins: plugins![azalea_pathfinder::Plugin], -/// handle, -/// }).await; -/// ``` -pub async fn start< - S: Send + Sync + Clone + 'static, - A: Send + TryInto<ServerAddress>, - Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, ->( - options: Options<S, A, Fut>, -) -> Result<(), StartError> { - let address = match options.address.try_into() { - Ok(address) => address, - Err(_) => return Err(StartError::InvalidAddress), - }; - - let (mut bot, mut rx) = Client::join(&options.account, address).await?; - - let mut plugins = options.plugins; - // DEFAULT PLUGINS - plugins.add(bot::Plugin); - plugins.add(pathfinder::Plugin); - - bot.plugins = Arc::new(plugins.build()); - - let state = options.state; - - while let Some(event) = rx.recv().await { - let cloned_plugins = (*bot.plugins).clone(); - for plugin in cloned_plugins.into_iter() { - tokio::spawn(plugin.handle(event.clone(), bot.clone())); - } - - tokio::spawn((options.handle)(bot.clone(), event.clone(), state.clone())); - } - - Ok(()) -} diff --git a/azalea/src/swarm/chat.rs b/azalea/src/swarm/chat.rs index 4582c59e..a55e9bf6 100644 --- a/azalea/src/swarm/chat.rs +++ b/azalea/src/swarm/chat.rs @@ -1,4 +1,4 @@ -//! Implements SwarmEvent::Chat +//! Implements `SwarmEvent::Chat`. // How the chat event works (to avoid firing the event multiple times): // --- @@ -13,71 +13,81 @@ // in Swarm that's set to the smallest index of all the bots, and we remove all // messages from the queue that are before that index. +use azalea_client::{packet_handling::ChatReceivedEvent, ChatPacket}; +use azalea_ecs::{ + app::{App, Plugin}, + component::Component, + event::{EventReader, EventWriter}, + schedule::IntoSystemDescriptor, + system::{Commands, Query, Res, ResMut, Resource}, +}; +use std::collections::VecDeque; + use crate::{Swarm, SwarmEvent}; -use async_trait::async_trait; -use azalea_client::{ChatPacket, Client, Event}; -use parking_lot::Mutex; -use std::{collections::VecDeque, sync::Arc}; -use tokio::sync::broadcast::{Receiver, Sender}; #[derive(Clone)] -pub struct Plugin { - pub swarm_state: SwarmState, - pub tx: Sender<ChatPacket>, -} - -impl crate::Plugin for Plugin { - type State = State; - - fn build(&self) -> State { - State { - chat_index: Arc::new(Mutex::new(0)), - swarm_state: self.swarm_state.clone(), - tx: self.tx.clone(), - } +pub struct SwarmChatPlugin; +impl Plugin for SwarmChatPlugin { + fn build(&self, app: &mut App) { + app.add_event::<NewChatMessageEvent>() + .add_system(chat_listener.label("chat_listener")) + .add_system(update_min_index_and_shrink_queue.after("chat_listener")) + .insert_resource(GlobalChatState { + chat_queue: VecDeque::new(), + chat_min_index: 0, + }); } } -#[derive(Clone)] -pub struct State { - pub chat_index: Arc<Mutex<usize>>, - pub tx: Sender<ChatPacket>, - pub swarm_state: SwarmState, +#[derive(Component, Debug)] +pub struct ClientChatState { + pub chat_index: usize, } -#[derive(Clone)] -pub struct SwarmState { - pub chat_queue: Arc<Mutex<VecDeque<ChatPacket>>>, - pub chat_min_index: Arc<Mutex<usize>>, - pub rx: Arc<tokio::sync::Mutex<Receiver<ChatPacket>>>, +/// A chat message that no other bots have seen yet was received by a bot. +#[derive(Debug)] +pub struct NewChatMessageEvent(ChatPacket); + +#[derive(Resource)] +pub struct GlobalChatState { + pub chat_queue: VecDeque<ChatPacket>, + pub chat_min_index: usize, } -impl State { - pub fn handle_chat(&self, message: ChatPacket) { +fn chat_listener( + mut commands: Commands, + mut query: Query<&mut ClientChatState>, + mut events: EventReader<ChatReceivedEvent>, + mut global_chat_state: ResMut<GlobalChatState>, + mut new_chat_messages_events: EventWriter<NewChatMessageEvent>, +) { + for event in events.iter() { + let mut client_chat_state = query.get_mut(event.entity); + let mut client_chat_index = if let Ok(ref client_chat_state) = client_chat_state { + client_chat_state.chat_index + } else { + // if the client has no chat state, we default to this and insert it at the end + global_chat_state.chat_min_index + }; + // When a bot receives a chat messages, it looks into the queue to find the // earliest instance of the message content that's after the bot's chat index. // If it finds it, then its personal index is simply updated. Otherwise, fire // the event and add to the queue. - let mut chat_queue = self.swarm_state.chat_queue.lock(); - let chat_min_index = self.swarm_state.chat_min_index.lock(); - let mut chat_index = self.chat_index.lock(); - - if *chat_min_index > *chat_index { - // if this happens it's because this bot just logged in, so - // ignore it and let another bot handle it - println!("chat_min_index ({chat_min_index}) > chat_index ({chat_index})"); - *chat_index = *chat_min_index; - return; - } - let actual_vec_index = *chat_index - *chat_min_index; + let actual_vec_index = client_chat_index - global_chat_state.chat_min_index; // go through the queue and find the first message that's after the bot's index let mut found = false; - for (i, past_message) in chat_queue.iter().enumerate().skip(actual_vec_index) { - if past_message == &message { + for (i, past_message) in global_chat_state + .chat_queue + .iter() + .enumerate() + .skip(actual_vec_index) + { + if past_message == &event.packet { // found the message, update the index - *chat_index = i + *chat_min_index + 1; + client_chat_index = i + global_chat_state.chat_min_index + 1; found = true; break; } @@ -85,185 +95,157 @@ impl State { if !found { // didn't find the message, so fire the swarm event and add to the queue - self.tx - .send(message.clone()) - .expect("failed to send chat message to swarm"); - chat_queue.push_back(message); - *chat_index = chat_queue.len() + *chat_min_index; + new_chat_messages_events.send(NewChatMessageEvent(event.packet.clone())); + global_chat_state.chat_queue.push_back(event.packet.clone()); + client_chat_index = + global_chat_state.chat_queue.len() + global_chat_state.chat_min_index; } - } -} - -#[async_trait] -impl crate::PluginState for State { - async fn handle(self: Box<Self>, event: Event, _bot: Client) { - // we're allowed to access Plugin::swarm_state since it's shared for every bot - if let Event::Chat(m) = event { - self.handle_chat(m); + if let Ok(ref mut client_chat_state) = client_chat_state { + client_chat_state.chat_index = client_chat_index; + } else { + commands.entity(event.entity).insert(ClientChatState { + chat_index: client_chat_index, + }); } } } -impl SwarmState { - pub fn new<S>(swarm: Swarm<S>) -> (Self, Sender<ChatPacket>) - where - S: Send + Sync + Clone + 'static, - { - let (tx, rx) = tokio::sync::broadcast::channel(1); - - let swarm_state = SwarmState { - chat_queue: Arc::new(Mutex::new(VecDeque::new())), - chat_min_index: Arc::new(Mutex::new(0)), - rx: Arc::new(tokio::sync::Mutex::new(rx)), - }; - tokio::spawn(swarm_state.clone().start(swarm)); - - (swarm_state, tx) - } - async fn start<S>(self, swarm: Swarm<S>) - where - S: Send + Sync + Clone + 'static, - { - // it should never be locked unless we reused the same plugin for two swarms - // (bad) - let mut rx = self.rx.lock().await; - while let Ok(m) = rx.recv().await { - swarm.swarm_tx.send(SwarmEvent::Chat(m)).unwrap(); - let bot_states = swarm - .bot_datas - .lock() - .iter() - .map(|(bot, _)| { - bot.plugins - .get::<State>() - .expect("Chat plugin not installed") - .clone() - }) - .collect::<Vec<_>>(); - self.handle_new_chat_message(&bot_states); +fn update_min_index_and_shrink_queue( + query: Query<&ClientChatState>, + mut global_chat_state: ResMut<GlobalChatState>, + mut events: EventReader<NewChatMessageEvent>, + swarm: Option<Res<Swarm>>, +) { + for event in events.iter() { + if let Some(swarm) = &swarm { + // it should also work if Swarm isn't present (so the tests don't need it) + swarm + .swarm_tx + .send(SwarmEvent::Chat(event.0.clone())) + .unwrap(); } - } -} - -impl SwarmState { - pub fn handle_new_chat_message(&self, bot_states: &[State]) { // To make sure the queue doesn't grow too large, we keep a `chat_min_index` // in Swarm that's set to the smallest index of all the bots, and we remove all // messages from the queue that are before that index. - let chat_min_index = *self.chat_min_index.lock(); - let mut new_chat_min_index = usize::MAX; - for bot_state in bot_states { - let this_chat_index = *bot_state.chat_index.lock(); + let mut new_chat_min_index = global_chat_state.chat_min_index; + for client_chat_state in query.iter() { + let this_chat_index = client_chat_state.chat_index; if this_chat_index < new_chat_min_index { new_chat_min_index = this_chat_index; } } - let mut chat_queue = self.chat_queue.lock(); - if chat_min_index > new_chat_min_index { - println!( - "chat_min_index ({chat_min_index}) > new_chat_min_index ({new_chat_min_index})" - ); + if global_chat_state.chat_min_index > new_chat_min_index { return; } // remove all messages from the queue that are before the min index - for _ in 0..(new_chat_min_index - chat_min_index) { - chat_queue.pop_front(); + for _ in 0..(new_chat_min_index - global_chat_state.chat_min_index) { + global_chat_state.chat_queue.pop_front(); } // update the min index - *self.chat_min_index.lock() = new_chat_min_index; + global_chat_state.chat_min_index = new_chat_min_index; } } #[cfg(test)] mod tests { + use azalea_ecs::{ecs::Ecs, event::Events, system::SystemState}; + use super::*; + fn make_test_app() -> App { + let mut app = App::new(); + // we add the events like this instead of with .add_event so we can have our own + // event mangement in drain_events + app.init_resource::<Events<ChatReceivedEvent>>() + .init_resource::<Events<NewChatMessageEvent>>() + .add_system(chat_listener.label("chat_listener")) + .add_system(update_min_index_and_shrink_queue.after("chat_listener")) + .insert_resource(GlobalChatState { + chat_queue: VecDeque::new(), + chat_min_index: 0, + }); + app + } + + fn drain_events(ecs: &mut Ecs) -> Vec<ChatPacket> { + let mut system_state: SystemState<ResMut<Events<NewChatMessageEvent>>> = + SystemState::new(ecs); + let mut events = system_state.get_mut(ecs); + + events.drain().map(|e| e.0.clone()).collect::<Vec<_>>() + } + #[tokio::test] async fn test_swarm_chat() { - let (tx, mut rx) = tokio::sync::broadcast::channel(1); - let swarm_state = SwarmState { - chat_queue: Arc::new(Mutex::new(VecDeque::new())), - chat_min_index: Arc::new(Mutex::new(0)), - rx: Arc::new(tokio::sync::Mutex::new(rx)), - }; - let mut bot_states = vec![]; - let bot0 = State { - swarm_state: swarm_state.clone(), - chat_index: Arc::new(Mutex::new(0)), - tx: tx.clone(), - }; - let bot1 = State { - swarm_state: swarm_state.clone(), - chat_index: Arc::new(Mutex::new(0)), - tx: tx.clone(), - }; - bot_states.push(bot0.clone()); - bot_states.push(bot1.clone()); + let mut app = make_test_app(); + + let bot0 = app.world.spawn_empty().id(); + let bot1 = app.world.spawn_empty().id(); + + app.world.send_event(ChatReceivedEvent { + entity: bot0, + packet: ChatPacket::new("a"), + }); + app.update(); - bot0.handle_chat(ChatPacket::new("a")); // the swarm should get the event immediately after the bot gets it + assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("a")]); assert_eq!( - swarm_state.rx.lock().await.try_recv(), - Ok(ChatPacket::new("a")) + app.world.get::<ClientChatState>(bot0).unwrap().chat_index, + 1 ); - assert_eq!(*bot0.chat_index.lock(), 1); // and a second bot sending the event shouldn't do anything - bot1.handle_chat(ChatPacket::new("a")); - assert!(swarm_state.rx.lock().await.try_recv().is_err()); - assert_eq!(*bot1.chat_index.lock(), 1); - - // but if the first one gets it again, it should sent it again - bot0.handle_chat(ChatPacket::new("a")); + app.world.send_event(ChatReceivedEvent { + entity: bot1, + packet: ChatPacket::new("a"), + }); + app.update(); + assert_eq!(drain_events(&mut app.world), vec![]); assert_eq!( - swarm_state.rx.lock().await.try_recv(), - Ok(ChatPacket::new("a")) + app.world.get::<ClientChatState>(bot1).unwrap().chat_index, + 1 ); + // but if the first one gets it again, it should sent it again + app.world.send_event(ChatReceivedEvent { + entity: bot0, + packet: ChatPacket::new("a"), + }); + app.update(); + assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("a")]); + // alright and now the second bot got a different chat message and it should be // sent - bot1.handle_chat(ChatPacket::new("b")); - assert_eq!( - swarm_state.rx.lock().await.try_recv(), - Ok(ChatPacket::new("b")) - ); + app.world.send_event(ChatReceivedEvent { + entity: bot1, + packet: ChatPacket::new("b"), + }); + app.update(); + assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("b")]); } #[tokio::test] async fn test_new_bot() { - let (tx, mut rx) = tokio::sync::broadcast::channel(1); - let swarm_state = SwarmState { - chat_queue: Arc::new(Mutex::new(VecDeque::new())), - chat_min_index: Arc::new(Mutex::new(0)), - rx: Arc::new(tokio::sync::Mutex::new(rx)), - }; - let mut bot_states = vec![]; - let bot0 = State { - swarm_state: swarm_state.clone(), - chat_index: Arc::new(Mutex::new(0)), - tx: tx.clone(), - }; - bot_states.push(bot0.clone()); + let mut app = make_test_app(); + + let bot0 = app.world.spawn_empty().id(); // bot0 gets a chat message - bot0.handle_chat(ChatPacket::new("a")); - assert_eq!( - swarm_state.rx.lock().await.try_recv(), - Ok(ChatPacket::new("a")) - ); - // now a second bot joined and got a different chat message - let bot1 = State { - swarm_state: swarm_state.clone(), - chat_index: Arc::new(Mutex::new(0)), - tx: tx.clone(), - }; - bot_states.push(bot1.clone()); - bot1.handle_chat(ChatPacket::new("b")); - assert_eq!( - swarm_state.rx.lock().await.try_recv(), - Ok(ChatPacket::new("b")) - ); + app.world.send_event(ChatReceivedEvent { + entity: bot0, + packet: ChatPacket::new("a"), + }); + app.update(); + assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("a")]); + let bot1 = app.world.spawn_empty().id(); + app.world.send_event(ChatReceivedEvent { + entity: bot1, + packet: ChatPacket::new("b"), + }); + app.update(); + assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("b")]); } } diff --git a/azalea/src/swarm/events.rs b/azalea/src/swarm/events.rs new file mode 100644 index 00000000..81d8c731 --- /dev/null +++ b/azalea/src/swarm/events.rs @@ -0,0 +1,45 @@ +use azalea_client::LocalPlayer; +use azalea_ecs::{ + app::{App, Plugin}, + event::EventWriter, + query::With, + system::{Query, ResMut, Resource}, +}; +use azalea_world::entity::MinecraftEntityId; +use derive_more::{Deref, DerefMut}; + +pub struct SwarmPlugin; +impl Plugin for SwarmPlugin { + fn build(&self, app: &mut App) { + app.add_event::<SwarmReadyEvent>() + .add_system(check_ready) + .init_resource::<IsSwarmReady>(); + } +} + +/// All the bots from the swarm are now in the world. +pub struct SwarmReadyEvent; + +#[derive(Default, Resource, Deref, DerefMut)] +struct IsSwarmReady(bool); + +fn check_ready( + query: Query<Option<&MinecraftEntityId>, With<LocalPlayer>>, + mut is_swarm_ready: ResMut<IsSwarmReady>, + mut ready_events: EventWriter<SwarmReadyEvent>, +) { + // if we already know the swarm is ready, do nothing + if **is_swarm_ready { + return; + } + // if all the players are in the world, we're ready + for entity_id in query.iter() { + if entity_id.is_none() { + return; + } + } + + // all the players are in the world, so we're ready + **is_swarm_ready = true; + ready_events.send(SwarmReadyEvent); +} diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index 6fc3e40c..9c16bc01 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -1,41 +1,29 @@ -/// Swarms are a way to conveniently control many bots. -mod chat; -mod plugins; +//! Swarms are a way to conveniently control many bots. -pub use self::plugins::*; -use crate::{bot, HandleFn}; -use azalea_client::{Account, ChatPacket, Client, Event, JoinError, Plugins}; +mod chat; +mod events; + +use crate::{bot::DefaultBotPlugins, HandleFn}; +use azalea_client::{init_ecs_app, start_ecs, Account, ChatPacket, Client, Event, JoinError}; +use azalea_ecs::{ + app::{App, Plugin, PluginGroup, PluginGroupBuilder}, + component::Component, + ecs::Ecs, + entity::Entity, + system::Resource, +}; use azalea_protocol::{ - connect::{Connection, ConnectionError}, + connect::ConnectionError, resolver::{self, ResolverError}, ServerAddress, }; -use azalea_world::WeakWorldContainer; +use azalea_world::WorldContainer; use futures::future::join_all; use log::error; use parking_lot::{Mutex, RwLock}; -use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; +use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration}; use thiserror::Error; -use tokio::sync::mpsc::{self, UnboundedSender}; - -/// A helper macro that generates a [`SwarmPlugins`] struct from a list of -/// objects that implement [`SwarmPlugin`]. -/// -/// ```rust,no_run -/// swarm_plugins![azalea_pathfinder::Plugin]; -/// ``` -#[macro_export] -macro_rules! swarm_plugins { - ($($plugin:expr),*) => { - { - let mut plugins = azalea::SwarmPlugins::new(); - $( - plugins.add($plugin); - )* - plugins - } - }; -} +use tokio::sync::mpsc; /// A swarm is a way to conveniently control many bots at once, while also /// being able to control bots at an individual level when desired. @@ -46,82 +34,295 @@ macro_rules! swarm_plugins { /// It's used to make the [`Swarm::add`] function work. /// /// [`azalea::start_swarm`]: fn.start_swarm.html -#[derive(Clone)] -pub struct Swarm<S> { - bot_datas: Arc<Mutex<Vec<(Client, S)>>>, +#[derive(Clone, Resource)] +pub struct Swarm { + pub ecs_lock: Arc<Mutex<Ecs>>, + bots: Arc<Mutex<HashMap<Entity, Client>>>, + + // bot_datas: Arc<Mutex<Vec<(Client, S)>>>, resolved_address: SocketAddr, address: ServerAddress, - pub worlds: Arc<RwLock<WeakWorldContainer>>, - /// Plugins that are set for new bots - plugins: Plugins, + pub world_container: Arc<RwLock<WorldContainer>>, - bots_tx: UnboundedSender<(Option<Event>, (Client, S))>, - swarm_tx: UnboundedSender<SwarmEvent>, -} + bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>, + swarm_tx: mpsc::UnboundedSender<SwarmEvent>, -/// An event about something that doesn't have to do with a single bot. -#[derive(Clone, Debug)] -pub enum SwarmEvent { - /// All the bots in the swarm have successfully joined the server. - Login, - /// The swarm was created. This is only fired once, and it's guaranteed to - /// be the first event to fire. - Init, - /// A bot got disconnected from the server. - /// - /// You can implement an auto-reconnect by calling [`Swarm::add`] - /// with the account from this event. - Disconnect(Account), - /// At least one bot received a chat message. - Chat(ChatPacket), + run_schedule_sender: mpsc::Sender<()>, } -pub type SwarmHandleFn<Fut, S, SS> = fn(Swarm<S>, SwarmEvent, SS) -> Fut; - -/// The options that are passed to [`azalea::start_swarm`]. -/// -/// [`azalea::start_swarm`]: crate::start_swarm() -pub struct SwarmOptions<S, SS, A, Fut, SwarmFut> +/// Create a new [`Swarm`]. +pub struct SwarmBuilder<S, SS, Fut, SwarmFut> where - A: TryInto<ServerAddress>, + S: Default + Send + Sync + Clone + 'static, + SS: Default + Send + Sync + Clone + 'static, Fut: Future<Output = Result<(), anyhow::Error>>, SwarmFut: Future<Output = Result<(), anyhow::Error>>, { - /// The address of the server that we're connecting to. This can be a - /// `&str`, [`ServerAddress`], or anything that implements - /// `TryInto<ServerAddress>`. - /// - /// [`ServerAddress`]: azalea_protocol::ServerAddress - pub address: A, + app: App, /// The accounts that are going to join the server. - pub accounts: Vec<Account>, - /// The plugins that are going to be used for all the bots. - /// - /// You can usually leave this as `plugins![]`. - pub plugins: Plugins, - /// The plugins that are going to be used for the swarm. - /// - /// You can usually leave this as `swarm_plugins![]`. - pub swarm_plugins: SwarmPlugins<S>, + accounts: Vec<Account>, /// The individual bot states. This must be the same length as `accounts`, /// since each bot gets one state. - pub states: Vec<S>, + states: Vec<S>, /// The state for the overall swarm. - pub swarm_state: SS, + swarm_state: SS, /// The function that's called every time a bot receives an [`Event`]. - pub handle: HandleFn<Fut, S>, + handler: Option<HandleFn<Fut, S>>, /// The function that's called every time the swarm receives a /// [`SwarmEvent`]. - pub swarm_handle: SwarmHandleFn<SwarmFut, S, SS>, + swarm_handler: Option<SwarmHandleFn<SwarmFut, SS>>, /// How long we should wait between each bot joining the server. Set to /// None to have every bot connect at the same time. None is different than /// a duration of 0, since if a duration is present the bots will wait for /// the previous one to be ready. - pub join_delay: Option<std::time::Duration>, + join_delay: Option<std::time::Duration>, +} +impl<S, SS, Fut, SwarmFut> SwarmBuilder<S, SS, Fut, SwarmFut> +where + Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, + SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, + S: Default + Send + Sync + Clone + Component + 'static, + SS: Default + Send + Sync + Clone + Component + 'static, +{ + /// Start creating the swarm. + #[must_use] + pub fn new() -> Self { + Self { + // we create the app here so plugins can add onto it. + // the schedules won't run until [`Self::start`] is called. + app: init_ecs_app(), + + accounts: Vec::new(), + states: Vec::new(), + swarm_state: SS::default(), + handler: None, + swarm_handler: None, + join_delay: None, + } + .add_plugins(DefaultSwarmPlugins) + .add_plugins(DefaultBotPlugins) + } + + /// Add a vec of [`Account`]s to the swarm. + /// + /// Use [`Self::add_account`] to only add one account. If you want the + /// clients to have different default states, add them one at a time with + /// [`Self::add_account_with_state`]. + #[must_use] + pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self { + for account in accounts { + self = self.add_account(account); + } + self + } + /// Add a single new [`Account`] to the swarm. Use [`Self::add_accounts`] to + /// add multiple accounts at a time. + /// + /// This will make the state for this client be the default, use + /// [`Self::add_account_with_state`] to avoid that. + #[must_use] + pub fn add_account(self, account: Account) -> Self { + self.add_account_with_state(account, S::default()) + } + /// Add an account with a custom initial state. Use just + /// [`Self::add_account`] to use the Default implementation for the state. + #[must_use] + pub fn add_account_with_state(mut self, account: Account, state: S) -> Self { + self.accounts.push(account); + self.states.push(state); + self + } + + /// Set the function that's called every time a bot receives an [`Event`]. + /// This is the way to handle normal per-bot events. + /// + /// You can only have one client handler, calling this again will replace + /// the old client handler function (you can have a client handler and swarm + /// handler separately though). + #[must_use] + pub fn set_handler(mut self, handler: HandleFn<Fut, S>) -> Self { + self.handler = Some(handler); + self + } + /// Set the function that's called every time the swarm receives a + /// [`SwarmEvent`]. This is the way to handle global swarm events. + /// + /// You can only have one swarm handler, calling this again will replace + /// the old swarm handler function (you can have a client handler and swarm + /// handler separately though). + #[must_use] + pub fn set_swarm_handler(mut self, handler: SwarmHandleFn<SwarmFut, SS>) -> Self { + self.swarm_handler = Some(handler); + self + } + + /// Add a plugin to the swarm. + #[must_use] + pub fn add_plugin<T: Plugin>(mut self, plugin: T) -> Self { + self.app.add_plugin(plugin); + self + } + /// Add a group of plugins to the swarm. + #[must_use] + pub fn add_plugins<T: PluginGroup>(mut self, plugin_group: T) -> Self { + self.app.add_plugins(plugin_group); + self + } + + /// Set how long we should wait between each bot joining the server. + /// + /// By default, every bot will connect at the same time. If you set this + /// field, however, the bots will wait for the previous one to have + /// connected and *then* they'll wait the given duration. + #[must_use] + pub fn join_delay(mut self, delay: std::time::Duration) -> Self { + self.join_delay = Some(delay); + self + } + + /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given + /// server. + /// + /// The `address` argument can be a `&str`, [`ServerAddress`], or anything + /// that implements `TryInto<ServerAddress>`. + /// + /// [`ServerAddress`]: azalea_protocol::ServerAddress + pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<(), SwarmStartError> { + assert_eq!( + self.accounts.len(), + self.states.len(), + "There must be exactly one state per bot." + ); + + // convert the TryInto<ServerAddress> into a ServerAddress + let address: ServerAddress = match address.try_into() { + Ok(address) => address, + Err(_) => return Err(SwarmStartError::InvalidAddress), + }; + + // resolve the address + let resolved_address = resolver::resolve_address(&address).await?; + + let world_container = Arc::new(RwLock::new(WorldContainer::default())); + + // we can't modify the swarm plugins after this + let (bots_tx, mut bots_rx) = mpsc::unbounded_channel(); + let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel(); + + let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1); + let ecs_lock = start_ecs(self.app, run_schedule_receiver, run_schedule_sender.clone()); + + let swarm = Swarm { + ecs_lock: ecs_lock.clone(), + bots: Arc::new(Mutex::new(HashMap::new())), + + resolved_address, + address, + world_container, + + bots_tx, + + swarm_tx: swarm_tx.clone(), + + run_schedule_sender, + }; + ecs_lock.lock().insert_resource(swarm.clone()); + + // SwarmBuilder (self) isn't Send so we have to take all the things we need out + // of it + let mut swarm_clone = swarm.clone(); + let join_delay = self.join_delay; + let accounts = self.accounts.clone(); + let states = self.states.clone(); + + let join_task = tokio::spawn(async move { + if let Some(join_delay) = join_delay { + // if there's a join delay, then join one by one + for (account, state) in accounts.iter().zip(states) { + swarm_clone + .add_with_exponential_backoff(account, state.clone()) + .await; + tokio::time::sleep(join_delay).await; + } + } else { + // otherwise, join all at once + let swarm_borrow = &swarm_clone; + join_all(accounts.iter().zip(states).map( + async move |(account, state)| -> Result<(), JoinError> { + swarm_borrow + .clone() + .add_with_exponential_backoff(account, state.clone()) + .await; + Ok(()) + }, + )) + .await; + } + }); + + let swarm_state = self.swarm_state; + + // Watch swarm_rx and send those events to the swarm_handle. + let swarm_clone = swarm.clone(); + tokio::spawn(async move { + while let Some(event) = swarm_rx.recv().await { + if let Some(swarm_handler) = self.swarm_handler { + tokio::spawn((swarm_handler)( + swarm_clone.clone(), + event, + swarm_state.clone(), + )); + } + } + }); + + // bot events + while let Some((Some(event), bot)) = bots_rx.recv().await { + if let Some(handler) = self.handler { + let state = bot.component::<S>(); + tokio::spawn((handler)(bot, event, state)); + } + } + + join_task.abort(); + + Ok(()) + } +} + +impl<S, SS, Fut, SwarmFut> Default for SwarmBuilder<S, SS, Fut, SwarmFut> +where + Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, + SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, + S: Default + Send + Sync + Clone + Component + 'static, + SS: Default + Send + Sync + Clone + Component + 'static, +{ + fn default() -> Self { + Self::new() + } +} + +/// An event about something that doesn't have to do with a single bot. +#[derive(Clone, Debug)] +pub enum SwarmEvent { + /// All the bots in the swarm have successfully joined the server. + Login, + /// The swarm was created. This is only fired once, and it's guaranteed to + /// be the first event to fire. + Init, + /// A bot got disconnected from the server. + /// + /// You can implement an auto-reconnect by calling [`Swarm::add`] + /// with the account from this event. + Disconnect(Account), + /// At least one bot received a chat message. + Chat(ChatPacket), } +pub type SwarmHandleFn<Fut, SS> = fn(Swarm, SwarmEvent, SS) -> Fut; + #[derive(Error, Debug)] pub enum SwarmStartError { #[error("Invalid address")] @@ -154,7 +355,7 @@ pub enum SwarmStartError { /// let mut states = Vec::new(); /// /// for i in 0..10 { -/// accounts.push(Account::offline(&format!("bot{}", i))); +/// accounts.push(Account::offline(&format!("bot{i}"))); /// states.push(State::default()); /// } /// @@ -204,193 +405,69 @@ pub enum SwarmStartError { /// } /// Ok(()) /// } -pub async fn start_swarm< - S: Send + Sync + Clone + 'static, - SS: Send + Sync + Clone + 'static, - A: Send + TryInto<ServerAddress>, - Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, - SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, ->( - options: SwarmOptions<S, SS, A, Fut, SwarmFut>, -) -> Result<(), SwarmStartError> { - assert_eq!( - options.accounts.len(), - options.states.len(), - "There must be exactly one state per bot." - ); - - // convert the TryInto<ServerAddress> into a ServerAddress - let address: ServerAddress = match options.address.try_into() { - Ok(address) => address, - Err(_) => return Err(SwarmStartError::InvalidAddress), - }; - - // resolve the address - let resolved_address = resolver::resolve_address(&address).await?; - - let world_container = Arc::new(RwLock::new(WeakWorldContainer::default())); - - let mut plugins = options.plugins; - let swarm_plugins = options.swarm_plugins; - - // DEFAULT CLIENT PLUGINS - plugins.add(bot::Plugin); - plugins.add(crate::pathfinder::Plugin); - // DEFAULT SWARM PLUGINS - - // we can't modify the swarm plugins after this - let (bots_tx, mut bots_rx) = mpsc::unbounded_channel(); - let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel(); - - let mut swarm = Swarm { - bot_datas: Arc::new(Mutex::new(Vec::new())), - - resolved_address, - address, - worlds: world_container, - plugins, - - bots_tx, - - swarm_tx: swarm_tx.clone(), - }; - - { - // the chat plugin is hacky and needs the swarm to be passed like this - let (chat_swarm_state, chat_tx) = chat::SwarmState::new(swarm.clone()); - swarm.plugins.add(chat::Plugin { - swarm_state: chat_swarm_state, - tx: chat_tx, - }); - } - - let swarm_plugins = swarm_plugins.build(); - - let mut swarm_clone = swarm.clone(); - let join_task = tokio::spawn(async move { - if let Some(join_delay) = options.join_delay { - // if there's a join delay, then join one by one - for (account, state) in options.accounts.iter().zip(options.states) { - swarm_clone - .add_with_exponential_backoff(account, state.clone()) - .await; - tokio::time::sleep(join_delay).await; - } - } else { - let swarm_borrow = &swarm_clone; - join_all(options.accounts.iter().zip(options.states).map( - async move |(account, state)| -> Result<(), JoinError> { - swarm_borrow - .clone() - .add_with_exponential_backoff(account, state.clone()) - .await; - Ok(()) - }, - )) - .await; - } - }); - - let swarm_state = options.swarm_state; - let mut internal_state = InternalSwarmState::default(); - - // Watch swarm_rx and send those events to the plugins and swarm_handle. - let swarm_clone = swarm.clone(); - let swarm_plugins_clone = swarm_plugins.clone(); - tokio::spawn(async move { - while let Some(event) = swarm_rx.recv().await { - for plugin in swarm_plugins_clone.clone().into_iter() { - tokio::spawn(plugin.handle(event.clone(), swarm_clone.clone())); - } - tokio::spawn((options.swarm_handle)( - swarm_clone.clone(), - event, - swarm_state.clone(), - )); - } - }); - - // bot events - while let Some((Some(event), (bot, state))) = bots_rx.recv().await { - // bot event handling - let cloned_plugins = (*bot.plugins).clone(); - for plugin in cloned_plugins.into_iter() { - tokio::spawn(plugin.handle(event.clone(), bot.clone())); - } - - // swarm event handling - // remove this #[allow] when more checks are added - #[allow(clippy::single_match)] - match &event { - Event::Login => { - internal_state.bots_joined += 1; - if internal_state.bots_joined == swarm.bot_datas.lock().len() { - swarm_tx.send(SwarmEvent::Login).unwrap(); - } - } - _ => {} - } - - tokio::spawn((options.handle)(bot, event, state)); - } - - join_task.abort(); - - Ok(()) -} - -impl<S> Swarm<S> -where - S: Send + Sync + Clone + 'static, -{ +// pub async fn start_swarm< +// S: Send + Sync + Clone + 'static, +// SS: Send + Sync + Clone + 'static, +// A: Send + TryInto<ServerAddress>, +// Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, +// SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, +// >( +// options: SwarmOptions<S, SS, A, Fut, SwarmFut>, +// ) -> Result<(), SwarmStartError> { +// } + +impl Swarm { /// Add a new account to the swarm. You can remove it later by calling /// [`Client::disconnect`]. - pub async fn add(&mut self, account: &Account, state: S) -> Result<Client, JoinError> { - let conn = Connection::new(&self.resolved_address).await?; - let (conn, game_profile) = Client::handshake(conn, account, &self.address.clone()).await?; - + /// + /// # Errors + /// + /// Returns an `Err` if the bot could not do a handshake successfully. + pub async fn add<S: Component + Clone>( + &mut self, + account: &Account, + state: S, + ) -> Result<Client, JoinError> { // tx is moved to the bot so it can send us events // rx is used to receive events from the bot - let (tx, mut rx) = mpsc::channel(1); - let mut bot = Client::new(game_profile, conn, Some(self.worlds.clone())); - tx.send(Event::Init).await.expect("Failed to send event"); - bot.start_tasks(tx); + // An event that causes the schedule to run. This is only used internally. + // let (run_schedule_sender, run_schedule_receiver) = mpsc::unbounded_channel(); + // let ecs_lock = start_ecs(run_schedule_receiver, run_schedule_sender.clone()); + let (bot, mut rx) = Client::start_client( + self.ecs_lock.clone(), + account, + &self.address, + &self.resolved_address, + self.run_schedule_sender.clone(), + ) + .await?; + // add the state to the client + { + let mut ecs = self.ecs_lock.lock(); + ecs.entity_mut(bot.entity).insert(state); + } - bot.plugins = Arc::new(self.plugins.clone().build()); + self.bots.lock().insert(bot.entity, bot.clone()); + let cloned_bots = self.bots.clone(); let cloned_bots_tx = self.bots_tx.clone(); let cloned_bot = bot.clone(); - let cloned_state = state.clone(); let owned_account = account.clone(); - let bot_datas = self.bot_datas.clone(); let swarm_tx = self.swarm_tx.clone(); - // send the init event immediately so it's the first thing we get - swarm_tx.send(SwarmEvent::Init).unwrap(); tokio::spawn(async move { while let Some(event) = rx.recv().await { // we can't handle events here (since we can't copy the handler), // they're handled above in start_swarm - if let Err(e) = - cloned_bots_tx.send((Some(event), (cloned_bot.clone(), cloned_state.clone()))) - { + if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) { error!("Error sending event to swarm: {e}"); } } - // the bot disconnected, so we remove it from the swarm - let mut bot_datas = bot_datas.lock(); - let index = bot_datas - .iter() - .position(|(b, _)| b.profile.uuid == cloned_bot.profile.uuid) - .expect("bot disconnected but not found in swarm"); - bot_datas.remove(index); - + cloned_bots.lock().remove(&bot.entity); swarm_tx .send(SwarmEvent::Disconnect(owned_account)) .unwrap(); }); - self.bot_datas.lock().push((bot.clone(), state.clone())); - Ok(bot) } @@ -399,7 +476,11 @@ where /// /// Exponential backoff means if it fails joining it will initially wait 10 /// seconds, then 20, then 40, up to 2 minutes. - pub async fn add_with_exponential_backoff(&mut self, account: &Account, state: S) -> Client { + pub async fn add_with_exponential_backoff<S: Component + Clone>( + &mut self, + account: &Account, + state: S, + ) -> Client { let mut disconnects = 0; loop { match self.add(account, state.clone()).await { @@ -409,7 +490,7 @@ where let delay = (Duration::from_secs(5) * 2u32.pow(disconnects)) .min(Duration::from_secs(120)); let username = account.username.clone(); - error!("Error joining {username}: {e}. Waiting {delay:?} and trying again."); + error!("Error joining as {username}: {e}. Waiting {delay:?} and trying again."); tokio::time::sleep(delay).await; } } @@ -417,33 +498,42 @@ where } } -impl<S> IntoIterator for Swarm<S> -where - S: Send + Sync + Clone + 'static, -{ - type Item = (Client, S); +impl IntoIterator for Swarm { + type Item = Client; type IntoIter = std::vec::IntoIter<Self::Item>; - /// Iterate over the bots and their states in this swarm. + /// Iterate over the bots in this swarm. /// /// ```rust,no_run - /// for (bot, state) in swarm { + /// for bot in swarm { + /// let state = bot.component::<State>(); /// // ... /// } /// ``` fn into_iter(self) -> Self::IntoIter { - self.bot_datas.lock().clone().into_iter() + self.bots + .lock() + .clone() + .into_values() + .collect::<Vec<_>>() + .into_iter() } } -#[derive(Default)] -struct InternalSwarmState { - /// The number of bots connected to the server - pub bots_joined: usize, -} - impl From<ConnectionError> for SwarmStartError { fn from(e: ConnectionError) -> Self { SwarmStartError::from(JoinError::from(e)) } } + +/// This plugin group will add all the default plugins necessary for swarms to +/// work. +pub struct DefaultSwarmPlugins; + +impl PluginGroup for DefaultSwarmPlugins { + fn build(self) -> PluginGroupBuilder { + PluginGroupBuilder::start::<Self>() + .add(chat::SwarmChatPlugin) + .add(events::SwarmPlugin) + } +} diff --git a/azalea/src/swarm/plugins.rs b/azalea/src/swarm/plugins.rs deleted file mode 100644 index f92d40e6..00000000 --- a/azalea/src/swarm/plugins.rs +++ /dev/null @@ -1,135 +0,0 @@ -use crate::{Swarm, SwarmEvent}; -use async_trait::async_trait; -use nohash_hasher::NoHashHasher; -use std::{ - any::{Any, TypeId}, - collections::HashMap, - hash::BuildHasherDefault, -}; - -type U64Hasher = BuildHasherDefault<NoHashHasher<u64>>; - -// kind of based on https://docs.rs/http/latest/src/http/extensions.rs.html -/// A map of plugin ids to [`SwarmPlugin`] trait objects. The client stores -/// this so we can keep the state for our [`Swarm`] plugins. -/// -/// If you're using azalea, you should generate this from the `swarm_plugins!` -/// macro. -#[derive(Clone, Default)] -pub struct SwarmPlugins<S> { - map: Option<HashMap<TypeId, Box<dyn SwarmPlugin<S>>, U64Hasher>>, -} - -#[derive(Clone)] -pub struct SwarmPluginStates<S> { - map: Option<HashMap<TypeId, Box<dyn SwarmPluginState<S>>, U64Hasher>>, -} - -impl<S> SwarmPluginStates<S> { - pub fn get<T: SwarmPluginState<S>>(&self) -> Option<&T> { - self.map - .as_ref() - .and_then(|map| map.get(&TypeId::of::<T>())) - .and_then(|boxed| (boxed.as_ref() as &dyn Any).downcast_ref::<T>()) - } -} - -impl<S> SwarmPlugins<S> -where - S: 'static, -{ - /// Create a new empty set of plugins. - pub fn new() -> Self { - Self { map: None } - } - - /// Add a new plugin to this set. - pub fn add<T: SwarmPlugin<S>>(&mut self, plugin: T) { - if self.map.is_none() { - self.map = Some(HashMap::with_hasher(BuildHasherDefault::default())); - } - self.map - .as_mut() - .unwrap() - .insert(TypeId::of::<T>(), Box::new(plugin)); - } - - /// Build our plugin states from this set of plugins. Note that if you're - /// using `azalea` you'll probably never need to use this as it's called - /// for you. - pub fn build(self) -> SwarmPluginStates<S> { - if self.map.is_none() { - return SwarmPluginStates { map: None }; - } - let mut map = HashMap::with_hasher(BuildHasherDefault::default()); - for (id, plugin) in self.map.unwrap().into_iter() { - map.insert(id, plugin.build()); - } - SwarmPluginStates { map: Some(map) } - } -} - -impl<S> IntoIterator for SwarmPluginStates<S> { - type Item = Box<dyn SwarmPluginState<S>>; - type IntoIter = std::vec::IntoIter<Self::Item>; - - /// Iterate over the plugin states. - fn into_iter(self) -> Self::IntoIter { - self.map - .map(|map| map.into_values().collect::<Vec<_>>()) - .unwrap_or_default() - .into_iter() - } -} - -/// A `SwarmPluginState` keeps the current state of a plugin for a client. All -/// the fields must be atomic. Unique `SwarmPluginState`s are built from -/// [`SwarmPlugin`]s. -#[async_trait] -pub trait SwarmPluginState<S>: Send + Sync + SwarmPluginStateClone<S> + Any + 'static { - async fn handle(self: Box<Self>, event: SwarmEvent, swarm: Swarm<S>); -} - -/// Swarm plugins can keep their own personal state ([`SwarmPluginState`]), -/// listen to [`SwarmEvent`]s, and add new functions to [`Swarm`]. -pub trait SwarmPlugin<S>: Send + Sync + SwarmPluginClone<S> + Any + 'static { - fn build(&self) -> Box<dyn SwarmPluginState<S>>; -} - -/// An internal trait that allows SwarmPluginState to be cloned. -#[doc(hidden)] -pub trait SwarmPluginStateClone<S> { - fn clone_box(&self) -> Box<dyn SwarmPluginState<S>>; -} -impl<T, S> SwarmPluginStateClone<S> for T -where - T: 'static + SwarmPluginState<S> + Clone, -{ - fn clone_box(&self) -> Box<dyn SwarmPluginState<S>> { - Box::new(self.clone()) - } -} -impl<S> Clone for Box<dyn SwarmPluginState<S>> { - fn clone(&self) -> Self { - self.clone_box() - } -} - -/// An internal trait that allows SwarmPlugin to be cloned. -#[doc(hidden)] -pub trait SwarmPluginClone<S> { - fn clone_box(&self) -> Box<dyn SwarmPlugin<S>>; -} -impl<T, S> SwarmPluginClone<S> for T -where - T: 'static + SwarmPlugin<S> + Clone, -{ - fn clone_box(&self) -> Box<dyn SwarmPlugin<S>> { - Box::new(self.clone()) - } -} -impl<S> Clone for Box<dyn SwarmPlugin<S>> { - fn clone(&self) -> Self { - self.clone_box() - } -} |
