aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormat <27899617+mat-1@users.noreply.github.com>2025-05-02 15:55:58 -0500
committerGitHub <noreply@github.com>2025-05-02 15:55:58 -0500
commit9a40b65bc1912298a43de43fd6e8477a8622a832 (patch)
treec429c62489926d6bbfc1675fea5a1860378d7a00
parent52e34de95cd64a1c8ae1177cd7bc1d67fbab3c71 (diff)
downloadazalea-drasl-9a40b65bc1912298a43de43fd6e8477a8622a832.tar.xz
Add AutoReconnectPlugin (#221)
* add AutoReconnectPlugin * merge main * start simplifying swarm internals * fix Swarm::into_iter, handler functions, DisconnectEvent, and add some more docs * add ClientBuilder/SwarmBuilder::reconnect_after * fix a doctest * reword SwarmEvent::Disconnect doc * better behavior when we try to join twice * reconnect on ConnectionFailedEvent too * autoreconnect is less breaking now
-rw-r--r--CHANGELOG.md1
-rw-r--r--azalea-chat/src/lib.rs2
-rw-r--r--azalea-client/src/client.rs65
-rw-r--r--azalea-client/src/plugins/auto_reconnect.rs138
-rw-r--r--azalea-client/src/plugins/disconnect.rs26
-rw-r--r--azalea-client/src/plugins/join.rs116
-rw-r--r--azalea-client/src/plugins/login.rs2
-rw-r--r--azalea-client/src/plugins/mod.rs4
-rw-r--r--azalea/examples/testbot/main.rs8
-rw-r--r--azalea/src/lib.rs26
-rw-r--r--azalea/src/swarm/mod.rs153
11 files changed, 422 insertions, 119 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f71d6b49..68702b49 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -17,6 +17,7 @@ write down most non-trivial breaking changes.
- azalea and azalea-client now have a `packet-event` feature, which can be disabled for efficiency if you're not using `Event::Packet`.
- `StartJoinServerEvent` can now be used to join servers exclusively from the ECS without a Tokio runtime.
- `FormattedText::to_html` and `FormattedText::to_custom_format`.
+- Add auto-reconnecting which is enabled by default.
### Changed
diff --git a/azalea-chat/src/lib.rs b/azalea-chat/src/lib.rs
index faa54d70..f01d8835 100644
--- a/azalea-chat/src/lib.rs
+++ b/azalea-chat/src/lib.rs
@@ -8,4 +8,4 @@ pub mod style;
pub mod text_component;
pub mod translatable_component;
-pub use component::{FormattedText, DEFAULT_STYLE};
+pub use component::{DEFAULT_STYLE, FormattedText};
diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs
index 2ebf44b5..dc9a3d3e 100644
--- a/azalea-client/src/client.rs
+++ b/azalea-client/src/client.rs
@@ -54,7 +54,7 @@ use crate::{
events::Event,
interact::CurrentSequenceNumber,
inventory::Inventory,
- join::{StartJoinCallback, StartJoinServerEvent},
+ join::{ConnectOpts, StartJoinCallback, StartJoinServerEvent},
local_player::{
GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList,
},
@@ -107,22 +107,20 @@ pub enum JoinError {
Disconnect { reason: FormattedText },
}
-pub struct StartClientOpts<'a> {
+pub struct StartClientOpts {
pub ecs_lock: Arc<Mutex<World>>,
- pub account: &'a Account,
- pub address: &'a ServerAddress,
- pub resolved_address: &'a SocketAddr,
- pub proxy: Option<Proxy>,
+ pub account: Account,
+ pub connect_opts: ConnectOpts,
pub event_sender: Option<mpsc::UnboundedSender<Event>>,
}
-impl<'a> StartClientOpts<'a> {
+impl StartClientOpts {
pub fn new(
- account: &'a Account,
- address: &'a ServerAddress,
- resolved_address: &'a SocketAddr,
+ account: Account,
+ address: ServerAddress,
+ resolved_address: SocketAddr,
event_sender: Option<mpsc::UnboundedSender<Event>>,
- ) -> StartClientOpts<'a> {
+ ) -> StartClientOpts {
let mut app = App::new();
app.add_plugins(DefaultPlugins);
@@ -132,15 +130,17 @@ impl<'a> StartClientOpts<'a> {
Self {
ecs_lock,
account,
- address,
- resolved_address,
- proxy: None,
+ connect_opts: ConnectOpts {
+ address,
+ resolved_address,
+ proxy: None,
+ },
event_sender,
}
}
pub fn proxy(mut self, proxy: Proxy) -> Self {
- self.proxy = Some(proxy);
+ self.connect_opts.proxy = Some(proxy);
self
}
}
@@ -173,14 +173,14 @@ impl Client {
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let account = Account::offline("bot");
- /// let (client, rx) = Client::join(&account, "localhost").await?;
+ /// let (client, rx) = Client::join(account, "localhost").await?;
/// client.chat("Hello, world!");
/// client.disconnect();
/// Ok(())
/// }
/// ```
pub async fn join(
- account: &Account,
+ account: Account,
address: impl TryInto<ServerAddress>,
) -> Result<(Self, mpsc::UnboundedReceiver<Event>), JoinError> {
let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?;
@@ -189,8 +189,8 @@ impl Client {
let client = Self::start_client(StartClientOpts::new(
account,
- &address,
- &resolved_address,
+ address,
+ resolved_address,
Some(tx),
))
.await?;
@@ -198,7 +198,7 @@ impl Client {
}
pub async fn join_with_proxy(
- account: &Account,
+ account: Account,
address: impl TryInto<ServerAddress>,
proxy: Proxy,
) -> Result<(Self, mpsc::UnboundedReceiver<Event>), JoinError> {
@@ -207,7 +207,7 @@ impl Client {
let (tx, rx) = mpsc::unbounded_channel();
let client = Self::start_client(
- StartClientOpts::new(account, &address, &resolved_address, Some(tx)).proxy(proxy),
+ StartClientOpts::new(account, address, resolved_address, Some(tx)).proxy(proxy),
)
.await?;
Ok((client, rx))
@@ -219,28 +219,21 @@ impl Client {
StartClientOpts {
ecs_lock,
account,
- address,
- resolved_address,
- proxy,
+ connect_opts,
event_sender,
- }: StartClientOpts<'_>,
+ }: StartClientOpts,
) -> Result<Self, JoinError> {
// send a StartJoinServerEvent
let (start_join_callback_tx, mut start_join_callback_rx) =
mpsc::unbounded_channel::<Result<Entity, JoinError>>();
- {
- let mut ecs = ecs_lock.lock();
- ecs.send_event(StartJoinServerEvent {
- account: account.clone(),
- address: address.clone(),
- resolved_address: *resolved_address,
- proxy,
- event_sender: event_sender.clone(),
- start_join_callback_tx: Some(StartJoinCallback(start_join_callback_tx)),
- });
- }
+ ecs_lock.lock().send_event(StartJoinServerEvent {
+ account,
+ connect_opts,
+ event_sender,
+ start_join_callback_tx: Some(StartJoinCallback(start_join_callback_tx)),
+ });
let entity = start_join_callback_rx.recv().await.expect(
"StartJoinCallback should not be dropped before sending a message, this is a bug in Azalea",
diff --git a/azalea-client/src/plugins/auto_reconnect.rs b/azalea-client/src/plugins/auto_reconnect.rs
new file mode 100644
index 00000000..280aaa65
--- /dev/null
+++ b/azalea-client/src/plugins/auto_reconnect.rs
@@ -0,0 +1,138 @@
+//! Auto-reconnect to the server when the client is kicked.
+//!
+//! See [`AutoReconnectPlugin`] for more information.
+
+use std::time::{Duration, Instant};
+
+use bevy_app::prelude::*;
+use bevy_ecs::prelude::*;
+
+use super::{
+ disconnect::DisconnectEvent,
+ events::LocalPlayerEvents,
+ join::{ConnectOpts, ConnectionFailedEvent, StartJoinServerEvent},
+};
+use crate::Account;
+
+/// The default delay that Azalea will use for reconnecting our clients. See
+/// [`AutoReconnectPlugin`] for more information.
+pub const DEFAULT_RECONNECT_DELAY: Duration = Duration::from_secs(5);
+
+/// A default plugin that makes clients automatically rejoin the server when
+/// they're disconnected. The reconnect delay is configurable globally or
+/// per-client with the [`AutoReconnectDelay`] resource/component. Auto
+/// reconnecting can be disabled by removing the resource from the ECS.
+///
+/// The delay defaults to [`DEFAULT_RECONNECT_DELAY`].
+pub struct AutoReconnectPlugin;
+impl Plugin for AutoReconnectPlugin {
+ fn build(&self, app: &mut App) {
+ app.insert_resource(AutoReconnectDelay::new(DEFAULT_RECONNECT_DELAY))
+ .add_systems(
+ Update,
+ (start_rejoin_on_disconnect, rejoin_after_delay)
+ .chain()
+ .before(super::join::handle_start_join_server_event),
+ );
+ }
+}
+
+pub fn start_rejoin_on_disconnect(
+ mut commands: Commands,
+ mut disconnect_events: EventReader<DisconnectEvent>,
+ mut connection_failed_events: EventReader<ConnectionFailedEvent>,
+ auto_reconnect_delay_res: Option<Res<AutoReconnectDelay>>,
+ auto_reconnect_delay_query: Query<&AutoReconnectDelay>,
+) {
+ for entity in disconnect_events
+ .read()
+ .map(|e| e.entity)
+ .chain(connection_failed_events.read().map(|e| e.entity))
+ {
+ let Some(delay) = get_delay(
+ &auto_reconnect_delay_res,
+ auto_reconnect_delay_query,
+ entity,
+ ) else {
+ // no auto reconnect
+ continue;
+ };
+
+ let reconnect_after = Instant::now() + delay;
+ commands.entity(entity).insert(InternalReconnectAfter {
+ instant: reconnect_after,
+ });
+ }
+}
+
+fn get_delay(
+ auto_reconnect_delay_res: &Option<Res<AutoReconnectDelay>>,
+ auto_reconnect_delay_query: Query<&AutoReconnectDelay>,
+ entity: Entity,
+) -> Option<Duration> {
+ if let Ok(c) = auto_reconnect_delay_query.get(entity) {
+ Some(c.delay)
+ } else if let Some(r) = &auto_reconnect_delay_res {
+ Some(r.delay)
+ } else {
+ None
+ }
+}
+
+pub fn rejoin_after_delay(
+ mut commands: Commands,
+ mut join_events: EventWriter<StartJoinServerEvent>,
+ query: Query<(
+ Entity,
+ &InternalReconnectAfter,
+ &Account,
+ &ConnectOpts,
+ Option<&LocalPlayerEvents>,
+ )>,
+) {
+ for (entity, reconnect_after, account, connect_opts, local_player_events) in query.iter() {
+ if Instant::now() >= reconnect_after.instant {
+ // don't keep trying to reconnect
+ commands.entity(entity).remove::<InternalReconnectAfter>();
+
+ // our Entity will be reused since the account has the same uuid
+ join_events.write(StartJoinServerEvent {
+ account: account.clone(),
+ connect_opts: connect_opts.clone(),
+ // not actually necessary since we're reusing the same entity and LocalPlayerEvents
+ // isn't removed, but this is more readable and just in case it's changed in the
+ // future
+ event_sender: local_player_events.map(|e| e.0.clone()),
+ start_join_callback_tx: None,
+ });
+ }
+ }
+}
+
+/// A resource *and* component that indicates how long to wait before
+/// reconnecting when we're kicked.
+///
+/// Initially, it's a resource in the ECS set to 5 seconds. You can modify
+/// the resource to update the global reconnect delay, or insert it as a
+/// component to set the individual delay for a single client.
+///
+/// You can also remove this resource from the ECS to disable the default
+/// auto-reconnecting behavior. Inserting the resource/component again will not
+/// make clients that were already disconnected automatically reconnect.
+#[derive(Resource, Component, Debug, Clone)]
+pub struct AutoReconnectDelay {
+ pub delay: Duration,
+}
+impl AutoReconnectDelay {
+ pub fn new(delay: Duration) -> Self {
+ Self { delay }
+ }
+}
+
+/// This is inserted when we're disconnected and indicates when we'll reconnect.
+///
+/// This is set based on [`AutoReconnectDelay`].
+#[derive(Component, Debug, Clone)]
+pub struct InternalReconnectAfter {
+ pub instant: Instant,
+}
diff --git a/azalea-client/src/plugins/disconnect.rs b/azalea-client/src/plugins/disconnect.rs
index 343c25d8..987007c2 100644
--- a/azalea-client/src/plugins/disconnect.rs
+++ b/azalea-client/src/plugins/disconnect.rs
@@ -7,10 +7,7 @@ use bevy_ecs::prelude::*;
use derive_more::Deref;
use tracing::info;
-use crate::{
- InstanceHolder, client::JoinedClientBundle, connection::RawConnection,
- events::LocalPlayerEvents,
-};
+use crate::{InstanceHolder, client::JoinedClientBundle, connection::RawConnection};
pub struct DisconnectPlugin;
impl Plugin for DisconnectPlugin {
@@ -19,15 +16,28 @@ impl Plugin for DisconnectPlugin {
PostUpdate,
(
update_read_packets_task_running_component,
- disconnect_on_connection_dead,
remove_components_from_disconnected_players,
+ // this happens after `remove_components_from_disconnected_players` since that
+ // system removes `IsConnectionAlive`, which ensures that
+ // `DisconnectEvent` won't get called again from
+ // `disconnect_on_connection_dead`
+ disconnect_on_connection_dead,
)
.chain(),
);
}
}
-/// An event sent when a client is getting disconnected.
+/// An event sent when a client got disconnected from the server.
+///
+/// If the client was kicked with a reason, that reason will be present in the
+/// [`reason`](DisconnectEvent::reason) field.
+///
+/// This event won't be sent if creating the initial connection to the server
+/// failed, for that see [`ConnectionFailedEvent`].
+///
+/// [`ConnectionFailedEvent`]: crate::join::ConnectionFailedEvent
+
#[derive(Event)]
pub struct DisconnectEvent {
pub entity: Entity,
@@ -59,8 +69,8 @@ pub fn remove_components_from_disconnected_players(
.remove::<InLoadedChunk>()
// this makes it close the tcp connection
.remove::<RawConnection>()
- // swarm detects when this tx gets dropped to fire SwarmEvent::Disconnect
- .remove::<LocalPlayerEvents>();
+ // this makes it not send DisconnectEvent again
+ .remove::<IsConnectionAlive>();
// note that we don't remove the client from the ECS, so if they decide
// to reconnect they'll keep their state
diff --git a/azalea-client/src/plugins/join.rs b/azalea-client/src/plugins/join.rs
index 3f47d90c..e31c64c4 100644
--- a/azalea-client/src/plugins/join.rs
+++ b/azalea-client/src/plugins/join.rs
@@ -1,4 +1,4 @@
-use std::{net::SocketAddr, sync::Arc};
+use std::{io, net::SocketAddr, sync::Arc};
use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
use azalea_protocol::{
@@ -29,22 +29,54 @@ use crate::{
pub struct JoinPlugin;
impl Plugin for JoinPlugin {
fn build(&self, app: &mut App) {
- app.add_event::<StartJoinServerEvent>().add_systems(
- Update,
- (handle_start_join_server_event, poll_create_connection_task),
- );
+ app.add_event::<StartJoinServerEvent>()
+ .add_event::<ConnectionFailedEvent>()
+ .add_systems(
+ Update,
+ (
+ handle_start_join_server_event.before(super::login::poll_auth_task),
+ poll_create_connection_task,
+ handle_connection_failed_events,
+ )
+ .chain(),
+ );
}
}
+/// An event to make a client join the server and be added to our swarm.
+///
+/// This won't do anything if a client with the Account UUID is already
+/// connected to the server.
#[derive(Event, Debug)]
pub struct StartJoinServerEvent {
pub account: Account,
+ pub connect_opts: ConnectOpts,
+ pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
+
+ pub start_join_callback_tx: Option<StartJoinCallback>,
+}
+
+/// Options for how the connection to the server will be made. These are
+/// persisted on reconnects.
+///
+/// This is inserted as a component on clients to make auto-reconnecting work.
+#[derive(Debug, Clone, Component)]
+pub struct ConnectOpts {
pub address: ServerAddress,
pub resolved_address: SocketAddr,
pub proxy: Option<Proxy>,
- pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
+}
- pub start_join_callback_tx: Option<StartJoinCallback>,
+/// An event that's sent when creating the TCP connection and sending the first
+/// packet fails.
+///
+/// This isn't sent if we're kicked later, see [`DisconnectEvent`].
+///
+/// [`DisconnectEvent`]: crate::disconnect::DisconnectEvent
+#[derive(Event)]
+pub struct ConnectionFailedEvent {
+ pub entity: Entity,
+ pub error: ConnectionError,
}
// this is mpsc instead of oneshot so it can be cloned (since it's sent in an
@@ -56,11 +88,30 @@ pub fn handle_start_join_server_event(
mut commands: Commands,
mut events: EventReader<StartJoinServerEvent>,
mut entity_uuid_index: ResMut<EntityUuidIndex>,
+ connection_query: Query<&RawConnection>,
) {
for event in events.read() {
let uuid = event.account.uuid_or_offline();
let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
debug!("Reusing entity {entity:?} for client");
+
+ // check if it's already connected
+ if let Ok(conn) = connection_query.get(entity) {
+ if conn.is_alive() {
+ if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
+ warn!(
+ "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
+ );
+ let _ = start_join_callback_tx.0.send(Ok(entity));
+ } else {
+ warn!(
+ "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
+ );
+ }
+ return;
+ }
+ }
+
entity
} else {
let entity = commands.spawn_empty().id();
@@ -71,12 +122,15 @@ pub fn handle_start_join_server_event(
};
let mut entity_mut = commands.entity(entity);
+
entity_mut.insert((
// add the Account to the entity now so plugins can access it earlier
event.account.to_owned(),
// localentity is always present for our clients, even if we're not actually logged
// in
LocalEntity,
+ // ConnectOpts is inserted as a component here
+ event.connect_opts.clone(),
// we don't insert InLoginState until we actually create the connection. note that
// there's no InHandshakeState component since we switch off of the handshake state
// immediately when the connection is created
@@ -92,11 +146,9 @@ pub fn handle_start_join_server_event(
}
let task_pool = IoTaskPool::get();
- let resolved_addr = event.resolved_address;
- let address = event.address.clone();
- let proxy = event.proxy.clone();
+ let connect_opts = event.connect_opts.clone();
let task = task_pool.spawn(async_compat::Compat::new(
- create_conn_and_send_intention_packet(resolved_addr, address, proxy),
+ create_conn_and_send_intention_packet(connect_opts),
));
entity_mut.insert(CreateConnectionTask(task));
@@ -104,20 +156,18 @@ pub fn handle_start_join_server_event(
}
async fn create_conn_and_send_intention_packet(
- resolved_addr: SocketAddr,
- address: ServerAddress,
- proxy: Option<Proxy>,
+ opts: ConnectOpts,
) -> Result<LoginConn, ConnectionError> {
- let mut conn = if let Some(proxy) = proxy {
- Connection::new_with_proxy(&resolved_addr, proxy).await?
+ let mut conn = if let Some(proxy) = opts.proxy {
+ Connection::new_with_proxy(&opts.resolved_address, proxy).await?
} else {
- Connection::new(&resolved_addr).await?
+ Connection::new(&opts.resolved_address).await?
};
conn.write(ServerboundIntention {
protocol_version: PROTOCOL_VERSION,
- hostname: address.host.clone(),
- port: address.port,
+ hostname: opts.address.host.clone(),
+ port: opts.address.port,
intention: ClientIntention::Login,
})
.await?;
@@ -140,6 +190,7 @@ pub fn poll_create_connection_task(
&Account,
Option<&StartJoinCallback>,
)>,
+ mut connection_failed_events: EventWriter<ConnectionFailedEvent>,
) {
for (entity, mut task, account, mut start_join_callback) in query.iter_mut() {
if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
@@ -147,11 +198,9 @@ pub fn poll_create_connection_task(
entity_mut.remove::<CreateConnectionTask>();
let conn = match poll_res {
Ok(conn) => conn,
- Err(err) => {
- warn!("failed to create connection: {err}");
- if let Some(cb) = start_join_callback.take() {
- let _ = cb.0.send(Err(err.into()));
- }
+ Err(error) => {
+ warn!("failed to create connection: {error}");
+ connection_failed_events.write(ConnectionFailedEvent { entity, error });
return;
}
};
@@ -196,3 +245,22 @@ pub fn poll_create_connection_task(
}
}
}
+
+pub fn handle_connection_failed_events(
+ mut events: EventReader<ConnectionFailedEvent>,
+ query: Query<&StartJoinCallback>,
+) {
+ for event in events.read() {
+ let Ok(start_join_callback) = query.get(event.entity) else {
+ // the StartJoinCallback isn't required to be present, so this is fine
+ continue;
+ };
+
+ // io::Error isn't clonable, so we create a new one based on the `kind` and
+ // `to_string`,
+ let ConnectionError::Io(err) = &event.error;
+ let cloned_err = ConnectionError::Io(io::Error::new(err.kind(), err.to_string()));
+
+ let _ = start_join_callback.0.send(Err(cloned_err.into()));
+ }
+}
diff --git a/azalea-client/src/plugins/login.rs b/azalea-client/src/plugins/login.rs
index 9a871ac3..357769e9 100644
--- a/azalea-client/src/plugins/login.rs
+++ b/azalea-client/src/plugins/login.rs
@@ -33,7 +33,7 @@ fn handle_receive_hello_event(trigger: Trigger<ReceiveHelloEvent>, mut commands:
commands.entity(player).insert(AuthTask(task));
}
-fn poll_auth_task(
+pub fn poll_auth_task(
mut commands: Commands,
mut query: Query<(Entity, &mut AuthTask, &mut RawConnection)>,
) {
diff --git a/azalea-client/src/plugins/mod.rs b/azalea-client/src/plugins/mod.rs
index 431d59b2..f657b9e9 100644
--- a/azalea-client/src/plugins/mod.rs
+++ b/azalea-client/src/plugins/mod.rs
@@ -1,6 +1,7 @@
use bevy_app::{PluginGroup, PluginGroupBuilder};
pub mod attack;
+pub mod auto_reconnect;
pub mod brand;
pub mod chat;
pub mod chunks;
@@ -51,7 +52,8 @@ impl PluginGroup for DefaultPlugins {
.add(pong::PongPlugin)
.add(connection::ConnectionPlugin)
.add(login::LoginPlugin)
- .add(join::JoinPlugin);
+ .add(join::JoinPlugin)
+ .add(auto_reconnect::AutoReconnectPlugin);
#[cfg(feature = "log")]
{
group = group.add(bevy_log::LogPlugin::default());
diff --git a/azalea/examples/testbot/main.rs b/azalea/examples/testbot/main.rs
index c25904cf..6733d797 100644
--- a/azalea/examples/testbot/main.rs
+++ b/azalea/examples/testbot/main.rs
@@ -192,14 +192,10 @@ async fn handle(bot: Client, event: azalea::Event, state: State) -> anyhow::Resu
Ok(())
}
-async fn swarm_handle(swarm: Swarm, event: SwarmEvent, _state: SwarmState) -> anyhow::Result<()> {
+async fn swarm_handle(_swarm: Swarm, event: SwarmEvent, _state: SwarmState) -> anyhow::Result<()> {
match &event {
- SwarmEvent::Disconnect(account, join_opts) => {
+ SwarmEvent::Disconnect(account, _join_opts) => {
println!("bot got kicked! {}", account.username);
- tokio::time::sleep(Duration::from_secs(5)).await;
- swarm
- .add_and_retry_forever_with_opts(account, State::default(), join_opts)
- .await;
}
SwarmEvent::Chat(chat) => {
if chat.message().to_string() == "The particle was not visible for anybody" {
diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs
index d63ea6c3..3f388e42 100644
--- a/azalea/src/lib.rs
+++ b/azalea/src/lib.rs
@@ -14,6 +14,7 @@ pub mod prelude;
pub mod swarm;
use std::net::SocketAddr;
+use std::time::Duration;
use app::Plugins;
pub use azalea_auth as auth;
@@ -126,7 +127,12 @@ impl ClientBuilder<NoState, ()> {
/// Set the function that's called every time a bot receives an [`Event`].
/// This is the way to handle normal per-bot events.
///
- /// Currently you can have up to one client handler.
+ /// Currently, you can have up to one client handler.
+ ///
+ /// Note that if you're creating clients directly from the ECS using
+ /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
+ /// the handler function won't be called for that client. This shouldn't be
+ /// a concern for most bots, though.
///
/// ```
/// # use azalea::prelude::*;
@@ -139,6 +145,8 @@ impl ClientBuilder<NoState, ()> {
/// Ok(())
/// }
/// ```
+ ///
+ /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
#[must_use]
pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> ClientBuilder<S, R>
where
@@ -169,6 +177,22 @@ where
self
}
+ /// Configures the auto-reconnection behavior for our bot.
+ ///
+ /// If this is `Some`, then it'll set the default reconnection delay for our
+ /// bot (how long it'll wait after being kicked before it tries
+ /// rejoining). if it's `None`, then auto-reconnecting will be disabled.
+ ///
+ /// If this function isn't called, then our client will reconnect after
+ /// [`DEFAULT_RECONNECT_DELAY`].
+ ///
+ /// [`DEFAULT_RECONNECT_DELAY`]: azalea_client::auto_reconnect::DEFAULT_RECONNECT_DELAY
+ #[must_use]
+ pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
+ self.swarm.reconnect_after = delay.into();
+ self
+ }
+
/// Build this `ClientBuilder` into an actual [`Client`] and join the given
/// server. If the client can't join, it'll keep retrying forever until it
/// can.
diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs
index 4fd5120a..57a12608 100644
--- a/azalea/src/swarm/mod.rs
+++ b/azalea/src/swarm/mod.rs
@@ -19,9 +19,13 @@ use std::{
};
use azalea_client::{
- Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts, chat::ChatPacket,
+ Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts,
+ auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY},
+ chat::ChatPacket,
+ join::ConnectOpts,
start_ecs_runner,
};
+use azalea_entity::LocalEntity;
use azalea_protocol::{ServerAddress, resolver};
use azalea_world::InstanceContainer;
use bevy_app::{App, PluginGroup, PluginGroupBuilder, Plugins, SubApp};
@@ -46,15 +50,15 @@ use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartEr
pub struct Swarm {
pub ecs_lock: Arc<Mutex<World>>,
- bots: Arc<Mutex<HashMap<Entity, Client>>>,
-
// the address is public and mutable so plugins can change it
pub resolved_address: Arc<RwLock<SocketAddr>>,
pub address: Arc<RwLock<ServerAddress>>,
pub instance_container: Arc<RwLock<InstanceContainer>>,
+ /// This is used internally to make the client handler function work.
bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
+ /// This is used internally to make the swarm handler function work.
swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
}
@@ -92,7 +96,11 @@ where
/// None to have every bot connect at the same time. None is different than
/// a duration of 0, since if a duration is present the bots will wait for
/// the previous one to be ready.
- pub(crate) join_delay: Option<std::time::Duration>,
+ pub(crate) join_delay: Option<Duration>,
+
+ /// The default reconnection delay for our bots. This will change the value
+ /// of the `AutoReconnectDelay` resource.
+ pub(crate) reconnect_after: Option<Duration>,
}
impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
/// Start creating the swarm.
@@ -144,6 +152,7 @@ impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
handler: None,
swarm_handler: None,
join_delay: None,
+ reconnect_after: Some(DEFAULT_RECONNECT_DELAY),
}
}
}
@@ -157,6 +166,12 @@ where
///
/// Currently you can have up to one handler.
///
+ /// Note that if you're creating clients directly from the ECS using
+ /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
+ /// the handler function won't be called for that client. This also applies
+ /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for
+ /// most bots, though.
+ ///
/// ```
/// # use azalea::{prelude::*, swarm::prelude::*};
/// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
@@ -178,6 +193,8 @@ where
/// # Ok(())
/// # }
/// ```
+ ///
+ /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
#[must_use]
pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
where
@@ -205,6 +222,12 @@ where
///
/// Currently you can have up to one swarm handler.
///
+ /// Note that if you're creating clients directly from the ECS using
+ /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
+ /// this handler function won't be called for that client. This also applies
+ /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for
+ /// most bots, though.
+ ///
/// ```
/// # use azalea::{prelude::*, swarm::prelude::*};
/// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
@@ -227,6 +250,8 @@ where
/// Ok(())
/// }
/// ```
+ ///
+ /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
#[must_use]
pub fn set_swarm_handler<SS, Fut, SR>(
self,
@@ -246,6 +271,7 @@ where
Box::pin(handler(swarm, event, state))
})),
join_delay: self.join_delay,
+ reconnect_after: self.reconnect_after,
}
}
}
@@ -341,11 +367,25 @@ where
/// field, however, the bots will wait for the previous one to have
/// connected and *then* they'll wait the given duration.
#[must_use]
- pub fn join_delay(mut self, delay: std::time::Duration) -> Self {
+ pub fn join_delay(mut self, delay: Duration) -> Self {
self.join_delay = Some(delay);
self
}
+ /// Configures the auto-reconnection behavior for our bots.
+ ///
+ /// If this is `Some`, then it'll set the default reconnection delay for our
+ /// bots (how long they'll wait after being kicked before they try
+ /// rejoining). if it's `None`, then auto-reconnecting will be disabled.
+ ///
+ /// If this function isn't called, then our clients will reconnect after
+ /// [`DEFAULT_RECONNECT_DELAY`].
+ #[must_use]
+ pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
+ self.reconnect_after = delay.into();
+ self
+ }
+
/// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
/// server.
///
@@ -406,7 +446,6 @@ where
let swarm = Swarm {
ecs_lock: ecs_lock.clone(),
- bots: Arc::new(Mutex::new(HashMap::new())),
resolved_address: Arc::new(RwLock::new(resolved_address)),
address: Arc::new(RwLock::new(address)),
@@ -422,6 +461,13 @@ where
let mut ecs = ecs_lock.lock();
ecs.insert_resource(swarm.clone());
ecs.insert_resource(self.swarm_state.clone());
+ if let Some(reconnect_after) = self.reconnect_after {
+ ecs.insert_resource(AutoReconnectDelay {
+ delay: reconnect_after,
+ });
+ } else {
+ ecs.remove_resource::<AutoReconnectDelay>();
+ }
ecs.run_schedule(main_schedule_label);
ecs.clear_trackers();
}
@@ -556,8 +602,12 @@ pub enum SwarmEvent {
Init,
/// A bot got disconnected from the server.
///
- /// You can implement an auto-reconnect by calling [`Swarm::add_with_opts`]
- /// with the account and options from this event.
+ /// If you'd like to implement special auto-reconnect behavior beyond what's
+ /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
+ /// and then call [`Swarm::add_with_opts`] with the account and options
+ /// from this event.
+ ///
+ /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
Disconnect(Box<Account>, JoinOpts),
/// At least one bot received a chat message.
Chat(ChatPacket),
@@ -664,15 +714,18 @@ impl Swarm {
let resolved_address = join_opts
.custom_resolved_address
.unwrap_or_else(|| *self.resolved_address.read());
+ let proxy = join_opts.proxy.clone();
let (tx, rx) = mpsc::unbounded_channel();
let bot = Client::start_client(StartClientOpts {
ecs_lock: self.ecs_lock.clone(),
- account,
- address: &address,
- resolved_address: &resolved_address,
- proxy: join_opts.proxy.clone(),
+ account: account.clone(),
+ connect_opts: ConnectOpts {
+ address,
+ resolved_address,
+ proxy,
+ },
event_sender: Some(tx),
})
.await?;
@@ -682,31 +735,25 @@ impl Swarm {
ecs.entity_mut(bot.entity).insert(state);
}
- self.bots.lock().insert(bot.entity, bot.clone());
-
- let cloned_bots = self.bots.clone();
- let cloned_bots_tx = self.bots_tx.clone();
let cloned_bot = bot.clone();
let swarm_tx = self.swarm_tx.clone();
+ let bots_tx = self.bots_tx.clone();
+
let join_opts = join_opts.clone();
tokio::spawn(Self::event_copying_task(
- rx,
- cloned_bots,
- cloned_bots_tx,
- cloned_bot,
- swarm_tx,
- join_opts,
+ rx, swarm_tx, bots_tx, cloned_bot, join_opts,
));
Ok(bot)
}
+ /// Copy the events from a client's receiver into bots_tx, until the bot is
+ /// removed from the ECS.
async fn event_copying_task(
mut rx: mpsc::UnboundedReceiver<Event>,
- cloned_bots: Arc<Mutex<HashMap<Entity, Client>>>,
- cloned_bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
- cloned_bot: Client,
swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
+ bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
+ bot: Client,
join_opts: JoinOpts,
) {
while let Some(event) = rx.recv().await {
@@ -740,21 +787,33 @@ impl Swarm {
}
}
+ if let Event::Disconnect(_) = event {
+ debug!(
+ "sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
+ bot.entity
+ );
+ let account = bot
+ .get_component::<Account>()
+ .expect("bot is missing required Account component");
+ swarm_tx
+ .send(SwarmEvent::Disconnect(Box::new(account), join_opts.clone()))
+ .unwrap();
+ }
+
// we can't handle events here (since we can't copy the handler),
// they're handled above in SwarmBuilder::start
- if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) {
- error!("Error sending event to swarm: {e}");
+ if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
+ error!(
+ "Error sending event to swarm, aborting event_copying_task for {}: {e}",
+ bot.entity
+ );
+ break;
}
}
- debug!("client sender ended, removing from cloned_bots and sending SwarmEvent::Disconnect");
-
- cloned_bots.lock().remove(&cloned_bot.entity);
- let account = cloned_bot
- .get_component::<Account>()
- .expect("bot is missing required Account component");
- swarm_tx
- .send(SwarmEvent::Disconnect(Box::new(account), join_opts))
- .unwrap();
+ debug!(
+ "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
+ bot.entity
+ );
}
/// Add a new account to the swarm, retrying if it couldn't join. This will
@@ -807,6 +866,17 @@ impl Swarm {
}
}
}
+
+ /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
+ /// This will include clients that were disconnected without being removed
+ /// from the ECS.
+ ///
+ /// [`LocalEntity`]: azalea_entity::LocalEntity
+ pub fn client_entities(&self) -> Box<[Entity]> {
+ let mut ecs = self.ecs_lock.lock();
+ let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
+ query.iter(&ecs).collect::<Box<[Entity]>>()
+ }
}
impl IntoIterator for Swarm {
@@ -827,11 +897,12 @@ impl IntoIterator for Swarm {
/// # }
/// ```
fn into_iter(self) -> Self::IntoIter {
- self.bots
- .lock()
- .clone()
- .into_values()
- .collect::<Vec<_>>()
+ let client_entities = self.client_entities();
+
+ client_entities
+ .into_iter()
+ .map(|entity| Client::new(entity, self.ecs_lock.clone()))
+ .collect::<Box<[Client]>>()
.into_iter()
}
}