diff options
| author | mat <git@matdoes.dev> | 2025-12-14 23:16:42 -0330 |
|---|---|---|
| committer | mat <git@matdoes.dev> | 2025-12-14 23:16:42 -0330 |
| commit | 9745766cff8907e6c6bb87b4ab852acaeadd6ef9 (patch) | |
| tree | 3bdfadccee346cf6d2c0ccb2a2704f564ac9b625 | |
| parent | dcbd690f21665e22ea250024a1aa85dec34e6c9e (diff) | |
| download | azalea-drasl-9745766cff8907e6c6bb87b4ab852acaeadd6ef9.tar.xz | |
cleanup azalea crate, move client/swarm builders into separate modules
| -rw-r--r-- | azalea/src/builder.rs | 201 | ||||
| -rw-r--r-- | azalea/src/lib.rs | 207 | ||||
| -rw-r--r-- | azalea/src/swarm/builder.rs | 607 | ||||
| -rw-r--r-- | azalea/src/swarm/mod.rs | 610 |
4 files changed, 827 insertions, 798 deletions
diff --git a/azalea/src/builder.rs b/azalea/src/builder.rs new file mode 100644 index 00000000..d0318424 --- /dev/null +++ b/azalea/src/builder.rs @@ -0,0 +1,201 @@ +use std::time::Duration; + +use azalea_client::{Account, DefaultPlugins}; +use azalea_protocol::address::ResolvableAddr; +use bevy_app::{AppExit, Plugins}; +use bevy_ecs::component::Component; + +use crate::{ + HandleFn, JoinOpts, NoState, + bot::DefaultBotPlugins, + swarm::{self, SwarmBuilder}, +}; + +/// A builder for creating new [`Client`]s. This is the recommended way of +/// making a bot. +/// +/// ```no_run +/// # use azalea::prelude::*; +/// # #[tokio::main] +/// # async fn main() { +/// ClientBuilder::new() +/// .set_handler(handle) +/// .start(Account::offline("bot"), "localhost") +/// .await; +/// # } +/// # #[derive(Clone, Component, Default)] +/// # pub struct State; +/// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { +/// # Ok(()) +/// # } +/// ``` +pub struct ClientBuilder<S, R> +where + S: Default + Send + Sync + Clone + Component + 'static, + R: Send + 'static, + Self: Send, +{ + /// Internally, ClientBuilder is just a wrapper over SwarmBuilder since it's + /// technically just a subset of it so we can avoid duplicating code this + /// way. + swarm: SwarmBuilder<S, swarm::NoSwarmState, R, ()>, +} +impl ClientBuilder<NoState, ()> { + /// Start building a client that can join the world. + #[must_use] + pub fn new() -> Self { + Self::new_without_plugins() + .add_plugins(DefaultPlugins) + .add_plugins(DefaultBotPlugins) + } + + /// [`Self::new`] but without adding the plugins by default. + /// + /// This is useful if you want to disable a default plugin. This also exists + /// for swarms, see [`SwarmBuilder::new_without_plugins`]. + /// + /// Note that you can also disable `LogPlugin` by disabling the `log` + /// feature. + /// + /// You **must** add [`DefaultPlugins`] and [`DefaultBotPlugins`] to this. + /// + /// ``` + /// # use azalea::prelude::*; + /// use azalea::app::PluginGroup; + /// + /// let client_builder = ClientBuilder::new_without_plugins() + /// .add_plugins( + /// azalea::DefaultPlugins + /// .build() + /// .disable::<azalea::chat_signing::ChatSigningPlugin>(), + /// ) + /// .add_plugins(azalea::bot::DefaultBotPlugins); + /// # client_builder.set_handler(handle); + /// # #[derive(Clone, Component, Default)] + /// # pub struct State; + /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { + /// # Ok(()) + /// # } + /// ``` + #[must_use] + pub fn new_without_plugins() -> Self { + Self { + swarm: SwarmBuilder::new_without_plugins(), + } + } + + /// Set the function that's called every time a bot receives an [`Event`]. + /// This is the way to handle normal per-bot events. + /// + /// Currently, you can have up to one client handler. + /// + /// Note that if you're creating clients directly from the ECS using + /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then + /// the handler function won't be called for that client. This shouldn't be + /// a concern for most bots, though. + /// + /// ``` + /// # use azalea::prelude::*; + /// # let client_builder = azalea::ClientBuilder::new(); + /// client_builder.set_handler(handle); + /// + /// # #[derive(Clone, Component, Default)] + /// # pub struct State; + /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { + /// Ok(()) + /// } + /// ``` + /// + /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent + #[must_use] + pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> ClientBuilder<S, R> + where + S: Default + Send + Sync + Clone + Component + 'static, + Fut: Future<Output = R> + Send + 'static, + R: Send + 'static, + { + ClientBuilder { + swarm: self.swarm.set_handler(handler), + } + } +} +impl<S, R> ClientBuilder<S, R> +where + S: Default + Send + Sync + Clone + Component + 'static, + R: Send + 'static, +{ + /// Set the client state instead of initializing defaults. + #[must_use] + pub fn set_state(mut self, state: S) -> Self { + self.swarm.states = vec![state]; + self + } + /// Add a group of plugins to the client. + /// + /// See [`Self::new_without_plugins`] to learn how to disable default + /// plugins. + #[must_use] + pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self { + self.swarm = self.swarm.add_plugins(plugins); + self + } + + /// Configures the auto-reconnection behavior for our bot. + /// + /// If this is `Some`, then it'll set the default reconnection delay for our + /// bot (how long it'll wait after being kicked before it tries + /// rejoining). if it's `None`, then auto-reconnecting will be disabled. + /// + /// If this function isn't called, then our client will reconnect after + /// [`DEFAULT_RECONNECT_DELAY`]. + /// + /// [`DEFAULT_RECONNECT_DELAY`]: azalea_client::auto_reconnect::DEFAULT_RECONNECT_DELAY + #[must_use] + pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self { + self.swarm.reconnect_after = delay.into(); + self + } + + /// Build this `ClientBuilder` into an actual [`Client`] and join the given + /// server. + /// + /// If the client can't join, it'll keep retrying forever until it can. + /// + /// The `address` argument can be a `&str`, [`ServerAddr`], + /// [`ResolvedAddr`], or anything else that implements [`ResolvableAddr`]. + /// + /// # Errors + /// + /// This will error if the given address is invalid or couldn't be resolved + /// to a Minecraft server. + /// + /// [`ServerAddr`]: azalea_protocol::address::ServerAddr + /// [`ResolvedAddr`]: azalea_protocol::address::ResolvedAddr + pub async fn start(mut self, account: Account, address: impl ResolvableAddr) -> AppExit { + self.swarm.accounts = vec![(account, JoinOpts::default())]; + if self.swarm.states.is_empty() { + self.swarm.states = vec![S::default()]; + } + self.swarm.start(address).await + } + + /// Do the same as [`Self::start`], but allow passing in custom join + /// options. + pub async fn start_with_opts( + mut self, + account: Account, + address: impl ResolvableAddr, + opts: JoinOpts, + ) -> AppExit { + self.swarm.accounts = vec![(account, opts.clone())]; + if self.swarm.states.is_empty() { + self.swarm.states = vec![S::default()]; + } + self.swarm.start_with_opts(address, opts).await + } +} +impl Default for ClientBuilder<NoState, ()> { + fn default() -> Self { + Self::new() + } +} diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs index 44c1fe0c..b10b7fc8 100644 --- a/azalea/src/lib.rs +++ b/azalea/src/lib.rs @@ -5,15 +5,15 @@ pub mod accept_resource_packs; pub mod auto_respawn; pub mod auto_tool; pub mod bot; +mod builder; pub mod container; pub mod nearest_entity; pub mod pathfinder; pub mod prelude; pub mod swarm; -use std::{net::SocketAddr, time::Duration}; +use std::net::SocketAddr; -use app::Plugins; pub use azalea_auth as auth; pub use azalea_block as block; #[doc(hidden)] @@ -37,216 +37,23 @@ pub use azalea_core::position::{BlockPos, Vec3}; pub use azalea_entity as entity; pub use azalea_physics as physics; pub use azalea_protocol as protocol; -use azalea_protocol::address::{ResolvableAddr, ServerAddr}; +use azalea_protocol::{address::ServerAddr, connect::Proxy}; pub use azalea_registry as registry; #[doc(hidden)] #[deprecated(note = "renamed to `Identifier`.")] -pub use azalea_registry::identifier::Identifier as ResourceLocation; +pub type ResourceLocation = azalea_registry::identifier::Identifier; pub use azalea_registry::identifier::Identifier; pub use azalea_world as world; pub use bevy_app as app; -use bevy_app::AppExit; pub use bevy_ecs as ecs; -use ecs::component::Component; -use futures::{Future, future::BoxFuture}; -use protocol::connect::Proxy; -use swarm::SwarmBuilder; - -use crate::bot::DefaultBotPlugins; +use bevy_ecs::component::Component; +pub use builder::ClientBuilder; +use futures::future::BoxFuture; pub type BoxHandleFn<S, R> = Box<dyn Fn(Client, azalea_client::Event, S) -> BoxFuture<'static, R> + Send>; pub type HandleFn<S, Fut> = fn(Client, azalea_client::Event, S) -> Fut; -/// A builder for creating new [`Client`]s. This is the recommended way of -/// making a bot. -/// -/// ```no_run -/// # use azalea::prelude::*; -/// # #[tokio::main] -/// # async fn main() { -/// ClientBuilder::new() -/// .set_handler(handle) -/// .start(Account::offline("bot"), "localhost") -/// .await; -/// # } -/// # #[derive(Clone, Component, Default)] -/// # pub struct State; -/// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { -/// # Ok(()) -/// # } -/// ``` -pub struct ClientBuilder<S, R> -where - S: Default + Send + Sync + Clone + Component + 'static, - R: Send + 'static, - Self: Send, -{ - /// Internally, ClientBuilder is just a wrapper over SwarmBuilder since it's - /// technically just a subset of it so we can avoid duplicating code this - /// way. - swarm: SwarmBuilder<S, swarm::NoSwarmState, R, ()>, -} -impl ClientBuilder<NoState, ()> { - /// Start building a client that can join the world. - #[must_use] - pub fn new() -> Self { - Self::new_without_plugins() - .add_plugins(DefaultPlugins) - .add_plugins(DefaultBotPlugins) - } - - /// [`Self::new`] but without adding the plugins by default. - /// - /// This is useful if you want to disable a default plugin. This also exists - /// for swarms, see [`SwarmBuilder::new_without_plugins`]. - /// - /// Note that you can also disable `LogPlugin` by disabling the `log` - /// feature. - /// - /// You **must** add [`DefaultPlugins`] and [`DefaultBotPlugins`] to this. - /// - /// ``` - /// # use azalea::prelude::*; - /// use azalea::app::PluginGroup; - /// - /// let client_builder = ClientBuilder::new_without_plugins() - /// .add_plugins( - /// azalea::DefaultPlugins - /// .build() - /// .disable::<azalea::chat_signing::ChatSigningPlugin>(), - /// ) - /// .add_plugins(azalea::bot::DefaultBotPlugins); - /// # client_builder.set_handler(handle); - /// # #[derive(Clone, Component, Default)] - /// # pub struct State; - /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { - /// # Ok(()) - /// # } - /// ``` - #[must_use] - pub fn new_without_plugins() -> Self { - Self { - swarm: SwarmBuilder::new_without_plugins(), - } - } - - /// Set the function that's called every time a bot receives an [`Event`]. - /// This is the way to handle normal per-bot events. - /// - /// Currently, you can have up to one client handler. - /// - /// Note that if you're creating clients directly from the ECS using - /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then - /// the handler function won't be called for that client. This shouldn't be - /// a concern for most bots, though. - /// - /// ``` - /// # use azalea::prelude::*; - /// # let client_builder = azalea::ClientBuilder::new(); - /// client_builder.set_handler(handle); - /// - /// # #[derive(Clone, Component, Default)] - /// # pub struct State; - /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { - /// Ok(()) - /// } - /// ``` - /// - /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent - #[must_use] - pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> ClientBuilder<S, R> - where - S: Default + Send + Sync + Clone + Component + 'static, - Fut: Future<Output = R> + Send + 'static, - R: Send + 'static, - { - ClientBuilder { - swarm: self.swarm.set_handler(handler), - } - } -} -impl<S, R> ClientBuilder<S, R> -where - S: Default + Send + Sync + Clone + Component + 'static, - R: Send + 'static, -{ - /// Set the client state instead of initializing defaults. - #[must_use] - pub fn set_state(mut self, state: S) -> Self { - self.swarm.states = vec![state]; - self - } - /// Add a group of plugins to the client. - /// - /// See [`Self::new_without_plugins`] to learn how to disable default - /// plugins. - #[must_use] - pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self { - self.swarm = self.swarm.add_plugins(plugins); - self - } - - /// Configures the auto-reconnection behavior for our bot. - /// - /// If this is `Some`, then it'll set the default reconnection delay for our - /// bot (how long it'll wait after being kicked before it tries - /// rejoining). if it's `None`, then auto-reconnecting will be disabled. - /// - /// If this function isn't called, then our client will reconnect after - /// [`DEFAULT_RECONNECT_DELAY`]. - /// - /// [`DEFAULT_RECONNECT_DELAY`]: azalea_client::auto_reconnect::DEFAULT_RECONNECT_DELAY - #[must_use] - pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self { - self.swarm.reconnect_after = delay.into(); - self - } - - /// Build this `ClientBuilder` into an actual [`Client`] and join the given - /// server. - /// - /// If the client can't join, it'll keep retrying forever until it can. - /// - /// The `address` argument can be a `&str`, [`ServerAddr`], - /// [`ResolvedAddr`], or anything else that implements [`ResolvableAddr`]. - /// - /// # Errors - /// - /// This will error if the given address is invalid or couldn't be resolved - /// to a Minecraft server. - /// - /// [`ServerAddr`]: azalea_protocol::address::ServerAddr - /// [`ResolvedAddr`]: azalea_protocol::address::ResolvedAddr - pub async fn start(mut self, account: Account, address: impl ResolvableAddr) -> AppExit { - self.swarm.accounts = vec![(account, JoinOpts::default())]; - if self.swarm.states.is_empty() { - self.swarm.states = vec![S::default()]; - } - self.swarm.start(address).await - } - - /// Do the same as [`Self::start`], but allow passing in custom join - /// options. - pub async fn start_with_opts( - mut self, - account: Account, - address: impl ResolvableAddr, - opts: JoinOpts, - ) -> AppExit { - self.swarm.accounts = vec![(account, opts.clone())]; - if self.swarm.states.is_empty() { - self.swarm.states = vec![S::default()]; - } - self.swarm.start_with_opts(address, opts).await - } -} -impl Default for ClientBuilder<NoState, ()> { - fn default() -> Self { - Self::new() - } -} - /// A marker that can be used in place of a State in [`ClientBuilder`] or /// [`SwarmBuilder`]. /// diff --git a/azalea/src/swarm/builder.rs b/azalea/src/swarm/builder.rs new file mode 100644 index 00000000..412ec67e --- /dev/null +++ b/azalea/src/swarm/builder.rs @@ -0,0 +1,607 @@ +use std::{ + collections::{HashMap, hash_map}, + mem, + sync::{ + Arc, + atomic::{self, AtomicBool}, + }, + time::Duration, +}; + +use azalea_client::{ + Account, DefaultPlugins, + auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY}, + start_ecs_runner, +}; +use azalea_protocol::address::{ResolvableAddr, ResolvedAddr}; +use azalea_world::InstanceContainer; +use bevy_app::{App, AppExit, Plugins, SubApp}; +use bevy_ecs::{component::Component, resource::Resource}; +use futures::future::join_all; +use parking_lot::RwLock; +use tokio::{sync::mpsc, task}; +use tracing::{debug, error, warn}; + +use crate::{ + BoxHandleFn, HandleFn, JoinOpts, NoState, + bot::DefaultBotPlugins, + swarm::{ + BoxSwarmHandleFn, DefaultSwarmPlugins, NoSwarmState, Swarm, SwarmEvent, SwarmHandleFn, + }, +}; + +/// Create a new [`Swarm`]. +/// +/// The generics of this struct stand for the following: +/// - S: State +/// - SS: Swarm State +/// - R: Return type of the handler +/// - SR: Return type of the swarm handler +/// +/// You shouldn't have to manually set them though, they'll be inferred for you. +pub struct SwarmBuilder<S, SS, R, SR> +where + S: Send + Sync + Clone + Component + 'static, + SS: Default + Send + Sync + Clone + Resource + 'static, + Self: Send, +{ + // SubApp is used instead of App to make it Send + pub(crate) app: SubApp, + /// The accounts and proxies that are going to join the server. + pub(crate) accounts: Vec<(Account, JoinOpts)>, + /// The individual bot states. + /// + /// This must be the same length as `accounts`, since each bot gets one + /// state. + pub(crate) states: Vec<S>, + /// The state for the overall swarm. + pub(crate) swarm_state: SS, + /// The function that's called every time a bot receives an [`Event`]. + pub(crate) handler: Option<BoxHandleFn<S, R>>, + /// The function that's called every time the swarm receives a + /// [`SwarmEvent`]. + pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>, + + /// How long we should wait between each bot joining the server. + /// + /// If this is None, every bot will connect at the same time. None is + /// different than a duration of 0, since if a duration is present the + /// bots will wait for the previous one to be ready. + pub(crate) join_delay: Option<Duration>, + + /// The default reconnection delay for our bots. + /// + /// This will change the value of the [`AutoReconnectDelay`] resource. + pub(crate) reconnect_after: Option<Duration>, +} +impl SwarmBuilder<NoState, NoSwarmState, (), ()> { + /// Start creating the swarm. + #[must_use] + pub fn new() -> Self { + Self::new_without_plugins() + .add_plugins(DefaultPlugins) + .add_plugins(DefaultBotPlugins) + .add_plugins(DefaultSwarmPlugins) + } + + /// [`Self::new`] but without adding the plugins by default. + /// + /// This is useful if you want to disable a default plugin. This also exists + /// for `ClientBuilder`, see [`ClientBuilder::new_without_plugins`]. + /// + /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and + /// [`DefaultSwarmPlugins`] to this. + /// + /// ``` + /// # use azalea::{prelude::*, swarm::prelude::*}; + /// use azalea::app::PluginGroup; + /// + /// let swarm_builder = SwarmBuilder::new_without_plugins() + /// .add_plugins(azalea::DefaultPlugins.build().disable::<azalea::chat_signing::ChatSigningPlugin>()) + /// .add_plugins(azalea::bot::DefaultBotPlugins) + /// .add_plugins(azalea::swarm::DefaultSwarmPlugins); + /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle); + /// # #[derive(Clone, Component, Default, Resource)] + /// # pub struct State; + /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { + /// # Ok(()) + /// # } + /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> { + /// # Ok(()) + /// # } + /// ``` + /// + /// [`ClientBuilder::new_without_plugins`]: crate::ClientBuilder::new_without_plugins + #[must_use] + pub fn new_without_plugins() -> Self { + SwarmBuilder { + // we create the app here so plugins can add onto it. + // the schedules won't run until [`Self::start`] is called. + + // `App::new()` is used instead of `SubApp::new()` so the necessary resources are + // initialized + app: mem::take(App::new().main_mut()), + accounts: Vec::new(), + states: Vec::new(), + swarm_state: NoSwarmState, + handler: None, + swarm_handler: None, + join_delay: None, + reconnect_after: Some(DEFAULT_RECONNECT_DELAY), + } + } +} + +impl<SS, SR> SwarmBuilder<NoState, SS, (), SR> +where + SS: Default + Send + Sync + Clone + Resource + 'static, +{ + /// Set the function that's called every time a bot receives an + /// [`enum@Event`]. This is the intended way to handle normal per-bot + /// events. + /// + /// Currently you can have up to one handler. + /// + /// Note that if you're creating clients directly from the ECS using + /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then + /// the handler function won't be called for that client. This also applies + /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for + /// most bots, though. + /// + /// ``` + /// # use azalea::{prelude::*, swarm::prelude::*}; + /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle); + /// swarm_builder.set_handler(handle); + /// + /// #[derive(Clone, Component, Default)] + /// struct State {} + /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { + /// Ok(()) + /// } + /// + /// # #[derive(Clone, Default, Resource)] + /// # struct SwarmState {} + /// # async fn swarm_handle( + /// # mut swarm: Swarm, + /// # event: SwarmEvent, + /// # state: SwarmState, + /// # ) -> anyhow::Result<()> { + /// # Ok(()) + /// # } + /// ``` + /// + /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent + #[must_use] + pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR> + where + Fut: Future<Output = R> + Send + 'static, + S: Send + Sync + Clone + Component + Default + 'static, + { + SwarmBuilder { + handler: Some(Box::new(move |bot, event, state: S| { + Box::pin(handler(bot, event, state)) + })), + // if we added accounts before the State was set, we've gotta set it to the default now + states: vec![S::default(); self.accounts.len()], + app: self.app, + ..self + } + } +} + +impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()> +where + S: Send + Sync + Clone + Component + 'static, +{ + /// Set the function that's called every time the swarm receives a + /// [`SwarmEvent`]. This is the intended way to handle global swarm events. + /// + /// Currently you can have up to one swarm handler. + /// + /// Note that if you're creating clients directly from the ECS using + /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then + /// this handler function won't be called for that client. This also applies + /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for + /// most bots, though. + /// + /// ``` + /// # use azalea::{prelude::*, swarm::prelude::*}; + /// # let swarm_builder = SwarmBuilder::new().set_handler(handle); + /// swarm_builder.set_swarm_handler(swarm_handle); + /// + /// # #[derive(Clone, Component, Default)] + /// # struct State {} + /// + /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { + /// # Ok(()) + /// # } + /// + /// #[derive(Clone, Default, Resource)] + /// struct SwarmState {} + /// async fn swarm_handle( + /// mut swarm: Swarm, + /// event: SwarmEvent, + /// state: SwarmState, + /// ) -> anyhow::Result<()> { + /// Ok(()) + /// } + /// ``` + /// + /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent + #[must_use] + pub fn set_swarm_handler<SS, Fut, SR>( + self, + handler: SwarmHandleFn<SS, Fut>, + ) -> SwarmBuilder<S, SS, R, SR> + where + SS: Default + Send + Sync + Clone + Resource + 'static, + Fut: Future<Output = SR> + Send + 'static, + { + SwarmBuilder { + handler: self.handler, + app: self.app, + accounts: self.accounts, + states: self.states, + swarm_state: SS::default(), + swarm_handler: Some(Box::new(move |swarm, event, state| { + Box::pin(handler(swarm, event, state)) + })), + join_delay: self.join_delay, + reconnect_after: self.reconnect_after, + } + } +} + +impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR> +where + S: Send + Sync + Clone + Component + 'static, + SS: Default + Send + Sync + Clone + Resource + 'static, + R: Send + 'static, + SR: Send + 'static, +{ + /// Add a vec of [`Account`]s to the swarm. + /// + /// Use [`Self::add_account`] to only add one account. If you want the + /// clients to have different default states, add them one at a time with + /// [`Self::add_account_with_state`]. + /// + /// By default, every account will join at the same time, you can add a + /// delay with [`Self::join_delay`]. + #[must_use] + pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self + where + S: Default, + { + for account in accounts { + self = self.add_account(account); + } + + self + } + + /// Add a single new [`Account`] to the swarm. + /// + /// Use [`Self::add_accounts`] to add multiple accounts at a time. + /// + /// This will make the state for this client be the default, use + /// [`Self::add_account_with_state`] to avoid that. + #[must_use] + pub fn add_account(self, account: Account) -> Self + where + S: Default, + { + self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default()) + } + + /// Add an account with a custom initial state. + /// + /// Use just [`Self::add_account`] to use the `Default` implementation for + /// the state. + #[must_use] + pub fn add_account_with_state(self, account: Account, state: S) -> Self { + self.add_account_with_state_and_opts(account, state, JoinOpts::default()) + } + + /// Add an account with a custom initial state. + /// + /// Use just [`Self::add_account`] to use the `Default` implementation for + /// the state. + #[must_use] + pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self + where + S: Default, + { + self.add_account_with_state_and_opts(account, S::default(), opts) + } + + /// Same as [`Self::add_account_with_state`], but allow passing in custom + /// join options. + #[must_use] + pub fn add_account_with_state_and_opts( + mut self, + account: Account, + state: S, + join_opts: JoinOpts, + ) -> Self { + self.accounts.push((account, join_opts)); + self.states.push(state); + self + } + + /// Set the swarm state instead of initializing defaults. + #[must_use] + pub fn set_swarm_state(mut self, swarm_state: SS) -> Self { + self.swarm_state = swarm_state; + self + } + + /// Add one or more plugins to this swarm. + /// + /// See [`Self::new_without_plugins`] to learn how to disable default + /// plugins. + #[must_use] + pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self { + self.app.add_plugins(plugins); + self + } + + /// Set how long we should wait between each bot joining the server. + /// + /// By default, every bot will connect at the same time. If you set this + /// field, however, the bots will wait for the previous one to have + /// connected and *then* they'll wait the given duration. + #[must_use] + pub fn join_delay(mut self, delay: Duration) -> Self { + self.join_delay = Some(delay); + self + } + + /// Configures the auto-reconnection behavior for our bots. + /// + /// If this is `Some`, then it'll set the default reconnection delay for our + /// bots (how long they'll wait after being kicked before they try + /// rejoining). if it's `None`, then auto-reconnecting will be disabled. + /// + /// If this function isn't called, then our clients will reconnect after + /// [`DEFAULT_RECONNECT_DELAY`]. + #[must_use] + pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self { + self.reconnect_after = delay.into(); + self + } + + /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given + /// server. + /// + /// The `address` argument can be a `&str`, [`ServerAddr`], + /// [`ResolvedAddr`], or anything else that implements [`ResolvableAddr`]. + /// + /// [`ServerAddr`]: azalea_protocol::address::ServerAddr + pub async fn start(self, address: impl ResolvableAddr) -> AppExit { + self.start_with_opts(address, JoinOpts::default()).await + } + + #[doc(hidden)] + #[deprecated = "renamed to `start_with_opts`."] + pub async fn start_with_default_opts( + self, + address: impl ResolvableAddr, + default_join_opts: JoinOpts, + ) -> AppExit { + self.start_with_opts(address, default_join_opts).await + } + + /// Do the same as [`Self::start`], but allow passing in default join + /// options for the bots. + pub async fn start_with_opts( + mut self, + address: impl ResolvableAddr, + join_opts: JoinOpts, + ) -> AppExit { + assert_eq!( + self.accounts.len(), + self.states.len(), + "There must be exactly one state per bot." + ); + + debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION")); + + let address = if let Some(socket_addr) = join_opts.custom_socket_addr { + let server_addr = if let Some(server_addr) = join_opts + .custom_server_addr + .clone() + .or_else(|| address.clone().server_addr().ok()) + { + server_addr + } else { + error!( + "Failed to parse address: {address:?}. If this was expected, consider passing in a `ServerAddr` instead." + ); + return AppExit::error(); + }; + + ResolvedAddr { + server: server_addr, + socket: socket_addr, + } + } else { + let Ok(addr) = address.clone().resolve().await else { + error!( + "Failed to resolve address: {address:?}. If this was expected, consider resolving the address earlier with `ResolvableAddr::resolve`." + ); + return AppExit::error(); + }; + addr + }; + + let instance_container = Arc::new(RwLock::new(InstanceContainer::default())); + + // we can't modify the swarm plugins after this + let (bots_tx, mut bots_rx) = mpsc::unbounded_channel(); + let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel(); + + swarm_tx.send(SwarmEvent::Init).unwrap(); + + let main_schedule_label = self.app.update_schedule.unwrap(); + + let local_set = task::LocalSet::new(); + + local_set.run_until(async move { + // start_ecs_runner must be run inside of the LocalSet + let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app); + + let swarm = Swarm { + ecs_lock: ecs_lock.clone(), + + address: Arc::new(RwLock::new(address)), + instance_container, + + bots_tx, + + swarm_tx: swarm_tx.clone(), + }; + + // run the main schedule so the startup systems run + { + let mut ecs = ecs_lock.lock(); + ecs.insert_resource(swarm.clone()); + ecs.insert_resource(self.swarm_state.clone()); + if let Some(reconnect_after) = self.reconnect_after { + ecs.insert_resource(AutoReconnectDelay { + delay: reconnect_after, + }); + } else { + ecs.remove_resource::<AutoReconnectDelay>(); + } + ecs.run_schedule(main_schedule_label); + ecs.clear_trackers(); + } + + // only do this after we inserted the Swarm and state resources to avoid errors + // where Res<Swarm> is inaccessible + start_running_systems(); + + // SwarmBuilder (self) isn't Send so we have to take all the things we need out + // of it + let swarm_clone = swarm.clone(); + let join_delay = self.join_delay; + let accounts = self.accounts.clone(); + let states = self.states.clone(); + + task::spawn_local(async move { + if let Some(join_delay) = join_delay { + // if there's a join delay, then join one by one + for ((account, bot_join_opts), state) in accounts.iter().zip(states) { + let mut join_opts = join_opts.clone(); + join_opts.update(bot_join_opts); + let _ = swarm_clone.add_with_opts(account, state, &join_opts).await; + tokio::time::sleep(join_delay).await; + } + } else { + // otherwise, join all at once + let swarm_borrow = &swarm_clone; + join_all(accounts.iter().zip(states).map( + |((account, bot_join_opts), state)| async { + let mut join_opts = join_opts.clone(); + join_opts.update(bot_join_opts); + let _ = swarm_borrow + .clone() + .add_with_opts(account, state, &join_opts) + .await; + }, + )) + .await; + } + + swarm_tx.send(SwarmEvent::Login).unwrap(); + }); + + let swarm_state = self.swarm_state; + + // Watch swarm_rx and send those events to the swarm_handle. + let swarm_clone = swarm.clone(); + let swarm_handler_task = task::spawn_local(async move { + while let Some(event) = swarm_rx.recv().await { + if let Some(swarm_handler) = &self.swarm_handler { + task::spawn_local((swarm_handler)( + swarm_clone.clone(), + event, + swarm_state.clone(), + )); + } + } + + unreachable!( + "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail" + ); + }); + + // bot events + let client_handler_task = task::spawn_local(async move { + while let Some((Some(first_event), first_bot)) = bots_rx.recv().await { + if bots_rx.len() > 1_000 { + static WARNED: AtomicBool = AtomicBool::new(false); + if !WARNED.swap(true, atomic::Ordering::Relaxed) { + warn!("the Client Event channel has more than 1000 items!") + } + } + + if let Some(handler) = &self.handler { + let ecs_mutex = first_bot.ecs.clone(); + let mut ecs = ecs_mutex.lock(); + let mut query = ecs.query::<Option<&S>>(); + let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else { + error!( + "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.", + first_bot.username(), + first_bot.entity + ); + continue; + }; + let first_bot_entity = first_bot.entity; + let first_bot_state = first_bot_state.clone(); + + task::spawn_local((handler)(first_bot, first_event, first_bot_state.clone())); + + // this makes it not have to keep locking the ecs + let mut states = HashMap::new(); + states.insert(first_bot_entity, first_bot_state); + while let Ok((Some(event), bot)) = bots_rx.try_recv() { + let state = match states.entry(bot.entity) { + hash_map::Entry::Occupied(e) => e.into_mut(), + hash_map::Entry::Vacant(e) => { + let Ok(Some(state)) = query.get(&ecs, bot.entity) else { + error!( + "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.", + bot.username(), + bot.entity + ); + continue; + }; + let state = state.clone(); + e.insert(state) + } + }; + task::spawn_local((handler)(bot, event, state.clone())); + } + } + } + }); + + let app_exit = appexit_rx + .await + .expect("appexit_tx shouldn't be dropped by the ECS runner before sending"); + + swarm_handler_task.abort(); + client_handler_task.abort(); + + app_exit + }).await + } +} + +impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> { + fn default() -> Self { + Self::new() + } +} diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index 360c9939..db5dd394 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -2,39 +2,29 @@ //! //! See [`Swarm`] for more information. +mod builder; mod chat; mod events; pub mod prelude; -use std::{ - collections::{HashMap, hash_map}, - future::Future, - mem, - sync::{ - Arc, - atomic::{self, AtomicBool}, - }, - time::Duration, +use std::sync::{ + Arc, + atomic::{self, AtomicBool}, }; -use azalea_client::{ - Account, Client, DefaultPlugins, Event, StartClientOpts, - auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY}, - chat::ChatPacket, - join::ConnectOpts, - start_ecs_runner, -}; +use azalea_client::{Account, Client, Event, StartClientOpts, chat::ChatPacket, join::ConnectOpts}; use azalea_entity::LocalEntity; -use azalea_protocol::address::{ResolvableAddr, ResolvedAddr}; +use azalea_protocol::address::ResolvedAddr; use azalea_world::InstanceContainer; -use bevy_app::{App, AppExit, PluginGroup, PluginGroupBuilder, Plugins, SubApp}; +use bevy_app::{PluginGroup, PluginGroupBuilder}; use bevy_ecs::prelude::*; -use futures::future::{BoxFuture, join_all}; +pub use builder::SwarmBuilder; +use futures::future::BoxFuture; use parking_lot::{Mutex, RwLock}; use tokio::{sync::mpsc, task}; use tracing::{debug, error, warn}; -use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState}; +use crate::JoinOpts; /// A swarm is a way to conveniently control many bots at once, while also /// being able to control bots at an individual level when desired. @@ -55,585 +45,9 @@ pub struct Swarm { pub instance_container: Arc<RwLock<InstanceContainer>>, /// This is used internally to make the client handler function work. - bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>, + pub(crate) bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>, /// This is used internally to make the swarm handler function work. - swarm_tx: mpsc::UnboundedSender<SwarmEvent>, -} - -/// Create a new [`Swarm`]. -/// -/// The generics of this struct stand for the following: -/// - S: State -/// - SS: Swarm State -/// - R: Return type of the handler -/// - SR: Return type of the swarm handler -/// -/// You shouldn't have to manually set them though, they'll be inferred for you. -pub struct SwarmBuilder<S, SS, R, SR> -where - S: Send + Sync + Clone + Component + 'static, - SS: Default + Send + Sync + Clone + Resource + 'static, - Self: Send, -{ - // SubApp is used instead of App to make it Send - pub(crate) app: SubApp, - /// The accounts and proxies that are going to join the server. - pub(crate) accounts: Vec<(Account, JoinOpts)>, - /// The individual bot states. - /// - /// This must be the same length as `accounts`, since each bot gets one - /// state. - pub(crate) states: Vec<S>, - /// The state for the overall swarm. - pub(crate) swarm_state: SS, - /// The function that's called every time a bot receives an [`Event`]. - pub(crate) handler: Option<BoxHandleFn<S, R>>, - /// The function that's called every time the swarm receives a - /// [`SwarmEvent`]. - pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>, - - /// How long we should wait between each bot joining the server. - /// - /// If this is None, every bot will connect at the same time. None is - /// different than a duration of 0, since if a duration is present the - /// bots will wait for the previous one to be ready. - pub(crate) join_delay: Option<Duration>, - - /// The default reconnection delay for our bots. - /// - /// This will change the value of the [`AutoReconnectDelay`] resource. - pub(crate) reconnect_after: Option<Duration>, -} -impl SwarmBuilder<NoState, NoSwarmState, (), ()> { - /// Start creating the swarm. - #[must_use] - pub fn new() -> Self { - Self::new_without_plugins() - .add_plugins(DefaultPlugins) - .add_plugins(DefaultBotPlugins) - .add_plugins(DefaultSwarmPlugins) - } - - /// [`Self::new`] but without adding the plugins by default. - /// - /// This is useful if you want to disable a default plugin. This also exists - /// for `ClientBuilder`, see [`ClientBuilder::new_without_plugins`]. - /// - /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and - /// [`DefaultSwarmPlugins`] to this. - /// - /// ``` - /// # use azalea::{prelude::*, swarm::prelude::*}; - /// use azalea::app::PluginGroup; - /// - /// let swarm_builder = SwarmBuilder::new_without_plugins() - /// .add_plugins(azalea::DefaultPlugins.build().disable::<azalea::chat_signing::ChatSigningPlugin>()) - /// .add_plugins(azalea::bot::DefaultBotPlugins) - /// .add_plugins(azalea::swarm::DefaultSwarmPlugins); - /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle); - /// # #[derive(Clone, Component, Default, Resource)] - /// # pub struct State; - /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { - /// # Ok(()) - /// # } - /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> { - /// # Ok(()) - /// # } - /// ``` - /// - /// [`ClientBuilder::new_without_plugins`]: crate::ClientBuilder::new_without_plugins - #[must_use] - pub fn new_without_plugins() -> Self { - SwarmBuilder { - // we create the app here so plugins can add onto it. - // the schedules won't run until [`Self::start`] is called. - - // `App::new()` is used instead of `SubApp::new()` so the necessary resources are - // initialized - app: mem::take(App::new().main_mut()), - accounts: Vec::new(), - states: Vec::new(), - swarm_state: NoSwarmState, - handler: None, - swarm_handler: None, - join_delay: None, - reconnect_after: Some(DEFAULT_RECONNECT_DELAY), - } - } -} - -impl<SS, SR> SwarmBuilder<NoState, SS, (), SR> -where - SS: Default + Send + Sync + Clone + Resource + 'static, -{ - /// Set the function that's called every time a bot receives an - /// [`enum@Event`]. This is the intended way to handle normal per-bot - /// events. - /// - /// Currently you can have up to one handler. - /// - /// Note that if you're creating clients directly from the ECS using - /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then - /// the handler function won't be called for that client. This also applies - /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for - /// most bots, though. - /// - /// ``` - /// # use azalea::{prelude::*, swarm::prelude::*}; - /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle); - /// swarm_builder.set_handler(handle); - /// - /// #[derive(Clone, Component, Default)] - /// struct State {} - /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { - /// Ok(()) - /// } - /// - /// # #[derive(Clone, Default, Resource)] - /// # struct SwarmState {} - /// # async fn swarm_handle( - /// # mut swarm: Swarm, - /// # event: SwarmEvent, - /// # state: SwarmState, - /// # ) -> anyhow::Result<()> { - /// # Ok(()) - /// # } - /// ``` - /// - /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent - #[must_use] - pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR> - where - Fut: Future<Output = R> + Send + 'static, - S: Send + Sync + Clone + Component + Default + 'static, - { - SwarmBuilder { - handler: Some(Box::new(move |bot, event, state: S| { - Box::pin(handler(bot, event, state)) - })), - // if we added accounts before the State was set, we've gotta set it to the default now - states: vec![S::default(); self.accounts.len()], - app: self.app, - ..self - } - } -} - -impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()> -where - S: Send + Sync + Clone + Component + 'static, -{ - /// Set the function that's called every time the swarm receives a - /// [`SwarmEvent`]. This is the intended way to handle global swarm events. - /// - /// Currently you can have up to one swarm handler. - /// - /// Note that if you're creating clients directly from the ECS using - /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then - /// this handler function won't be called for that client. This also applies - /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for - /// most bots, though. - /// - /// ``` - /// # use azalea::{prelude::*, swarm::prelude::*}; - /// # let swarm_builder = SwarmBuilder::new().set_handler(handle); - /// swarm_builder.set_swarm_handler(swarm_handle); - /// - /// # #[derive(Clone, Component, Default)] - /// # struct State {} - /// - /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> { - /// # Ok(()) - /// # } - /// - /// #[derive(Clone, Default, Resource)] - /// struct SwarmState {} - /// async fn swarm_handle( - /// mut swarm: Swarm, - /// event: SwarmEvent, - /// state: SwarmState, - /// ) -> anyhow::Result<()> { - /// Ok(()) - /// } - /// ``` - /// - /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent - #[must_use] - pub fn set_swarm_handler<SS, Fut, SR>( - self, - handler: SwarmHandleFn<SS, Fut>, - ) -> SwarmBuilder<S, SS, R, SR> - where - SS: Default + Send + Sync + Clone + Resource + 'static, - Fut: Future<Output = SR> + Send + 'static, - { - SwarmBuilder { - handler: self.handler, - app: self.app, - accounts: self.accounts, - states: self.states, - swarm_state: SS::default(), - swarm_handler: Some(Box::new(move |swarm, event, state| { - Box::pin(handler(swarm, event, state)) - })), - join_delay: self.join_delay, - reconnect_after: self.reconnect_after, - } - } -} - -impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR> -where - S: Send + Sync + Clone + Component + 'static, - SS: Default + Send + Sync + Clone + Resource + 'static, - R: Send + 'static, - SR: Send + 'static, -{ - /// Add a vec of [`Account`]s to the swarm. - /// - /// Use [`Self::add_account`] to only add one account. If you want the - /// clients to have different default states, add them one at a time with - /// [`Self::add_account_with_state`]. - /// - /// By default, every account will join at the same time, you can add a - /// delay with [`Self::join_delay`]. - #[must_use] - pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self - where - S: Default, - { - for account in accounts { - self = self.add_account(account); - } - - self - } - - /// Add a single new [`Account`] to the swarm. - /// - /// Use [`Self::add_accounts`] to add multiple accounts at a time. - /// - /// This will make the state for this client be the default, use - /// [`Self::add_account_with_state`] to avoid that. - #[must_use] - pub fn add_account(self, account: Account) -> Self - where - S: Default, - { - self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default()) - } - - /// Add an account with a custom initial state. - /// - /// Use just [`Self::add_account`] to use the `Default` implementation for - /// the state. - #[must_use] - pub fn add_account_with_state(self, account: Account, state: S) -> Self { - self.add_account_with_state_and_opts(account, state, JoinOpts::default()) - } - - /// Add an account with a custom initial state. - /// - /// Use just [`Self::add_account`] to use the `Default` implementation for - /// the state. - #[must_use] - pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self - where - S: Default, - { - self.add_account_with_state_and_opts(account, S::default(), opts) - } - - /// Same as [`Self::add_account_with_state`], but allow passing in custom - /// join options. - #[must_use] - pub fn add_account_with_state_and_opts( - mut self, - account: Account, - state: S, - join_opts: JoinOpts, - ) -> Self { - self.accounts.push((account, join_opts)); - self.states.push(state); - self - } - - /// Set the swarm state instead of initializing defaults. - #[must_use] - pub fn set_swarm_state(mut self, swarm_state: SS) -> Self { - self.swarm_state = swarm_state; - self - } - - /// Add one or more plugins to this swarm. - /// - /// See [`Self::new_without_plugins`] to learn how to disable default - /// plugins. - #[must_use] - pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self { - self.app.add_plugins(plugins); - self - } - - /// Set how long we should wait between each bot joining the server. - /// - /// By default, every bot will connect at the same time. If you set this - /// field, however, the bots will wait for the previous one to have - /// connected and *then* they'll wait the given duration. - #[must_use] - pub fn join_delay(mut self, delay: Duration) -> Self { - self.join_delay = Some(delay); - self - } - - /// Configures the auto-reconnection behavior for our bots. - /// - /// If this is `Some`, then it'll set the default reconnection delay for our - /// bots (how long they'll wait after being kicked before they try - /// rejoining). if it's `None`, then auto-reconnecting will be disabled. - /// - /// If this function isn't called, then our clients will reconnect after - /// [`DEFAULT_RECONNECT_DELAY`]. - #[must_use] - pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self { - self.reconnect_after = delay.into(); - self - } - - /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given - /// server. - /// - /// The `address` argument can be a `&str`, [`ServerAddr`], - /// [`ResolvedAddr`], or anything else that implements [`ResolvableAddr`]. - /// - /// [`ServerAddr`]: azalea_protocol::address::ServerAddr - pub async fn start(self, address: impl ResolvableAddr) -> AppExit { - self.start_with_opts(address, JoinOpts::default()).await - } - - #[doc(hidden)] - #[deprecated = "renamed to `start_with_opts`."] - pub async fn start_with_default_opts( - self, - address: impl ResolvableAddr, - default_join_opts: JoinOpts, - ) -> AppExit { - self.start_with_opts(address, default_join_opts).await - } - - /// Do the same as [`Self::start`], but allow passing in default join - /// options for the bots. - pub async fn start_with_opts( - mut self, - address: impl ResolvableAddr, - join_opts: JoinOpts, - ) -> AppExit { - assert_eq!( - self.accounts.len(), - self.states.len(), - "There must be exactly one state per bot." - ); - - debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION")); - - let address = if let Some(socket_addr) = join_opts.custom_socket_addr { - let server_addr = if let Some(server_addr) = join_opts - .custom_server_addr - .clone() - .or_else(|| address.clone().server_addr().ok()) - { - server_addr - } else { - error!( - "Failed to parse address: {address:?}. If this was expected, consider passing in a `ServerAddr` instead." - ); - return AppExit::error(); - }; - - ResolvedAddr { - server: server_addr, - socket: socket_addr, - } - } else { - let Ok(addr) = address.clone().resolve().await else { - error!( - "Failed to resolve address: {address:?}. If this was expected, consider resolving the address earlier with `ResolvableAddr::resolve`." - ); - return AppExit::error(); - }; - addr - }; - - let instance_container = Arc::new(RwLock::new(InstanceContainer::default())); - - // we can't modify the swarm plugins after this - let (bots_tx, mut bots_rx) = mpsc::unbounded_channel(); - let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel(); - - swarm_tx.send(SwarmEvent::Init).unwrap(); - - let main_schedule_label = self.app.update_schedule.unwrap(); - - let local_set = task::LocalSet::new(); - - local_set.run_until(async move { - // start_ecs_runner must be run inside of the LocalSet - let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app); - - let swarm = Swarm { - ecs_lock: ecs_lock.clone(), - - address: Arc::new(RwLock::new(address)), - instance_container, - - bots_tx, - - swarm_tx: swarm_tx.clone(), - }; - - // run the main schedule so the startup systems run - { - let mut ecs = ecs_lock.lock(); - ecs.insert_resource(swarm.clone()); - ecs.insert_resource(self.swarm_state.clone()); - if let Some(reconnect_after) = self.reconnect_after { - ecs.insert_resource(AutoReconnectDelay { - delay: reconnect_after, - }); - } else { - ecs.remove_resource::<AutoReconnectDelay>(); - } - ecs.run_schedule(main_schedule_label); - ecs.clear_trackers(); - } - - // only do this after we inserted the Swarm and state resources to avoid errors - // where Res<Swarm> is inaccessible - start_running_systems(); - - // SwarmBuilder (self) isn't Send so we have to take all the things we need out - // of it - let swarm_clone = swarm.clone(); - let join_delay = self.join_delay; - let accounts = self.accounts.clone(); - let states = self.states.clone(); - - task::spawn_local(async move { - if let Some(join_delay) = join_delay { - // if there's a join delay, then join one by one - for ((account, bot_join_opts), state) in accounts.iter().zip(states) { - let mut join_opts = join_opts.clone(); - join_opts.update(bot_join_opts); - let _ = swarm_clone.add_with_opts(account, state, &join_opts).await; - tokio::time::sleep(join_delay).await; - } - } else { - // otherwise, join all at once - let swarm_borrow = &swarm_clone; - join_all(accounts.iter().zip(states).map( - |((account, bot_join_opts), state)| async { - let mut join_opts = join_opts.clone(); - join_opts.update(bot_join_opts); - let _ = swarm_borrow - .clone() - .add_with_opts(account, state, &join_opts) - .await; - }, - )) - .await; - } - - swarm_tx.send(SwarmEvent::Login).unwrap(); - }); - - let swarm_state = self.swarm_state; - - // Watch swarm_rx and send those events to the swarm_handle. - let swarm_clone = swarm.clone(); - let swarm_handler_task = task::spawn_local(async move { - while let Some(event) = swarm_rx.recv().await { - if let Some(swarm_handler) = &self.swarm_handler { - task::spawn_local((swarm_handler)( - swarm_clone.clone(), - event, - swarm_state.clone(), - )); - } - } - - unreachable!( - "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail" - ); - }); - - // bot events - let client_handler_task = task::spawn_local(async move { - while let Some((Some(first_event), first_bot)) = bots_rx.recv().await { - if bots_rx.len() > 1_000 { - static WARNED: AtomicBool = AtomicBool::new(false); - if !WARNED.swap(true, atomic::Ordering::Relaxed) { - warn!("the Client Event channel has more than 1000 items!") - } - } - - if let Some(handler) = &self.handler { - let ecs_mutex = first_bot.ecs.clone(); - let mut ecs = ecs_mutex.lock(); - let mut query = ecs.query::<Option<&S>>(); - let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else { - error!( - "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.", - first_bot.username(), - first_bot.entity - ); - continue; - }; - let first_bot_entity = first_bot.entity; - let first_bot_state = first_bot_state.clone(); - - task::spawn_local((handler)(first_bot, first_event, first_bot_state.clone())); - - // this makes it not have to keep locking the ecs - let mut states = HashMap::new(); - states.insert(first_bot_entity, first_bot_state); - while let Ok((Some(event), bot)) = bots_rx.try_recv() { - let state = match states.entry(bot.entity) { - hash_map::Entry::Occupied(e) => e.into_mut(), - hash_map::Entry::Vacant(e) => { - let Ok(Some(state)) = query.get(&ecs, bot.entity) else { - error!( - "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.", - bot.username(), - bot.entity - ); - continue; - }; - let state = state.clone(); - e.insert(state) - } - }; - task::spawn_local((handler)(bot, event, state.clone())); - } - } - } - }); - - let app_exit = appexit_rx - .await - .expect("appexit_tx shouldn't be dropped by the ECS runner before sending"); - - swarm_handler_task.abort(); - client_handler_task.abort(); - - app_exit - }).await - } -} - -impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> { - fn default() -> Self { - Self::new() - } + pub(crate) swarm_tx: mpsc::UnboundedSender<SwarmEvent>, } /// An event about something that doesn't have to do with a single bot. |
