aboutsummaryrefslogtreecommitdiff
path: root/azalea/src
diff options
context:
space:
mode:
Diffstat (limited to 'azalea/src')
-rw-r--r--azalea/src/bot.rs131
-rw-r--r--azalea/src/lib.rs209
-rw-r--r--azalea/src/pathfinder/mod.rs341
-rw-r--r--azalea/src/pathfinder/moves.rs136
-rw-r--r--azalea/src/pathfinder/mtdstarlite.rs8
-rw-r--r--azalea/src/prelude.rs6
-rw-r--r--azalea/src/start.rs136
-rw-r--r--azalea/src/swarm/chat.rs344
-rw-r--r--azalea/src/swarm/events.rs45
-rw-r--r--azalea/src/swarm/mod.rs614
-rw-r--r--azalea/src/swarm/plugins.rs135
11 files changed, 1105 insertions, 1000 deletions
diff --git a/azalea/src/bot.rs b/azalea/src/bot.rs
index 0674c692..510449d3 100644
--- a/azalea/src/bot.rs
+++ b/azalea/src/bot.rs
@@ -1,56 +1,115 @@
-use crate::{Client, Event};
-use async_trait::async_trait;
use azalea_core::Vec3;
-use parking_lot::Mutex;
-use std::{f64::consts::PI, sync::Arc};
+use azalea_ecs::{
+ app::{App, Plugin, PluginGroup, PluginGroupBuilder},
+ component::Component,
+ entity::Entity,
+ event::EventReader,
+ query::{With, Without},
+ schedule::IntoSystemDescriptor,
+ system::{Commands, Query},
+ AppTickExt,
+};
+use azalea_world::{
+ entity::{metadata::Player, set_rotation, Jumping, Physics, Position},
+ Local,
+};
+use std::f64::consts::PI;
-#[derive(Clone, Default)]
-pub struct Plugin;
-impl crate::Plugin for Plugin {
- type State = State;
+use crate::pathfinder::PathfinderPlugin;
- fn build(&self) -> State {
- State::default()
+#[derive(Clone, Default)]
+pub struct BotPlugin;
+impl Plugin for BotPlugin {
+ fn build(&self, app: &mut App) {
+ app.add_event::<LookAtEvent>()
+ .add_event::<JumpEvent>()
+ .add_system(insert_bot.before("deduplicate_entities"))
+ .add_system(look_at_listener)
+ .add_system(jump_listener.label("jump_listener").before("ai_step"))
+ .add_tick_system(stop_jumping.after("ai_step"));
}
}
-#[derive(Default, Clone)]
-pub struct State {
- jumping_once: Arc<Mutex<bool>>,
+/// Component for all bots.
+#[derive(Default, Component)]
+pub struct Bot {
+ jumping_once: bool,
+}
+
+/// Insert the [`Bot`] component for any local players that don't have it.
+#[allow(clippy::type_complexity)]
+fn insert_bot(
+ mut commands: Commands,
+ mut query: Query<Entity, (Without<Bot>, With<Local>, With<Player>)>,
+) {
+ for entity in &mut query {
+ commands.entity(entity).insert(Bot::default());
+ }
}
-#[async_trait]
-impl crate::PluginState for State {
- async fn handle(self: Box<Self>, event: Event, mut bot: Client) {
- if let Event::Tick = event {
- if *self.jumping_once.lock() && bot.jumping() {
- *self.jumping_once.lock() = false;
- bot.set_jumping(false);
- }
+fn stop_jumping(mut query: Query<(&mut Jumping, &mut Bot)>) {
+ for (mut jumping, mut bot) in &mut query {
+ if bot.jumping_once && **jumping {
+ bot.jumping_once = false;
+ **jumping = false;
}
}
}
-pub trait BotTrait {
+pub trait BotClientExt {
fn jump(&mut self);
- fn look_at(&mut self, pos: &Vec3);
+ fn look_at(&mut self, pos: Vec3);
}
-impl BotTrait for azalea_client::Client {
+impl BotClientExt for azalea_client::Client {
/// Queue a jump for the next tick.
fn jump(&mut self) {
- self.set_jumping(true);
- let state = self.plugins.get::<State>().unwrap().clone();
- *state.jumping_once.lock() = true;
+ let mut ecs = self.ecs.lock();
+ ecs.send_event(JumpEvent(self.entity));
}
/// Turn the bot's head to look at the coordinate in the world.
- fn look_at(&mut self, pos: &Vec3) {
- let (y_rot, x_rot) = direction_looking_at(self.entity().pos(), pos);
- self.set_rotation(y_rot, x_rot);
+ fn look_at(&mut self, position: Vec3) {
+ let mut ecs = self.ecs.lock();
+ ecs.send_event(LookAtEvent {
+ entity: self.entity,
+ position,
+ });
+ }
+}
+
+/// Event to jump once.
+pub struct JumpEvent(pub Entity);
+
+fn jump_listener(mut query: Query<(&mut Jumping, &mut Bot)>, mut events: EventReader<JumpEvent>) {
+ for event in events.iter() {
+ if let Ok((mut jumping, mut bot)) = query.get_mut(event.0) {
+ **jumping = true;
+ bot.jumping_once = true;
+ }
}
}
+/// Make an entity look towards a certain position in the world.
+pub struct LookAtEvent {
+ pub entity: Entity,
+ /// The position we want the entity to be looking at.
+ pub position: Vec3,
+}
+fn look_at_listener(
+ mut events: EventReader<LookAtEvent>,
+ mut query: Query<(&Position, &mut Physics)>,
+) {
+ for event in events.iter() {
+ if let Ok((position, mut physics)) = query.get_mut(event.entity) {
+ let (y_rot, x_rot) = direction_looking_at(position, &event.position);
+ set_rotation(&mut physics, y_rot, x_rot);
+ }
+ }
+}
+
+/// Return the (`y_rot`, `x_rot`) that would make a client at `current` be
+/// looking at `target`.
fn direction_looking_at(current: &Vec3, target: &Vec3) -> (f32, f32) {
// borrowed from mineflayer's Bot.lookAt because i didn't want to do math
let delta = target - current;
@@ -59,3 +118,15 @@ fn direction_looking_at(current: &Vec3, target: &Vec3) -> (f32, f32) {
let x_rot = f64::atan2(delta.y, ground_distance) * -(180.0 / PI);
(y_rot as f32, x_rot as f32)
}
+
+/// A [`PluginGroup`] for the plugins that add extra bot functionality to the
+/// client.
+pub struct DefaultBotPlugins;
+
+impl PluginGroup for DefaultBotPlugins {
+ fn build(self) -> PluginGroupBuilder {
+ PluginGroupBuilder::start::<Self>()
+ .add(BotPlugin)
+ .add(PathfinderPlugin)
+ }
+}
diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs
index 4ceb5b2e..026a35f5 100644
--- a/azalea/src/lib.rs
+++ b/azalea/src/lib.rs
@@ -1,94 +1,139 @@
-//! Azalea is a framework for creating Minecraft bots.
-//!
-//! Internally, it's just a wrapper over [`azalea_client`], adding useful
-//! functions for making bots. Because of this, lots of the documentation will
-//! refer to `azalea_client`. You can just replace these with `azalea` in your
-//! code, since everything from azalea_client is re-exported in azalea.
-//!
-//! # Installation
-//!
-//! First, install Rust nightly with `rustup install nightly` and `rustup
-//! default nightly`.
-//!
-//! Then, add one of the following lines to your Cargo.toml:
-//!
-//! Latest bleeding-edge version:
-//! `azalea = { git="https://github.com/mat-1/azalea" }`\
-//! Latest "stable" release:
-//! `azalea = "0.5.0"`
-//!
-//! ## Optimization
-//!
-//! For faster compile times, make a `.cargo/config.toml` file in your project
-//! and copy
-//! [this file](https://github.com/mat-1/azalea/blob/main/.cargo/config.toml)
-//! into it.
-//!
-//! For faster performance in debug mode, add
-//! ```toml
-//! [profile.dev]
-//! opt-level = 1
-//! [profile.dev.package."*"]
-//! opt-level = 3
-//! ```
-//! to your Cargo.toml. You may have to install the LLD linker.
-//!
-//! # Examples
-//!
-//! ```rust,no_run
-//! //! A bot that logs chat messages sent in the server to the console.
-//!
-//! use azalea::prelude::*;
-//! use parking_lot::Mutex;
-//! use std::sync::Arc;
-//!
-//! #[tokio::main]
-//! async fn main() {
-//! let account = Account::offline("bot");
-//! // or Account::microsoft("example@example.com").await.unwrap();
-//!
-//! azalea::start(azalea::Options {
-//! account,
-//! address: "localhost",
-//! state: State::default(),
-//! plugins: plugins![],
-//! handle,
-//! })
-//! .await
-//! .unwrap();
-//! }
-//!
-//! #[derive(Default, Clone)]
-//! pub struct State {}
-//!
-//! async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> {
-//! match event {
-//! Event::Chat(m) => {
-//! println!("{}", m.message().to_ansi());
-//! }
-//! _ => {}
-//! }
-//!
-//! Ok(())
-//! }
-//! ```
-//!
-//! [`azalea_client`]: https://docs.rs/azalea-client
-
-#![feature(trait_upcasting)]
+#![doc = include_str!("../README.md")]
#![feature(async_closure)]
-#![allow(incomplete_features)]
mod bot;
pub mod pathfinder;
pub mod prelude;
-mod start;
mod swarm;
-pub use azalea_block::*;
+pub use azalea_block as blocks;
pub use azalea_client::*;
pub use azalea_core::{BlockPos, Vec3};
-pub use start::{start, Options};
+use azalea_ecs::{
+ app::{App, Plugin},
+ component::Component,
+};
+pub use azalea_protocol as protocol;
+pub use azalea_registry::EntityKind;
+pub use azalea_world::{entity, World};
+use bot::DefaultBotPlugins;
+use ecs::app::PluginGroup;
+use futures::Future;
+use protocol::{
+ resolver::{self, ResolverError},
+ ServerAddress,
+};
pub use swarm::*;
+use thiserror::Error;
+use tokio::sync::mpsc;
pub type HandleFn<Fut, S> = fn(Client, Event, S) -> Fut;
+
+#[derive(Error, Debug)]
+pub enum StartError {
+ #[error("Invalid address")]
+ InvalidAddress,
+ #[error(transparent)]
+ ResolveAddress(#[from] ResolverError),
+ #[error("Join error: {0}")]
+ Join(#[from] azalea_client::JoinError),
+}
+
+pub struct ClientBuilder<S, Fut>
+where
+ S: Default + Send + Sync + Clone + 'static,
+ Fut: Future<Output = Result<(), anyhow::Error>>,
+{
+ app: App,
+ /// The function that's called every time a bot receives an [`Event`].
+ handler: Option<HandleFn<Fut, S>>,
+ state: S,
+}
+impl<S, Fut> ClientBuilder<S, Fut>
+where
+ S: Default + Send + Sync + Clone + Component + 'static,
+ Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+{
+ /// Start building a client that can join the world.
+ #[must_use]
+ pub fn new() -> Self {
+ Self {
+ // we create the app here so plugins can add onto it.
+ // the schedules won't run until [`Self::start`] is called.
+ app: init_ecs_app(),
+
+ handler: None,
+ state: S::default(),
+ }
+ .add_plugins(DefaultBotPlugins)
+ }
+ /// Set the function that's called every time a bot receives an [`Event`].
+ /// This is the way to handle normal per-bot events.
+ ///
+ /// You can only have one client handler, calling this again will replace
+ /// the old client handler function (you can have a client handler and swarm
+ /// handler separately though).
+ #[must_use]
+ pub fn set_handler(mut self, handler: HandleFn<Fut, S>) -> Self {
+ self.handler = Some(handler);
+ self
+ }
+ /// Add a plugin to the client.
+ #[must_use]
+ pub fn add_plugin<T: Plugin>(mut self, plugin: T) -> Self {
+ self.app.add_plugin(plugin);
+ self
+ }
+ /// Add a group of plugins to the client.
+ #[must_use]
+ pub fn add_plugins<T: PluginGroup>(mut self, plugin_group: T) -> Self {
+ self.app.add_plugins(plugin_group);
+ self
+ }
+
+ /// Build this `ClientBuilder` into an actual [`Client`] and join the given
+ /// server.
+ ///
+ /// The `address` argument can be a `&str`, [`ServerAddress`], or anything
+ /// that implements `TryInto<ServerAddress>`.
+ ///
+ /// [`ServerAddress`]: azalea_protocol::ServerAddress
+ pub async fn start(
+ self,
+ account: Account,
+ address: impl TryInto<ServerAddress>,
+ ) -> Result<(), StartError> {
+ let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?;
+ let resolved_address = resolver::resolve_address(&address).await?;
+
+ // An event that causes the schedule to run. This is only used internally.
+ let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
+ let ecs_lock = start_ecs(self.app, run_schedule_receiver, run_schedule_sender.clone());
+
+ let (bot, mut rx) = Client::start_client(
+ ecs_lock,
+ &account,
+ &address,
+ &resolved_address,
+ run_schedule_sender,
+ )
+ .await?;
+
+ while let Some(event) = rx.recv().await {
+ if let Some(handler) = self.handler {
+ tokio::spawn((handler)(bot.clone(), event.clone(), self.state.clone()));
+ }
+ }
+
+ Ok(())
+ }
+}
+impl<S, Fut> Default for ClientBuilder<S, Fut>
+where
+ S: Default + Send + Sync + Clone + Component + 'static,
+ Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+{
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/azalea/src/pathfinder/mod.rs b/azalea/src/pathfinder/mod.rs
index de62e9a7..5246290d 100644
--- a/azalea/src/pathfinder/mod.rs
+++ b/azalea/src/pathfinder/mod.rs
@@ -1,147 +1,255 @@
mod moves;
mod mtdstarlite;
-use crate::{prelude::*, SprintDirection, WalkDirection};
-use crate::{Client, Event};
-use async_trait::async_trait;
+use crate::bot::{JumpEvent, LookAtEvent};
+use crate::{SprintDirection, WalkDirection};
+
+use azalea_client::{StartSprintEvent, StartWalkEvent};
use azalea_core::{BlockPos, CardinalDirection};
-use azalea_world::entity::EntityData;
+use azalea_ecs::{
+ app::{App, Plugin},
+ component::Component,
+ entity::Entity,
+ event::{EventReader, EventWriter},
+ query::{With, Without},
+ schedule::IntoSystemDescriptor,
+ system::{Commands, Query, Res},
+ AppTickExt,
+};
+use azalea_world::entity::metadata::Player;
+use azalea_world::Local;
+use azalea_world::{
+ entity::{Physics, Position, WorldName},
+ WorldContainer,
+};
+use bevy_tasks::{AsyncComputeTaskPool, Task};
+use futures_lite::future;
use log::{debug, error};
use mtdstarlite::Edge;
pub use mtdstarlite::MTDStarLite;
-use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
#[derive(Clone, Default)]
-pub struct Plugin;
-impl crate::Plugin for Plugin {
- type State = State;
-
- fn build(&self) -> State {
- State::default()
+pub struct PathfinderPlugin;
+impl Plugin for PathfinderPlugin {
+ fn build(&self, app: &mut App) {
+ app.add_event::<GotoEvent>()
+ .add_event::<PathFoundEvent>()
+ .add_tick_system(tick_execute_path.before("walk_listener"))
+ .add_system(goto_listener)
+ .add_system(add_default_pathfinder.after("deduplicate_entities"))
+ .add_system(handle_tasks)
+ .add_system(path_found_listener);
}
}
-#[derive(Default, Clone)]
-pub struct State {
- // pathfinder: Option<MTDStarLite<Node, f32>>,
- pub path: Arc<Mutex<VecDeque<Node>>>,
+/// A component that makes this entity able to pathfind.
+#[derive(Component, Default)]
+pub struct Pathfinder {
+ pub path: VecDeque<Node>,
+}
+#[allow(clippy::type_complexity)]
+fn add_default_pathfinder(
+ mut commands: Commands,
+ mut query: Query<Entity, (Without<Pathfinder>, With<Local>, With<Player>)>,
+) {
+ for entity in &mut query {
+ commands.entity(entity).insert(Pathfinder::default());
+ }
}
-#[async_trait]
-impl crate::PluginState for State {
- async fn handle(self: Box<Self>, event: Event, mut bot: Client) {
- if let Event::Tick = event {
- let mut path = self.path.lock();
+pub trait PathfinderClientExt {
+ fn goto(&self, goal: impl Goal + Send + Sync + 'static);
+}
- if !path.is_empty() {
- tick_execute_path(&mut bot, &mut path);
- }
- }
+impl PathfinderClientExt for azalea_client::Client {
+ fn goto(&self, goal: impl Goal + Send + Sync + 'static) {
+ self.ecs.lock().send_event(GotoEvent {
+ entity: self.entity,
+ goal: Arc::new(goal),
+ });
}
}
-
-pub trait Trait {
- fn goto(&self, goal: impl Goal);
+pub struct GotoEvent {
+ pub entity: Entity,
+ pub goal: Arc<dyn Goal + Send + Sync>,
}
+pub struct PathFoundEvent {
+ pub entity: Entity,
+ pub path: VecDeque<Node>,
+}
+
+#[derive(Component)]
+pub struct ComputePath(Task<Option<PathFoundEvent>>);
-impl Trait for azalea_client::Client {
- fn goto(&self, goal: impl Goal) {
+fn goto_listener(
+ mut commands: Commands,
+ mut events: EventReader<GotoEvent>,
+ mut query: Query<(&Position, &WorldName)>,
+ world_container: Res<WorldContainer>,
+) {
+ let thread_pool = AsyncComputeTaskPool::get();
+
+ for event in events.iter() {
+ let (position, world_name) = query
+ .get_mut(event.entity)
+ .expect("Called goto on an entity that's not in the world");
let start = Node {
- pos: BlockPos::from(self.entity().pos()),
+ pos: BlockPos::from(position),
vertical_vel: VerticalVel::None,
};
- let end = goal.goal_node();
- debug!("start: {start:?}, end: {end:?}");
-
- let possible_moves: Vec<&dyn moves::Move> = vec![
- &moves::ForwardMove(CardinalDirection::North),
- &moves::ForwardMove(CardinalDirection::East),
- &moves::ForwardMove(CardinalDirection::South),
- &moves::ForwardMove(CardinalDirection::West),
- //
- &moves::AscendMove(CardinalDirection::North),
- &moves::AscendMove(CardinalDirection::East),
- &moves::AscendMove(CardinalDirection::South),
- &moves::AscendMove(CardinalDirection::West),
- //
- &moves::DescendMove(CardinalDirection::North),
- &moves::DescendMove(CardinalDirection::East),
- &moves::DescendMove(CardinalDirection::South),
- &moves::DescendMove(CardinalDirection::West),
- //
- &moves::DiagonalMove(CardinalDirection::North),
- &moves::DiagonalMove(CardinalDirection::East),
- &moves::DiagonalMove(CardinalDirection::South),
- &moves::DiagonalMove(CardinalDirection::West),
- ];
-
- let successors = |node: &Node| {
- let mut edges = Vec::new();
-
- let world = &self.world.read().shared;
- for possible_move in possible_moves.iter() {
- edges.push(Edge {
- target: possible_move.next_node(node),
- cost: possible_move.cost(world, node),
- });
+
+ let world_lock = world_container
+ .get(world_name)
+ .expect("Entity tried to pathfind but the entity isn't in a valid world");
+ let end = event.goal.goal_node();
+
+ let goal = event.goal.clone();
+ let entity = event.entity;
+
+ let task = thread_pool.spawn(async move {
+ debug!("start: {start:?}, end: {end:?}");
+
+ let possible_moves: Vec<&dyn moves::Move> = vec![
+ &moves::ForwardMove(CardinalDirection::North),
+ &moves::ForwardMove(CardinalDirection::East),
+ &moves::ForwardMove(CardinalDirection::South),
+ &moves::ForwardMove(CardinalDirection::West),
+ //
+ &moves::AscendMove(CardinalDirection::North),
+ &moves::AscendMove(CardinalDirection::East),
+ &moves::AscendMove(CardinalDirection::South),
+ &moves::AscendMove(CardinalDirection::West),
+ //
+ &moves::DescendMove(CardinalDirection::North),
+ &moves::DescendMove(CardinalDirection::East),
+ &moves::DescendMove(CardinalDirection::South),
+ &moves::DescendMove(CardinalDirection::West),
+ //
+ &moves::DiagonalMove(CardinalDirection::North),
+ &moves::DiagonalMove(CardinalDirection::East),
+ &moves::DiagonalMove(CardinalDirection::South),
+ &moves::DiagonalMove(CardinalDirection::West),
+ ];
+
+ let successors = |node: &Node| {
+ let mut edges = Vec::new();
+
+ let world = world_lock.read();
+ for possible_move in &possible_moves {
+ edges.push(Edge {
+ target: possible_move.next_node(node),
+ cost: possible_move.cost(&world, node),
+ });
+ }
+ edges
+ };
+
+ let mut pf = MTDStarLite::new(
+ start,
+ end,
+ |n| goal.heuristic(n),
+ successors,
+ successors,
+ |n| goal.success(n),
+ );
+
+ let start_time = std::time::Instant::now();
+ let p = pf.find_path();
+ let end_time = std::time::Instant::now();
+ debug!("path: {p:?}");
+ debug!("time: {:?}", end_time - start_time);
+
+ // convert the Option<Vec<Node>> to a VecDeque<Node>
+ if let Some(p) = p {
+ let path = p.into_iter().collect::<VecDeque<_>>();
+ // commands.entity(event.entity).insert(Pathfinder { path: p });
+ Some(PathFoundEvent { entity, path })
+ } else {
+ error!("no path found");
+ None
}
- edges
- };
+ });
- let mut pf = MTDStarLite::new(
- start,
- end,
- |n| goal.heuristic(n),
- successors,
- successors,
- |n| goal.success(n),
- );
-
- let start = std::time::Instant::now();
- let p = pf.find_path();
- let end = std::time::Instant::now();
- debug!("path: {p:?}");
- debug!("time: {:?}", end - start);
-
- let state = self
- .plugins
- .get::<State>()
- .expect("Pathfinder plugin not installed!")
- .clone();
- // convert the Option<Vec<Node>> to a VecDeque<Node>
- if let Some(p) = p {
- *state.path.lock() = p.into_iter().collect();
- } else {
- error!("no path found");
+ commands.spawn(ComputePath(task));
+ }
+}
+
+// poll the tasks and send the PathFoundEvent if they're done
+fn handle_tasks(
+ mut commands: Commands,
+ mut transform_tasks: Query<(Entity, &mut ComputePath)>,
+ mut path_found_events: EventWriter<PathFoundEvent>,
+) {
+ for (entity, mut task) in &mut transform_tasks {
+ if let Some(optional_path_found_event) = future::block_on(future::poll_once(&mut task.0)) {
+ if let Some(path_found_event) = optional_path_found_event {
+ path_found_events.send(path_found_event);
+ }
+
+ // Task is complete, so remove task component from entity
+ commands.entity(entity).remove::<ComputePath>();
}
}
}
-fn tick_execute_path(bot: &mut Client, path: &mut VecDeque<Node>) {
- let target = if let Some(target) = path.front() {
- target
- } else {
- return;
- };
- let center = target.pos.center();
- // println!("going to {center:?} (at {pos:?})", pos = bot.entity().pos());
- bot.look_at(&center);
- bot.sprint(SprintDirection::Forward);
- // check if we should jump
- if target.pos.y > bot.entity().pos().y.floor() as i32 {
- bot.jump();
+// set the path for the target entity when we get the PathFoundEvent
+fn path_found_listener(mut events: EventReader<PathFoundEvent>, mut query: Query<&mut Pathfinder>) {
+ for event in events.iter() {
+ let mut pathfinder = query
+ .get_mut(event.entity)
+ .expect("Path found for an entity that doesn't have a pathfinder");
+ pathfinder.path = event.path.clone();
}
+}
- if target.is_reached(&bot.entity()) {
- // println!("ok target {target:?} reached");
- path.pop_front();
- if path.is_empty() {
- bot.walk(WalkDirection::None);
+fn tick_execute_path(
+ mut query: Query<(Entity, &mut Pathfinder, &Position, &Physics)>,
+ mut look_at_events: EventWriter<LookAtEvent>,
+ mut sprint_events: EventWriter<StartSprintEvent>,
+ mut walk_events: EventWriter<StartWalkEvent>,
+ mut jump_events: EventWriter<JumpEvent>,
+) {
+ for (entity, mut pathfinder, position, physics) in &mut query {
+ loop {
+ let Some(target) = pathfinder.path.front() else {
+ break;
+ };
+ let center = target.pos.center();
+ // println!("going to {center:?} (at {pos:?})", pos = bot.entity().pos());
+ look_at_events.send(LookAtEvent {
+ entity,
+ position: center,
+ });
+ debug!(
+ "tick: pathfinder {entity:?}; going to {:?}; currently at {position:?}",
+ target.pos
+ );
+ sprint_events.send(StartSprintEvent {
+ entity,
+ direction: SprintDirection::Forward,
+ });
+ // check if we should jump
+ if target.pos.y > position.y.floor() as i32 {
+ jump_events.send(JumpEvent(entity));
+ }
+
+ if target.is_reached(position, physics) {
+ // println!("reached target");
+ pathfinder.path.pop_front();
+ if pathfinder.path.is_empty() {
+ // println!("reached goal");
+ walk_events.send(StartWalkEvent {
+ entity,
+ direction: WalkDirection::None,
+ });
+ }
+ // tick again, maybe we already reached the next node!
+ } else {
+ break;
+ }
}
- // tick again, maybe we already reached the next node!
- tick_execute_path(bot, path);
}
}
@@ -172,7 +280,8 @@ pub trait Goal {
impl Node {
/// Returns whether the entity is at the node and should start going to the
/// next node.
- pub fn is_reached(&self, entity: &EntityData) -> bool {
+ #[must_use]
+ pub fn is_reached(&self, position: &Position, physics: &Physics) -> bool {
// println!(
// "entity.delta.y: {} {:?}=={:?}, self.vertical_vel={:?}",
// entity.delta.y,
@@ -180,11 +289,11 @@ impl Node {
// self.pos,
// self.vertical_vel
// );
- BlockPos::from(entity.pos()) == self.pos
+ BlockPos::from(position) == self.pos
&& match self.vertical_vel {
- VerticalVel::NoneMidair => entity.delta.y > -0.1 && entity.delta.y < 0.1,
- VerticalVel::None => entity.on_ground,
- VerticalVel::FallingLittle => entity.delta.y < -0.1,
+ VerticalVel::NoneMidair => physics.delta.y > -0.1 && physics.delta.y < 0.1,
+ VerticalVel::None => physics.on_ground,
+ VerticalVel::FallingLittle => physics.delta.y < -0.1,
}
}
}
diff --git a/azalea/src/pathfinder/moves.rs b/azalea/src/pathfinder/moves.rs
index ccf8ba1a..9bb5c7c1 100644
--- a/azalea/src/pathfinder/moves.rs
+++ b/azalea/src/pathfinder/moves.rs
@@ -1,11 +1,11 @@
use super::{Node, VerticalVel};
use azalea_core::{BlockPos, CardinalDirection};
use azalea_physics::collision::{self, BlockWithShape};
-use azalea_world::WeakWorld;
+use azalea_world::World;
/// whether this block is passable
-fn is_block_passable(pos: &BlockPos, world: &WeakWorld) -> bool {
- if let Some(block) = world.get_block_state(pos) {
+fn is_block_passable(pos: &BlockPos, world: &World) -> bool {
+ if let Some(block) = world.chunks.get_block_state(pos) {
block.shape() == &collision::empty_shape()
} else {
false
@@ -13,8 +13,8 @@ fn is_block_passable(pos: &BlockPos, world: &WeakWorld) -> bool {
}
/// whether this block has a solid hitbox (i.e. we can stand on it)
-fn is_block_solid(pos: &BlockPos, world: &WeakWorld) -> bool {
- if let Some(block) = world.get_block_state(pos) {
+fn is_block_solid(pos: &BlockPos, world: &World) -> bool {
+ if let Some(block) = world.chunks.get_block_state(pos) {
block.shape() == &collision::block_shape()
} else {
false
@@ -22,22 +22,22 @@ fn is_block_solid(pos: &BlockPos, world: &WeakWorld) -> bool {
}
/// Whether this block and the block above are passable
-fn is_passable(pos: &BlockPos, world: &WeakWorld) -> bool {
+fn is_passable(pos: &BlockPos, world: &World) -> bool {
is_block_passable(pos, world) && is_block_passable(&pos.up(1), world)
}
/// Whether we can stand in this position. Checks if the block below is solid,
/// and that the two blocks above that are passable.
-fn is_standable(pos: &BlockPos, world: &WeakWorld) -> bool {
+fn is_standable(pos: &BlockPos, world: &World) -> bool {
is_block_solid(&pos.down(1), world) && is_passable(pos, world)
}
const JUMP_COST: f32 = 0.5;
const WALK_ONE_BLOCK_COST: f32 = 1.0;
-pub trait Move {
- fn cost(&self, world: &WeakWorld, node: &Node) -> f32;
+pub trait Move: Send + Sync {
+ fn cost(&self, world: &World, node: &Node) -> f32;
/// Returns by how much the entity's position should be changed when this
/// move is executed.
fn offset(&self) -> BlockPos;
@@ -51,7 +51,7 @@ pub trait Move {
pub struct ForwardMove(pub CardinalDirection);
impl Move for ForwardMove {
- fn cost(&self, world: &WeakWorld, node: &Node) -> f32 {
+ fn cost(&self, world: &World, node: &Node) -> f32 {
if is_standable(&(node.pos + self.offset()), world)
&& node.vertical_vel == VerticalVel::None
{
@@ -67,7 +67,7 @@ impl Move for ForwardMove {
pub struct AscendMove(pub CardinalDirection);
impl Move for AscendMove {
- fn cost(&self, world: &WeakWorld, node: &Node) -> f32 {
+ fn cost(&self, world: &World, node: &Node) -> f32 {
if node.vertical_vel == VerticalVel::None
&& is_block_passable(&node.pos.up(2), world)
&& is_standable(&(node.pos + self.offset()), world)
@@ -89,7 +89,7 @@ impl Move for AscendMove {
}
pub struct DescendMove(pub CardinalDirection);
impl Move for DescendMove {
- fn cost(&self, world: &WeakWorld, node: &Node) -> f32 {
+ fn cost(&self, world: &World, node: &Node) -> f32 {
// check whether 3 blocks vertically forward are passable
if node.vertical_vel == VerticalVel::None
&& is_standable(&(node.pos + self.offset()), world)
@@ -112,7 +112,7 @@ impl Move for DescendMove {
}
pub struct DiagonalMove(pub CardinalDirection);
impl Move for DiagonalMove {
- fn cost(&self, world: &WeakWorld, node: &Node) -> f32 {
+ fn cost(&self, world: &World, node: &Node) -> f32 {
if node.vertical_vel != VerticalVel::None {
return f32::INFINITY;
}
@@ -151,56 +151,92 @@ mod tests {
use super::*;
use azalea_block::BlockState;
use azalea_core::ChunkPos;
- use azalea_world::{Chunk, PartialWorld};
+ use azalea_world::{Chunk, ChunkStorage, PartialWorld};
#[test]
fn test_is_passable() {
- let mut world = PartialWorld::default();
- world
- .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default()))
- .unwrap();
- world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone);
- world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air);
-
- assert_eq!(
- is_block_passable(&BlockPos::new(0, 0, 0), &world.shared),
- false
+ let mut partial_world = PartialWorld::default();
+ let mut chunk_storage = ChunkStorage::default();
+
+ partial_world.chunks.set(
+ &ChunkPos { x: 0, z: 0 },
+ Some(Chunk::default()),
+ &mut chunk_storage,
+ );
+ partial_world.chunks.set_block_state(
+ &BlockPos::new(0, 0, 0),
+ BlockState::Stone,
+ &mut chunk_storage,
);
- assert_eq!(
- is_block_passable(&BlockPos::new(0, 1, 0), &world.shared),
- true
+ partial_world.chunks.set_block_state(
+ &BlockPos::new(0, 1, 0),
+ BlockState::Air,
+ &mut chunk_storage,
);
+
+ let world = chunk_storage.into();
+ assert_eq!(is_block_passable(&BlockPos::new(0, 0, 0), &world), false);
+ assert_eq!(is_block_passable(&BlockPos::new(0, 1, 0), &world), true);
}
#[test]
fn test_is_solid() {
- let mut world = PartialWorld::default();
- world
- .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default()))
- .unwrap();
- world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone);
- world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air);
-
- assert_eq!(is_block_solid(&BlockPos::new(0, 0, 0), &world.shared), true);
- assert_eq!(
- is_block_solid(&BlockPos::new(0, 1, 0), &world.shared),
- false
+ let mut partial_world = PartialWorld::default();
+ let mut chunk_storage = ChunkStorage::default();
+ partial_world.chunks.set(
+ &ChunkPos { x: 0, z: 0 },
+ Some(Chunk::default()),
+ &mut chunk_storage,
);
+ partial_world.chunks.set_block_state(
+ &BlockPos::new(0, 0, 0),
+ BlockState::Stone,
+ &mut chunk_storage,
+ );
+ partial_world.chunks.set_block_state(
+ &BlockPos::new(0, 1, 0),
+ BlockState::Air,
+ &mut chunk_storage,
+ );
+
+ let world = chunk_storage.into();
+ assert_eq!(is_block_solid(&BlockPos::new(0, 0, 0), &world), true);
+ assert_eq!(is_block_solid(&BlockPos::new(0, 1, 0), &world), false);
}
#[test]
fn test_is_standable() {
- let mut world = PartialWorld::default();
- world
- .set_chunk(&ChunkPos { x: 0, z: 0 }, Some(Chunk::default()))
- .unwrap();
- world.set_block_state(&BlockPos::new(0, 0, 0), BlockState::Stone);
- world.set_block_state(&BlockPos::new(0, 1, 0), BlockState::Air);
- world.set_block_state(&BlockPos::new(0, 2, 0), BlockState::Air);
- world.set_block_state(&BlockPos::new(0, 3, 0), BlockState::Air);
-
- assert!(is_standable(&BlockPos::new(0, 1, 0), &world.shared));
- assert!(!is_standable(&BlockPos::new(0, 0, 0), &world.shared));
- assert!(!is_standable(&BlockPos::new(0, 2, 0), &world.shared));
+ let mut partial_world = PartialWorld::default();
+ let mut chunk_storage = ChunkStorage::default();
+ partial_world.chunks.set(
+ &ChunkPos { x: 0, z: 0 },
+ Some(Chunk::default()),
+ &mut chunk_storage,
+ );
+ partial_world.chunks.set_block_state(
+ &BlockPos::new(0, 0, 0),
+ BlockState::Stone,
+ &mut chunk_storage,
+ );
+ partial_world.chunks.set_block_state(
+ &BlockPos::new(0, 1, 0),
+ BlockState::Air,
+ &mut chunk_storage,
+ );
+ partial_world.chunks.set_block_state(
+ &BlockPos::new(0, 2, 0),
+ BlockState::Air,
+ &mut chunk_storage,
+ );
+ partial_world.chunks.set_block_state(
+ &BlockPos::new(0, 3, 0),
+ BlockState::Air,
+ &mut chunk_storage,
+ );
+
+ let world = chunk_storage.into();
+ assert!(is_standable(&BlockPos::new(0, 1, 0), &world));
+ assert!(!is_standable(&BlockPos::new(0, 0, 0), &world));
+ assert!(!is_standable(&BlockPos::new(0, 2, 0), &world));
}
}
diff --git a/azalea/src/pathfinder/mtdstarlite.rs b/azalea/src/pathfinder/mtdstarlite.rs
index 50a467df..ce463279 100644
--- a/azalea/src/pathfinder/mtdstarlite.rs
+++ b/azalea/src/pathfinder/mtdstarlite.rs
@@ -3,9 +3,9 @@
//!
//! Future optimization attempt ideas:
//! - Use a different priority queue (e.g. fibonacci heap)
-//! - Use FxHash instead of the default hasher
+//! - Use `FxHash` instead of the default hasher
//! - Have `par` be a raw pointer
-//! - Try borrowing vs copying the Node in several places (like state_mut)
+//! - Try borrowing vs copying the Node in several places (like `state_mut`)
//! - Store edge costs in their own map
use priority_queue::DoublePriorityQueue;
@@ -260,9 +260,7 @@ impl<
// identify a path from sstart to sgoal using the parent pointers
let mut target = self.state(&self.goal).par;
while !(Some(self.start) == target) {
- let this_target = if let Some(this_target) = target {
- this_target
- } else {
+ let Some(this_target) = target else {
break;
};
// hunter follows path from start to goal;
diff --git a/azalea/src/prelude.rs b/azalea/src/prelude.rs
index bedf724f..3d8cc13e 100644
--- a/azalea/src/prelude.rs
+++ b/azalea/src/prelude.rs
@@ -1,7 +1,7 @@
//! The Azalea prelude. Things that are necessary for a bare-bones bot are
//! re-exported here.
-pub use crate::bot::BotTrait;
-pub use crate::pathfinder::Trait;
-pub use crate::{plugins, swarm_plugins, Plugin};
+pub use crate::bot::BotClientExt;
+pub use crate::pathfinder::PathfinderClientExt;
pub use azalea_client::{Account, Client, Event};
+pub use azalea_ecs::component::Component;
diff --git a/azalea/src/start.rs b/azalea/src/start.rs
deleted file mode 100644
index e2374fb8..00000000
--- a/azalea/src/start.rs
+++ /dev/null
@@ -1,136 +0,0 @@
-use crate::{bot, pathfinder, HandleFn};
-use azalea_client::{Account, Client, Plugins};
-use azalea_protocol::ServerAddress;
-use std::{future::Future, sync::Arc};
-use thiserror::Error;
-
-/// A helper macro that generates a [`Plugins`] struct from a list of objects
-/// that implement [`Plugin`].
-///
-/// ```rust,no_run
-/// plugins![azalea_pathfinder::Plugin];
-/// ```
-///
-/// [`Plugin`]: crate::Plugin
-#[macro_export]
-macro_rules! plugins {
- ($($plugin:expr),*) => {
- {
- let mut plugins = azalea::Plugins::new();
- $(
- plugins.add($plugin);
- )*
- plugins
- }
- };
-}
-
-/// The options that are passed to [`azalea::start`].
-///
-/// [`azalea::start`]: crate::start()
-pub struct Options<S, A, Fut>
-where
- A: TryInto<ServerAddress>,
- Fut: Future<Output = Result<(), anyhow::Error>>,
-{
- /// The address of the server that we're connecting to. This can be a
- /// `&str`, [`ServerAddress`], or anything that implements
- /// `TryInto<ServerAddress>`.
- ///
- /// [`ServerAddress`]: azalea_protocol::ServerAddress
- pub address: A,
- /// The account that's going to join the server.
- pub account: Account,
- /// The plugins that are going to be used. Plugins are external crates that
- /// add extra functionality to Azalea. You should use the [`plugins`] macro
- /// for this field.
- ///
- /// ```rust,no_run
- /// plugins![azalea_pathfinder::Plugin]
- /// ```
- pub plugins: Plugins,
- /// A struct that contains the data that you want your bot to remember
- /// across events.
- ///
- /// # Examples
- ///
- /// ```rust
- /// use parking_lot::Mutex;
- /// use std::sync::Arc;
- ///
- /// #[derive(Default, Clone)]
- /// struct State {
- /// farming: Arc<Mutex<bool>>,
- /// }
- /// ```
- pub state: S,
- /// The function that's called whenever we get an event.
- ///
- /// # Examples
- ///
- /// ```rust
- /// use azalea::prelude::*;
- ///
- /// async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> {
- /// Ok(())
- /// }
- /// ```
- pub handle: HandleFn<Fut, S>,
-}
-
-#[derive(Error, Debug)]
-pub enum StartError {
- #[error("Invalid address")]
- InvalidAddress,
- #[error("Join error: {0}")]
- Join(#[from] azalea_client::JoinError),
-}
-
-/// Join a server and start handling events. This function will run forever
-/// until it gets disconnected from the server.
-///
-/// # Examples
-///
-/// ```rust,no_run
-/// let error = azalea::start(azalea::Options {
-/// account,
-/// address: "localhost",
-/// state: State::default(),
-/// plugins: plugins![azalea_pathfinder::Plugin],
-/// handle,
-/// }).await;
-/// ```
-pub async fn start<
- S: Send + Sync + Clone + 'static,
- A: Send + TryInto<ServerAddress>,
- Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
->(
- options: Options<S, A, Fut>,
-) -> Result<(), StartError> {
- let address = match options.address.try_into() {
- Ok(address) => address,
- Err(_) => return Err(StartError::InvalidAddress),
- };
-
- let (mut bot, mut rx) = Client::join(&options.account, address).await?;
-
- let mut plugins = options.plugins;
- // DEFAULT PLUGINS
- plugins.add(bot::Plugin);
- plugins.add(pathfinder::Plugin);
-
- bot.plugins = Arc::new(plugins.build());
-
- let state = options.state;
-
- while let Some(event) = rx.recv().await {
- let cloned_plugins = (*bot.plugins).clone();
- for plugin in cloned_plugins.into_iter() {
- tokio::spawn(plugin.handle(event.clone(), bot.clone()));
- }
-
- tokio::spawn((options.handle)(bot.clone(), event.clone(), state.clone()));
- }
-
- Ok(())
-}
diff --git a/azalea/src/swarm/chat.rs b/azalea/src/swarm/chat.rs
index 4582c59e..a55e9bf6 100644
--- a/azalea/src/swarm/chat.rs
+++ b/azalea/src/swarm/chat.rs
@@ -1,4 +1,4 @@
-//! Implements SwarmEvent::Chat
+//! Implements `SwarmEvent::Chat`.
// How the chat event works (to avoid firing the event multiple times):
// ---
@@ -13,71 +13,81 @@
// in Swarm that's set to the smallest index of all the bots, and we remove all
// messages from the queue that are before that index.
+use azalea_client::{packet_handling::ChatReceivedEvent, ChatPacket};
+use azalea_ecs::{
+ app::{App, Plugin},
+ component::Component,
+ event::{EventReader, EventWriter},
+ schedule::IntoSystemDescriptor,
+ system::{Commands, Query, Res, ResMut, Resource},
+};
+use std::collections::VecDeque;
+
use crate::{Swarm, SwarmEvent};
-use async_trait::async_trait;
-use azalea_client::{ChatPacket, Client, Event};
-use parking_lot::Mutex;
-use std::{collections::VecDeque, sync::Arc};
-use tokio::sync::broadcast::{Receiver, Sender};
#[derive(Clone)]
-pub struct Plugin {
- pub swarm_state: SwarmState,
- pub tx: Sender<ChatPacket>,
-}
-
-impl crate::Plugin for Plugin {
- type State = State;
-
- fn build(&self) -> State {
- State {
- chat_index: Arc::new(Mutex::new(0)),
- swarm_state: self.swarm_state.clone(),
- tx: self.tx.clone(),
- }
+pub struct SwarmChatPlugin;
+impl Plugin for SwarmChatPlugin {
+ fn build(&self, app: &mut App) {
+ app.add_event::<NewChatMessageEvent>()
+ .add_system(chat_listener.label("chat_listener"))
+ .add_system(update_min_index_and_shrink_queue.after("chat_listener"))
+ .insert_resource(GlobalChatState {
+ chat_queue: VecDeque::new(),
+ chat_min_index: 0,
+ });
}
}
-#[derive(Clone)]
-pub struct State {
- pub chat_index: Arc<Mutex<usize>>,
- pub tx: Sender<ChatPacket>,
- pub swarm_state: SwarmState,
+#[derive(Component, Debug)]
+pub struct ClientChatState {
+ pub chat_index: usize,
}
-#[derive(Clone)]
-pub struct SwarmState {
- pub chat_queue: Arc<Mutex<VecDeque<ChatPacket>>>,
- pub chat_min_index: Arc<Mutex<usize>>,
- pub rx: Arc<tokio::sync::Mutex<Receiver<ChatPacket>>>,
+/// A chat message that no other bots have seen yet was received by a bot.
+#[derive(Debug)]
+pub struct NewChatMessageEvent(ChatPacket);
+
+#[derive(Resource)]
+pub struct GlobalChatState {
+ pub chat_queue: VecDeque<ChatPacket>,
+ pub chat_min_index: usize,
}
-impl State {
- pub fn handle_chat(&self, message: ChatPacket) {
+fn chat_listener(
+ mut commands: Commands,
+ mut query: Query<&mut ClientChatState>,
+ mut events: EventReader<ChatReceivedEvent>,
+ mut global_chat_state: ResMut<GlobalChatState>,
+ mut new_chat_messages_events: EventWriter<NewChatMessageEvent>,
+) {
+ for event in events.iter() {
+ let mut client_chat_state = query.get_mut(event.entity);
+ let mut client_chat_index = if let Ok(ref client_chat_state) = client_chat_state {
+ client_chat_state.chat_index
+ } else {
+ // if the client has no chat state, we default to this and insert it at the end
+ global_chat_state.chat_min_index
+ };
+
// When a bot receives a chat messages, it looks into the queue to find the
// earliest instance of the message content that's after the bot's chat index.
// If it finds it, then its personal index is simply updated. Otherwise, fire
// the event and add to the queue.
- let mut chat_queue = self.swarm_state.chat_queue.lock();
- let chat_min_index = self.swarm_state.chat_min_index.lock();
- let mut chat_index = self.chat_index.lock();
-
- if *chat_min_index > *chat_index {
- // if this happens it's because this bot just logged in, so
- // ignore it and let another bot handle it
- println!("chat_min_index ({chat_min_index}) > chat_index ({chat_index})");
- *chat_index = *chat_min_index;
- return;
- }
- let actual_vec_index = *chat_index - *chat_min_index;
+ let actual_vec_index = client_chat_index - global_chat_state.chat_min_index;
// go through the queue and find the first message that's after the bot's index
let mut found = false;
- for (i, past_message) in chat_queue.iter().enumerate().skip(actual_vec_index) {
- if past_message == &message {
+ for (i, past_message) in global_chat_state
+ .chat_queue
+ .iter()
+ .enumerate()
+ .skip(actual_vec_index)
+ {
+ if past_message == &event.packet {
// found the message, update the index
- *chat_index = i + *chat_min_index + 1;
+ client_chat_index = i + global_chat_state.chat_min_index + 1;
found = true;
break;
}
@@ -85,185 +95,157 @@ impl State {
if !found {
// didn't find the message, so fire the swarm event and add to the queue
- self.tx
- .send(message.clone())
- .expect("failed to send chat message to swarm");
- chat_queue.push_back(message);
- *chat_index = chat_queue.len() + *chat_min_index;
+ new_chat_messages_events.send(NewChatMessageEvent(event.packet.clone()));
+ global_chat_state.chat_queue.push_back(event.packet.clone());
+ client_chat_index =
+ global_chat_state.chat_queue.len() + global_chat_state.chat_min_index;
}
- }
-}
-
-#[async_trait]
-impl crate::PluginState for State {
- async fn handle(self: Box<Self>, event: Event, _bot: Client) {
- // we're allowed to access Plugin::swarm_state since it's shared for every bot
- if let Event::Chat(m) = event {
- self.handle_chat(m);
+ if let Ok(ref mut client_chat_state) = client_chat_state {
+ client_chat_state.chat_index = client_chat_index;
+ } else {
+ commands.entity(event.entity).insert(ClientChatState {
+ chat_index: client_chat_index,
+ });
}
}
}
-impl SwarmState {
- pub fn new<S>(swarm: Swarm<S>) -> (Self, Sender<ChatPacket>)
- where
- S: Send + Sync + Clone + 'static,
- {
- let (tx, rx) = tokio::sync::broadcast::channel(1);
-
- let swarm_state = SwarmState {
- chat_queue: Arc::new(Mutex::new(VecDeque::new())),
- chat_min_index: Arc::new(Mutex::new(0)),
- rx: Arc::new(tokio::sync::Mutex::new(rx)),
- };
- tokio::spawn(swarm_state.clone().start(swarm));
-
- (swarm_state, tx)
- }
- async fn start<S>(self, swarm: Swarm<S>)
- where
- S: Send + Sync + Clone + 'static,
- {
- // it should never be locked unless we reused the same plugin for two swarms
- // (bad)
- let mut rx = self.rx.lock().await;
- while let Ok(m) = rx.recv().await {
- swarm.swarm_tx.send(SwarmEvent::Chat(m)).unwrap();
- let bot_states = swarm
- .bot_datas
- .lock()
- .iter()
- .map(|(bot, _)| {
- bot.plugins
- .get::<State>()
- .expect("Chat plugin not installed")
- .clone()
- })
- .collect::<Vec<_>>();
- self.handle_new_chat_message(&bot_states);
+fn update_min_index_and_shrink_queue(
+ query: Query<&ClientChatState>,
+ mut global_chat_state: ResMut<GlobalChatState>,
+ mut events: EventReader<NewChatMessageEvent>,
+ swarm: Option<Res<Swarm>>,
+) {
+ for event in events.iter() {
+ if let Some(swarm) = &swarm {
+ // it should also work if Swarm isn't present (so the tests don't need it)
+ swarm
+ .swarm_tx
+ .send(SwarmEvent::Chat(event.0.clone()))
+ .unwrap();
}
- }
-}
-
-impl SwarmState {
- pub fn handle_new_chat_message(&self, bot_states: &[State]) {
// To make sure the queue doesn't grow too large, we keep a `chat_min_index`
// in Swarm that's set to the smallest index of all the bots, and we remove all
// messages from the queue that are before that index.
- let chat_min_index = *self.chat_min_index.lock();
- let mut new_chat_min_index = usize::MAX;
- for bot_state in bot_states {
- let this_chat_index = *bot_state.chat_index.lock();
+ let mut new_chat_min_index = global_chat_state.chat_min_index;
+ for client_chat_state in query.iter() {
+ let this_chat_index = client_chat_state.chat_index;
if this_chat_index < new_chat_min_index {
new_chat_min_index = this_chat_index;
}
}
- let mut chat_queue = self.chat_queue.lock();
- if chat_min_index > new_chat_min_index {
- println!(
- "chat_min_index ({chat_min_index}) > new_chat_min_index ({new_chat_min_index})"
- );
+ if global_chat_state.chat_min_index > new_chat_min_index {
return;
}
// remove all messages from the queue that are before the min index
- for _ in 0..(new_chat_min_index - chat_min_index) {
- chat_queue.pop_front();
+ for _ in 0..(new_chat_min_index - global_chat_state.chat_min_index) {
+ global_chat_state.chat_queue.pop_front();
}
// update the min index
- *self.chat_min_index.lock() = new_chat_min_index;
+ global_chat_state.chat_min_index = new_chat_min_index;
}
}
#[cfg(test)]
mod tests {
+ use azalea_ecs::{ecs::Ecs, event::Events, system::SystemState};
+
use super::*;
+ fn make_test_app() -> App {
+ let mut app = App::new();
+ // we add the events like this instead of with .add_event so we can have our own
+ // event mangement in drain_events
+ app.init_resource::<Events<ChatReceivedEvent>>()
+ .init_resource::<Events<NewChatMessageEvent>>()
+ .add_system(chat_listener.label("chat_listener"))
+ .add_system(update_min_index_and_shrink_queue.after("chat_listener"))
+ .insert_resource(GlobalChatState {
+ chat_queue: VecDeque::new(),
+ chat_min_index: 0,
+ });
+ app
+ }
+
+ fn drain_events(ecs: &mut Ecs) -> Vec<ChatPacket> {
+ let mut system_state: SystemState<ResMut<Events<NewChatMessageEvent>>> =
+ SystemState::new(ecs);
+ let mut events = system_state.get_mut(ecs);
+
+ events.drain().map(|e| e.0.clone()).collect::<Vec<_>>()
+ }
+
#[tokio::test]
async fn test_swarm_chat() {
- let (tx, mut rx) = tokio::sync::broadcast::channel(1);
- let swarm_state = SwarmState {
- chat_queue: Arc::new(Mutex::new(VecDeque::new())),
- chat_min_index: Arc::new(Mutex::new(0)),
- rx: Arc::new(tokio::sync::Mutex::new(rx)),
- };
- let mut bot_states = vec![];
- let bot0 = State {
- swarm_state: swarm_state.clone(),
- chat_index: Arc::new(Mutex::new(0)),
- tx: tx.clone(),
- };
- let bot1 = State {
- swarm_state: swarm_state.clone(),
- chat_index: Arc::new(Mutex::new(0)),
- tx: tx.clone(),
- };
- bot_states.push(bot0.clone());
- bot_states.push(bot1.clone());
+ let mut app = make_test_app();
+
+ let bot0 = app.world.spawn_empty().id();
+ let bot1 = app.world.spawn_empty().id();
+
+ app.world.send_event(ChatReceivedEvent {
+ entity: bot0,
+ packet: ChatPacket::new("a"),
+ });
+ app.update();
- bot0.handle_chat(ChatPacket::new("a"));
// the swarm should get the event immediately after the bot gets it
+ assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("a")]);
assert_eq!(
- swarm_state.rx.lock().await.try_recv(),
- Ok(ChatPacket::new("a"))
+ app.world.get::<ClientChatState>(bot0).unwrap().chat_index,
+ 1
);
- assert_eq!(*bot0.chat_index.lock(), 1);
// and a second bot sending the event shouldn't do anything
- bot1.handle_chat(ChatPacket::new("a"));
- assert!(swarm_state.rx.lock().await.try_recv().is_err());
- assert_eq!(*bot1.chat_index.lock(), 1);
-
- // but if the first one gets it again, it should sent it again
- bot0.handle_chat(ChatPacket::new("a"));
+ app.world.send_event(ChatReceivedEvent {
+ entity: bot1,
+ packet: ChatPacket::new("a"),
+ });
+ app.update();
+ assert_eq!(drain_events(&mut app.world), vec![]);
assert_eq!(
- swarm_state.rx.lock().await.try_recv(),
- Ok(ChatPacket::new("a"))
+ app.world.get::<ClientChatState>(bot1).unwrap().chat_index,
+ 1
);
+ // but if the first one gets it again, it should sent it again
+ app.world.send_event(ChatReceivedEvent {
+ entity: bot0,
+ packet: ChatPacket::new("a"),
+ });
+ app.update();
+ assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("a")]);
+
// alright and now the second bot got a different chat message and it should be
// sent
- bot1.handle_chat(ChatPacket::new("b"));
- assert_eq!(
- swarm_state.rx.lock().await.try_recv(),
- Ok(ChatPacket::new("b"))
- );
+ app.world.send_event(ChatReceivedEvent {
+ entity: bot1,
+ packet: ChatPacket::new("b"),
+ });
+ app.update();
+ assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("b")]);
}
#[tokio::test]
async fn test_new_bot() {
- let (tx, mut rx) = tokio::sync::broadcast::channel(1);
- let swarm_state = SwarmState {
- chat_queue: Arc::new(Mutex::new(VecDeque::new())),
- chat_min_index: Arc::new(Mutex::new(0)),
- rx: Arc::new(tokio::sync::Mutex::new(rx)),
- };
- let mut bot_states = vec![];
- let bot0 = State {
- swarm_state: swarm_state.clone(),
- chat_index: Arc::new(Mutex::new(0)),
- tx: tx.clone(),
- };
- bot_states.push(bot0.clone());
+ let mut app = make_test_app();
+
+ let bot0 = app.world.spawn_empty().id();
// bot0 gets a chat message
- bot0.handle_chat(ChatPacket::new("a"));
- assert_eq!(
- swarm_state.rx.lock().await.try_recv(),
- Ok(ChatPacket::new("a"))
- );
- // now a second bot joined and got a different chat message
- let bot1 = State {
- swarm_state: swarm_state.clone(),
- chat_index: Arc::new(Mutex::new(0)),
- tx: tx.clone(),
- };
- bot_states.push(bot1.clone());
- bot1.handle_chat(ChatPacket::new("b"));
- assert_eq!(
- swarm_state.rx.lock().await.try_recv(),
- Ok(ChatPacket::new("b"))
- );
+ app.world.send_event(ChatReceivedEvent {
+ entity: bot0,
+ packet: ChatPacket::new("a"),
+ });
+ app.update();
+ assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("a")]);
+ let bot1 = app.world.spawn_empty().id();
+ app.world.send_event(ChatReceivedEvent {
+ entity: bot1,
+ packet: ChatPacket::new("b"),
+ });
+ app.update();
+ assert_eq!(drain_events(&mut app.world), vec![ChatPacket::new("b")]);
}
}
diff --git a/azalea/src/swarm/events.rs b/azalea/src/swarm/events.rs
new file mode 100644
index 00000000..81d8c731
--- /dev/null
+++ b/azalea/src/swarm/events.rs
@@ -0,0 +1,45 @@
+use azalea_client::LocalPlayer;
+use azalea_ecs::{
+ app::{App, Plugin},
+ event::EventWriter,
+ query::With,
+ system::{Query, ResMut, Resource},
+};
+use azalea_world::entity::MinecraftEntityId;
+use derive_more::{Deref, DerefMut};
+
+pub struct SwarmPlugin;
+impl Plugin for SwarmPlugin {
+ fn build(&self, app: &mut App) {
+ app.add_event::<SwarmReadyEvent>()
+ .add_system(check_ready)
+ .init_resource::<IsSwarmReady>();
+ }
+}
+
+/// All the bots from the swarm are now in the world.
+pub struct SwarmReadyEvent;
+
+#[derive(Default, Resource, Deref, DerefMut)]
+struct IsSwarmReady(bool);
+
+fn check_ready(
+ query: Query<Option<&MinecraftEntityId>, With<LocalPlayer>>,
+ mut is_swarm_ready: ResMut<IsSwarmReady>,
+ mut ready_events: EventWriter<SwarmReadyEvent>,
+) {
+ // if we already know the swarm is ready, do nothing
+ if **is_swarm_ready {
+ return;
+ }
+ // if all the players are in the world, we're ready
+ for entity_id in query.iter() {
+ if entity_id.is_none() {
+ return;
+ }
+ }
+
+ // all the players are in the world, so we're ready
+ **is_swarm_ready = true;
+ ready_events.send(SwarmReadyEvent);
+}
diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs
index 6fc3e40c..9c16bc01 100644
--- a/azalea/src/swarm/mod.rs
+++ b/azalea/src/swarm/mod.rs
@@ -1,41 +1,29 @@
-/// Swarms are a way to conveniently control many bots.
-mod chat;
-mod plugins;
+//! Swarms are a way to conveniently control many bots.
-pub use self::plugins::*;
-use crate::{bot, HandleFn};
-use azalea_client::{Account, ChatPacket, Client, Event, JoinError, Plugins};
+mod chat;
+mod events;
+
+use crate::{bot::DefaultBotPlugins, HandleFn};
+use azalea_client::{init_ecs_app, start_ecs, Account, ChatPacket, Client, Event, JoinError};
+use azalea_ecs::{
+ app::{App, Plugin, PluginGroup, PluginGroupBuilder},
+ component::Component,
+ ecs::Ecs,
+ entity::Entity,
+ system::Resource,
+};
use azalea_protocol::{
- connect::{Connection, ConnectionError},
+ connect::ConnectionError,
resolver::{self, ResolverError},
ServerAddress,
};
-use azalea_world::WeakWorldContainer;
+use azalea_world::WorldContainer;
use futures::future::join_all;
use log::error;
use parking_lot::{Mutex, RwLock};
-use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
+use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration};
use thiserror::Error;
-use tokio::sync::mpsc::{self, UnboundedSender};
-
-/// A helper macro that generates a [`SwarmPlugins`] struct from a list of
-/// objects that implement [`SwarmPlugin`].
-///
-/// ```rust,no_run
-/// swarm_plugins![azalea_pathfinder::Plugin];
-/// ```
-#[macro_export]
-macro_rules! swarm_plugins {
- ($($plugin:expr),*) => {
- {
- let mut plugins = azalea::SwarmPlugins::new();
- $(
- plugins.add($plugin);
- )*
- plugins
- }
- };
-}
+use tokio::sync::mpsc;
/// A swarm is a way to conveniently control many bots at once, while also
/// being able to control bots at an individual level when desired.
@@ -46,82 +34,295 @@ macro_rules! swarm_plugins {
/// It's used to make the [`Swarm::add`] function work.
///
/// [`azalea::start_swarm`]: fn.start_swarm.html
-#[derive(Clone)]
-pub struct Swarm<S> {
- bot_datas: Arc<Mutex<Vec<(Client, S)>>>,
+#[derive(Clone, Resource)]
+pub struct Swarm {
+ pub ecs_lock: Arc<Mutex<Ecs>>,
+ bots: Arc<Mutex<HashMap<Entity, Client>>>,
+
+ // bot_datas: Arc<Mutex<Vec<(Client, S)>>>,
resolved_address: SocketAddr,
address: ServerAddress,
- pub worlds: Arc<RwLock<WeakWorldContainer>>,
- /// Plugins that are set for new bots
- plugins: Plugins,
+ pub world_container: Arc<RwLock<WorldContainer>>,
- bots_tx: UnboundedSender<(Option<Event>, (Client, S))>,
- swarm_tx: UnboundedSender<SwarmEvent>,
-}
+ bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
+ swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
-/// An event about something that doesn't have to do with a single bot.
-#[derive(Clone, Debug)]
-pub enum SwarmEvent {
- /// All the bots in the swarm have successfully joined the server.
- Login,
- /// The swarm was created. This is only fired once, and it's guaranteed to
- /// be the first event to fire.
- Init,
- /// A bot got disconnected from the server.
- ///
- /// You can implement an auto-reconnect by calling [`Swarm::add`]
- /// with the account from this event.
- Disconnect(Account),
- /// At least one bot received a chat message.
- Chat(ChatPacket),
+ run_schedule_sender: mpsc::Sender<()>,
}
-pub type SwarmHandleFn<Fut, S, SS> = fn(Swarm<S>, SwarmEvent, SS) -> Fut;
-
-/// The options that are passed to [`azalea::start_swarm`].
-///
-/// [`azalea::start_swarm`]: crate::start_swarm()
-pub struct SwarmOptions<S, SS, A, Fut, SwarmFut>
+/// Create a new [`Swarm`].
+pub struct SwarmBuilder<S, SS, Fut, SwarmFut>
where
- A: TryInto<ServerAddress>,
+ S: Default + Send + Sync + Clone + 'static,
+ SS: Default + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<(), anyhow::Error>>,
SwarmFut: Future<Output = Result<(), anyhow::Error>>,
{
- /// The address of the server that we're connecting to. This can be a
- /// `&str`, [`ServerAddress`], or anything that implements
- /// `TryInto<ServerAddress>`.
- ///
- /// [`ServerAddress`]: azalea_protocol::ServerAddress
- pub address: A,
+ app: App,
/// The accounts that are going to join the server.
- pub accounts: Vec<Account>,
- /// The plugins that are going to be used for all the bots.
- ///
- /// You can usually leave this as `plugins![]`.
- pub plugins: Plugins,
- /// The plugins that are going to be used for the swarm.
- ///
- /// You can usually leave this as `swarm_plugins![]`.
- pub swarm_plugins: SwarmPlugins<S>,
+ accounts: Vec<Account>,
/// The individual bot states. This must be the same length as `accounts`,
/// since each bot gets one state.
- pub states: Vec<S>,
+ states: Vec<S>,
/// The state for the overall swarm.
- pub swarm_state: SS,
+ swarm_state: SS,
/// The function that's called every time a bot receives an [`Event`].
- pub handle: HandleFn<Fut, S>,
+ handler: Option<HandleFn<Fut, S>>,
/// The function that's called every time the swarm receives a
/// [`SwarmEvent`].
- pub swarm_handle: SwarmHandleFn<SwarmFut, S, SS>,
+ swarm_handler: Option<SwarmHandleFn<SwarmFut, SS>>,
/// How long we should wait between each bot joining the server. Set to
/// None to have every bot connect at the same time. None is different than
/// a duration of 0, since if a duration is present the bots will wait for
/// the previous one to be ready.
- pub join_delay: Option<std::time::Duration>,
+ join_delay: Option<std::time::Duration>,
+}
+impl<S, SS, Fut, SwarmFut> SwarmBuilder<S, SS, Fut, SwarmFut>
+where
+ Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+ SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+ S: Default + Send + Sync + Clone + Component + 'static,
+ SS: Default + Send + Sync + Clone + Component + 'static,
+{
+ /// Start creating the swarm.
+ #[must_use]
+ pub fn new() -> Self {
+ Self {
+ // we create the app here so plugins can add onto it.
+ // the schedules won't run until [`Self::start`] is called.
+ app: init_ecs_app(),
+
+ accounts: Vec::new(),
+ states: Vec::new(),
+ swarm_state: SS::default(),
+ handler: None,
+ swarm_handler: None,
+ join_delay: None,
+ }
+ .add_plugins(DefaultSwarmPlugins)
+ .add_plugins(DefaultBotPlugins)
+ }
+
+ /// Add a vec of [`Account`]s to the swarm.
+ ///
+ /// Use [`Self::add_account`] to only add one account. If you want the
+ /// clients to have different default states, add them one at a time with
+ /// [`Self::add_account_with_state`].
+ #[must_use]
+ pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self {
+ for account in accounts {
+ self = self.add_account(account);
+ }
+ self
+ }
+ /// Add a single new [`Account`] to the swarm. Use [`Self::add_accounts`] to
+ /// add multiple accounts at a time.
+ ///
+ /// This will make the state for this client be the default, use
+ /// [`Self::add_account_with_state`] to avoid that.
+ #[must_use]
+ pub fn add_account(self, account: Account) -> Self {
+ self.add_account_with_state(account, S::default())
+ }
+ /// Add an account with a custom initial state. Use just
+ /// [`Self::add_account`] to use the Default implementation for the state.
+ #[must_use]
+ pub fn add_account_with_state(mut self, account: Account, state: S) -> Self {
+ self.accounts.push(account);
+ self.states.push(state);
+ self
+ }
+
+ /// Set the function that's called every time a bot receives an [`Event`].
+ /// This is the way to handle normal per-bot events.
+ ///
+ /// You can only have one client handler, calling this again will replace
+ /// the old client handler function (you can have a client handler and swarm
+ /// handler separately though).
+ #[must_use]
+ pub fn set_handler(mut self, handler: HandleFn<Fut, S>) -> Self {
+ self.handler = Some(handler);
+ self
+ }
+ /// Set the function that's called every time the swarm receives a
+ /// [`SwarmEvent`]. This is the way to handle global swarm events.
+ ///
+ /// You can only have one swarm handler, calling this again will replace
+ /// the old swarm handler function (you can have a client handler and swarm
+ /// handler separately though).
+ #[must_use]
+ pub fn set_swarm_handler(mut self, handler: SwarmHandleFn<SwarmFut, SS>) -> Self {
+ self.swarm_handler = Some(handler);
+ self
+ }
+
+ /// Add a plugin to the swarm.
+ #[must_use]
+ pub fn add_plugin<T: Plugin>(mut self, plugin: T) -> Self {
+ self.app.add_plugin(plugin);
+ self
+ }
+ /// Add a group of plugins to the swarm.
+ #[must_use]
+ pub fn add_plugins<T: PluginGroup>(mut self, plugin_group: T) -> Self {
+ self.app.add_plugins(plugin_group);
+ self
+ }
+
+ /// Set how long we should wait between each bot joining the server.
+ ///
+ /// By default, every bot will connect at the same time. If you set this
+ /// field, however, the bots will wait for the previous one to have
+ /// connected and *then* they'll wait the given duration.
+ #[must_use]
+ pub fn join_delay(mut self, delay: std::time::Duration) -> Self {
+ self.join_delay = Some(delay);
+ self
+ }
+
+ /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
+ /// server.
+ ///
+ /// The `address` argument can be a `&str`, [`ServerAddress`], or anything
+ /// that implements `TryInto<ServerAddress>`.
+ ///
+ /// [`ServerAddress`]: azalea_protocol::ServerAddress
+ pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<(), SwarmStartError> {
+ assert_eq!(
+ self.accounts.len(),
+ self.states.len(),
+ "There must be exactly one state per bot."
+ );
+
+ // convert the TryInto<ServerAddress> into a ServerAddress
+ let address: ServerAddress = match address.try_into() {
+ Ok(address) => address,
+ Err(_) => return Err(SwarmStartError::InvalidAddress),
+ };
+
+ // resolve the address
+ let resolved_address = resolver::resolve_address(&address).await?;
+
+ let world_container = Arc::new(RwLock::new(WorldContainer::default()));
+
+ // we can't modify the swarm plugins after this
+ let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
+ let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
+
+ let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
+ let ecs_lock = start_ecs(self.app, run_schedule_receiver, run_schedule_sender.clone());
+
+ let swarm = Swarm {
+ ecs_lock: ecs_lock.clone(),
+ bots: Arc::new(Mutex::new(HashMap::new())),
+
+ resolved_address,
+ address,
+ world_container,
+
+ bots_tx,
+
+ swarm_tx: swarm_tx.clone(),
+
+ run_schedule_sender,
+ };
+ ecs_lock.lock().insert_resource(swarm.clone());
+
+ // SwarmBuilder (self) isn't Send so we have to take all the things we need out
+ // of it
+ let mut swarm_clone = swarm.clone();
+ let join_delay = self.join_delay;
+ let accounts = self.accounts.clone();
+ let states = self.states.clone();
+
+ let join_task = tokio::spawn(async move {
+ if let Some(join_delay) = join_delay {
+ // if there's a join delay, then join one by one
+ for (account, state) in accounts.iter().zip(states) {
+ swarm_clone
+ .add_with_exponential_backoff(account, state.clone())
+ .await;
+ tokio::time::sleep(join_delay).await;
+ }
+ } else {
+ // otherwise, join all at once
+ let swarm_borrow = &swarm_clone;
+ join_all(accounts.iter().zip(states).map(
+ async move |(account, state)| -> Result<(), JoinError> {
+ swarm_borrow
+ .clone()
+ .add_with_exponential_backoff(account, state.clone())
+ .await;
+ Ok(())
+ },
+ ))
+ .await;
+ }
+ });
+
+ let swarm_state = self.swarm_state;
+
+ // Watch swarm_rx and send those events to the swarm_handle.
+ let swarm_clone = swarm.clone();
+ tokio::spawn(async move {
+ while let Some(event) = swarm_rx.recv().await {
+ if let Some(swarm_handler) = self.swarm_handler {
+ tokio::spawn((swarm_handler)(
+ swarm_clone.clone(),
+ event,
+ swarm_state.clone(),
+ ));
+ }
+ }
+ });
+
+ // bot events
+ while let Some((Some(event), bot)) = bots_rx.recv().await {
+ if let Some(handler) = self.handler {
+ let state = bot.component::<S>();
+ tokio::spawn((handler)(bot, event, state));
+ }
+ }
+
+ join_task.abort();
+
+ Ok(())
+ }
+}
+
+impl<S, SS, Fut, SwarmFut> Default for SwarmBuilder<S, SS, Fut, SwarmFut>
+where
+ Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+ SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+ S: Default + Send + Sync + Clone + Component + 'static,
+ SS: Default + Send + Sync + Clone + Component + 'static,
+{
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// An event about something that doesn't have to do with a single bot.
+#[derive(Clone, Debug)]
+pub enum SwarmEvent {
+ /// All the bots in the swarm have successfully joined the server.
+ Login,
+ /// The swarm was created. This is only fired once, and it's guaranteed to
+ /// be the first event to fire.
+ Init,
+ /// A bot got disconnected from the server.
+ ///
+ /// You can implement an auto-reconnect by calling [`Swarm::add`]
+ /// with the account from this event.
+ Disconnect(Account),
+ /// At least one bot received a chat message.
+ Chat(ChatPacket),
}
+pub type SwarmHandleFn<Fut, SS> = fn(Swarm, SwarmEvent, SS) -> Fut;
+
#[derive(Error, Debug)]
pub enum SwarmStartError {
#[error("Invalid address")]
@@ -154,7 +355,7 @@ pub enum SwarmStartError {
/// let mut states = Vec::new();
///
/// for i in 0..10 {
-/// accounts.push(Account::offline(&format!("bot{}", i)));
+/// accounts.push(Account::offline(&format!("bot{i}")));
/// states.push(State::default());
/// }
///
@@ -204,193 +405,69 @@ pub enum SwarmStartError {
/// }
/// Ok(())
/// }
-pub async fn start_swarm<
- S: Send + Sync + Clone + 'static,
- SS: Send + Sync + Clone + 'static,
- A: Send + TryInto<ServerAddress>,
- Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
- SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
->(
- options: SwarmOptions<S, SS, A, Fut, SwarmFut>,
-) -> Result<(), SwarmStartError> {
- assert_eq!(
- options.accounts.len(),
- options.states.len(),
- "There must be exactly one state per bot."
- );
-
- // convert the TryInto<ServerAddress> into a ServerAddress
- let address: ServerAddress = match options.address.try_into() {
- Ok(address) => address,
- Err(_) => return Err(SwarmStartError::InvalidAddress),
- };
-
- // resolve the address
- let resolved_address = resolver::resolve_address(&address).await?;
-
- let world_container = Arc::new(RwLock::new(WeakWorldContainer::default()));
-
- let mut plugins = options.plugins;
- let swarm_plugins = options.swarm_plugins;
-
- // DEFAULT CLIENT PLUGINS
- plugins.add(bot::Plugin);
- plugins.add(crate::pathfinder::Plugin);
- // DEFAULT SWARM PLUGINS
-
- // we can't modify the swarm plugins after this
- let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
- let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
-
- let mut swarm = Swarm {
- bot_datas: Arc::new(Mutex::new(Vec::new())),
-
- resolved_address,
- address,
- worlds: world_container,
- plugins,
-
- bots_tx,
-
- swarm_tx: swarm_tx.clone(),
- };
-
- {
- // the chat plugin is hacky and needs the swarm to be passed like this
- let (chat_swarm_state, chat_tx) = chat::SwarmState::new(swarm.clone());
- swarm.plugins.add(chat::Plugin {
- swarm_state: chat_swarm_state,
- tx: chat_tx,
- });
- }
-
- let swarm_plugins = swarm_plugins.build();
-
- let mut swarm_clone = swarm.clone();
- let join_task = tokio::spawn(async move {
- if let Some(join_delay) = options.join_delay {
- // if there's a join delay, then join one by one
- for (account, state) in options.accounts.iter().zip(options.states) {
- swarm_clone
- .add_with_exponential_backoff(account, state.clone())
- .await;
- tokio::time::sleep(join_delay).await;
- }
- } else {
- let swarm_borrow = &swarm_clone;
- join_all(options.accounts.iter().zip(options.states).map(
- async move |(account, state)| -> Result<(), JoinError> {
- swarm_borrow
- .clone()
- .add_with_exponential_backoff(account, state.clone())
- .await;
- Ok(())
- },
- ))
- .await;
- }
- });
-
- let swarm_state = options.swarm_state;
- let mut internal_state = InternalSwarmState::default();
-
- // Watch swarm_rx and send those events to the plugins and swarm_handle.
- let swarm_clone = swarm.clone();
- let swarm_plugins_clone = swarm_plugins.clone();
- tokio::spawn(async move {
- while let Some(event) = swarm_rx.recv().await {
- for plugin in swarm_plugins_clone.clone().into_iter() {
- tokio::spawn(plugin.handle(event.clone(), swarm_clone.clone()));
- }
- tokio::spawn((options.swarm_handle)(
- swarm_clone.clone(),
- event,
- swarm_state.clone(),
- ));
- }
- });
-
- // bot events
- while let Some((Some(event), (bot, state))) = bots_rx.recv().await {
- // bot event handling
- let cloned_plugins = (*bot.plugins).clone();
- for plugin in cloned_plugins.into_iter() {
- tokio::spawn(plugin.handle(event.clone(), bot.clone()));
- }
-
- // swarm event handling
- // remove this #[allow] when more checks are added
- #[allow(clippy::single_match)]
- match &event {
- Event::Login => {
- internal_state.bots_joined += 1;
- if internal_state.bots_joined == swarm.bot_datas.lock().len() {
- swarm_tx.send(SwarmEvent::Login).unwrap();
- }
- }
- _ => {}
- }
-
- tokio::spawn((options.handle)(bot, event, state));
- }
-
- join_task.abort();
-
- Ok(())
-}
-
-impl<S> Swarm<S>
-where
- S: Send + Sync + Clone + 'static,
-{
+// pub async fn start_swarm<
+// S: Send + Sync + Clone + 'static,
+// SS: Send + Sync + Clone + 'static,
+// A: Send + TryInto<ServerAddress>,
+// Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+// SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+// >(
+// options: SwarmOptions<S, SS, A, Fut, SwarmFut>,
+// ) -> Result<(), SwarmStartError> {
+// }
+
+impl Swarm {
/// Add a new account to the swarm. You can remove it later by calling
/// [`Client::disconnect`].
- pub async fn add(&mut self, account: &Account, state: S) -> Result<Client, JoinError> {
- let conn = Connection::new(&self.resolved_address).await?;
- let (conn, game_profile) = Client::handshake(conn, account, &self.address.clone()).await?;
-
+ ///
+ /// # Errors
+ ///
+ /// Returns an `Err` if the bot could not do a handshake successfully.
+ pub async fn add<S: Component + Clone>(
+ &mut self,
+ account: &Account,
+ state: S,
+ ) -> Result<Client, JoinError> {
// tx is moved to the bot so it can send us events
// rx is used to receive events from the bot
- let (tx, mut rx) = mpsc::channel(1);
- let mut bot = Client::new(game_profile, conn, Some(self.worlds.clone()));
- tx.send(Event::Init).await.expect("Failed to send event");
- bot.start_tasks(tx);
+ // An event that causes the schedule to run. This is only used internally.
+ // let (run_schedule_sender, run_schedule_receiver) = mpsc::unbounded_channel();
+ // let ecs_lock = start_ecs(run_schedule_receiver, run_schedule_sender.clone());
+ let (bot, mut rx) = Client::start_client(
+ self.ecs_lock.clone(),
+ account,
+ &self.address,
+ &self.resolved_address,
+ self.run_schedule_sender.clone(),
+ )
+ .await?;
+ // add the state to the client
+ {
+ let mut ecs = self.ecs_lock.lock();
+ ecs.entity_mut(bot.entity).insert(state);
+ }
- bot.plugins = Arc::new(self.plugins.clone().build());
+ self.bots.lock().insert(bot.entity, bot.clone());
+ let cloned_bots = self.bots.clone();
let cloned_bots_tx = self.bots_tx.clone();
let cloned_bot = bot.clone();
- let cloned_state = state.clone();
let owned_account = account.clone();
- let bot_datas = self.bot_datas.clone();
let swarm_tx = self.swarm_tx.clone();
- // send the init event immediately so it's the first thing we get
- swarm_tx.send(SwarmEvent::Init).unwrap();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
// we can't handle events here (since we can't copy the handler),
// they're handled above in start_swarm
- if let Err(e) =
- cloned_bots_tx.send((Some(event), (cloned_bot.clone(), cloned_state.clone())))
- {
+ if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) {
error!("Error sending event to swarm: {e}");
}
}
- // the bot disconnected, so we remove it from the swarm
- let mut bot_datas = bot_datas.lock();
- let index = bot_datas
- .iter()
- .position(|(b, _)| b.profile.uuid == cloned_bot.profile.uuid)
- .expect("bot disconnected but not found in swarm");
- bot_datas.remove(index);
-
+ cloned_bots.lock().remove(&bot.entity);
swarm_tx
.send(SwarmEvent::Disconnect(owned_account))
.unwrap();
});
- self.bot_datas.lock().push((bot.clone(), state.clone()));
-
Ok(bot)
}
@@ -399,7 +476,11 @@ where
///
/// Exponential backoff means if it fails joining it will initially wait 10
/// seconds, then 20, then 40, up to 2 minutes.
- pub async fn add_with_exponential_backoff(&mut self, account: &Account, state: S) -> Client {
+ pub async fn add_with_exponential_backoff<S: Component + Clone>(
+ &mut self,
+ account: &Account,
+ state: S,
+ ) -> Client {
let mut disconnects = 0;
loop {
match self.add(account, state.clone()).await {
@@ -409,7 +490,7 @@ where
let delay = (Duration::from_secs(5) * 2u32.pow(disconnects))
.min(Duration::from_secs(120));
let username = account.username.clone();
- error!("Error joining {username}: {e}. Waiting {delay:?} and trying again.");
+ error!("Error joining as {username}: {e}. Waiting {delay:?} and trying again.");
tokio::time::sleep(delay).await;
}
}
@@ -417,33 +498,42 @@ where
}
}
-impl<S> IntoIterator for Swarm<S>
-where
- S: Send + Sync + Clone + 'static,
-{
- type Item = (Client, S);
+impl IntoIterator for Swarm {
+ type Item = Client;
type IntoIter = std::vec::IntoIter<Self::Item>;
- /// Iterate over the bots and their states in this swarm.
+ /// Iterate over the bots in this swarm.
///
/// ```rust,no_run
- /// for (bot, state) in swarm {
+ /// for bot in swarm {
+ /// let state = bot.component::<State>();
/// // ...
/// }
/// ```
fn into_iter(self) -> Self::IntoIter {
- self.bot_datas.lock().clone().into_iter()
+ self.bots
+ .lock()
+ .clone()
+ .into_values()
+ .collect::<Vec<_>>()
+ .into_iter()
}
}
-#[derive(Default)]
-struct InternalSwarmState {
- /// The number of bots connected to the server
- pub bots_joined: usize,
-}
-
impl From<ConnectionError> for SwarmStartError {
fn from(e: ConnectionError) -> Self {
SwarmStartError::from(JoinError::from(e))
}
}
+
+/// This plugin group will add all the default plugins necessary for swarms to
+/// work.
+pub struct DefaultSwarmPlugins;
+
+impl PluginGroup for DefaultSwarmPlugins {
+ fn build(self) -> PluginGroupBuilder {
+ PluginGroupBuilder::start::<Self>()
+ .add(chat::SwarmChatPlugin)
+ .add(events::SwarmPlugin)
+ }
+}
diff --git a/azalea/src/swarm/plugins.rs b/azalea/src/swarm/plugins.rs
deleted file mode 100644
index f92d40e6..00000000
--- a/azalea/src/swarm/plugins.rs
+++ /dev/null
@@ -1,135 +0,0 @@
-use crate::{Swarm, SwarmEvent};
-use async_trait::async_trait;
-use nohash_hasher::NoHashHasher;
-use std::{
- any::{Any, TypeId},
- collections::HashMap,
- hash::BuildHasherDefault,
-};
-
-type U64Hasher = BuildHasherDefault<NoHashHasher<u64>>;
-
-// kind of based on https://docs.rs/http/latest/src/http/extensions.rs.html
-/// A map of plugin ids to [`SwarmPlugin`] trait objects. The client stores
-/// this so we can keep the state for our [`Swarm`] plugins.
-///
-/// If you're using azalea, you should generate this from the `swarm_plugins!`
-/// macro.
-#[derive(Clone, Default)]
-pub struct SwarmPlugins<S> {
- map: Option<HashMap<TypeId, Box<dyn SwarmPlugin<S>>, U64Hasher>>,
-}
-
-#[derive(Clone)]
-pub struct SwarmPluginStates<S> {
- map: Option<HashMap<TypeId, Box<dyn SwarmPluginState<S>>, U64Hasher>>,
-}
-
-impl<S> SwarmPluginStates<S> {
- pub fn get<T: SwarmPluginState<S>>(&self) -> Option<&T> {
- self.map
- .as_ref()
- .and_then(|map| map.get(&TypeId::of::<T>()))
- .and_then(|boxed| (boxed.as_ref() as &dyn Any).downcast_ref::<T>())
- }
-}
-
-impl<S> SwarmPlugins<S>
-where
- S: 'static,
-{
- /// Create a new empty set of plugins.
- pub fn new() -> Self {
- Self { map: None }
- }
-
- /// Add a new plugin to this set.
- pub fn add<T: SwarmPlugin<S>>(&mut self, plugin: T) {
- if self.map.is_none() {
- self.map = Some(HashMap::with_hasher(BuildHasherDefault::default()));
- }
- self.map
- .as_mut()
- .unwrap()
- .insert(TypeId::of::<T>(), Box::new(plugin));
- }
-
- /// Build our plugin states from this set of plugins. Note that if you're
- /// using `azalea` you'll probably never need to use this as it's called
- /// for you.
- pub fn build(self) -> SwarmPluginStates<S> {
- if self.map.is_none() {
- return SwarmPluginStates { map: None };
- }
- let mut map = HashMap::with_hasher(BuildHasherDefault::default());
- for (id, plugin) in self.map.unwrap().into_iter() {
- map.insert(id, plugin.build());
- }
- SwarmPluginStates { map: Some(map) }
- }
-}
-
-impl<S> IntoIterator for SwarmPluginStates<S> {
- type Item = Box<dyn SwarmPluginState<S>>;
- type IntoIter = std::vec::IntoIter<Self::Item>;
-
- /// Iterate over the plugin states.
- fn into_iter(self) -> Self::IntoIter {
- self.map
- .map(|map| map.into_values().collect::<Vec<_>>())
- .unwrap_or_default()
- .into_iter()
- }
-}
-
-/// A `SwarmPluginState` keeps the current state of a plugin for a client. All
-/// the fields must be atomic. Unique `SwarmPluginState`s are built from
-/// [`SwarmPlugin`]s.
-#[async_trait]
-pub trait SwarmPluginState<S>: Send + Sync + SwarmPluginStateClone<S> + Any + 'static {
- async fn handle(self: Box<Self>, event: SwarmEvent, swarm: Swarm<S>);
-}
-
-/// Swarm plugins can keep their own personal state ([`SwarmPluginState`]),
-/// listen to [`SwarmEvent`]s, and add new functions to [`Swarm`].
-pub trait SwarmPlugin<S>: Send + Sync + SwarmPluginClone<S> + Any + 'static {
- fn build(&self) -> Box<dyn SwarmPluginState<S>>;
-}
-
-/// An internal trait that allows SwarmPluginState to be cloned.
-#[doc(hidden)]
-pub trait SwarmPluginStateClone<S> {
- fn clone_box(&self) -> Box<dyn SwarmPluginState<S>>;
-}
-impl<T, S> SwarmPluginStateClone<S> for T
-where
- T: 'static + SwarmPluginState<S> + Clone,
-{
- fn clone_box(&self) -> Box<dyn SwarmPluginState<S>> {
- Box::new(self.clone())
- }
-}
-impl<S> Clone for Box<dyn SwarmPluginState<S>> {
- fn clone(&self) -> Self {
- self.clone_box()
- }
-}
-
-/// An internal trait that allows SwarmPlugin to be cloned.
-#[doc(hidden)]
-pub trait SwarmPluginClone<S> {
- fn clone_box(&self) -> Box<dyn SwarmPlugin<S>>;
-}
-impl<T, S> SwarmPluginClone<S> for T
-where
- T: 'static + SwarmPlugin<S> + Clone,
-{
- fn clone_box(&self) -> Box<dyn SwarmPlugin<S>> {
- Box::new(self.clone())
- }
-}
-impl<S> Clone for Box<dyn SwarmPlugin<S>> {
- fn clone(&self) -> Self {
- self.clone_box()
- }
-}