aboutsummaryrefslogtreecommitdiff
path: root/azalea/src
diff options
context:
space:
mode:
authormat <27899617+mat-1@users.noreply.github.com>2022-11-27 16:25:07 -0600
committerGitHub <noreply@github.com>2022-11-27 16:25:07 -0600
commit631ed63dbdc7167df4de02a55b5c2ef1cea909e9 (patch)
tree104e567c332f2aeb30ea6acefef8c73f9b2f158b /azalea/src
parent962b9fcaae917c7e5bef718469fba31f6ff7c3cb (diff)
downloadazalea-drasl-631ed63dbdc7167df4de02a55b5c2ef1cea909e9.tar.xz
Swarm (#36)
* make azalea-pathfinder dir * start writing d* lite impl * more work on d* lite * work more on implementing d* lite * full d* lite impl * updated edges * add next() function * add NoPathError * why does dstar lite not work * fix d* lite implementation * make the test actually check the coords * replace while loop with if statement * fix clippy complaints * make W only have to be PartialOrd * fix PartialOrd issues * implement mtd* lite * add a test to mtd* lite * remove normal d* lite * make heuristic only take in one arg * add `success` function * Update README.md * evil black magic to make .entity not need dimension * start adding moves * slightly improve the vec3/position situation new macro that implements all the useful functions * moves stuff * make it compile * update deps in az-pathfinder * make it compile again * more pathfinding stuff * add Bot::look_at * replace EntityMut and EntityRef with just Entity * block pos pathfinding stuff * rename movedirection to walkdirection * execute path every tick * advance path * change az-pf version * make azalea_client keep plugin state * fix Plugins::get * why does it think there is air * start debugging incorrect air * update some From methods to use rem_euclid * start adding swarm * fix deadlock i still don't understand why it was happening but the solution was to keep the Client::player lock for shorter so it didn't overlap with the Client::dimension lock * make lookat actually work probably * fix going too fast * Update main.rs * make a thing immutable * direction_looking_at * fix rotations * import swarm in an example * fix stuff from merge * remove azalea_pathfinder import * delete azalea-pathfinder crate already in azalea::pathfinder module * swarms * start working on shared dimensions * Shared worlds work * start adding Swarm::add_account * add_account works * change "client" to "bot" in some places * Fix issues from merge * Update world.rs * add SwarmEvent::Disconnect(Account) * almost add SwarmEvent::Chat and new plugin system it panics rn * make plugins have to provide the State associated type * improve comments * make fn build slightly cleaner * fix SwarmEvent::Chat * change a println in bot/main.rs * Client::shutdown -> disconnect * polish fix clippy warnings + improve some docs a bit * fix shared worlds* *there's a bug that entities and bots will have their positions exaggerated because the relative movement packet is applied for every entity once per bot * i am being trolled by rust for some reason some stuff is really slow for literally no reason and it makes no sense i am going insane * make world an RwLock again * remove debug messages * fix skipping event ticks unfortunately now sending events is `.send().await?` instead of just `.send()` * fix deadlock + warnings * turns out my floor_mod impl was wrong and i32::rem_euclid has the correct behavior LOL * still errors with lots of bots * make swarm iter & fix new chunks not loading * improve docs * start fixing tests * fix all the tests except the examples i don't know how to exclude them from the tests * improve docs some more
Diffstat (limited to 'azalea/src')
-rw-r--r--[-rwxr-xr-x]azalea/src/bot.rs37
-rw-r--r--[-rwxr-xr-x]azalea/src/lib.rs149
-rw-r--r--azalea/src/pathfinder/mod.rs34
-rw-r--r--[-rwxr-xr-x]azalea/src/prelude.rs2
-rw-r--r--azalea/src/start.rs136
-rw-r--r--azalea/src/swarm/chat.rs147
-rw-r--r--azalea/src/swarm/mod.rs447
-rw-r--r--azalea/src/swarm/plugins.rs134
8 files changed, 913 insertions, 173 deletions
diff --git a/azalea/src/bot.rs b/azalea/src/bot.rs
index 0becaa62..0674c692 100755..100644
--- a/azalea/src/bot.rs
+++ b/azalea/src/bot.rs
@@ -4,9 +4,14 @@ use azalea_core::Vec3;
use parking_lot::Mutex;
use std::{f64::consts::PI, sync::Arc};
-#[derive(Default, Clone)]
-pub struct Plugin {
- pub state: State,
+#[derive(Clone, Default)]
+pub struct Plugin;
+impl crate::Plugin for Plugin {
+ type State = State;
+
+ fn build(&self) -> State {
+ State::default()
+ }
}
#[derive(Default, Clone)]
@@ -14,6 +19,18 @@ pub struct State {
jumping_once: Arc<Mutex<bool>>,
}
+#[async_trait]
+impl crate::PluginState for State {
+ async fn handle(self: Box<Self>, event: Event, mut bot: Client) {
+ if let Event::Tick = event {
+ if *self.jumping_once.lock() && bot.jumping() {
+ *self.jumping_once.lock() = false;
+ bot.set_jumping(false);
+ }
+ }
+ }
+}
+
pub trait BotTrait {
fn jump(&mut self);
fn look_at(&mut self, pos: &Vec3);
@@ -23,7 +40,7 @@ impl BotTrait for azalea_client::Client {
/// Queue a jump for the next tick.
fn jump(&mut self) {
self.set_jumping(true);
- let state = self.plugins.get::<Plugin>().unwrap().state.clone();
+ let state = self.plugins.get::<State>().unwrap().clone();
*state.jumping_once.lock() = true;
}
@@ -34,18 +51,6 @@ impl BotTrait for azalea_client::Client {
}
}
-#[async_trait]
-impl crate::Plugin for Plugin {
- async fn handle(self: Box<Self>, event: Event, mut bot: Client) {
- if let Event::Tick = event {
- if *self.state.jumping_once.lock() && bot.jumping() {
- *self.state.jumping_once.lock() = false;
- bot.set_jumping(false);
- }
- }
- }
-}
-
fn direction_looking_at(current: &Vec3, target: &Vec3) -> (f32, f32) {
// borrowed from mineflayer's Bot.lookAt because i didn't want to do math
let delta = target - current;
diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs
index 89754409..7c9c660b 100755..100644
--- a/azalea/src/lib.rs
+++ b/azalea/src/lib.rs
@@ -75,152 +75,19 @@
//!
//! [`azalea_client`]: https://crates.io/crates/azalea-client
+#![feature(trait_upcasting)]
+#![feature(async_closure)]
+#![allow(incomplete_features)]
+
mod bot;
pub mod pathfinder;
pub mod prelude;
+mod start;
+mod swarm;
pub use azalea_client::*;
pub use azalea_core::{BlockPos, Vec3};
-use azalea_protocol::ServerAddress;
-use std::{future::Future, sync::Arc};
-use thiserror::Error;
+pub use start::{start, Options};
+pub use swarm::*;
pub type HandleFn<Fut, S> = fn(Client, Event, S) -> Fut;
-
-/// The options that are passed to [`azalea::start`].
-///
-/// [`azalea::start`]: fn.start.html
-pub struct Options<S, A, Fut>
-where
- A: TryInto<ServerAddress>,
- Fut: Future<Output = Result<(), anyhow::Error>>,
-{
- /// The address of the server that we're connecting to. This can be a
- /// `&str`, [`ServerAddress`], or anything that implements
- /// `TryInto<ServerAddress>`.
- ///
- /// [`ServerAddress`]: azalea_protocol::ServerAddress
- pub address: A,
- /// The account that's going to join the server.
- pub account: Account,
- /// The plugins that are going to be used. Plugins are external crates that
- /// add extra functionality to Azalea. You should use the [`plugins`] macro
- /// for this field.
- ///
- /// ```rust,no_run
- /// plugins![azalea_pathfinder::Plugin::default()]
- /// ```
- pub plugins: Plugins,
- /// A struct that contains the data that you want your bot to remember
- /// across events.
- ///
- /// # Examples
- ///
- /// ```rust
- /// use parking_lot::Mutex;
- /// use std::sync::Arc;
- ///
- /// #[derive(Default, Clone)]
- /// struct State {
- /// farming: Arc<Mutex<bool>>,
- /// }
- /// ```
- pub state: S,
- /// The function that's called whenever we get an event.
- ///
- /// # Examples
- ///
- /// ```rust
- /// use azalea::prelude::*;
- ///
- /// async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> {
- /// Ok(())
- /// }
- /// ```
- pub handle: HandleFn<Fut, S>,
-}
-
-#[derive(Error, Debug)]
-pub enum Error {
- #[error("Invalid address")]
- InvalidAddress,
- #[error("Join error: {0}")]
- Join(#[from] azalea_client::JoinError),
-}
-
-/// Join a server and start handling events. This function will run forever until
-/// it gets disconnected from the server.
-///
-/// # Examples
-///
-/// ```rust,no_run
-/// let error = azalea::start(azalea::Options {
-/// account,
-/// address: "localhost",
-/// state: State::default(),
-/// plugins: plugins![azalea_pathfinder::Plugin::default()],
-/// handle,
-/// }).await;
-/// ```
-pub async fn start<
- S: Send + Sync + Clone + 'static,
- A: Send + TryInto<ServerAddress>,
- Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
->(
- options: Options<S, A, Fut>,
-) -> Result<(), Error> {
- let address = match options.address.try_into() {
- Ok(address) => address,
- Err(_) => return Err(Error::InvalidAddress),
- };
-
- let (mut bot, mut rx) = Client::join(&options.account, address).await?;
-
- let mut plugins = options.plugins;
- plugins.add(bot::Plugin::default());
- plugins.add(pathfinder::Plugin::default());
- bot.plugins = Arc::new(plugins);
-
- let state = options.state;
-
- while let Some(event) = rx.recv().await {
- let cloned_plugins = (*bot.plugins).clone();
- for plugin in cloned_plugins.into_iter() {
- tokio::spawn(plugin.handle(event.clone(), bot.clone()));
- }
-
- tokio::spawn(bot::Plugin::handle(
- Box::new(bot.plugins.get::<bot::Plugin>().unwrap().clone()),
- event.clone(),
- bot.clone(),
- ));
- tokio::spawn(pathfinder::Plugin::handle(
- Box::new(bot.plugins.get::<pathfinder::Plugin>().unwrap().clone()),
- event.clone(),
- bot.clone(),
- ));
-
- tokio::spawn((options.handle)(bot.clone(), event.clone(), state.clone()));
- }
-
- Ok(())
-}
-
-/// A helper macro that generates a [`Plugins`] struct from a list of objects
-/// that implement [`Plugin`].
-///
-/// ```rust,no_run
-/// plugins![azalea_pathfinder::Plugin::default()];
-/// ```
-#[macro_export]
-macro_rules! plugins {
- ($($plugin:expr),*) => {
- {
- let mut plugins = azalea::Plugins::new();
- $(
- plugins.add($plugin);
- )*
- plugins
- }
- };
-}
diff --git a/azalea/src/pathfinder/mod.rs b/azalea/src/pathfinder/mod.rs
index f119c645..8a9d7540 100644
--- a/azalea/src/pathfinder/mod.rs
+++ b/azalea/src/pathfinder/mod.rs
@@ -13,9 +13,14 @@ use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
-#[derive(Default, Clone)]
-pub struct Plugin {
- pub state: State,
+#[derive(Clone, Default)]
+pub struct Plugin;
+impl crate::Plugin for Plugin {
+ type State = State;
+
+ fn build(&self) -> State {
+ State::default()
+ }
}
#[derive(Default, Clone)]
@@ -25,10 +30,10 @@ pub struct State {
}
#[async_trait]
-impl crate::Plugin for Plugin {
+impl crate::PluginState for State {
async fn handle(self: Box<Self>, event: Event, mut bot: Client) {
if let Event::Tick = event {
- let mut path = self.state.path.lock();
+ let mut path = self.path.lock();
if !path.is_empty() {
tick_execute_path(&mut bot, &mut path);
@@ -102,9 +107,8 @@ impl Trait for azalea_client::Client {
let state = self
.plugins
- .get::<Plugin>()
+ .get::<State>()
.expect("Pathfinder plugin not installed!")
- .state
.clone();
// convert the Option<Vec<Node>> to a VecDeque<Node>
*state.path.lock() = p.expect("no path").into_iter().collect();
@@ -127,7 +131,7 @@ fn tick_execute_path(bot: &mut Client, path: &mut VecDeque<Node>) {
}
if target.is_reached(&bot.entity()) {
- println!("ok target {target:?} reached");
+ // println!("ok target {target:?} reached");
path.pop_front();
if path.is_empty() {
bot.walk(WalkDirection::None);
@@ -165,13 +169,13 @@ impl Node {
/// Returns whether the entity is at the node and should start going to the
/// next node.
pub fn is_reached(&self, entity: &EntityData) -> bool {
- println!(
- "entity.delta.y: {} {:?}=={:?}, self.vertical_vel={:?}",
- entity.delta.y,
- BlockPos::from(entity.pos()),
- self.pos,
- self.vertical_vel
- );
+ // println!(
+ // "entity.delta.y: {} {:?}=={:?}, self.vertical_vel={:?}",
+ // entity.delta.y,
+ // BlockPos::from(entity.pos()),
+ // self.pos,
+ // self.vertical_vel
+ // );
BlockPos::from(entity.pos()) == self.pos
&& match self.vertical_vel {
VerticalVel::NoneMidair => entity.delta.y > -0.1 && entity.delta.y < 0.1,
diff --git a/azalea/src/prelude.rs b/azalea/src/prelude.rs
index 9fa1ac1a..30205e59 100755..100644
--- a/azalea/src/prelude.rs
+++ b/azalea/src/prelude.rs
@@ -2,5 +2,5 @@
pub use crate::bot::BotTrait;
pub use crate::pathfinder::Trait;
-pub use crate::plugins;
+pub use crate::{plugins, swarm_plugins, Plugin};
pub use azalea_client::{Account, Client, Event};
diff --git a/azalea/src/start.rs b/azalea/src/start.rs
new file mode 100644
index 00000000..c7d79261
--- /dev/null
+++ b/azalea/src/start.rs
@@ -0,0 +1,136 @@
+use crate::{bot, pathfinder, HandleFn};
+use azalea_client::{Account, Client, Plugins};
+use azalea_protocol::ServerAddress;
+use std::{future::Future, sync::Arc};
+use thiserror::Error;
+
+/// A helper macro that generates a [`Plugins`] struct from a list of objects
+/// that implement [`Plugin`].
+///
+/// ```rust,no_run
+/// plugins![azalea_pathfinder::Plugin];
+/// ```
+///
+/// [`Plugin`]: crate::Plugin
+#[macro_export]
+macro_rules! plugins {
+ ($($plugin:expr),*) => {
+ {
+ let mut plugins = azalea::Plugins::new();
+ $(
+ plugins.add($plugin);
+ )*
+ plugins
+ }
+ };
+}
+
+/// The options that are passed to [`azalea::start`].
+///
+/// [`azalea::start`]: crate::start()
+pub struct Options<S, A, Fut>
+where
+ A: TryInto<ServerAddress>,
+ Fut: Future<Output = Result<(), anyhow::Error>>,
+{
+ /// The address of the server that we're connecting to. This can be a
+ /// `&str`, [`ServerAddress`], or anything that implements
+ /// `TryInto<ServerAddress>`.
+ ///
+ /// [`ServerAddress`]: azalea_protocol::ServerAddress
+ pub address: A,
+ /// The account that's going to join the server.
+ pub account: Account,
+ /// The plugins that are going to be used. Plugins are external crates that
+ /// add extra functionality to Azalea. You should use the [`plugins`] macro
+ /// for this field.
+ ///
+ /// ```rust,no_run
+ /// plugins![azalea_pathfinder::Plugin]
+ /// ```
+ pub plugins: Plugins,
+ /// A struct that contains the data that you want your bot to remember
+ /// across events.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use parking_lot::Mutex;
+ /// use std::sync::Arc;
+ ///
+ /// #[derive(Default, Clone)]
+ /// struct State {
+ /// farming: Arc<Mutex<bool>>,
+ /// }
+ /// ```
+ pub state: S,
+ /// The function that's called whenever we get an event.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use azalea::prelude::*;
+ ///
+ /// async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> {
+ /// Ok(())
+ /// }
+ /// ```
+ pub handle: HandleFn<Fut, S>,
+}
+
+#[derive(Error, Debug)]
+pub enum StartError {
+ #[error("Invalid address")]
+ InvalidAddress,
+ #[error("Join error: {0}")]
+ Join(#[from] azalea_client::JoinError),
+}
+
+/// Join a server and start handling events. This function will run forever until
+/// it gets disconnected from the server.
+///
+/// # Examples
+///
+/// ```rust,no_run
+/// let error = azalea::start(azalea::Options {
+/// account,
+/// address: "localhost",
+/// state: State::default(),
+/// plugins: plugins![azalea_pathfinder::Plugin],
+/// handle,
+/// }).await;
+/// ```
+pub async fn start<
+ S: Send + Sync + Clone + 'static,
+ A: Send + TryInto<ServerAddress>,
+ Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+>(
+ options: Options<S, A, Fut>,
+) -> Result<(), StartError> {
+ let address = match options.address.try_into() {
+ Ok(address) => address,
+ Err(_) => return Err(StartError::InvalidAddress),
+ };
+
+ let (mut bot, mut rx) = Client::join(&options.account, address).await?;
+
+ let mut plugins = options.plugins;
+ // DEFAULT PLUGINS
+ plugins.add(bot::Plugin);
+ plugins.add(pathfinder::Plugin);
+
+ bot.plugins = Arc::new(plugins.build());
+
+ let state = options.state;
+
+ while let Some(event) = rx.recv().await {
+ let cloned_plugins = (*bot.plugins).clone();
+ for plugin in cloned_plugins.into_iter() {
+ tokio::spawn(plugin.handle(event.clone(), bot.clone()));
+ }
+
+ tokio::spawn((options.handle)(bot.clone(), event.clone(), state.clone()));
+ }
+
+ Ok(())
+}
diff --git a/azalea/src/swarm/chat.rs b/azalea/src/swarm/chat.rs
new file mode 100644
index 00000000..a39632f5
--- /dev/null
+++ b/azalea/src/swarm/chat.rs
@@ -0,0 +1,147 @@
+//! Implements SwarmEvent::Chat
+
+// How the chat event works (to avoid firing the event multiple times):
+// ---
+// There's a shared queue of all the chat messages
+// Each bot contains an index of the farthest message we've seen
+// When a bot receives a chat messages, it looks into the queue to find the
+// earliest instance of the message content that's after the bot's chat index.
+// If it finds it, then its personal index is simply updated. Otherwise, fire
+// the event and add to the queue.
+//
+// To make sure the queue doesn't grow too large, we keep a `chat_min_index`
+// in Swarm that's set to the smallest index of all the bots, and we remove all
+// messages from the queue that are before that index.
+
+use crate::{Swarm, SwarmEvent};
+use async_trait::async_trait;
+use azalea_client::{ChatPacket, Client, Event};
+use parking_lot::Mutex;
+use std::{collections::VecDeque, sync::Arc};
+use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
+
+#[derive(Clone)]
+pub struct Plugin {
+ pub swarm_state: SwarmState,
+ pub tx: UnboundedSender<ChatPacket>,
+}
+
+impl crate::Plugin for Plugin {
+ type State = State;
+
+ fn build(&self) -> State {
+ State {
+ farthest_chat_index: Arc::new(Mutex::new(0)),
+ swarm_state: self.swarm_state.clone(),
+ tx: self.tx.clone(),
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct State {
+ pub farthest_chat_index: Arc<Mutex<usize>>,
+ pub tx: UnboundedSender<ChatPacket>,
+ pub swarm_state: SwarmState,
+}
+
+#[derive(Clone)]
+pub struct SwarmState {
+ pub chat_queue: Arc<Mutex<VecDeque<ChatPacket>>>,
+ pub chat_min_index: Arc<Mutex<usize>>,
+ pub rx: Arc<tokio::sync::Mutex<UnboundedReceiver<ChatPacket>>>,
+}
+
+#[async_trait]
+impl crate::PluginState for State {
+ async fn handle(self: Box<Self>, event: Event, _bot: Client) {
+ // we're allowed to access Plugin::swarm_state since it's shared for every bot
+ if let Event::Chat(m) = event {
+ // When a bot receives a chat messages, it looks into the queue to find the
+ // earliest instance of the message content that's after the bot's chat index.
+ // If it finds it, then its personal index is simply updated. Otherwise, fire
+ // the event and add to the queue.
+
+ let mut chat_queue = self.swarm_state.chat_queue.lock();
+ let chat_min_index = self.swarm_state.chat_min_index.lock();
+ let mut farthest_chat_index = self.farthest_chat_index.lock();
+
+ let actual_vec_index = *farthest_chat_index - *chat_min_index;
+
+ // go through the queue and find the first message that's after the bot's index
+ let mut found = false;
+ for (i, msg) in chat_queue.iter().enumerate().skip(actual_vec_index) {
+ if msg == &m {
+ // found the message, update the index
+ *farthest_chat_index = i + *chat_min_index + 1;
+ found = true;
+ break;
+ }
+ }
+
+ if !found {
+ // didn't find the message, so fire the swarm event and add to the queue
+ self.tx
+ .send(m.clone())
+ .expect("failed to send chat message to swarm");
+ chat_queue.push_back(m);
+ *farthest_chat_index = chat_queue.len() - 1 + *chat_min_index;
+ }
+ }
+ }
+}
+
+impl SwarmState {
+ pub fn new<S>(swarm: Swarm<S>) -> (Self, UnboundedSender<ChatPacket>)
+ where
+ S: Send + Sync + Clone + 'static,
+ {
+ let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+
+ let swarm_state = SwarmState {
+ chat_queue: Arc::new(Mutex::new(VecDeque::new())),
+ chat_min_index: Arc::new(Mutex::new(0)),
+ rx: Arc::new(tokio::sync::Mutex::new(rx)),
+ };
+ tokio::spawn(swarm_state.clone().start(swarm));
+
+ (swarm_state, tx)
+ }
+ async fn start<S>(self, swarm: Swarm<S>)
+ where
+ S: Send + Sync + Clone + 'static,
+ {
+ // it should never be locked unless we reused the same plugin for two swarms (bad)
+ let mut rx = self.rx.lock().await;
+ while let Some(m) = rx.recv().await {
+ swarm.swarm_tx.send(SwarmEvent::Chat(m)).unwrap();
+
+ // To make sure the queue doesn't grow too large, we keep a `chat_min_index`
+ // in Swarm that's set to the smallest index of all the bots, and we remove all
+ // messages from the queue that are before that index.
+
+ let chat_min_index = *self.chat_min_index.lock();
+ let mut new_chat_min_index = usize::MAX;
+ for (bot, _) in swarm.bot_datas.lock().iter() {
+ let this_farthest_chat_index = *bot
+ .plugins
+ .get::<State>()
+ .expect("Chat plugin not installed")
+ .farthest_chat_index
+ .lock();
+ if this_farthest_chat_index < new_chat_min_index {
+ new_chat_min_index = this_farthest_chat_index;
+ }
+ }
+
+ let mut chat_queue = self.chat_queue.lock();
+ // remove all messages from the queue that are before the min index
+ for _ in 0..(new_chat_min_index - chat_min_index) {
+ chat_queue.pop_front();
+ }
+
+ // update the min index
+ *self.chat_min_index.lock() = new_chat_min_index;
+ }
+ }
+}
diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs
new file mode 100644
index 00000000..c45014d2
--- /dev/null
+++ b/azalea/src/swarm/mod.rs
@@ -0,0 +1,447 @@
+/// Swarms are a way to conveniently control many bots.
+mod chat;
+mod plugins;
+
+pub use self::plugins::*;
+use crate::{bot, HandleFn};
+use azalea_client::{Account, ChatPacket, Client, Event, JoinError, Plugins};
+use azalea_protocol::{
+ connect::{Connection, ConnectionError},
+ resolver::{self, ResolverError},
+ ServerAddress,
+};
+use azalea_world::WeakWorldContainer;
+use futures::future::join_all;
+use log::error;
+use parking_lot::{Mutex, RwLock};
+use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
+use thiserror::Error;
+use tokio::sync::mpsc::{self, UnboundedSender};
+
+/// A helper macro that generates a [`SwarmPlugins`] struct from a list of objects
+/// that implement [`SwarmPlugin`].
+///
+/// ```rust,no_run
+/// swarm_plugins![azalea_pathfinder::Plugin];
+/// ```
+#[macro_export]
+macro_rules! swarm_plugins {
+ ($($plugin:expr),*) => {
+ {
+ let mut plugins = azalea::SwarmPlugins::new();
+ $(
+ plugins.add($plugin);
+ )*
+ plugins
+ }
+ };
+}
+
+/// A swarm is a way to conveniently control many bots at once, while also
+/// being able to control bots at an individual level when desired.
+///
+/// Swarms are created from the [`azalea::start_swarm`] function.
+///
+/// The `S` type parameter is the type of the state for individual bots.
+/// It's used to make the [`Swarm::add`] function work.
+///
+/// [`azalea::start_swarm`]: fn.start_swarm.html
+#[derive(Clone)]
+pub struct Swarm<S> {
+ bot_datas: Arc<Mutex<Vec<(Client, S)>>>,
+
+ resolved_address: SocketAddr,
+ address: ServerAddress,
+ pub worlds: Arc<RwLock<WeakWorldContainer>>,
+ /// Plugins that are set for new bots
+ plugins: Plugins,
+
+ bots_tx: UnboundedSender<(Option<Event>, (Client, S))>,
+ swarm_tx: UnboundedSender<SwarmEvent>,
+}
+
+/// An event about something that doesn't have to do with a single bot.
+#[derive(Clone, Debug)]
+pub enum SwarmEvent {
+ /// All the bots in the swarm have successfully joined the server.
+ Login,
+ /// The swarm was created. This is only fired once, and it's guaranteed to
+ /// be the first event to fire.
+ Init,
+ /// A bot got disconnected from the server.
+ ///
+ /// You can implement an auto-reconnect by calling [`Swarm::add`]
+ /// with the account from this event.
+ Disconnect(Account),
+ /// At least one bot received a chat message.
+ Chat(ChatPacket),
+}
+
+pub type SwarmHandleFn<Fut, S, SS> = fn(Swarm<S>, SwarmEvent, SS) -> Fut;
+
+/// The options that are passed to [`azalea::start_swarm`].
+///
+/// [`azalea::start_swarm`]: crate::start_swarm()
+pub struct SwarmOptions<S, SS, A, Fut, SwarmFut>
+where
+ A: TryInto<ServerAddress>,
+ Fut: Future<Output = Result<(), anyhow::Error>>,
+ SwarmFut: Future<Output = Result<(), anyhow::Error>>,
+{
+ /// The address of the server that we're connecting to. This can be a
+ /// `&str`, [`ServerAddress`], or anything that implements
+ /// `TryInto<ServerAddress>`.
+ ///
+ /// [`ServerAddress`]: azalea_protocol::ServerAddress
+ pub address: A,
+ /// The accounts that are going to join the server.
+ pub accounts: Vec<Account>,
+ /// The plugins that are going to be used for all the bots.
+ ///
+ /// You can usually leave this as `plugins![]`.
+ pub plugins: Plugins,
+ /// The plugins that are going to be used for the swarm.
+ ///
+ /// You can usually leave this as `swarm_plugins![]`.
+ pub swarm_plugins: SwarmPlugins<S>,
+ /// The individual bot states. This must be the same length as `accounts`,
+ /// since each bot gets one state.
+ pub states: Vec<S>,
+ /// The state for the overall swarm.
+ pub swarm_state: SS,
+ /// The function that's called every time a bot receives an [`Event`].
+ pub handle: HandleFn<Fut, S>,
+ /// The function that's called every time the swarm receives a [`SwarmEvent`].
+ pub swarm_handle: SwarmHandleFn<SwarmFut, S, SS>,
+
+ /// How long we should wait between each bot joining the server. Set to
+ /// None to have every bot connect at the same time. None is different than
+ /// a duration of 0, since if a duration is present the bots will wait for
+ /// the previous one to be ready.
+ pub join_delay: Option<std::time::Duration>,
+}
+
+#[derive(Error, Debug)]
+pub enum SwarmStartError {
+ #[error("Invalid address")]
+ InvalidAddress,
+ #[error(transparent)]
+ ResolveAddress(#[from] ResolverError),
+ #[error("Join error: {0}")]
+ Join(#[from] azalea_client::JoinError),
+}
+
+/// Make a bot [`Swarm`].
+///
+/// [`Swarm`]: struct.Swarm.html
+///
+/// # Examples
+/// ```rust,no_run
+/// use azalea::{prelude::*, Swarm, SwarmEvent};
+/// use azalea::{Account, Client, Event};
+/// use std::time::Duration;
+///
+/// #[derive(Default, Clone)]
+/// struct State {}
+///
+/// #[derive(Default, Clone)]
+/// struct SwarmState {}
+///
+/// #[tokio::main]
+/// async fn main() -> anyhow::Result<()> {
+/// let mut accounts = Vec::new();
+/// let mut states = Vec::new();
+///
+/// for i in 0..10 {
+/// accounts.push(Account::offline(&format!("bot{}", i)));
+/// states.push(State::default());
+/// }
+///
+/// loop {
+/// let e = azalea::start_swarm(azalea::SwarmOptions {
+/// accounts: accounts.clone(),
+/// address: "localhost",
+///
+/// states: states.clone(),
+/// swarm_state: SwarmState::default(),
+///
+/// plugins: plugins![],
+/// swarm_plugins: swarm_plugins![],
+///
+/// handle,
+/// swarm_handle,
+///
+/// join_delay: Some(Duration::from_millis(1000)),
+/// })
+/// .await;
+/// println!("{e:?}");
+/// }
+/// }
+///
+/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
+/// match &event {
+/// _ => {}
+/// }
+/// Ok(())
+/// }
+///
+/// async fn swarm_handle(
+/// mut swarm: Swarm<State>,
+/// event: SwarmEvent,
+/// _state: SwarmState,
+/// ) -> anyhow::Result<()> {
+/// match &event {
+/// SwarmEvent::Disconnect(account) => {
+/// // automatically reconnect after 5 seconds
+/// tokio::time::sleep(Duration::from_secs(5)).await;
+/// swarm.add(account, State::default()).await?;
+/// }
+/// SwarmEvent::Chat(m) => {
+/// println!("{}", m.message().to_ansi(None));
+/// }
+/// _ => {}
+/// }
+/// Ok(())
+/// }
+pub async fn start_swarm<
+ S: Send + Sync + Clone + 'static,
+ SS: Send + Sync + Clone + 'static,
+ A: Send + TryInto<ServerAddress>,
+ Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+ SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
+>(
+ options: SwarmOptions<S, SS, A, Fut, SwarmFut>,
+) -> Result<(), SwarmStartError> {
+ assert_eq!(
+ options.accounts.len(),
+ options.states.len(),
+ "There must be exactly one state per bot."
+ );
+
+ // convert the TryInto<ServerAddress> into a ServerAddress
+ let address: ServerAddress = match options.address.try_into() {
+ Ok(address) => address,
+ Err(_) => return Err(SwarmStartError::InvalidAddress),
+ };
+
+ // resolve the address
+ let resolved_address = resolver::resolve_address(&address).await?;
+
+ let world_container = Arc::new(RwLock::new(WeakWorldContainer::default()));
+
+ let mut plugins = options.plugins;
+ let swarm_plugins = options.swarm_plugins;
+
+ // DEFAULT CLIENT PLUGINS
+ plugins.add(bot::Plugin);
+ plugins.add(crate::pathfinder::Plugin);
+ // DEFAULT SWARM PLUGINS
+
+ // we can't modify the swarm plugins after this
+ let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
+ let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
+
+ let mut swarm = Swarm {
+ bot_datas: Arc::new(Mutex::new(Vec::new())),
+
+ resolved_address,
+ address,
+ worlds: world_container,
+ plugins,
+
+ bots_tx,
+
+ swarm_tx: swarm_tx.clone(),
+ };
+
+ {
+ // the chat plugin is hacky and needs the swarm to be passed like this
+ let (chat_swarm_state, chat_tx) = chat::SwarmState::new(swarm.clone());
+ swarm.plugins.add(chat::Plugin {
+ swarm_state: chat_swarm_state,
+ tx: chat_tx,
+ });
+ }
+
+ let swarm_plugins = swarm_plugins.build();
+
+ let mut swarm_clone = swarm.clone();
+ let join_task = tokio::spawn(async move {
+ if let Some(join_delay) = options.join_delay {
+ // if there's a join delay, then join one by one
+ for (account, state) in options.accounts.iter().zip(options.states) {
+ swarm_clone
+ .add_with_exponential_backoff(account, state.clone())
+ .await;
+ tokio::time::sleep(join_delay).await;
+ }
+ } else {
+ let swarm_borrow = &swarm_clone;
+ join_all(options.accounts.iter().zip(options.states).map(
+ async move |(account, state)| -> Result<(), JoinError> {
+ swarm_borrow
+ .clone()
+ .add_with_exponential_backoff(account, state.clone())
+ .await;
+ Ok(())
+ },
+ ))
+ .await;
+ }
+ });
+
+ let swarm_state = options.swarm_state;
+ let mut internal_state = InternalSwarmState::default();
+
+ // Watch swarm_rx and send those events to the plugins and swarm_handle.
+ let swarm_clone = swarm.clone();
+ let swarm_plugins_clone = swarm_plugins.clone();
+ tokio::spawn(async move {
+ while let Some(event) = swarm_rx.recv().await {
+ for plugin in swarm_plugins_clone.clone().into_iter() {
+ tokio::spawn(plugin.handle(event.clone(), swarm_clone.clone()));
+ }
+ tokio::spawn((options.swarm_handle)(
+ swarm_clone.clone(),
+ event,
+ swarm_state.clone(),
+ ));
+ }
+ });
+
+ // bot events
+ while let Some((Some(event), (bot, state))) = bots_rx.recv().await {
+ // bot event handling
+ let cloned_plugins = (*bot.plugins).clone();
+ for plugin in cloned_plugins.into_iter() {
+ tokio::spawn(plugin.handle(event.clone(), bot.clone()));
+ }
+
+ // swarm event handling
+ // remove this #[allow] when more checks are added
+ #[allow(clippy::single_match)]
+ match &event {
+ Event::Login => {
+ internal_state.bots_joined += 1;
+ if internal_state.bots_joined == swarm.bot_datas.lock().len() {
+ swarm_tx.send(SwarmEvent::Login).unwrap();
+ }
+ }
+ _ => {}
+ }
+
+ tokio::spawn((options.handle)(bot, event, state));
+ }
+
+ join_task.abort();
+
+ Ok(())
+}
+
+impl<S> Swarm<S>
+where
+ S: Send + Sync + Clone + 'static,
+{
+ /// Add a new account to the swarm. You can remove it later by calling [`Client::disconnect`].
+ pub async fn add(&mut self, account: &Account, state: S) -> Result<Client, JoinError> {
+ let conn = Connection::new(&self.resolved_address).await?;
+ let (conn, game_profile) = Client::handshake(conn, account, &self.address.clone()).await?;
+
+ // tx is moved to the bot so it can send us events
+ // rx is used to receive events from the bot
+ let (tx, mut rx) = mpsc::channel(1);
+ let mut bot = Client::new(game_profile, conn, Some(self.worlds.clone()));
+ tx.send(Event::Init).await.expect("Failed to send event");
+ bot.start_tasks(tx);
+
+ bot.plugins = Arc::new(self.plugins.clone().build());
+
+ let cloned_bots_tx = self.bots_tx.clone();
+ let cloned_bot = bot.clone();
+ let cloned_state = state.clone();
+ let owned_account = account.clone();
+ let bot_datas = self.bot_datas.clone();
+ let swarm_tx = self.swarm_tx.clone();
+ // send the init event immediately so it's the first thing we get
+ swarm_tx.send(SwarmEvent::Init).unwrap();
+ tokio::spawn(async move {
+ while let Some(event) = rx.recv().await {
+ // we can't handle events here (since we can't copy the handler),
+ // they're handled above in start_swarm
+ if let Err(e) =
+ cloned_bots_tx.send((Some(event), (cloned_bot.clone(), cloned_state.clone())))
+ {
+ error!("Error sending event to swarm: {e}");
+ }
+ }
+ // the bot disconnected, so we remove it from the swarm
+ let mut bot_datas = bot_datas.lock();
+ let index = bot_datas
+ .iter()
+ .position(|(b, _)| b.profile.uuid == cloned_bot.profile.uuid)
+ .expect("bot disconnected but not found in swarm");
+ bot_datas.remove(index);
+
+ swarm_tx
+ .send(SwarmEvent::Disconnect(owned_account))
+ .unwrap();
+ });
+
+ self.bot_datas.lock().push((bot.clone(), state.clone()));
+
+ Ok(bot)
+ }
+
+ /// Add a new account to the swarm, retrying if it couldn't join. This will
+ /// run forever until the bot joins or the task is aborted.
+ ///
+ /// Exponential backoff means if it fails joining it will initially wait 10
+ /// seconds, then 20, then 40, up to 2 minutes.
+ pub async fn add_with_exponential_backoff(&mut self, account: &Account, state: S) -> Client {
+ let mut disconnects = 0;
+ loop {
+ match self.add(account, state.clone()).await {
+ Ok(bot) => return bot,
+ Err(e) => {
+ disconnects += 1;
+ let delay = (Duration::from_secs(5) * 2u32.pow(disconnects))
+ .min(Duration::from_secs(120));
+ let username = account.username.clone();
+ error!("Error joining {username}: {e}. Waiting {delay:?} and trying again.");
+ tokio::time::sleep(delay).await;
+ }
+ }
+ }
+ }
+}
+
+impl<S> IntoIterator for Swarm<S>
+where
+ S: Send + Sync + Clone + 'static,
+{
+ type Item = (Client, S);
+ type IntoIter = std::vec::IntoIter<Self::Item>;
+
+ /// Iterate over the bots and their states in this swarm.
+ ///
+ /// ```rust,no_run
+ /// for (bot, state) in swarm {
+ /// // ...
+ /// }
+ /// ```
+ fn into_iter(self) -> Self::IntoIter {
+ self.bot_datas.lock().clone().into_iter()
+ }
+}
+
+#[derive(Default)]
+struct InternalSwarmState {
+ /// The number of bots connected to the server
+ pub bots_joined: usize,
+}
+
+impl From<ConnectionError> for SwarmStartError {
+ fn from(e: ConnectionError) -> Self {
+ SwarmStartError::from(JoinError::from(e))
+ }
+}
diff --git a/azalea/src/swarm/plugins.rs b/azalea/src/swarm/plugins.rs
new file mode 100644
index 00000000..0c7cf2ae
--- /dev/null
+++ b/azalea/src/swarm/plugins.rs
@@ -0,0 +1,134 @@
+use crate::{Swarm, SwarmEvent};
+use async_trait::async_trait;
+use nohash_hasher::NoHashHasher;
+use std::{
+ any::{Any, TypeId},
+ collections::HashMap,
+ hash::BuildHasherDefault,
+};
+
+type U64Hasher = BuildHasherDefault<NoHashHasher<u64>>;
+
+// kind of based on https://docs.rs/http/latest/src/http/extensions.rs.html
+/// A map of plugin ids to [`SwarmPlugin`] trait objects. The client stores
+/// this so we can keep the state for our [`Swarm`] plugins.
+///
+/// If you're using azalea, you should generate this from the `swarm_plugins!` macro.
+#[derive(Clone, Default)]
+pub struct SwarmPlugins<S> {
+ map: Option<HashMap<TypeId, Box<dyn SwarmPlugin<S>>, U64Hasher>>,
+}
+
+#[derive(Clone)]
+pub struct SwarmPluginStates<S> {
+ map: Option<HashMap<TypeId, Box<dyn SwarmPluginState<S>>, U64Hasher>>,
+}
+
+impl<S> SwarmPluginStates<S> {
+ pub fn get<T: SwarmPluginState<S>>(&self) -> Option<&T> {
+ self.map
+ .as_ref()
+ .and_then(|map| map.get(&TypeId::of::<T>()))
+ .and_then(|boxed| (boxed.as_ref() as &dyn Any).downcast_ref::<T>())
+ }
+}
+
+impl<S> SwarmPlugins<S>
+where
+ S: 'static,
+{
+ /// Create a new empty set of plugins.
+ pub fn new() -> Self {
+ Self { map: None }
+ }
+
+ /// Add a new plugin to this set.
+ pub fn add<T: SwarmPlugin<S>>(&mut self, plugin: T) {
+ if self.map.is_none() {
+ self.map = Some(HashMap::with_hasher(BuildHasherDefault::default()));
+ }
+ self.map
+ .as_mut()
+ .unwrap()
+ .insert(TypeId::of::<T>(), Box::new(plugin));
+ }
+
+ /// Build our plugin states from this set of plugins. Note that if you're
+ /// using `azalea` you'll probably never need to use this as it's called
+ /// for you.
+ pub fn build(self) -> SwarmPluginStates<S> {
+ if self.map.is_none() {
+ return SwarmPluginStates { map: None };
+ }
+ let mut map = HashMap::with_hasher(BuildHasherDefault::default());
+ for (id, plugin) in self.map.unwrap().into_iter() {
+ map.insert(id, plugin.build());
+ }
+ SwarmPluginStates { map: Some(map) }
+ }
+}
+
+impl<S> IntoIterator for SwarmPluginStates<S> {
+ type Item = Box<dyn SwarmPluginState<S>>;
+ type IntoIter = std::vec::IntoIter<Self::Item>;
+
+ /// Iterate over the plugin states.
+ fn into_iter(self) -> Self::IntoIter {
+ self.map
+ .map(|map| map.into_values().collect::<Vec<_>>())
+ .unwrap_or_default()
+ .into_iter()
+ }
+}
+
+/// A `SwarmPluginState` keeps the current state of a plugin for a client. All
+/// the fields must be atomic. Unique `SwarmPluginState`s are built from
+/// [`SwarmPlugin`]s.
+#[async_trait]
+pub trait SwarmPluginState<S>: Send + Sync + SwarmPluginStateClone<S> + Any + 'static {
+ async fn handle(self: Box<Self>, event: SwarmEvent, swarm: Swarm<S>);
+}
+
+/// Swarm plugins can keep their own personal state ([`SwarmPluginState`]),
+/// listen to [`SwarmEvent`]s, and add new functions to [`Swarm`].
+pub trait SwarmPlugin<S>: Send + Sync + SwarmPluginClone<S> + Any + 'static {
+ fn build(&self) -> Box<dyn SwarmPluginState<S>>;
+}
+
+/// An internal trait that allows SwarmPluginState to be cloned.
+#[doc(hidden)]
+pub trait SwarmPluginStateClone<S> {
+ fn clone_box(&self) -> Box<dyn SwarmPluginState<S>>;
+}
+impl<T, S> SwarmPluginStateClone<S> for T
+where
+ T: 'static + SwarmPluginState<S> + Clone,
+{
+ fn clone_box(&self) -> Box<dyn SwarmPluginState<S>> {
+ Box::new(self.clone())
+ }
+}
+impl<S> Clone for Box<dyn SwarmPluginState<S>> {
+ fn clone(&self) -> Self {
+ self.clone_box()
+ }
+}
+
+/// An internal trait that allows SwarmPlugin to be cloned.
+#[doc(hidden)]
+pub trait SwarmPluginClone<S> {
+ fn clone_box(&self) -> Box<dyn SwarmPlugin<S>>;
+}
+impl<T, S> SwarmPluginClone<S> for T
+where
+ T: 'static + SwarmPlugin<S> + Clone,
+{
+ fn clone_box(&self) -> Box<dyn SwarmPlugin<S>> {
+ Box::new(self.clone())
+ }
+}
+impl<S> Clone for Box<dyn SwarmPlugin<S>> {
+ fn clone(&self) -> Self {
+ self.clone_box()
+ }
+}