diff options
| author | mat <27899617+mat-1@users.noreply.github.com> | 2022-11-27 16:25:07 -0600 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-11-27 16:25:07 -0600 |
| commit | 631ed63dbdc7167df4de02a55b5c2ef1cea909e9 (patch) | |
| tree | 104e567c332f2aeb30ea6acefef8c73f9b2f158b /azalea | |
| parent | 962b9fcaae917c7e5bef718469fba31f6ff7c3cb (diff) | |
| download | azalea-drasl-631ed63dbdc7167df4de02a55b5c2ef1cea909e9.tar.xz | |
Swarm (#36)
* make azalea-pathfinder dir
* start writing d* lite impl
* more work on d* lite
* work more on implementing d* lite
* full d* lite impl
* updated edges
* add next() function
* add NoPathError
* why does dstar lite not work
* fix d* lite implementation
* make the test actually check the coords
* replace while loop with if statement
* fix clippy complaints
* make W only have to be PartialOrd
* fix PartialOrd issues
* implement mtd* lite
* add a test to mtd* lite
* remove normal d* lite
* make heuristic only take in one arg
* add `success` function
* Update README.md
* evil black magic to make .entity not need dimension
* start adding moves
* slightly improve the vec3/position situation
new macro that implements all the useful functions
* moves stuff
* make it compile
* update deps in az-pathfinder
* make it compile again
* more pathfinding stuff
* add Bot::look_at
* replace EntityMut and EntityRef with just Entity
* block pos pathfinding stuff
* rename movedirection to walkdirection
* execute path every tick
* advance path
* change az-pf version
* make azalea_client keep plugin state
* fix Plugins::get
* why does it think there is air
* start debugging incorrect air
* update some From methods to use rem_euclid
* start adding swarm
* fix deadlock
i still don't understand why it was happening but the solution was to keep the Client::player lock for shorter so it didn't overlap with the Client::dimension lock
* make lookat actually work probably
* fix going too fast
* Update main.rs
* make a thing immutable
* direction_looking_at
* fix rotations
* import swarm in an example
* fix stuff from merge
* remove azalea_pathfinder import
* delete azalea-pathfinder crate
already in azalea::pathfinder module
* swarms
* start working on shared dimensions
* Shared worlds work
* start adding Swarm::add_account
* add_account works
* change "client" to "bot" in some places
* Fix issues from merge
* Update world.rs
* add SwarmEvent::Disconnect(Account)
* almost add SwarmEvent::Chat and new plugin system
it panics rn
* make plugins have to provide the State associated type
* improve comments
* make fn build slightly cleaner
* fix SwarmEvent::Chat
* change a println in bot/main.rs
* Client::shutdown -> disconnect
* polish
fix clippy warnings + improve some docs a bit
* fix shared worlds*
*there's a bug that entities and bots will have their positions exaggerated because the relative movement packet is applied for every entity once per bot
* i am being trolled by rust
for some reason some stuff is really slow for literally no reason and it makes no sense i am going insane
* make world an RwLock again
* remove debug messages
* fix skipping event ticks
unfortunately now sending events is `.send().await?` instead of just `.send()`
* fix deadlock + warnings
* turns out my floor_mod impl was wrong
and i32::rem_euclid has the correct behavior LOL
* still errors with lots of bots
* make swarm iter & fix new chunks not loading
* improve docs
* start fixing tests
* fix all the tests
except the examples i don't know how to exclude them from the tests
* improve docs some more
Diffstat (limited to 'azalea')
| -rw-r--r--[-rwxr-xr-x] | azalea/Cargo.toml | 12 | ||||
| -rwxr-xr-x | azalea/README.md | 1 | ||||
| -rw-r--r--[-rwxr-xr-x] | azalea/examples/mine_a_chunk.rs | 29 | ||||
| -rwxr-xr-x | azalea/examples/potatobot/autoeat.rs | 2 | ||||
| -rwxr-xr-x | azalea/examples/potatobot/main.rs | 2 | ||||
| -rwxr-xr-x | azalea/examples/pvp.rs | 4 | ||||
| -rw-r--r--[-rwxr-xr-x] | azalea/src/bot.rs | 37 | ||||
| -rw-r--r--[-rwxr-xr-x] | azalea/src/lib.rs | 149 | ||||
| -rw-r--r-- | azalea/src/pathfinder/mod.rs | 34 | ||||
| -rw-r--r--[-rwxr-xr-x] | azalea/src/prelude.rs | 2 | ||||
| -rw-r--r-- | azalea/src/start.rs | 136 | ||||
| -rw-r--r-- | azalea/src/swarm/chat.rs | 147 | ||||
| -rw-r--r-- | azalea/src/swarm/mod.rs | 447 | ||||
| -rw-r--r-- | azalea/src/swarm/plugins.rs | 134 |
14 files changed, 945 insertions, 191 deletions
diff --git a/azalea/Cargo.toml b/azalea/Cargo.toml index 92a689e0..498306dc 100755..100644 --- a/azalea/Cargo.toml +++ b/azalea/Cargo.toml @@ -10,19 +10,23 @@ version = "0.4.0" [dependencies] anyhow = "^1.0.65" -async-trait = "^0.1.57" -azalea-block = { version = "0.4.0", path = "../azalea-block" } +async-trait = "0.1.58" +azalea-block = {version = "0.4.0", path = "../azalea-block"} +azalea-chat = { version = "0.4.0", path = "../azalea-chat" } azalea-client = {version = "0.4.0", path = "../azalea-client"} azalea-core = {version = "0.4.0", path = "../azalea-core"} -azalea-physics = { version = "0.4.0", path = "../azalea-physics" } +azalea-physics = {version = "0.4.0", path = "../azalea-physics"} azalea-protocol = {version = "0.4.0", path = "../azalea-protocol"} -azalea-world = { version = "0.4.0", path = "../azalea-world" } +azalea-world = {version = "0.4.0", path = "../azalea-world"} +futures = "0.3.25" log = "0.4.17" +nohash-hasher = "0.2.0" num-traits = "0.2.15" parking_lot = {version = "^0.12.1", features = ["deadlock_detection"]} priority-queue = "1.3.0" thiserror = "^1.0.37" tokio = "^1.21.2" +uuid = "1.2.2" [dev-dependencies] anyhow = "^1.0.65" diff --git a/azalea/README.md b/azalea/README.md index d9aa1574..afd2feb4 100755 --- a/azalea/README.md +++ b/azalea/README.md @@ -1,3 +1,4 @@ Azalea is a framework for creating Minecraft bots. Internally, it's just a wrapper over azalea-client, adding useful functions for making bots. + diff --git a/azalea/examples/mine_a_chunk.rs b/azalea/examples/mine_a_chunk.rs index 2e30b2c5..f9b208a2 100755..100644 --- a/azalea/examples/mine_a_chunk.rs +++ b/azalea/examples/mine_a_chunk.rs @@ -1,13 +1,16 @@ -use azalea::{Account, Accounts, Client, Event, Swarm}; +use azalea::{prelude::*, SwarmEvent}; +use azalea::{Account, Client, Event, Swarm}; use parking_lot::Mutex; use std::sync::Arc; #[tokio::main] async fn main() { - let accounts = Accounts::new(); + let mut accounts = Vec::new(); + let mut states = Vec::new(); for i in 0..10 { - accounts.add(Account::offline(&format!("bot{}", i))); + accounts.push(Account::offline(&format!("bot{}", i))); + states.push(Arc::new(Mutex::new(State::default()))); } azalea::start_swarm(azalea::SwarmOptions { @@ -15,13 +18,15 @@ async fn main() { address: "localhost", swarm_state: State::default(), - state: State::default(), + states, - swarm_plugins: plugins![azalea_pathfinder::Plugin::default()], + swarm_plugins: plugins![], plugins: plugins![], - handle: Box::new(handle), - swarm_handle: Box::new(swarm_handle), + handle, + swarm_handle, + + join_delay: None, }) .await .unwrap(); @@ -37,9 +42,13 @@ async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { Ok(()) } -async fn swarm_handle(swarm: Swarm, event: Event, state: SwarmState) -> anyhow::Result<()> { - match event { - Event::Login => { +async fn swarm_handle( + swarm: Swarm<State>, + event: SwarmEvent, + state: SwarmState, +) -> anyhow::Result<()> { + match &event { + SwarmEvent::Login => { swarm.goto(azalea::BlockPos::new(0, 70, 0)).await; // or bots.goto_goal(pathfinder::Goals::Goto(azalea::BlockPos(0, 70, 0))).await; diff --git a/azalea/examples/potatobot/autoeat.rs b/azalea/examples/potatobot/autoeat.rs index 0f0ccc6d..89934fa2 100755 --- a/azalea/examples/potatobot/autoeat.rs +++ b/azalea/examples/potatobot/autoeat.rs @@ -14,7 +14,7 @@ pub struct Plugin { pub struct State {} #[async_trait] -impl azalea::Plugin for Plugin { +impl azalea::PluginState for Plugin { async fn handle(self: Box<Self>, event: Event, bot: Client) { match event { Event::UpdateHunger => { diff --git a/azalea/examples/potatobot/main.rs b/azalea/examples/potatobot/main.rs index e585c41d..8d40c48e 100755 --- a/azalea/examples/potatobot/main.rs +++ b/azalea/examples/potatobot/main.rs @@ -15,7 +15,7 @@ async fn main() { account, address: "localhost", state: State::default(), - plugins: plugins![autoeat::Plugin::default(), pathfinder::Plugin::default(),], + plugins: plugins![autoeat::Plugin, pathfinder::Plugin], handle, }) .await diff --git a/azalea/examples/pvp.rs b/azalea/examples/pvp.rs index 87d83c6d..157ad9e2 100755 --- a/azalea/examples/pvp.rs +++ b/azalea/examples/pvp.rs @@ -15,7 +15,7 @@ async fn main() { swarm_state: State::default(), state: State::default(), - swarm_plugins: plugins![pathfinder::Plugin::default()], + swarm_plugins: plugins![pathfinder::Plugin], plugins: plugins![], handle: Box::new(handle), @@ -32,7 +32,7 @@ struct State {} struct SwarmState {} async fn handle(bot: Client, event: Event, state: State) {} -async fn swarm_handle(swarm: Swarm, event: Event, state: State) { +async fn swarm_handle<S>(swarm: Swarm<S>, event: Event, state: State) { match event { Event::Tick => { // choose an arbitrary player within render distance to target diff --git a/azalea/src/bot.rs b/azalea/src/bot.rs index 0becaa62..0674c692 100755..100644 --- a/azalea/src/bot.rs +++ b/azalea/src/bot.rs @@ -4,9 +4,14 @@ use azalea_core::Vec3; use parking_lot::Mutex; use std::{f64::consts::PI, sync::Arc}; -#[derive(Default, Clone)] -pub struct Plugin { - pub state: State, +#[derive(Clone, Default)] +pub struct Plugin; +impl crate::Plugin for Plugin { + type State = State; + + fn build(&self) -> State { + State::default() + } } #[derive(Default, Clone)] @@ -14,6 +19,18 @@ pub struct State { jumping_once: Arc<Mutex<bool>>, } +#[async_trait] +impl crate::PluginState for State { + async fn handle(self: Box<Self>, event: Event, mut bot: Client) { + if let Event::Tick = event { + if *self.jumping_once.lock() && bot.jumping() { + *self.jumping_once.lock() = false; + bot.set_jumping(false); + } + } + } +} + pub trait BotTrait { fn jump(&mut self); fn look_at(&mut self, pos: &Vec3); @@ -23,7 +40,7 @@ impl BotTrait for azalea_client::Client { /// Queue a jump for the next tick. fn jump(&mut self) { self.set_jumping(true); - let state = self.plugins.get::<Plugin>().unwrap().state.clone(); + let state = self.plugins.get::<State>().unwrap().clone(); *state.jumping_once.lock() = true; } @@ -34,18 +51,6 @@ impl BotTrait for azalea_client::Client { } } -#[async_trait] -impl crate::Plugin for Plugin { - async fn handle(self: Box<Self>, event: Event, mut bot: Client) { - if let Event::Tick = event { - if *self.state.jumping_once.lock() && bot.jumping() { - *self.state.jumping_once.lock() = false; - bot.set_jumping(false); - } - } - } -} - fn direction_looking_at(current: &Vec3, target: &Vec3) -> (f32, f32) { // borrowed from mineflayer's Bot.lookAt because i didn't want to do math let delta = target - current; diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs index 89754409..7c9c660b 100755..100644 --- a/azalea/src/lib.rs +++ b/azalea/src/lib.rs @@ -75,152 +75,19 @@ //! //! [`azalea_client`]: https://crates.io/crates/azalea-client +#![feature(trait_upcasting)] +#![feature(async_closure)] +#![allow(incomplete_features)] + mod bot; pub mod pathfinder; pub mod prelude; +mod start; +mod swarm; pub use azalea_client::*; pub use azalea_core::{BlockPos, Vec3}; -use azalea_protocol::ServerAddress; -use std::{future::Future, sync::Arc}; -use thiserror::Error; +pub use start::{start, Options}; +pub use swarm::*; pub type HandleFn<Fut, S> = fn(Client, Event, S) -> Fut; - -/// The options that are passed to [`azalea::start`]. -/// -/// [`azalea::start`]: fn.start.html -pub struct Options<S, A, Fut> -where - A: TryInto<ServerAddress>, - Fut: Future<Output = Result<(), anyhow::Error>>, -{ - /// The address of the server that we're connecting to. This can be a - /// `&str`, [`ServerAddress`], or anything that implements - /// `TryInto<ServerAddress>`. - /// - /// [`ServerAddress`]: azalea_protocol::ServerAddress - pub address: A, - /// The account that's going to join the server. - pub account: Account, - /// The plugins that are going to be used. Plugins are external crates that - /// add extra functionality to Azalea. You should use the [`plugins`] macro - /// for this field. - /// - /// ```rust,no_run - /// plugins![azalea_pathfinder::Plugin::default()] - /// ``` - pub plugins: Plugins, - /// A struct that contains the data that you want your bot to remember - /// across events. - /// - /// # Examples - /// - /// ```rust - /// use parking_lot::Mutex; - /// use std::sync::Arc; - /// - /// #[derive(Default, Clone)] - /// struct State { - /// farming: Arc<Mutex<bool>>, - /// } - /// ``` - pub state: S, - /// The function that's called whenever we get an event. - /// - /// # Examples - /// - /// ```rust - /// use azalea::prelude::*; - /// - /// async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { - /// Ok(()) - /// } - /// ``` - pub handle: HandleFn<Fut, S>, -} - -#[derive(Error, Debug)] -pub enum Error { - #[error("Invalid address")] - InvalidAddress, - #[error("Join error: {0}")] - Join(#[from] azalea_client::JoinError), -} - -/// Join a server and start handling events. This function will run forever until -/// it gets disconnected from the server. -/// -/// # Examples -/// -/// ```rust,no_run -/// let error = azalea::start(azalea::Options { -/// account, -/// address: "localhost", -/// state: State::default(), -/// plugins: plugins![azalea_pathfinder::Plugin::default()], -/// handle, -/// }).await; -/// ``` -pub async fn start< - S: Send + Sync + Clone + 'static, - A: Send + TryInto<ServerAddress>, - Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, ->( - options: Options<S, A, Fut>, -) -> Result<(), Error> { - let address = match options.address.try_into() { - Ok(address) => address, - Err(_) => return Err(Error::InvalidAddress), - }; - - let (mut bot, mut rx) = Client::join(&options.account, address).await?; - - let mut plugins = options.plugins; - plugins.add(bot::Plugin::default()); - plugins.add(pathfinder::Plugin::default()); - bot.plugins = Arc::new(plugins); - - let state = options.state; - - while let Some(event) = rx.recv().await { - let cloned_plugins = (*bot.plugins).clone(); - for plugin in cloned_plugins.into_iter() { - tokio::spawn(plugin.handle(event.clone(), bot.clone())); - } - - tokio::spawn(bot::Plugin::handle( - Box::new(bot.plugins.get::<bot::Plugin>().unwrap().clone()), - event.clone(), - bot.clone(), - )); - tokio::spawn(pathfinder::Plugin::handle( - Box::new(bot.plugins.get::<pathfinder::Plugin>().unwrap().clone()), - event.clone(), - bot.clone(), - )); - - tokio::spawn((options.handle)(bot.clone(), event.clone(), state.clone())); - } - - Ok(()) -} - -/// A helper macro that generates a [`Plugins`] struct from a list of objects -/// that implement [`Plugin`]. -/// -/// ```rust,no_run -/// plugins![azalea_pathfinder::Plugin::default()]; -/// ``` -#[macro_export] -macro_rules! plugins { - ($($plugin:expr),*) => { - { - let mut plugins = azalea::Plugins::new(); - $( - plugins.add($plugin); - )* - plugins - } - }; -} diff --git a/azalea/src/pathfinder/mod.rs b/azalea/src/pathfinder/mod.rs index f119c645..8a9d7540 100644 --- a/azalea/src/pathfinder/mod.rs +++ b/azalea/src/pathfinder/mod.rs @@ -13,9 +13,14 @@ use parking_lot::Mutex; use std::collections::VecDeque; use std::sync::Arc; -#[derive(Default, Clone)] -pub struct Plugin { - pub state: State, +#[derive(Clone, Default)] +pub struct Plugin; +impl crate::Plugin for Plugin { + type State = State; + + fn build(&self) -> State { + State::default() + } } #[derive(Default, Clone)] @@ -25,10 +30,10 @@ pub struct State { } #[async_trait] -impl crate::Plugin for Plugin { +impl crate::PluginState for State { async fn handle(self: Box<Self>, event: Event, mut bot: Client) { if let Event::Tick = event { - let mut path = self.state.path.lock(); + let mut path = self.path.lock(); if !path.is_empty() { tick_execute_path(&mut bot, &mut path); @@ -102,9 +107,8 @@ impl Trait for azalea_client::Client { let state = self .plugins - .get::<Plugin>() + .get::<State>() .expect("Pathfinder plugin not installed!") - .state .clone(); // convert the Option<Vec<Node>> to a VecDeque<Node> *state.path.lock() = p.expect("no path").into_iter().collect(); @@ -127,7 +131,7 @@ fn tick_execute_path(bot: &mut Client, path: &mut VecDeque<Node>) { } if target.is_reached(&bot.entity()) { - println!("ok target {target:?} reached"); + // println!("ok target {target:?} reached"); path.pop_front(); if path.is_empty() { bot.walk(WalkDirection::None); @@ -165,13 +169,13 @@ impl Node { /// Returns whether the entity is at the node and should start going to the /// next node. pub fn is_reached(&self, entity: &EntityData) -> bool { - println!( - "entity.delta.y: {} {:?}=={:?}, self.vertical_vel={:?}", - entity.delta.y, - BlockPos::from(entity.pos()), - self.pos, - self.vertical_vel - ); + // println!( + // "entity.delta.y: {} {:?}=={:?}, self.vertical_vel={:?}", + // entity.delta.y, + // BlockPos::from(entity.pos()), + // self.pos, + // self.vertical_vel + // ); BlockPos::from(entity.pos()) == self.pos && match self.vertical_vel { VerticalVel::NoneMidair => entity.delta.y > -0.1 && entity.delta.y < 0.1, diff --git a/azalea/src/prelude.rs b/azalea/src/prelude.rs index 9fa1ac1a..30205e59 100755..100644 --- a/azalea/src/prelude.rs +++ b/azalea/src/prelude.rs @@ -2,5 +2,5 @@ pub use crate::bot::BotTrait; pub use crate::pathfinder::Trait; -pub use crate::plugins; +pub use crate::{plugins, swarm_plugins, Plugin}; pub use azalea_client::{Account, Client, Event}; diff --git a/azalea/src/start.rs b/azalea/src/start.rs new file mode 100644 index 00000000..c7d79261 --- /dev/null +++ b/azalea/src/start.rs @@ -0,0 +1,136 @@ +use crate::{bot, pathfinder, HandleFn}; +use azalea_client::{Account, Client, Plugins}; +use azalea_protocol::ServerAddress; +use std::{future::Future, sync::Arc}; +use thiserror::Error; + +/// A helper macro that generates a [`Plugins`] struct from a list of objects +/// that implement [`Plugin`]. +/// +/// ```rust,no_run +/// plugins![azalea_pathfinder::Plugin]; +/// ``` +/// +/// [`Plugin`]: crate::Plugin +#[macro_export] +macro_rules! plugins { + ($($plugin:expr),*) => { + { + let mut plugins = azalea::Plugins::new(); + $( + plugins.add($plugin); + )* + plugins + } + }; +} + +/// The options that are passed to [`azalea::start`]. +/// +/// [`azalea::start`]: crate::start() +pub struct Options<S, A, Fut> +where + A: TryInto<ServerAddress>, + Fut: Future<Output = Result<(), anyhow::Error>>, +{ + /// The address of the server that we're connecting to. This can be a + /// `&str`, [`ServerAddress`], or anything that implements + /// `TryInto<ServerAddress>`. + /// + /// [`ServerAddress`]: azalea_protocol::ServerAddress + pub address: A, + /// The account that's going to join the server. + pub account: Account, + /// The plugins that are going to be used. Plugins are external crates that + /// add extra functionality to Azalea. You should use the [`plugins`] macro + /// for this field. + /// + /// ```rust,no_run + /// plugins![azalea_pathfinder::Plugin] + /// ``` + pub plugins: Plugins, + /// A struct that contains the data that you want your bot to remember + /// across events. + /// + /// # Examples + /// + /// ```rust + /// use parking_lot::Mutex; + /// use std::sync::Arc; + /// + /// #[derive(Default, Clone)] + /// struct State { + /// farming: Arc<Mutex<bool>>, + /// } + /// ``` + pub state: S, + /// The function that's called whenever we get an event. + /// + /// # Examples + /// + /// ```rust + /// use azalea::prelude::*; + /// + /// async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { + /// Ok(()) + /// } + /// ``` + pub handle: HandleFn<Fut, S>, +} + +#[derive(Error, Debug)] +pub enum StartError { + #[error("Invalid address")] + InvalidAddress, + #[error("Join error: {0}")] + Join(#[from] azalea_client::JoinError), +} + +/// Join a server and start handling events. This function will run forever until +/// it gets disconnected from the server. +/// +/// # Examples +/// +/// ```rust,no_run +/// let error = azalea::start(azalea::Options { +/// account, +/// address: "localhost", +/// state: State::default(), +/// plugins: plugins![azalea_pathfinder::Plugin], +/// handle, +/// }).await; +/// ``` +pub async fn start< + S: Send + Sync + Clone + 'static, + A: Send + TryInto<ServerAddress>, + Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, +>( + options: Options<S, A, Fut>, +) -> Result<(), StartError> { + let address = match options.address.try_into() { + Ok(address) => address, + Err(_) => return Err(StartError::InvalidAddress), + }; + + let (mut bot, mut rx) = Client::join(&options.account, address).await?; + + let mut plugins = options.plugins; + // DEFAULT PLUGINS + plugins.add(bot::Plugin); + plugins.add(pathfinder::Plugin); + + bot.plugins = Arc::new(plugins.build()); + + let state = options.state; + + while let Some(event) = rx.recv().await { + let cloned_plugins = (*bot.plugins).clone(); + for plugin in cloned_plugins.into_iter() { + tokio::spawn(plugin.handle(event.clone(), bot.clone())); + } + + tokio::spawn((options.handle)(bot.clone(), event.clone(), state.clone())); + } + + Ok(()) +} diff --git a/azalea/src/swarm/chat.rs b/azalea/src/swarm/chat.rs new file mode 100644 index 00000000..a39632f5 --- /dev/null +++ b/azalea/src/swarm/chat.rs @@ -0,0 +1,147 @@ +//! Implements SwarmEvent::Chat + +// How the chat event works (to avoid firing the event multiple times): +// --- +// There's a shared queue of all the chat messages +// Each bot contains an index of the farthest message we've seen +// When a bot receives a chat messages, it looks into the queue to find the +// earliest instance of the message content that's after the bot's chat index. +// If it finds it, then its personal index is simply updated. Otherwise, fire +// the event and add to the queue. +// +// To make sure the queue doesn't grow too large, we keep a `chat_min_index` +// in Swarm that's set to the smallest index of all the bots, and we remove all +// messages from the queue that are before that index. + +use crate::{Swarm, SwarmEvent}; +use async_trait::async_trait; +use azalea_client::{ChatPacket, Client, Event}; +use parking_lot::Mutex; +use std::{collections::VecDeque, sync::Arc}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + +#[derive(Clone)] +pub struct Plugin { + pub swarm_state: SwarmState, + pub tx: UnboundedSender<ChatPacket>, +} + +impl crate::Plugin for Plugin { + type State = State; + + fn build(&self) -> State { + State { + farthest_chat_index: Arc::new(Mutex::new(0)), + swarm_state: self.swarm_state.clone(), + tx: self.tx.clone(), + } + } +} + +#[derive(Clone)] +pub struct State { + pub farthest_chat_index: Arc<Mutex<usize>>, + pub tx: UnboundedSender<ChatPacket>, + pub swarm_state: SwarmState, +} + +#[derive(Clone)] +pub struct SwarmState { + pub chat_queue: Arc<Mutex<VecDeque<ChatPacket>>>, + pub chat_min_index: Arc<Mutex<usize>>, + pub rx: Arc<tokio::sync::Mutex<UnboundedReceiver<ChatPacket>>>, +} + +#[async_trait] +impl crate::PluginState for State { + async fn handle(self: Box<Self>, event: Event, _bot: Client) { + // we're allowed to access Plugin::swarm_state since it's shared for every bot + if let Event::Chat(m) = event { + // When a bot receives a chat messages, it looks into the queue to find the + // earliest instance of the message content that's after the bot's chat index. + // If it finds it, then its personal index is simply updated. Otherwise, fire + // the event and add to the queue. + + let mut chat_queue = self.swarm_state.chat_queue.lock(); + let chat_min_index = self.swarm_state.chat_min_index.lock(); + let mut farthest_chat_index = self.farthest_chat_index.lock(); + + let actual_vec_index = *farthest_chat_index - *chat_min_index; + + // go through the queue and find the first message that's after the bot's index + let mut found = false; + for (i, msg) in chat_queue.iter().enumerate().skip(actual_vec_index) { + if msg == &m { + // found the message, update the index + *farthest_chat_index = i + *chat_min_index + 1; + found = true; + break; + } + } + + if !found { + // didn't find the message, so fire the swarm event and add to the queue + self.tx + .send(m.clone()) + .expect("failed to send chat message to swarm"); + chat_queue.push_back(m); + *farthest_chat_index = chat_queue.len() - 1 + *chat_min_index; + } + } + } +} + +impl SwarmState { + pub fn new<S>(swarm: Swarm<S>) -> (Self, UnboundedSender<ChatPacket>) + where + S: Send + Sync + Clone + 'static, + { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let swarm_state = SwarmState { + chat_queue: Arc::new(Mutex::new(VecDeque::new())), + chat_min_index: Arc::new(Mutex::new(0)), + rx: Arc::new(tokio::sync::Mutex::new(rx)), + }; + tokio::spawn(swarm_state.clone().start(swarm)); + + (swarm_state, tx) + } + async fn start<S>(self, swarm: Swarm<S>) + where + S: Send + Sync + Clone + 'static, + { + // it should never be locked unless we reused the same plugin for two swarms (bad) + let mut rx = self.rx.lock().await; + while let Some(m) = rx.recv().await { + swarm.swarm_tx.send(SwarmEvent::Chat(m)).unwrap(); + + // To make sure the queue doesn't grow too large, we keep a `chat_min_index` + // in Swarm that's set to the smallest index of all the bots, and we remove all + // messages from the queue that are before that index. + + let chat_min_index = *self.chat_min_index.lock(); + let mut new_chat_min_index = usize::MAX; + for (bot, _) in swarm.bot_datas.lock().iter() { + let this_farthest_chat_index = *bot + .plugins + .get::<State>() + .expect("Chat plugin not installed") + .farthest_chat_index + .lock(); + if this_farthest_chat_index < new_chat_min_index { + new_chat_min_index = this_farthest_chat_index; + } + } + + let mut chat_queue = self.chat_queue.lock(); + // remove all messages from the queue that are before the min index + for _ in 0..(new_chat_min_index - chat_min_index) { + chat_queue.pop_front(); + } + + // update the min index + *self.chat_min_index.lock() = new_chat_min_index; + } + } +} diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs new file mode 100644 index 00000000..c45014d2 --- /dev/null +++ b/azalea/src/swarm/mod.rs @@ -0,0 +1,447 @@ +/// Swarms are a way to conveniently control many bots. +mod chat; +mod plugins; + +pub use self::plugins::*; +use crate::{bot, HandleFn}; +use azalea_client::{Account, ChatPacket, Client, Event, JoinError, Plugins}; +use azalea_protocol::{ + connect::{Connection, ConnectionError}, + resolver::{self, ResolverError}, + ServerAddress, +}; +use azalea_world::WeakWorldContainer; +use futures::future::join_all; +use log::error; +use parking_lot::{Mutex, RwLock}; +use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; +use thiserror::Error; +use tokio::sync::mpsc::{self, UnboundedSender}; + +/// A helper macro that generates a [`SwarmPlugins`] struct from a list of objects +/// that implement [`SwarmPlugin`]. +/// +/// ```rust,no_run +/// swarm_plugins![azalea_pathfinder::Plugin]; +/// ``` +#[macro_export] +macro_rules! swarm_plugins { + ($($plugin:expr),*) => { + { + let mut plugins = azalea::SwarmPlugins::new(); + $( + plugins.add($plugin); + )* + plugins + } + }; +} + +/// A swarm is a way to conveniently control many bots at once, while also +/// being able to control bots at an individual level when desired. +/// +/// Swarms are created from the [`azalea::start_swarm`] function. +/// +/// The `S` type parameter is the type of the state for individual bots. +/// It's used to make the [`Swarm::add`] function work. +/// +/// [`azalea::start_swarm`]: fn.start_swarm.html +#[derive(Clone)] +pub struct Swarm<S> { + bot_datas: Arc<Mutex<Vec<(Client, S)>>>, + + resolved_address: SocketAddr, + address: ServerAddress, + pub worlds: Arc<RwLock<WeakWorldContainer>>, + /// Plugins that are set for new bots + plugins: Plugins, + + bots_tx: UnboundedSender<(Option<Event>, (Client, S))>, + swarm_tx: UnboundedSender<SwarmEvent>, +} + +/// An event about something that doesn't have to do with a single bot. +#[derive(Clone, Debug)] +pub enum SwarmEvent { + /// All the bots in the swarm have successfully joined the server. + Login, + /// The swarm was created. This is only fired once, and it's guaranteed to + /// be the first event to fire. + Init, + /// A bot got disconnected from the server. + /// + /// You can implement an auto-reconnect by calling [`Swarm::add`] + /// with the account from this event. + Disconnect(Account), + /// At least one bot received a chat message. + Chat(ChatPacket), +} + +pub type SwarmHandleFn<Fut, S, SS> = fn(Swarm<S>, SwarmEvent, SS) -> Fut; + +/// The options that are passed to [`azalea::start_swarm`]. +/// +/// [`azalea::start_swarm`]: crate::start_swarm() +pub struct SwarmOptions<S, SS, A, Fut, SwarmFut> +where + A: TryInto<ServerAddress>, + Fut: Future<Output = Result<(), anyhow::Error>>, + SwarmFut: Future<Output = Result<(), anyhow::Error>>, +{ + /// The address of the server that we're connecting to. This can be a + /// `&str`, [`ServerAddress`], or anything that implements + /// `TryInto<ServerAddress>`. + /// + /// [`ServerAddress`]: azalea_protocol::ServerAddress + pub address: A, + /// The accounts that are going to join the server. + pub accounts: Vec<Account>, + /// The plugins that are going to be used for all the bots. + /// + /// You can usually leave this as `plugins![]`. + pub plugins: Plugins, + /// The plugins that are going to be used for the swarm. + /// + /// You can usually leave this as `swarm_plugins![]`. + pub swarm_plugins: SwarmPlugins<S>, + /// The individual bot states. This must be the same length as `accounts`, + /// since each bot gets one state. + pub states: Vec<S>, + /// The state for the overall swarm. + pub swarm_state: SS, + /// The function that's called every time a bot receives an [`Event`]. + pub handle: HandleFn<Fut, S>, + /// The function that's called every time the swarm receives a [`SwarmEvent`]. + pub swarm_handle: SwarmHandleFn<SwarmFut, S, SS>, + + /// How long we should wait between each bot joining the server. Set to + /// None to have every bot connect at the same time. None is different than + /// a duration of 0, since if a duration is present the bots will wait for + /// the previous one to be ready. + pub join_delay: Option<std::time::Duration>, +} + +#[derive(Error, Debug)] +pub enum SwarmStartError { + #[error("Invalid address")] + InvalidAddress, + #[error(transparent)] + ResolveAddress(#[from] ResolverError), + #[error("Join error: {0}")] + Join(#[from] azalea_client::JoinError), +} + +/// Make a bot [`Swarm`]. +/// +/// [`Swarm`]: struct.Swarm.html +/// +/// # Examples +/// ```rust,no_run +/// use azalea::{prelude::*, Swarm, SwarmEvent}; +/// use azalea::{Account, Client, Event}; +/// use std::time::Duration; +/// +/// #[derive(Default, Clone)] +/// struct State {} +/// +/// #[derive(Default, Clone)] +/// struct SwarmState {} +/// +/// #[tokio::main] +/// async fn main() -> anyhow::Result<()> { +/// let mut accounts = Vec::new(); +/// let mut states = Vec::new(); +/// +/// for i in 0..10 { +/// accounts.push(Account::offline(&format!("bot{}", i))); +/// states.push(State::default()); +/// } +/// +/// loop { +/// let e = azalea::start_swarm(azalea::SwarmOptions { +/// accounts: accounts.clone(), +/// address: "localhost", +/// +/// states: states.clone(), +/// swarm_state: SwarmState::default(), +/// +/// plugins: plugins![], +/// swarm_plugins: swarm_plugins![], +/// +/// handle, +/// swarm_handle, +/// +/// join_delay: Some(Duration::from_millis(1000)), +/// }) +/// .await; +/// println!("{e:?}"); +/// } +/// } +/// +/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> { +/// match &event { +/// _ => {} +/// } +/// Ok(()) +/// } +/// +/// async fn swarm_handle( +/// mut swarm: Swarm<State>, +/// event: SwarmEvent, +/// _state: SwarmState, +/// ) -> anyhow::Result<()> { +/// match &event { +/// SwarmEvent::Disconnect(account) => { +/// // automatically reconnect after 5 seconds +/// tokio::time::sleep(Duration::from_secs(5)).await; +/// swarm.add(account, State::default()).await?; +/// } +/// SwarmEvent::Chat(m) => { +/// println!("{}", m.message().to_ansi(None)); +/// } +/// _ => {} +/// } +/// Ok(()) +/// } +pub async fn start_swarm< + S: Send + Sync + Clone + 'static, + SS: Send + Sync + Clone + 'static, + A: Send + TryInto<ServerAddress>, + Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, + SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static, +>( + options: SwarmOptions<S, SS, A, Fut, SwarmFut>, +) -> Result<(), SwarmStartError> { + assert_eq!( + options.accounts.len(), + options.states.len(), + "There must be exactly one state per bot." + ); + + // convert the TryInto<ServerAddress> into a ServerAddress + let address: ServerAddress = match options.address.try_into() { + Ok(address) => address, + Err(_) => return Err(SwarmStartError::InvalidAddress), + }; + + // resolve the address + let resolved_address = resolver::resolve_address(&address).await?; + + let world_container = Arc::new(RwLock::new(WeakWorldContainer::default())); + + let mut plugins = options.plugins; + let swarm_plugins = options.swarm_plugins; + + // DEFAULT CLIENT PLUGINS + plugins.add(bot::Plugin); + plugins.add(crate::pathfinder::Plugin); + // DEFAULT SWARM PLUGINS + + // we can't modify the swarm plugins after this + let (bots_tx, mut bots_rx) = mpsc::unbounded_channel(); + let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel(); + + let mut swarm = Swarm { + bot_datas: Arc::new(Mutex::new(Vec::new())), + + resolved_address, + address, + worlds: world_container, + plugins, + + bots_tx, + + swarm_tx: swarm_tx.clone(), + }; + + { + // the chat plugin is hacky and needs the swarm to be passed like this + let (chat_swarm_state, chat_tx) = chat::SwarmState::new(swarm.clone()); + swarm.plugins.add(chat::Plugin { + swarm_state: chat_swarm_state, + tx: chat_tx, + }); + } + + let swarm_plugins = swarm_plugins.build(); + + let mut swarm_clone = swarm.clone(); + let join_task = tokio::spawn(async move { + if let Some(join_delay) = options.join_delay { + // if there's a join delay, then join one by one + for (account, state) in options.accounts.iter().zip(options.states) { + swarm_clone + .add_with_exponential_backoff(account, state.clone()) + .await; + tokio::time::sleep(join_delay).await; + } + } else { + let swarm_borrow = &swarm_clone; + join_all(options.accounts.iter().zip(options.states).map( + async move |(account, state)| -> Result<(), JoinError> { + swarm_borrow + .clone() + .add_with_exponential_backoff(account, state.clone()) + .await; + Ok(()) + }, + )) + .await; + } + }); + + let swarm_state = options.swarm_state; + let mut internal_state = InternalSwarmState::default(); + + // Watch swarm_rx and send those events to the plugins and swarm_handle. + let swarm_clone = swarm.clone(); + let swarm_plugins_clone = swarm_plugins.clone(); + tokio::spawn(async move { + while let Some(event) = swarm_rx.recv().await { + for plugin in swarm_plugins_clone.clone().into_iter() { + tokio::spawn(plugin.handle(event.clone(), swarm_clone.clone())); + } + tokio::spawn((options.swarm_handle)( + swarm_clone.clone(), + event, + swarm_state.clone(), + )); + } + }); + + // bot events + while let Some((Some(event), (bot, state))) = bots_rx.recv().await { + // bot event handling + let cloned_plugins = (*bot.plugins).clone(); + for plugin in cloned_plugins.into_iter() { + tokio::spawn(plugin.handle(event.clone(), bot.clone())); + } + + // swarm event handling + // remove this #[allow] when more checks are added + #[allow(clippy::single_match)] + match &event { + Event::Login => { + internal_state.bots_joined += 1; + if internal_state.bots_joined == swarm.bot_datas.lock().len() { + swarm_tx.send(SwarmEvent::Login).unwrap(); + } + } + _ => {} + } + + tokio::spawn((options.handle)(bot, event, state)); + } + + join_task.abort(); + + Ok(()) +} + +impl<S> Swarm<S> +where + S: Send + Sync + Clone + 'static, +{ + /// Add a new account to the swarm. You can remove it later by calling [`Client::disconnect`]. + pub async fn add(&mut self, account: &Account, state: S) -> Result<Client, JoinError> { + let conn = Connection::new(&self.resolved_address).await?; + let (conn, game_profile) = Client::handshake(conn, account, &self.address.clone()).await?; + + // tx is moved to the bot so it can send us events + // rx is used to receive events from the bot + let (tx, mut rx) = mpsc::channel(1); + let mut bot = Client::new(game_profile, conn, Some(self.worlds.clone())); + tx.send(Event::Init).await.expect("Failed to send event"); + bot.start_tasks(tx); + + bot.plugins = Arc::new(self.plugins.clone().build()); + + let cloned_bots_tx = self.bots_tx.clone(); + let cloned_bot = bot.clone(); + let cloned_state = state.clone(); + let owned_account = account.clone(); + let bot_datas = self.bot_datas.clone(); + let swarm_tx = self.swarm_tx.clone(); + // send the init event immediately so it's the first thing we get + swarm_tx.send(SwarmEvent::Init).unwrap(); + tokio::spawn(async move { + while let Some(event) = rx.recv().await { + // we can't handle events here (since we can't copy the handler), + // they're handled above in start_swarm + if let Err(e) = + cloned_bots_tx.send((Some(event), (cloned_bot.clone(), cloned_state.clone()))) + { + error!("Error sending event to swarm: {e}"); + } + } + // the bot disconnected, so we remove it from the swarm + let mut bot_datas = bot_datas.lock(); + let index = bot_datas + .iter() + .position(|(b, _)| b.profile.uuid == cloned_bot.profile.uuid) + .expect("bot disconnected but not found in swarm"); + bot_datas.remove(index); + + swarm_tx + .send(SwarmEvent::Disconnect(owned_account)) + .unwrap(); + }); + + self.bot_datas.lock().push((bot.clone(), state.clone())); + + Ok(bot) + } + + /// Add a new account to the swarm, retrying if it couldn't join. This will + /// run forever until the bot joins or the task is aborted. + /// + /// Exponential backoff means if it fails joining it will initially wait 10 + /// seconds, then 20, then 40, up to 2 minutes. + pub async fn add_with_exponential_backoff(&mut self, account: &Account, state: S) -> Client { + let mut disconnects = 0; + loop { + match self.add(account, state.clone()).await { + Ok(bot) => return bot, + Err(e) => { + disconnects += 1; + let delay = (Duration::from_secs(5) * 2u32.pow(disconnects)) + .min(Duration::from_secs(120)); + let username = account.username.clone(); + error!("Error joining {username}: {e}. Waiting {delay:?} and trying again."); + tokio::time::sleep(delay).await; + } + } + } + } +} + +impl<S> IntoIterator for Swarm<S> +where + S: Send + Sync + Clone + 'static, +{ + type Item = (Client, S); + type IntoIter = std::vec::IntoIter<Self::Item>; + + /// Iterate over the bots and their states in this swarm. + /// + /// ```rust,no_run + /// for (bot, state) in swarm { + /// // ... + /// } + /// ``` + fn into_iter(self) -> Self::IntoIter { + self.bot_datas.lock().clone().into_iter() + } +} + +#[derive(Default)] +struct InternalSwarmState { + /// The number of bots connected to the server + pub bots_joined: usize, +} + +impl From<ConnectionError> for SwarmStartError { + fn from(e: ConnectionError) -> Self { + SwarmStartError::from(JoinError::from(e)) + } +} diff --git a/azalea/src/swarm/plugins.rs b/azalea/src/swarm/plugins.rs new file mode 100644 index 00000000..0c7cf2ae --- /dev/null +++ b/azalea/src/swarm/plugins.rs @@ -0,0 +1,134 @@ +use crate::{Swarm, SwarmEvent}; +use async_trait::async_trait; +use nohash_hasher::NoHashHasher; +use std::{ + any::{Any, TypeId}, + collections::HashMap, + hash::BuildHasherDefault, +}; + +type U64Hasher = BuildHasherDefault<NoHashHasher<u64>>; + +// kind of based on https://docs.rs/http/latest/src/http/extensions.rs.html +/// A map of plugin ids to [`SwarmPlugin`] trait objects. The client stores +/// this so we can keep the state for our [`Swarm`] plugins. +/// +/// If you're using azalea, you should generate this from the `swarm_plugins!` macro. +#[derive(Clone, Default)] +pub struct SwarmPlugins<S> { + map: Option<HashMap<TypeId, Box<dyn SwarmPlugin<S>>, U64Hasher>>, +} + +#[derive(Clone)] +pub struct SwarmPluginStates<S> { + map: Option<HashMap<TypeId, Box<dyn SwarmPluginState<S>>, U64Hasher>>, +} + +impl<S> SwarmPluginStates<S> { + pub fn get<T: SwarmPluginState<S>>(&self) -> Option<&T> { + self.map + .as_ref() + .and_then(|map| map.get(&TypeId::of::<T>())) + .and_then(|boxed| (boxed.as_ref() as &dyn Any).downcast_ref::<T>()) + } +} + +impl<S> SwarmPlugins<S> +where + S: 'static, +{ + /// Create a new empty set of plugins. + pub fn new() -> Self { + Self { map: None } + } + + /// Add a new plugin to this set. + pub fn add<T: SwarmPlugin<S>>(&mut self, plugin: T) { + if self.map.is_none() { + self.map = Some(HashMap::with_hasher(BuildHasherDefault::default())); + } + self.map + .as_mut() + .unwrap() + .insert(TypeId::of::<T>(), Box::new(plugin)); + } + + /// Build our plugin states from this set of plugins. Note that if you're + /// using `azalea` you'll probably never need to use this as it's called + /// for you. + pub fn build(self) -> SwarmPluginStates<S> { + if self.map.is_none() { + return SwarmPluginStates { map: None }; + } + let mut map = HashMap::with_hasher(BuildHasherDefault::default()); + for (id, plugin) in self.map.unwrap().into_iter() { + map.insert(id, plugin.build()); + } + SwarmPluginStates { map: Some(map) } + } +} + +impl<S> IntoIterator for SwarmPluginStates<S> { + type Item = Box<dyn SwarmPluginState<S>>; + type IntoIter = std::vec::IntoIter<Self::Item>; + + /// Iterate over the plugin states. + fn into_iter(self) -> Self::IntoIter { + self.map + .map(|map| map.into_values().collect::<Vec<_>>()) + .unwrap_or_default() + .into_iter() + } +} + +/// A `SwarmPluginState` keeps the current state of a plugin for a client. All +/// the fields must be atomic. Unique `SwarmPluginState`s are built from +/// [`SwarmPlugin`]s. +#[async_trait] +pub trait SwarmPluginState<S>: Send + Sync + SwarmPluginStateClone<S> + Any + 'static { + async fn handle(self: Box<Self>, event: SwarmEvent, swarm: Swarm<S>); +} + +/// Swarm plugins can keep their own personal state ([`SwarmPluginState`]), +/// listen to [`SwarmEvent`]s, and add new functions to [`Swarm`]. +pub trait SwarmPlugin<S>: Send + Sync + SwarmPluginClone<S> + Any + 'static { + fn build(&self) -> Box<dyn SwarmPluginState<S>>; +} + +/// An internal trait that allows SwarmPluginState to be cloned. +#[doc(hidden)] +pub trait SwarmPluginStateClone<S> { + fn clone_box(&self) -> Box<dyn SwarmPluginState<S>>; +} +impl<T, S> SwarmPluginStateClone<S> for T +where + T: 'static + SwarmPluginState<S> + Clone, +{ + fn clone_box(&self) -> Box<dyn SwarmPluginState<S>> { + Box::new(self.clone()) + } +} +impl<S> Clone for Box<dyn SwarmPluginState<S>> { + fn clone(&self) -> Self { + self.clone_box() + } +} + +/// An internal trait that allows SwarmPlugin to be cloned. +#[doc(hidden)] +pub trait SwarmPluginClone<S> { + fn clone_box(&self) -> Box<dyn SwarmPlugin<S>>; +} +impl<T, S> SwarmPluginClone<S> for T +where + T: 'static + SwarmPlugin<S> + Clone, +{ + fn clone_box(&self) -> Box<dyn SwarmPlugin<S>> { + Box::new(self.clone()) + } +} +impl<S> Clone for Box<dyn SwarmPlugin<S>> { + fn clone(&self) -> Self { + self.clone_box() + } +} |
