diff options
| author | mat <27899617+mat-1@users.noreply.github.com> | 2025-04-17 16:16:51 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-04-17 16:16:51 -0500 |
| commit | 3f60bdadac1a02e1109148bbbe5a8a3545f13849 (patch) | |
| tree | 6c0460be61e715c1b789f81b16ce4c0fb986c3b4 /azalea-client/src/plugins | |
| parent | 1989f4ec979c138f8f466ccebadca335eb2917d6 (diff) | |
| download | azalea-drasl-3f60bdadac1a02e1109148bbbe5a8a3545f13849.tar.xz | |
Move login state to the ECS (#213)
* use packet handlers code for login custom_query
* initial broken implementation for ecs-only login
* fixes
* run Update schedule 60 times per second and delete code related to run_schedule_sender
* fix tests
* fix online-mode
* reply to query packets in a separate system and make it easier for plugins to disable individual replies
* remove unused imports
Diffstat (limited to 'azalea-client/src/plugins')
| -rw-r--r-- | azalea-client/src/plugins/chat/mod.rs | 3 | ||||
| -rw-r--r-- | azalea-client/src/plugins/connection.rs | 369 | ||||
| -rw-r--r-- | azalea-client/src/plugins/disconnect.rs | 4 | ||||
| -rw-r--r-- | azalea-client/src/plugins/events.rs | 6 | ||||
| -rw-r--r-- | azalea-client/src/plugins/login.rs | 152 | ||||
| -rw-r--r-- | azalea-client/src/plugins/mod.rs | 43 | ||||
| -rw-r--r-- | azalea-client/src/plugins/packet/config/events.rs | 76 | ||||
| -rw-r--r-- | azalea-client/src/plugins/packet/config/mod.rs | 180 | ||||
| -rw-r--r-- | azalea-client/src/plugins/packet/game/events.rs | 87 | ||||
| -rw-r--r-- | azalea-client/src/plugins/packet/game/mod.rs | 315 | ||||
| -rw-r--r-- | azalea-client/src/plugins/packet/login.rs | 114 | ||||
| -rw-r--r-- | azalea-client/src/plugins/packet/login/events.rs | 86 | ||||
| -rw-r--r-- | azalea-client/src/plugins/packet/login/mod.rs | 145 | ||||
| -rw-r--r-- | azalea-client/src/plugins/packet/mod.rs | 84 |
14 files changed, 1090 insertions, 574 deletions
diff --git a/azalea-client/src/plugins/chat/mod.rs b/azalea-client/src/plugins/chat/mod.rs index 3d03d24e..8562f3ce 100644 --- a/azalea-client/src/plugins/chat/mod.rs +++ b/azalea-client/src/plugins/chat/mod.rs @@ -152,7 +152,6 @@ impl Client { content: message.to_string(), kind: ChatKind::Message, }); - let _ = self.run_schedule_sender.try_send(()); } /// Send a command packet to the server. The `command` argument should not @@ -166,7 +165,6 @@ impl Client { content: command.to_string(), kind: ChatKind::Command, }); - let _ = self.run_schedule_sender.try_send(()); } /// Send a message in chat. @@ -183,7 +181,6 @@ impl Client { entity: self.entity, content: content.to_string(), }); - let _ = self.run_schedule_sender.try_send(()); } } diff --git a/azalea-client/src/plugins/connection.rs b/azalea-client/src/plugins/connection.rs new file mode 100644 index 00000000..b462535e --- /dev/null +++ b/azalea-client/src/plugins/connection.rs @@ -0,0 +1,369 @@ +use std::{fmt::Debug, io::Cursor, mem, sync::Arc}; + +use azalea_crypto::Aes128CfbEnc; +use azalea_protocol::{ + connect::{RawReadConnection, RawWriteConnection}, + packets::{ + ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket, + game::ClientboundGamePacket, login::ClientboundLoginPacket, + }, + read::{ReadPacketError, deserialize_packet}, + write::serialize_packet, +}; +use bevy_app::prelude::*; +use bevy_ecs::prelude::*; +use bevy_tasks::{IoTaskPool, futures_lite::future}; +use thiserror::Error; +use tokio::{ + io::AsyncWriteExt, + net::tcp::OwnedWriteHalf, + sync::mpsc::{self}, +}; +use tracing::{debug, error, info, trace}; + +use super::packet::{ + config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent, +}; +use crate::packet::{config, game, login}; + +pub struct ConnectionPlugin; +impl Plugin for ConnectionPlugin { + fn build(&self, app: &mut App) { + app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain()); + } +} + +pub fn read_packets(ecs: &mut World) { + // receive_game_packet_events: EventWriter<ReceiveGamePacketEvent>, + let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>(); + let mut conn_query = ecs.query::<&mut RawConnection>(); + + let mut entities_handling_packets = Vec::new(); + let mut entities_with_injected_packets = Vec::new(); + for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) { + if !raw_conn.injected_clientbound_packets.is_empty() { + entities_with_injected_packets.push(( + entity, + mem::take(&mut raw_conn.injected_clientbound_packets), + )); + } + + if raw_conn.network.is_none() { + // no network connection, don't bother with the normal packet handling + continue; + } + + entities_handling_packets.push(entity); + } + + let mut queued_packet_events = QueuedPacketEvents::default(); + + // handle injected packets, see the comment on + // RawConnection::injected_clientbound_packets for more info + for (entity, raw_packets) in entities_with_injected_packets { + for raw_packet in raw_packets { + let conn = conn_query.get(ecs, entity).unwrap(); + let state = conn.state; + + trace!("Received injected packet with bytes: {raw_packet:?}"); + if let Err(e) = + handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events) + { + error!("Error reading injected packet: {e}"); + } + } + } + + for entity in entities_handling_packets { + loop { + let mut conn = conn_query.get_mut(ecs, entity).unwrap(); + let net_conn = conn.net_conn().unwrap(); + let read_res = net_conn.reader.try_read(); + let state = conn.state; + match read_res { + Ok(Some(raw_packet)) => { + let raw_packet = Arc::<[u8]>::from(raw_packet); + if let Err(e) = handle_raw_packet( + ecs, + &raw_packet, + entity, + state, + &mut queued_packet_events, + ) { + error!("Error reading packet: {e}"); + } + } + Ok(None) => { + // no packets available + break; + } + Err(err) => { + log_for_error(&err); + + if matches!( + &*err, + ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed + ) { + info!("Server closed connection"); + // ungraceful disconnect :( + conn.network = None; + // setting this will make us send a DisconnectEvent + conn.is_alive = false; + } + + break; + } + } + } + } + + queued_packet_events.send_events(ecs); +} + +fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) { + for mut conn in conn_query.iter_mut() { + if let Some(net_conn) = &mut conn.network { + // this needs to be done at some point every update to make sure packets are + // actually sent to the network + + net_conn.poll_writer(); + } + } +} + +#[derive(Default)] +pub struct QueuedPacketEvents { + login: Vec<ReceiveLoginPacketEvent>, + config: Vec<ReceiveConfigPacketEvent>, + game: Vec<ReceiveGamePacketEvent>, +} +impl QueuedPacketEvents { + fn send_events(&mut self, ecs: &mut World) { + ecs.send_event_batch(self.login.drain(..)); + ecs.send_event_batch(self.config.drain(..)); + ecs.send_event_batch(self.game.drain(..)); + } +} + +fn log_for_error(error: &ReadPacketError) { + if !matches!(*error, ReadPacketError::ConnectionClosed) { + error!("Error reading packet from Client: {error:?}"); + } +} + +/// The client's connection to the server. +#[derive(Component)] +pub struct RawConnection { + /// The network connection to the server. + /// + /// This isn't guaranteed to be present, for example during the main packet + /// handlers or at all times during tests. + /// + /// You shouldn't rely on this. Instead, use the events for sending packets + /// like [`SendPacketEvent`](crate::packet::game::SendPacketEvent) / + /// [`SendConfigPacketEvent`](crate::packet::config::SendConfigPacketEvent) + /// / [`SendLoginPacketEvent`](crate::packet::login::SendLoginPacketEvent). + /// + /// To check if we haven't disconnected from the server, use + /// [`Self::is_alive`]. + network: Option<NetworkConnection>, + pub state: ConnectionProtocol, + is_alive: bool, + + /// This exists for internal testing purposes and probably shouldn't be used + /// for normal bots. It's basically a way to make our client think it + /// received a packet from the server without needing to interact with the + /// network. + pub injected_clientbound_packets: Vec<Box<[u8]>>, +} +impl RawConnection { + pub fn new( + reader: RawReadConnection, + writer: RawWriteConnection, + state: ConnectionProtocol, + ) -> Self { + let task_pool = IoTaskPool::get(); + + let (network_packet_writer_tx, network_packet_writer_rx) = + mpsc::unbounded_channel::<Box<[u8]>>(); + + let writer_task = + task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream)); + + let mut conn = Self::new_networkless(state); + conn.network = Some(NetworkConnection { + reader, + enc_cipher: writer.enc_cipher, + network_packet_writer_tx, + writer_task, + }); + + conn + } + + pub fn new_networkless(state: ConnectionProtocol) -> Self { + Self { + network: None, + state, + is_alive: true, + injected_clientbound_packets: Vec::new(), + } + } + + pub fn is_alive(&self) -> bool { + self.is_alive + } + + /// Write a packet to the server without emitting any events. + /// + /// This is called by the handlers for [`SendPacketEvent`], + /// [`SendConfigPacketEvent`], and [`SendLoginPacketEvent`]. + /// + /// [`SendPacketEvent`]: crate::packet::game::SendPacketEvent + /// [`SendConfigPacketEvent`]: crate::packet::config::SendConfigPacketEvent + /// [`SendLoginPacketEvent`]: crate::packet::login::SendLoginPacketEvent + pub fn write<P: ProtocolPacket + Debug>( + &mut self, + packet: impl Packet<P>, + ) -> Result<(), WritePacketError> { + if let Some(network) = &mut self.network { + network.write(packet)?; + } else { + debug!( + "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead" + ); + } + Ok(()) + } + + pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> { + self.network.as_mut() + } +} + +pub fn handle_raw_packet( + ecs: &mut World, + raw_packet: &[u8], + entity: Entity, + state: ConnectionProtocol, + queued_packet_events: &mut QueuedPacketEvents, +) -> Result<(), Box<ReadPacketError>> { + let stream = &mut Cursor::new(raw_packet); + match state { + ConnectionProtocol::Handshake => { + unreachable!() + } + ConnectionProtocol::Game => { + let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?); + trace!("Packet: {packet:?}"); + game::process_packet(ecs, entity, packet.as_ref()); + queued_packet_events + .game + .push(ReceiveGamePacketEvent { entity, packet }); + } + ConnectionProtocol::Status => { + unreachable!() + } + ConnectionProtocol::Login => { + let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?); + trace!("Packet: {packet:?}"); + login::process_packet(ecs, entity, &packet); + queued_packet_events + .login + .push(ReceiveLoginPacketEvent { entity, packet }); + } + ConnectionProtocol::Configuration => { + let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?); + trace!("Packet: {packet:?}"); + config::process_packet(ecs, entity, &packet); + queued_packet_events + .config + .push(ReceiveConfigPacketEvent { entity, packet }); + } + }; + + Ok(()) +} + +pub struct NetworkConnection { + reader: RawReadConnection, + // compression threshold is in the RawReadConnection + pub enc_cipher: Option<Aes128CfbEnc>, + + pub writer_task: bevy_tasks::Task<()>, + /// A queue of raw TCP packets to send. These will not be modified further, + /// they should already be serialized and encrypted and everything before + /// being added here. + network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>, +} +impl NetworkConnection { + pub fn write<P: ProtocolPacket + Debug>( + &mut self, + packet: impl Packet<P>, + ) -> Result<(), WritePacketError> { + let packet = packet.into_variant(); + let raw_packet = serialize_packet(&packet)?; + self.write_raw(&raw_packet)?; + + Ok(()) + } + + pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> { + let network_packet = azalea_protocol::write::encode_to_network_packet( + raw_packet, + self.reader.compression_threshold, + &mut self.enc_cipher, + ); + self.network_packet_writer_tx + .send(network_packet.into_boxed_slice())?; + Ok(()) + } + + pub fn poll_writer(&mut self) { + let poll_once_res = future::poll_once(&mut self.writer_task); + future::block_on(poll_once_res); + } + + pub fn set_compression_threshold(&mut self, threshold: Option<u32>) { + trace!("Set compression threshold to {threshold:?}"); + self.reader.compression_threshold = threshold; + } + /// Set the encryption key that is used to encrypt and decrypt packets. It's + /// the same for both reading and writing. + pub fn set_encryption_key(&mut self, key: [u8; 16]) { + trace!("Enabled protocol encryption"); + let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key); + self.reader.dec_cipher = Some(dec_cipher); + self.enc_cipher = Some(enc_cipher); + } +} + +async fn write_task( + mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>, + mut write_half: OwnedWriteHalf, +) { + while let Some(network_packet) = network_packet_writer_rx.recv().await { + if let Err(e) = write_half.write_all(&network_packet).await { + debug!("Error writing packet to server: {e}"); + break; + }; + } + trace!("write task is done"); +} + +#[derive(Error, Debug)] +pub enum WritePacketError { + #[error("Wrong protocol state: expected {expected:?}, got {got:?}")] + WrongState { + expected: ConnectionProtocol, + got: ConnectionProtocol, + }, + #[error(transparent)] + Encoding(#[from] azalea_protocol::write::PacketEncodeError), + #[error(transparent)] + SendError { + #[from] + #[backtrace] + source: mpsc::error::SendError<Box<[u8]>>, + }, +} diff --git a/azalea-client/src/plugins/disconnect.rs b/azalea-client/src/plugins/disconnect.rs index bd10ac75..09606435 100644 --- a/azalea-client/src/plugins/disconnect.rs +++ b/azalea-client/src/plugins/disconnect.rs @@ -16,8 +16,8 @@ use derive_more::Deref; use tracing::trace; use crate::{ - InstanceHolder, client::JoinedClientBundle, events::LocalPlayerEvents, - raw_connection::RawConnection, + InstanceHolder, client::JoinedClientBundle, connection::RawConnection, + events::LocalPlayerEvents, }; pub struct DisconnectPlugin; diff --git a/azalea-client/src/plugins/events.rs b/azalea-client/src/plugins/events.rs index 64dcf4f5..85f50ea5 100644 --- a/azalea-client/src/plugins/events.rs +++ b/azalea-client/src/plugins/events.rs @@ -27,7 +27,7 @@ use crate::{ chat::{ChatPacket, ChatReceivedEvent}, disconnect::DisconnectEvent, packet::game::{ - AddPlayerEvent, DeathEvent, KeepAliveEvent, ReceivePacketEvent, RemovePlayerEvent, + AddPlayerEvent, DeathEvent, KeepAliveEvent, ReceiveGamePacketEvent, RemovePlayerEvent, UpdatePlayerEvent, }, }; @@ -157,7 +157,7 @@ impl Plugin for EventsPlugin { ) .add_systems( PreUpdate, - init_listener.before(crate::packet::game::process_packet_events), + init_listener.before(super::connection::read_packets), ) .add_systems(GameTick, tick_listener); } @@ -217,7 +217,7 @@ pub fn tick_listener(query: Query<&LocalPlayerEvents, With<InstanceName>>) { pub fn packet_listener( query: Query<&LocalPlayerEvents>, - mut events: EventReader<ReceivePacketEvent>, + mut events: EventReader<ReceiveGamePacketEvent>, ) { for event in events.read() { if let Ok(local_player_events) = query.get(event.entity) { diff --git a/azalea-client/src/plugins/login.rs b/azalea-client/src/plugins/login.rs new file mode 100644 index 00000000..385e9651 --- /dev/null +++ b/azalea-client/src/plugins/login.rs @@ -0,0 +1,152 @@ +use azalea_auth::sessionserver::ClientSessionServerError; +use azalea_protocol::packets::login::{ + ClientboundHello, ServerboundCustomQueryAnswer, ServerboundKey, +}; +use bevy_app::prelude::*; +use bevy_ecs::prelude::*; +use bevy_tasks::{IoTaskPool, Task, futures_lite::future}; +use tracing::{debug, error, trace}; + +use super::{ + connection::RawConnection, + packet::login::{ReceiveCustomQueryEvent, ReceiveHelloEvent, SendLoginPacketEvent}, +}; +use crate::{Account, JoinError}; + +/// Some systems that run during the `login` state. +pub struct LoginPlugin; +impl Plugin for LoginPlugin { + fn build(&self, app: &mut App) { + app.add_observer(handle_receive_hello_event) + .add_systems(Update, (poll_auth_task, reply_to_custom_queries)); + } +} + +fn handle_receive_hello_event(trigger: Trigger<ReceiveHelloEvent>, mut commands: Commands) { + let task_pool = IoTaskPool::get(); + + let account = trigger.account.clone(); + let packet = trigger.packet.clone(); + let player = trigger.entity(); + + let task = task_pool.spawn(auth_with_account(account, packet)); + commands.entity(player).insert(AuthTask(task)); +} + +fn poll_auth_task( + mut commands: Commands, + mut query: Query<(Entity, &mut AuthTask, &mut RawConnection)>, +) { + for (entity, mut auth_task, mut raw_conn) in query.iter_mut() { + if let Some(poll_res) = future::block_on(future::poll_once(&mut auth_task.0)) { + debug!("Finished auth"); + commands.entity(entity).remove::<AuthTask>(); + match poll_res { + Ok((packet, private_key)) => { + // we use this instead of SendLoginPacketEvent to ensure that it's sent right + // before encryption is enabled. i guess another option would be to make a + // Trigger+observer for set_encryption_key; the current implementation is + // simpler though. + if let Err(e) = raw_conn.write(packet) { + error!("Error sending key packet: {e:?}"); + } + if let Some(net_conn) = raw_conn.net_conn() { + net_conn.set_encryption_key(private_key); + } + } + Err(err) => { + error!("Error during authentication: {err:?}"); + } + } + } + } +} + +type PrivateKey = [u8; 16]; + +#[derive(Component)] +pub struct AuthTask(Task<Result<(ServerboundKey, PrivateKey), JoinError>>); + +pub async fn auth_with_account( + account: Account, + packet: ClientboundHello, +) -> Result<(ServerboundKey, PrivateKey), JoinError> { + let Ok(encrypt_res) = azalea_crypto::encrypt(&packet.public_key, &packet.challenge) else { + return Err(JoinError::EncryptionError(packet)); + }; + let key_packet = ServerboundKey { + key_bytes: encrypt_res.encrypted_public_key, + encrypted_challenge: encrypt_res.encrypted_challenge, + }; + let private_key = encrypt_res.secret_key; + + let Some(access_token) = &account.access_token else { + // offline mode account, no need to do auth + return Ok((key_packet, private_key)); + }; + + // keep track of the number of times we tried authenticating so we can give up + // after too many + let mut attempts: usize = 1; + + while let Err(err) = { + let access_token = access_token.lock().clone(); + + let uuid = &account + .uuid + .expect("Uuid must be present if access token is present."); + + // this is necessary since reqwest usually depends on tokio and we're using + // `futures` here + async_compat::Compat::new(async { + azalea_auth::sessionserver::join( + &access_token, + &packet.public_key, + &private_key, + uuid, + &packet.server_id, + ) + .await + }) + .await + } { + if attempts >= 2 { + // if this is the second attempt and we failed + // both times, give up + return Err(err.into()); + } + if matches!( + err, + ClientSessionServerError::InvalidSession | ClientSessionServerError::ForbiddenOperation + ) { + // uh oh, we got an invalid session and have + // to reauthenticate now + account.refresh().await?; + } else { + return Err(err.into()); + } + attempts += 1; + } + + Ok((key_packet, private_key)) +} + +pub fn reply_to_custom_queries( + mut commands: Commands, + mut events: EventReader<ReceiveCustomQueryEvent>, +) { + for event in events.read() { + trace!("Maybe replying to custom query: {event:?}"); + if event.disabled { + continue; + } + + commands.trigger(SendLoginPacketEvent::new( + event.entity, + ServerboundCustomQueryAnswer { + transaction_id: event.packet.transaction_id, + data: None, + }, + )); + } +} diff --git a/azalea-client/src/plugins/mod.rs b/azalea-client/src/plugins/mod.rs index 3b047ccb..16b34205 100644 --- a/azalea-client/src/plugins/mod.rs +++ b/azalea-client/src/plugins/mod.rs @@ -1,11 +1,15 @@ +use bevy_app::{PluginGroup, PluginGroupBuilder}; + pub mod attack; pub mod brand; pub mod chat; pub mod chunks; +pub mod connection; pub mod disconnect; pub mod events; pub mod interact; pub mod inventory; +pub mod login; pub mod mining; pub mod movement; pub mod packet; @@ -14,3 +18,42 @@ pub mod respawn; pub mod task_pool; pub mod tick_broadcast; pub mod tick_end; + +/// This plugin group will add all the default plugins necessary for Azalea to +/// work. +pub struct DefaultPlugins; + +impl PluginGroup for DefaultPlugins { + fn build(self) -> PluginGroupBuilder { + #[allow(unused_mut)] + let mut group = PluginGroupBuilder::start::<Self>() + .add(crate::client::AmbiguityLoggerPlugin) + .add(bevy_time::TimePlugin) + .add(packet::PacketPlugin) + .add(crate::client::AzaleaPlugin) + .add(azalea_entity::EntityPlugin) + .add(azalea_physics::PhysicsPlugin) + .add(events::EventsPlugin) + .add(task_pool::TaskPoolPlugin::default()) + .add(inventory::InventoryPlugin) + .add(chat::ChatPlugin) + .add(disconnect::DisconnectPlugin) + .add(movement::MovementPlugin) + .add(interact::InteractPlugin) + .add(respawn::RespawnPlugin) + .add(mining::MiningPlugin) + .add(attack::AttackPlugin) + .add(chunks::ChunksPlugin) + .add(tick_end::TickEndPlugin) + .add(brand::BrandPlugin) + .add(tick_broadcast::TickBroadcastPlugin) + .add(pong::PongPlugin) + .add(connection::ConnectionPlugin) + .add(login::LoginPlugin); + #[cfg(feature = "log")] + { + group = group.add(bevy_log::LogPlugin::default()); + } + group + } +} diff --git a/azalea-client/src/plugins/packet/config/events.rs b/azalea-client/src/plugins/packet/config/events.rs index 24a1157b..a9237e75 100644 --- a/azalea-client/src/plugins/packet/config/events.rs +++ b/azalea-client/src/plugins/packet/config/events.rs @@ -1,23 +1,20 @@ -use std::io::Cursor; +use std::sync::Arc; -use azalea_protocol::{ - packets::{ - Packet, - config::{ClientboundConfigPacket, ServerboundConfigPacket}, - }, - read::deserialize_packet, +use azalea_protocol::packets::{ + Packet, + config::{ClientboundConfigPacket, ServerboundConfigPacket}, }; use bevy_ecs::prelude::*; use tracing::{debug, error}; -use crate::{InConfigState, raw_connection::RawConnection}; +use crate::{InConfigState, connection::RawConnection}; #[derive(Event, Debug, Clone)] pub struct ReceiveConfigPacketEvent { /// The client entity that received the packet. pub entity: Entity, /// The packet that was actually received. - pub packet: ClientboundConfigPacket, + pub packet: Arc<ClientboundConfigPacket>, } /// An event for sending a packet to the server while we're in the @@ -39,7 +36,7 @@ pub fn handle_outgoing_packets_observer( mut query: Query<(&mut RawConnection, Option<&InConfigState>)>, ) { let event = trigger.event(); - if let Ok((raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) { + if let Ok((mut raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) { if in_configuration_state.is_none() { error!( "Tried to send a configuration packet {:?} while not in configuration state", @@ -47,8 +44,8 @@ pub fn handle_outgoing_packets_observer( ); return; } - debug!("Sending packet: {:?}", event.packet); - if let Err(e) = raw_conn.write_packet(event.packet.clone()) { + debug!("Sending config packet: {:?}", event.packet); + if let Err(e) = raw_conn.write(event.packet.clone()) { error!("Failed to send packet: {e}"); } } @@ -64,61 +61,6 @@ pub fn handle_outgoing_packets( } } -pub fn emit_receive_config_packet_events( - query: Query<(Entity, &RawConnection), With<InConfigState>>, - mut packet_events: ResMut<Events<ReceiveConfigPacketEvent>>, -) { - // we manually clear and send the events at the beginning of each update - // since otherwise it'd cause issues with events in process_packet_events - // running twice - packet_events.clear(); - for (player_entity, raw_conn) in &query { - let packets_lock = raw_conn.incoming_packet_queue(); - let mut packets = packets_lock.lock(); - if !packets.is_empty() { - let mut packets_read = 0; - for raw_packet in packets.iter() { - packets_read += 1; - let packet = match deserialize_packet::<ClientboundConfigPacket>(&mut Cursor::new( - raw_packet, - )) { - Ok(packet) => packet, - Err(err) => { - error!("failed to read packet: {err:?}"); - debug!("packet bytes: {raw_packet:?}"); - continue; - } - }; - - let should_interrupt = packet_interrupts(&packet); - - packet_events.send(ReceiveConfigPacketEvent { - entity: player_entity, - packet, - }); - - if should_interrupt { - break; - } - } - packets.drain(0..packets_read); - } - } -} - -/// Whether the given packet should make us stop deserializing the received -/// packets until next update. -/// -/// This is used for packets that can switch the client state. -fn packet_interrupts(packet: &ClientboundConfigPacket) -> bool { - matches!( - packet, - ClientboundConfigPacket::FinishConfiguration(_) - | ClientboundConfigPacket::Disconnect(_) - | ClientboundConfigPacket::Transfer(_) - ) -} - /// A Bevy trigger that's sent when our client receives a [`ClientboundPing`] /// packet in the config state. /// diff --git a/azalea-client/src/plugins/packet/config/mod.rs b/azalea-client/src/plugins/packet/config/mod.rs index ae601793..910019a6 100644 --- a/azalea-client/src/plugins/packet/config/mod.rs +++ b/azalea-client/src/plugins/packet/config/mod.rs @@ -1,65 +1,61 @@ mod events; +use std::io::Cursor; + use azalea_entity::LocalEntity; use azalea_protocol::packets::ConnectionProtocol; use azalea_protocol::packets::config::*; +use azalea_protocol::read::ReadPacketError; +use azalea_protocol::read::deserialize_packet; use bevy_ecs::prelude::*; -use bevy_ecs::system::SystemState; pub use events::*; use tracing::{debug, warn}; use super::as_system; use crate::client::InConfigState; +use crate::connection::RawConnection; use crate::disconnect::DisconnectEvent; use crate::packet::game::KeepAliveEvent; use crate::packet::game::ResourcePackEvent; -use crate::raw_connection::RawConnection; use crate::{InstanceHolder, declare_packet_handlers}; -pub fn process_packet_events(ecs: &mut World) { - let mut events_owned = Vec::new(); - let mut system_state: SystemState<EventReader<ReceiveConfigPacketEvent>> = - SystemState::new(ecs); - let mut events = system_state.get_mut(ecs); - for ReceiveConfigPacketEvent { - entity: player_entity, +pub fn process_raw_packet( + ecs: &mut World, + player: Entity, + raw_packet: &[u8], +) -> Result<(), Box<ReadPacketError>> { + let packet = deserialize_packet(&mut Cursor::new(raw_packet))?; + process_packet(ecs, player, &packet); + Ok(()) +} + +pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundConfigPacket) { + let mut handler = ConfigPacketHandler { player, ecs }; + + declare_packet_handlers!( + ClientboundConfigPacket, packet, - } in events.read() - { - // we do this so `ecs` isn't borrowed for the whole loop - events_owned.push((*player_entity, packet.clone())); - } - for (player_entity, packet) in events_owned { - let mut handler = ConfigPacketHandler { - player: player_entity, - ecs, - }; - - declare_packet_handlers!( - ClientboundConfigPacket, - packet, - handler, - [ - cookie_request, - custom_payload, - disconnect, - finish_configuration, - keep_alive, - ping, - reset_chat, - registry_data, - resource_pack_pop, - resource_pack_push, - store_cookie, - transfer, - update_enabled_features, - update_tags, - select_known_packs, - custom_report_details, - server_links, - ] - ); - } + handler, + [ + cookie_request, + custom_payload, + disconnect, + finish_configuration, + keep_alive, + ping, + reset_chat, + registry_data, + resource_pack_pop, + resource_pack_push, + store_cookie, + transfer, + update_enabled_features, + update_tags, + select_known_packs, + custom_report_details, + server_links, + ] + ); } pub struct ConfigPacketHandler<'a> { @@ -67,44 +63,45 @@ pub struct ConfigPacketHandler<'a> { pub player: Entity, } impl ConfigPacketHandler<'_> { - pub fn registry_data(&mut self, p: ClientboundRegistryData) { + pub fn registry_data(&mut self, p: &ClientboundRegistryData) { as_system::<Query<&mut InstanceHolder>>(self.ecs, |mut query| { let instance_holder = query.get_mut(self.player).unwrap(); let mut instance = instance_holder.instance.write(); // add the new registry data - instance.registries.append(p.registry_id, p.entries); + instance + .registries + .append(p.registry_id.clone(), p.entries.clone()); }); } - pub fn custom_payload(&mut self, p: ClientboundCustomPayload) { + pub fn custom_payload(&mut self, p: &ClientboundCustomPayload) { debug!("Got custom payload packet {p:?}"); } - pub fn disconnect(&mut self, p: ClientboundDisconnect) { + pub fn disconnect(&mut self, p: &ClientboundDisconnect) { warn!("Got disconnect packet {p:?}"); as_system::<EventWriter<_>>(self.ecs, |mut events| { events.send(DisconnectEvent { entity: self.player, - reason: Some(p.reason), + reason: Some(p.reason.clone()), }); }); } - pub fn finish_configuration(&mut self, p: ClientboundFinishConfiguration) { - debug!("got FinishConfiguration packet: {p:?}"); + pub fn finish_configuration(&mut self, _p: &ClientboundFinishConfiguration) { + debug!("got FinishConfiguration packet"); as_system::<(Commands, Query<&mut RawConnection>)>( self.ecs, |(mut commands, mut query)| { let mut raw_conn = query.get_mut(self.player).unwrap(); - raw_conn - .write_packet(ServerboundFinishConfiguration) - .expect( - "we should be in the right state and encoding this packet shouldn't fail", - ); - raw_conn.set_state(ConnectionProtocol::Game); + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundFinishConfiguration, + )); + raw_conn.state = ConnectionProtocol::Game; // these components are added now that we're going to be in the Game state commands @@ -120,34 +117,33 @@ impl ConfigPacketHandler<'_> { ); } - pub fn keep_alive(&mut self, p: ClientboundKeepAlive) { + pub fn keep_alive(&mut self, p: &ClientboundKeepAlive) { debug!( "Got keep alive packet (in configuration) {p:?} for {:?}", self.player ); - as_system::<(Query<&RawConnection>, EventWriter<_>)>(self.ecs, |(query, mut events)| { - let raw_conn = query.get(self.player).unwrap(); - + as_system::<(Commands, EventWriter<_>)>(self.ecs, |(mut commands, mut events)| { events.send(KeepAliveEvent { entity: self.player, id: p.id, }); - raw_conn - .write_packet(ServerboundKeepAlive { id: p.id }) - .unwrap(); + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundKeepAlive { id: p.id }, + )); }); } - pub fn ping(&mut self, p: ClientboundPing) { + pub fn ping(&mut self, p: &ClientboundPing) { debug!("Got ping packet (in configuration) {p:?}"); as_system::<Commands>(self.ecs, |mut commands| { - commands.trigger_targets(ConfigPingEvent(p), self.player); + commands.trigger_targets(ConfigPingEvent(p.clone()), self.player); }); } - pub fn resource_pack_push(&mut self, p: ClientboundResourcePackPush) { + pub fn resource_pack_push(&mut self, p: &ClientboundResourcePackPush) { debug!("Got resource pack push packet {p:?}"); as_system::<EventWriter<_>>(self.ecs, |mut events| { @@ -162,66 +158,64 @@ impl ConfigPacketHandler<'_> { }); } - pub fn resource_pack_pop(&mut self, p: ClientboundResourcePackPop) { + pub fn resource_pack_pop(&mut self, p: &ClientboundResourcePackPop) { debug!("Got resource pack pop packet {p:?}"); } - pub fn update_enabled_features(&mut self, p: ClientboundUpdateEnabledFeatures) { + pub fn update_enabled_features(&mut self, p: &ClientboundUpdateEnabledFeatures) { debug!("Got update enabled features packet {p:?}"); } - pub fn update_tags(&mut self, _p: ClientboundUpdateTags) { + pub fn update_tags(&mut self, _p: &ClientboundUpdateTags) { debug!("Got update tags packet"); } - pub fn cookie_request(&mut self, p: ClientboundCookieRequest) { + pub fn cookie_request(&mut self, p: &ClientboundCookieRequest) { debug!("Got cookie request packet {p:?}"); - as_system::<Query<&RawConnection>>(self.ecs, |query| { - let raw_conn = query.get(self.player).unwrap(); - - raw_conn - .write_packet(ServerboundCookieResponse { - key: p.key, + as_system::<Commands>(self.ecs, |mut commands| { + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundCookieResponse { + key: p.key.clone(), // cookies aren't implemented payload: None, - }) - .unwrap(); + }, + )); }); } - pub fn reset_chat(&mut self, p: ClientboundResetChat) { + pub fn reset_chat(&mut self, p: &ClientboundResetChat) { debug!("Got reset chat packet {p:?}"); } - pub fn store_cookie(&mut self, p: ClientboundStoreCookie) { + pub fn store_cookie(&mut self, p: &ClientboundStoreCookie) { debug!("Got store cookie packet {p:?}"); } - pub fn transfer(&mut self, p: ClientboundTransfer) { + pub fn transfer(&mut self, p: &ClientboundTransfer) { debug!("Got transfer packet {p:?}"); } - pub fn select_known_packs(&mut self, p: ClientboundSelectKnownPacks) { + pub fn select_known_packs(&mut self, p: &ClientboundSelectKnownPacks) { debug!("Got select known packs packet {p:?}"); - as_system::<Query<&RawConnection>>(self.ecs, |query| { - let raw_conn = query.get(self.player).unwrap(); - + as_system::<Commands>(self.ecs, |mut commands| { // resource pack management isn't implemented - raw_conn - .write_packet(ServerboundSelectKnownPacks { + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundSelectKnownPacks { known_packs: vec![], - }) - .unwrap(); + }, + )); }); } - pub fn server_links(&mut self, p: ClientboundServerLinks) { + pub fn server_links(&mut self, p: &ClientboundServerLinks) { debug!("Got server links packet {p:?}"); } - pub fn custom_report_details(&mut self, p: ClientboundCustomReportDetails) { + pub fn custom_report_details(&mut self, p: &ClientboundCustomReportDetails) { debug!("Got custom report details packet {p:?}"); } } diff --git a/azalea-client/src/plugins/packet/game/events.rs b/azalea-client/src/plugins/packet/game/events.rs index ad81f9bd..68bfb4b3 100644 --- a/azalea-client/src/plugins/packet/game/events.rs +++ b/azalea-client/src/plugins/packet/game/events.rs @@ -1,33 +1,27 @@ -use std::{ - io::Cursor, - sync::{Arc, Weak}, -}; +use std::sync::{Arc, Weak}; use azalea_chat::FormattedText; use azalea_core::resource_location::ResourceLocation; -use azalea_protocol::{ - packets::{ - Packet, - game::{ClientboundGamePacket, ClientboundPlayerCombatKill, ServerboundGamePacket}, - }, - read::deserialize_packet, +use azalea_protocol::packets::{ + Packet, + game::{ClientboundGamePacket, ClientboundPlayerCombatKill, ServerboundGamePacket}, }; use azalea_world::Instance; use bevy_ecs::prelude::*; use parking_lot::RwLock; -use tracing::{debug, error}; +use tracing::error; use uuid::Uuid; -use crate::{PlayerInfo, client::InGameState, raw_connection::RawConnection}; +use crate::{PlayerInfo, client::InGameState, connection::RawConnection}; /// An event that's sent when we receive a packet. /// ``` -/// # use azalea_client::packet::game::ReceivePacketEvent; +/// # use azalea_client::packet::game::ReceiveGamePacketEvent; /// # use azalea_protocol::packets::game::ClientboundGamePacket; /// # use bevy_ecs::event::EventReader; /// -/// fn handle_packets(mut events: EventReader<ReceivePacketEvent>) { -/// for ReceivePacketEvent { +/// fn handle_packets(mut events: EventReader<ReceiveGamePacketEvent>) { +/// for ReceiveGamePacketEvent { /// entity, /// packet, /// } in events.read() { @@ -41,7 +35,7 @@ use crate::{PlayerInfo, client::InGameState, raw_connection::RawConnection}; /// } /// ``` #[derive(Event, Debug, Clone)] -pub struct ReceivePacketEvent { +pub struct ReceiveGamePacketEvent { /// The client entity that received the packet. pub entity: Entity, /// The packet that was actually received. @@ -67,7 +61,7 @@ pub fn handle_outgoing_packets_observer( ) { let event = trigger.event(); - if let Ok((raw_connection, in_game_state)) = query.get_mut(event.sent_by) { + if let Ok((mut raw_connection, in_game_state)) = query.get_mut(event.sent_by) { if in_game_state.is_none() { error!( "Tried to send a game packet {:?} while not in game state", @@ -76,8 +70,8 @@ pub fn handle_outgoing_packets_observer( return; } - // debug!("Sending packet: {:?}", event.packet); - if let Err(e) = raw_connection.write_packet(event.packet.clone()) { + // debug!("Sending game packet: {:?}", event.packet); + if let Err(e) = raw_connection.write(event.packet.clone()) { error!("Failed to send packet: {e}"); } } @@ -91,61 +85,6 @@ pub fn handle_outgoing_packets(mut commands: Commands, mut events: EventReader<S } } -pub fn emit_receive_packet_events( - query: Query<(Entity, &RawConnection), With<InGameState>>, - mut packet_events: ResMut<Events<ReceivePacketEvent>>, -) { - // we manually clear and send the events at the beginning of each update - // since otherwise it'd cause issues with events in process_packet_events - // running twice - packet_events.clear(); - for (player_entity, raw_connection) in &query { - let packets_lock = raw_connection.incoming_packet_queue(); - let mut packets = packets_lock.lock(); - if !packets.is_empty() { - let mut packets_read = 0; - for raw_packet in packets.iter() { - packets_read += 1; - let packet = - match deserialize_packet::<ClientboundGamePacket>(&mut Cursor::new(raw_packet)) - { - Ok(packet) => packet, - Err(err) => { - error!("failed to read packet: {err:?}"); - debug!("packet bytes: {raw_packet:?}"); - continue; - } - }; - - let should_interrupt = packet_interrupts(&packet); - - packet_events.send(ReceivePacketEvent { - entity: player_entity, - packet: Arc::new(packet), - }); - - if should_interrupt { - break; - } - } - packets.drain(0..packets_read); - } - } -} - -/// Whether the given packet should make us stop deserializing the received -/// packets until next update. -/// -/// This is used for packets that can switch the client state. -fn packet_interrupts(packet: &ClientboundGamePacket) -> bool { - matches!( - packet, - ClientboundGamePacket::StartConfiguration(_) - | ClientboundGamePacket::Disconnect(_) - | ClientboundGamePacket::Transfer(_) - ) -} - /// A player joined the game (or more specifically, was added to the tab /// list of a local player). #[derive(Event, Debug, Clone)] diff --git a/azalea-client/src/plugins/packet/game/mod.rs b/azalea-client/src/plugins/packet/game/mod.rs index 8d896e65..60531d3b 100644 --- a/azalea-client/src/plugins/packet/game/mod.rs +++ b/azalea-client/src/plugins/packet/game/mod.rs @@ -32,171 +32,150 @@ use crate::{ }, movement::{KnockbackEvent, KnockbackType}, packet::as_system, - raw_connection::RawConnection, }; -pub fn process_packet_events(ecs: &mut World) { - let mut events_owned = Vec::<(Entity, Arc<ClientboundGamePacket>)>::new(); - - { - let mut system_state = SystemState::<EventReader<ReceivePacketEvent>>::new(ecs); - let mut events = system_state.get_mut(ecs); - for ReceivePacketEvent { - entity: player_entity, - packet, - } in events.read() - { - // we do this so `ecs` isn't borrowed for the whole loop - events_owned.push((*player_entity, packet.clone())); - } - } - - for (player_entity, packet) in events_owned { - let mut handler = GamePacketHandler { - player: player_entity, - ecs, - }; - - // the order of these doesn't matter, that's decided by the protocol library - declare_packet_handlers!( - ClientboundGamePacket, - packet.as_ref(), - handler, - [ - login, - set_chunk_cache_radius, - chunk_batch_start, - chunk_batch_finished, - custom_payload, - change_difficulty, - commands, - player_abilities, - set_cursor_item, - update_tags, - disconnect, - update_recipes, - entity_event, - player_position, - player_info_update, - player_info_remove, - set_chunk_cache_center, - chunks_biomes, - light_update, - level_chunk_with_light, - add_entity, - set_entity_data, - update_attributes, - set_entity_motion, - set_entity_link, - initialize_border, - set_time, - set_default_spawn_position, - set_health, - set_experience, - teleport_entity, - update_advancements, - rotate_head, - move_entity_pos, - move_entity_pos_rot, - move_entity_rot, - keep_alive, - remove_entities, - player_chat, - system_chat, - disguised_chat, - sound, - level_event, - block_update, - animate, - section_blocks_update, - game_event, - level_particles, - server_data, - set_equipment, - update_mob_effect, - award_stats, - block_changed_ack, - block_destruction, - block_entity_data, - block_event, - boss_event, - command_suggestions, - container_set_content, - container_set_data, - container_set_slot, - container_close, - cooldown, - custom_chat_completions, - delete_chat, - explode, - forget_level_chunk, - horse_screen_open, - map_item_data, - merchant_offers, - move_vehicle, - open_book, - open_screen, - open_sign_editor, - ping, - place_ghost_recipe, - player_combat_end, - player_combat_enter, - player_combat_kill, - player_look_at, - remove_mob_effect, - resource_pack_push, - resource_pack_pop, - respawn, - start_configuration, - entity_position_sync, - select_advancements_tab, - set_action_bar_text, - set_border_center, - set_border_lerp_size, - set_border_size, - set_border_warning_delay, - set_border_warning_distance, - set_camera, - set_display_objective, - set_objective, - set_passengers, - set_player_team, - set_score, - set_simulation_distance, - set_subtitle_text, - set_title_text, - set_titles_animation, - clear_titles, - sound_entity, - stop_sound, - tab_list, - tag_query, - take_item_entity, - bundle_delimiter, - damage_event, - hurt_animation, - ticking_state, - ticking_step, - reset_score, - cookie_request, - debug_sample, - pong_response, - store_cookie, - transfer, - move_minecart_along_track, - set_held_slot, - set_player_inventory, - projectile_power, - custom_report_details, - server_links, - player_rotation, - recipe_book_add, - recipe_book_remove, - recipe_book_settings, - test_instance_block_status, - ] - ); - } +pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundGamePacket) { + let mut handler = GamePacketHandler { player, ecs }; + + // the order of these doesn't matter, that's decided by the protocol library + declare_packet_handlers!( + ClientboundGamePacket, + packet, + handler, + [ + login, + set_chunk_cache_radius, + chunk_batch_start, + chunk_batch_finished, + custom_payload, + change_difficulty, + commands, + player_abilities, + set_cursor_item, + update_tags, + disconnect, + update_recipes, + entity_event, + player_position, + player_info_update, + player_info_remove, + set_chunk_cache_center, + chunks_biomes, + light_update, + level_chunk_with_light, + add_entity, + set_entity_data, + update_attributes, + set_entity_motion, + set_entity_link, + initialize_border, + set_time, + set_default_spawn_position, + set_health, + set_experience, + teleport_entity, + update_advancements, + rotate_head, + move_entity_pos, + move_entity_pos_rot, + move_entity_rot, + keep_alive, + remove_entities, + player_chat, + system_chat, + disguised_chat, + sound, + level_event, + block_update, + animate, + section_blocks_update, + game_event, + level_particles, + server_data, + set_equipment, + update_mob_effect, + award_stats, + block_changed_ack, + block_destruction, + block_entity_data, + block_event, + boss_event, + command_suggestions, + container_set_content, + container_set_data, + container_set_slot, + container_close, + cooldown, + custom_chat_completions, + delete_chat, + explode, + forget_level_chunk, + horse_screen_open, + map_item_data, + merchant_offers, + move_vehicle, + open_book, + open_screen, + open_sign_editor, + ping, + place_ghost_recipe, + player_combat_end, + player_combat_enter, + player_combat_kill, + player_look_at, + remove_mob_effect, + resource_pack_push, + resource_pack_pop, + respawn, + start_configuration, + entity_position_sync, + select_advancements_tab, + set_action_bar_text, + set_border_center, + set_border_lerp_size, + set_border_size, + set_border_warning_delay, + set_border_warning_distance, + set_camera, + set_display_objective, + set_objective, + set_passengers, + set_player_team, + set_score, + set_simulation_distance, + set_subtitle_text, + set_title_text, + set_titles_animation, + clear_titles, + sound_entity, + stop_sound, + tab_list, + tag_query, + take_item_entity, + bundle_delimiter, + damage_event, + hurt_animation, + ticking_state, + ticking_step, + reset_score, + cookie_request, + debug_sample, + pong_response, + store_cookie, + transfer, + move_minecart_along_track, + set_held_slot, + set_player_inventory, + projectile_power, + custom_report_details, + server_links, + player_rotation, + recipe_book_add, + recipe_book_remove, + recipe_book_settings, + test_instance_block_status, + ] + ); } pub struct GamePacketHandler<'a> { @@ -342,7 +321,7 @@ impl GamePacketHandler<'_> { client_information ); commands.trigger(SendPacketEvent::new(self.player, - azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { information: client_information.clone() }, + azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { client_information: client_information.clone() }, )); }, ); @@ -1506,9 +1485,11 @@ impl GamePacketHandler<'_> { pub fn start_configuration(&mut self, _p: &ClientboundStartConfiguration) { debug!("Got start configuration packet"); - as_system::<(Query<&RawConnection>, Commands)>(self.ecs, |(query, mut commands)| { - let raw_conn = query.get(self.player).unwrap(); - let _ = raw_conn.write_packet(ServerboundConfigurationAcknowledged); + as_system::<Commands>(self.ecs, |mut commands| { + commands.trigger(SendPacketEvent::new( + self.player, + ServerboundConfigurationAcknowledged, + )); commands .entity(self.player) diff --git a/azalea-client/src/plugins/packet/login.rs b/azalea-client/src/plugins/packet/login.rs deleted file mode 100644 index 1bb07266..00000000 --- a/azalea-client/src/plugins/packet/login.rs +++ /dev/null @@ -1,114 +0,0 @@ -// login packets aren't actually handled here because compression/encryption -// would make packet handling a lot messier - -use std::{collections::HashSet, sync::Arc}; - -use azalea_protocol::packets::{ - Packet, - login::{ - ClientboundLoginPacket, ServerboundLoginPacket, - s_custom_query_answer::ServerboundCustomQueryAnswer, - }, -}; -use bevy_ecs::{prelude::*, system::SystemState}; -use derive_more::{Deref, DerefMut}; -use tokio::sync::mpsc; -use tracing::error; - -// this struct is defined here anyways though so it's consistent with the other -// ones - -/// An event that's sent when we receive a login packet from the server. Note -/// that if you want to handle this in a system, you must add -/// `.before(azalea::packet::login::process_packet_events)` to it -/// because that system clears the events. -#[derive(Event, Debug, Clone)] -pub struct LoginPacketEvent { - /// The client entity that received the packet. - pub entity: Entity, - /// The packet that was actually received. - pub packet: Arc<ClientboundLoginPacket>, -} - -/// Event for sending a login packet to the server. -#[derive(Event)] -pub struct SendLoginPacketEvent { - pub entity: Entity, - pub packet: ServerboundLoginPacket, -} -impl SendLoginPacketEvent { - pub fn new(entity: Entity, packet: impl Packet<ServerboundLoginPacket>) -> Self { - let packet = packet.into_variant(); - Self { entity, packet } - } -} - -#[derive(Component)] -pub struct LoginSendPacketQueue { - pub tx: mpsc::UnboundedSender<ServerboundLoginPacket>, -} - -/// A marker component for local players that are currently in the -/// `login` state. -#[derive(Component, Clone, Debug)] -pub struct InLoginState; - -pub fn handle_send_packet_event( - mut send_packet_events: EventReader<SendLoginPacketEvent>, - mut query: Query<&mut LoginSendPacketQueue>, -) { - for event in send_packet_events.read() { - if let Ok(queue) = query.get_mut(event.entity) { - let _ = queue.tx.send(event.packet.clone()); - } else { - error!("Sent SendPacketEvent for entity that doesn't have a LoginSendPacketQueue"); - } - } -} - -/// Plugins can add to this set if they want to handle a custom query packet -/// themselves. This component removed after the login state ends. -#[derive(Component, Default, Debug, Deref, DerefMut)] -pub struct IgnoreQueryIds(HashSet<u32>); - -pub fn process_packet_events(ecs: &mut World) { - let mut events_owned = Vec::new(); - let mut system_state: SystemState<ResMut<Events<LoginPacketEvent>>> = SystemState::new(ecs); - let mut events = system_state.get_mut(ecs); - for LoginPacketEvent { - entity: player_entity, - packet, - } in events.drain() - { - // we do this so `ecs` isn't borrowed for the whole loop - events_owned.push((player_entity, packet)); - } - for (player_entity, packet) in events_owned { - #[allow(clippy::single_match)] - match packet.as_ref() { - ClientboundLoginPacket::CustomQuery(p) => { - let mut system_state: SystemState<( - EventWriter<SendLoginPacketEvent>, - Query<&IgnoreQueryIds>, - )> = SystemState::new(ecs); - let (mut send_packet_events, query) = system_state.get_mut(ecs); - - let ignore_query_ids = query.get(player_entity).ok().map(|x| x.0.clone()); - if let Some(ignore_query_ids) = ignore_query_ids { - if ignore_query_ids.contains(&p.transaction_id) { - continue; - } - } - - send_packet_events.send(SendLoginPacketEvent::new( - player_entity, - ServerboundCustomQueryAnswer { - transaction_id: p.transaction_id, - data: None, - }, - )); - } - _ => {} - } - } -} diff --git a/azalea-client/src/plugins/packet/login/events.rs b/azalea-client/src/plugins/packet/login/events.rs new file mode 100644 index 00000000..fc7a6b22 --- /dev/null +++ b/azalea-client/src/plugins/packet/login/events.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use azalea_protocol::packets::{ + Packet, + login::{ + ClientboundCustomQuery, ClientboundHello, ClientboundLoginPacket, ServerboundLoginPacket, + }, +}; +use bevy_ecs::prelude::*; +use tracing::{debug, error}; + +use super::InLoginState; +use crate::{Account, connection::RawConnection}; + +#[derive(Event, Debug, Clone)] +pub struct ReceiveLoginPacketEvent { + /// The client entity that received the packet. + pub entity: Entity, + /// The packet that was actually received. + pub packet: Arc<ClientboundLoginPacket>, +} + +#[derive(Event, Debug, Clone)] +pub struct ReceiveHelloEvent { + pub account: Account, + pub packet: ClientboundHello, +} + +#[derive(Event, Debug, Clone)] +pub struct ReceiveCustomQueryEvent { + /// The client entity that received the packet. + pub entity: Entity, + pub packet: ClientboundCustomQuery, + /// A system can set this to `true` to make Azalea not reply to the query. + /// You must make sure you modify this before the + /// [`reply_to_custom_queries`] system runs. + /// + /// [`reply_to_custom_queries`]: crate::login::reply_to_custom_queries + pub disabled: bool, +} + +/// Event for sending a login packet to the server. +#[derive(Event, Debug, Clone)] +pub struct SendLoginPacketEvent { + pub sent_by: Entity, + pub packet: ServerboundLoginPacket, +} +impl SendLoginPacketEvent { + pub fn new(entity: Entity, packet: impl Packet<ServerboundLoginPacket>) -> Self { + let packet = packet.into_variant(); + Self { + sent_by: entity, + packet, + } + } +} + +pub fn handle_outgoing_packets_observer( + trigger: Trigger<SendLoginPacketEvent>, + mut query: Query<(&mut RawConnection, Option<&InLoginState>)>, +) { + let event = trigger.event(); + if let Ok((mut raw_conn, in_login_state)) = query.get_mut(event.sent_by) { + if in_login_state.is_none() { + error!( + "Tried to send a login packet {:?} while not in login state", + event.packet + ); + return; + } + debug!("Sending login packet: {:?}", event.packet); + if let Err(e) = raw_conn.write(event.packet.clone()) { + error!("Failed to send packet: {e}"); + } + } +} +/// A system that converts [`SendLoginPacketEvent`] events into triggers so +/// they get received by [`handle_outgoing_packets_observer`]. +pub fn handle_outgoing_packets( + mut commands: Commands, + mut events: EventReader<SendLoginPacketEvent>, +) { + for event in events.read() { + commands.trigger(event.clone()); + } +} diff --git a/azalea-client/src/plugins/packet/login/mod.rs b/azalea-client/src/plugins/packet/login/mod.rs new file mode 100644 index 00000000..d313a767 --- /dev/null +++ b/azalea-client/src/plugins/packet/login/mod.rs @@ -0,0 +1,145 @@ +// login packets aren't actually handled here because compression/encryption +// would make packet handling a lot messier + +mod events; + +use azalea_protocol::packets::{ + ConnectionProtocol, + login::{ + ClientboundCookieRequest, ClientboundCustomQuery, ClientboundHello, + ClientboundLoginCompression, ClientboundLoginDisconnect, ClientboundLoginFinished, + ClientboundLoginPacket, ServerboundCookieResponse, ServerboundLoginAcknowledged, + }, +}; +use bevy_ecs::prelude::*; +pub use events::*; +use tracing::{debug, error}; + +use super::as_system; +use crate::{ + Account, GameProfileComponent, InConfigState, connection::RawConnection, + declare_packet_handlers, disconnect::DisconnectEvent, +}; + +pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundLoginPacket) { + let mut handler = LoginPacketHandler { player, ecs }; + + declare_packet_handlers!( + ClientboundLoginPacket, + packet, + handler, + [ + hello, + login_disconnect, + login_finished, + login_compression, + custom_query, + cookie_request + ] + ); +} + +/// A marker component for local players that are currently in the +/// `login` state. +#[derive(Component, Clone, Debug)] +pub struct InLoginState; + +pub struct LoginPacketHandler<'a> { + pub ecs: &'a mut World, + pub player: Entity, +} +impl LoginPacketHandler<'_> { + pub fn hello(&mut self, p: &ClientboundHello) { + debug!("Got encryption request {p:?}"); + + as_system::<(Commands, Query<&Account>)>(self.ecs, |(mut commands, query)| { + let Ok(account) = query.get(self.player) else { + error!( + "Expected Account component to be present on player when receiving hello packet." + ); + return; + }; + commands.trigger_targets( + ReceiveHelloEvent { + account: account.clone(), + packet: p.clone(), + }, + self.player, + ); + }); + } + pub fn login_disconnect(&mut self, p: &ClientboundLoginDisconnect) { + debug!("Got disconnect {:?}", p); + + as_system::<EventWriter<_>>(self.ecs, |mut events| { + events.send(DisconnectEvent { + entity: self.player, + reason: Some(p.reason.clone()), + }); + }); + } + pub fn login_finished(&mut self, p: &ClientboundLoginFinished) { + debug!( + "Got profile {:?}. login is finished and we're now switching to the config state", + p.game_profile + ); + + as_system::<(Commands, Query<&mut RawConnection>)>( + self.ecs, + |(mut commands, mut query)| { + commands.trigger(SendLoginPacketEvent::new( + self.player, + ServerboundLoginAcknowledged, + )); + + commands + .entity(self.player) + .remove::<InLoginState>() + .insert(InConfigState) + .insert(GameProfileComponent(p.game_profile.clone())); + + let mut conn = query + .get_mut(self.player) + .expect("RawConnection component should be present when receiving packets"); + conn.state = ConnectionProtocol::Configuration; + }, + ); + } + pub fn login_compression(&mut self, p: &ClientboundLoginCompression) { + debug!("Got compression request {p:?}"); + + as_system::<Query<&mut RawConnection>>(self.ecs, |mut query| { + let mut conn = query + .get_mut(self.player) + .expect("RawConnection component should be present when receiving packets"); + if let Some(net_conn) = &mut conn.net_conn() { + net_conn.set_compression_threshold(Some(p.compression_threshold as u32)); + } + }) + } + pub fn custom_query(&mut self, p: &ClientboundCustomQuery) { + debug!("Got custom query {p:?}"); + + as_system::<EventWriter<ReceiveCustomQueryEvent>>(self.ecs, |mut events| { + events.send(ReceiveCustomQueryEvent { + entity: self.player, + packet: p.clone(), + disabled: false, + }); + }); + } + pub fn cookie_request(&mut self, p: &ClientboundCookieRequest) { + debug!("Got cookie request {p:?}"); + + as_system::<Commands>(self.ecs, |mut commands| { + commands.trigger(SendLoginPacketEvent::new( + self.player, + ServerboundCookieResponse { + key: p.key.clone(), + // cookies aren't implemented + payload: None, + }, + )); + }); + } +} diff --git a/azalea-client/src/plugins/packet/mod.rs b/azalea-client/src/plugins/packet/mod.rs index 362154cc..1c14fa30 100644 --- a/azalea-client/src/plugins/packet/mod.rs +++ b/azalea-client/src/plugins/packet/mod.rs @@ -1,17 +1,11 @@ use azalea_entity::metadata::Health; -use bevy_app::{App, First, Plugin, PreUpdate, Update}; +use bevy_app::{App, Plugin, Update}; use bevy_ecs::{ prelude::*, system::{SystemParam, SystemState}, }; -use self::{ - game::{ - AddPlayerEvent, DeathEvent, InstanceLoadedEvent, KeepAliveEvent, RemovePlayerEvent, - ResourcePackEvent, UpdatePlayerEvent, - }, - login::{LoginPacketEvent, SendLoginPacketEvent}, -}; +use self::game::DeathEvent; use crate::{chat::ChatReceivedEvent, events::death_listener}; pub mod config; @@ -36,50 +30,38 @@ pub fn death_event_on_0_health( impl Plugin for PacketPlugin { fn build(&self, app: &mut App) { - app.add_systems( - First, - ( - game::emit_receive_packet_events, - config::emit_receive_config_packet_events, - ), - ) - .add_systems( - PreUpdate, - ( - game::process_packet_events, - config::process_packet_events, - login::handle_send_packet_event, - login::process_packet_events, - ), - ) - .add_observer(game::handle_outgoing_packets_observer) - .add_observer(config::handle_outgoing_packets_observer) - .add_systems( - Update, - ( + app.add_observer(game::handle_outgoing_packets_observer) + .add_observer(config::handle_outgoing_packets_observer) + .add_observer(login::handle_outgoing_packets_observer) + .add_systems( + Update, ( - config::handle_outgoing_packets, - game::handle_outgoing_packets, - ) - .chain(), - death_event_on_0_health.before(death_listener), - ), - ) - // we do this instead of add_event so we can handle the events ourselves - .init_resource::<Events<game::ReceivePacketEvent>>() - .init_resource::<Events<config::ReceiveConfigPacketEvent>>() - .add_event::<game::SendPacketEvent>() - .add_event::<config::SendConfigPacketEvent>() - .add_event::<AddPlayerEvent>() - .add_event::<RemovePlayerEvent>() - .add_event::<UpdatePlayerEvent>() - .add_event::<ChatReceivedEvent>() - .add_event::<DeathEvent>() - .add_event::<KeepAliveEvent>() - .add_event::<ResourcePackEvent>() - .add_event::<InstanceLoadedEvent>() - .add_event::<LoginPacketEvent>() - .add_event::<SendLoginPacketEvent>(); + ( + config::handle_outgoing_packets, + game::handle_outgoing_packets, + login::handle_outgoing_packets, + ) + .chain(), + death_event_on_0_health.before(death_listener), + ), + ) + .add_event::<game::ReceiveGamePacketEvent>() + .add_event::<config::ReceiveConfigPacketEvent>() + .add_event::<login::ReceiveLoginPacketEvent>() + // + .add_event::<game::SendPacketEvent>() + .add_event::<config::SendConfigPacketEvent>() + .add_event::<login::SendLoginPacketEvent>() + // + .add_event::<game::AddPlayerEvent>() + .add_event::<game::RemovePlayerEvent>() + .add_event::<game::UpdatePlayerEvent>() + .add_event::<ChatReceivedEvent>() + .add_event::<game::DeathEvent>() + .add_event::<game::KeepAliveEvent>() + .add_event::<game::ResourcePackEvent>() + .add_event::<game::InstanceLoadedEvent>() + .add_event::<login::ReceiveCustomQueryEvent>(); } } |
