aboutsummaryrefslogtreecommitdiff
path: root/azalea-client/src/plugins/join.rs
diff options
context:
space:
mode:
authormat <27899617+mat-1@users.noreply.github.com>2025-05-02 15:55:58 -0500
committerGitHub <noreply@github.com>2025-05-02 15:55:58 -0500
commit9a40b65bc1912298a43de43fd6e8477a8622a832 (patch)
treec429c62489926d6bbfc1675fea5a1860378d7a00 /azalea-client/src/plugins/join.rs
parent52e34de95cd64a1c8ae1177cd7bc1d67fbab3c71 (diff)
downloadazalea-drasl-9a40b65bc1912298a43de43fd6e8477a8622a832.tar.xz
Add AutoReconnectPlugin (#221)
* add AutoReconnectPlugin * merge main * start simplifying swarm internals * fix Swarm::into_iter, handler functions, DisconnectEvent, and add some more docs * add ClientBuilder/SwarmBuilder::reconnect_after * fix a doctest * reword SwarmEvent::Disconnect doc * better behavior when we try to join twice * reconnect on ConnectionFailedEvent too * autoreconnect is less breaking now
Diffstat (limited to 'azalea-client/src/plugins/join.rs')
-rw-r--r--azalea-client/src/plugins/join.rs116
1 files changed, 92 insertions, 24 deletions
diff --git a/azalea-client/src/plugins/join.rs b/azalea-client/src/plugins/join.rs
index 3f47d90c..e31c64c4 100644
--- a/azalea-client/src/plugins/join.rs
+++ b/azalea-client/src/plugins/join.rs
@@ -1,4 +1,4 @@
-use std::{net::SocketAddr, sync::Arc};
+use std::{io, net::SocketAddr, sync::Arc};
use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
use azalea_protocol::{
@@ -29,22 +29,54 @@ use crate::{
pub struct JoinPlugin;
impl Plugin for JoinPlugin {
fn build(&self, app: &mut App) {
- app.add_event::<StartJoinServerEvent>().add_systems(
- Update,
- (handle_start_join_server_event, poll_create_connection_task),
- );
+ app.add_event::<StartJoinServerEvent>()
+ .add_event::<ConnectionFailedEvent>()
+ .add_systems(
+ Update,
+ (
+ handle_start_join_server_event.before(super::login::poll_auth_task),
+ poll_create_connection_task,
+ handle_connection_failed_events,
+ )
+ .chain(),
+ );
}
}
+/// An event to make a client join the server and be added to our swarm.
+///
+/// This won't do anything if a client with the Account UUID is already
+/// connected to the server.
#[derive(Event, Debug)]
pub struct StartJoinServerEvent {
pub account: Account,
+ pub connect_opts: ConnectOpts,
+ pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
+
+ pub start_join_callback_tx: Option<StartJoinCallback>,
+}
+
+/// Options for how the connection to the server will be made. These are
+/// persisted on reconnects.
+///
+/// This is inserted as a component on clients to make auto-reconnecting work.
+#[derive(Debug, Clone, Component)]
+pub struct ConnectOpts {
pub address: ServerAddress,
pub resolved_address: SocketAddr,
pub proxy: Option<Proxy>,
- pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
+}
- pub start_join_callback_tx: Option<StartJoinCallback>,
+/// An event that's sent when creating the TCP connection and sending the first
+/// packet fails.
+///
+/// This isn't sent if we're kicked later, see [`DisconnectEvent`].
+///
+/// [`DisconnectEvent`]: crate::disconnect::DisconnectEvent
+#[derive(Event)]
+pub struct ConnectionFailedEvent {
+ pub entity: Entity,
+ pub error: ConnectionError,
}
// this is mpsc instead of oneshot so it can be cloned (since it's sent in an
@@ -56,11 +88,30 @@ pub fn handle_start_join_server_event(
mut commands: Commands,
mut events: EventReader<StartJoinServerEvent>,
mut entity_uuid_index: ResMut<EntityUuidIndex>,
+ connection_query: Query<&RawConnection>,
) {
for event in events.read() {
let uuid = event.account.uuid_or_offline();
let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
debug!("Reusing entity {entity:?} for client");
+
+ // check if it's already connected
+ if let Ok(conn) = connection_query.get(entity) {
+ if conn.is_alive() {
+ if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
+ warn!(
+ "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
+ );
+ let _ = start_join_callback_tx.0.send(Ok(entity));
+ } else {
+ warn!(
+ "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
+ );
+ }
+ return;
+ }
+ }
+
entity
} else {
let entity = commands.spawn_empty().id();
@@ -71,12 +122,15 @@ pub fn handle_start_join_server_event(
};
let mut entity_mut = commands.entity(entity);
+
entity_mut.insert((
// add the Account to the entity now so plugins can access it earlier
event.account.to_owned(),
// localentity is always present for our clients, even if we're not actually logged
// in
LocalEntity,
+ // ConnectOpts is inserted as a component here
+ event.connect_opts.clone(),
// we don't insert InLoginState until we actually create the connection. note that
// there's no InHandshakeState component since we switch off of the handshake state
// immediately when the connection is created
@@ -92,11 +146,9 @@ pub fn handle_start_join_server_event(
}
let task_pool = IoTaskPool::get();
- let resolved_addr = event.resolved_address;
- let address = event.address.clone();
- let proxy = event.proxy.clone();
+ let connect_opts = event.connect_opts.clone();
let task = task_pool.spawn(async_compat::Compat::new(
- create_conn_and_send_intention_packet(resolved_addr, address, proxy),
+ create_conn_and_send_intention_packet(connect_opts),
));
entity_mut.insert(CreateConnectionTask(task));
@@ -104,20 +156,18 @@ pub fn handle_start_join_server_event(
}
async fn create_conn_and_send_intention_packet(
- resolved_addr: SocketAddr,
- address: ServerAddress,
- proxy: Option<Proxy>,
+ opts: ConnectOpts,
) -> Result<LoginConn, ConnectionError> {
- let mut conn = if let Some(proxy) = proxy {
- Connection::new_with_proxy(&resolved_addr, proxy).await?
+ let mut conn = if let Some(proxy) = opts.proxy {
+ Connection::new_with_proxy(&opts.resolved_address, proxy).await?
} else {
- Connection::new(&resolved_addr).await?
+ Connection::new(&opts.resolved_address).await?
};
conn.write(ServerboundIntention {
protocol_version: PROTOCOL_VERSION,
- hostname: address.host.clone(),
- port: address.port,
+ hostname: opts.address.host.clone(),
+ port: opts.address.port,
intention: ClientIntention::Login,
})
.await?;
@@ -140,6 +190,7 @@ pub fn poll_create_connection_task(
&Account,
Option<&StartJoinCallback>,
)>,
+ mut connection_failed_events: EventWriter<ConnectionFailedEvent>,
) {
for (entity, mut task, account, mut start_join_callback) in query.iter_mut() {
if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
@@ -147,11 +198,9 @@ pub fn poll_create_connection_task(
entity_mut.remove::<CreateConnectionTask>();
let conn = match poll_res {
Ok(conn) => conn,
- Err(err) => {
- warn!("failed to create connection: {err}");
- if let Some(cb) = start_join_callback.take() {
- let _ = cb.0.send(Err(err.into()));
- }
+ Err(error) => {
+ warn!("failed to create connection: {error}");
+ connection_failed_events.write(ConnectionFailedEvent { entity, error });
return;
}
};
@@ -196,3 +245,22 @@ pub fn poll_create_connection_task(
}
}
}
+
+pub fn handle_connection_failed_events(
+ mut events: EventReader<ConnectionFailedEvent>,
+ query: Query<&StartJoinCallback>,
+) {
+ for event in events.read() {
+ let Ok(start_join_callback) = query.get(event.entity) else {
+ // the StartJoinCallback isn't required to be present, so this is fine
+ continue;
+ };
+
+ // io::Error isn't clonable, so we create a new one based on the `kind` and
+ // `to_string`,
+ let ConnectionError::Io(err) = &event.error;
+ let cloned_err = ConnectionError::Io(io::Error::new(err.kind(), err.to_string()));
+
+ let _ = start_join_callback.0.send(Err(cloned_err.into()));
+ }
+}