diff options
| author | mat <27899617+mat-1@users.noreply.github.com> | 2023-09-21 11:16:29 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-09-21 11:16:29 -0500 |
| commit | 7b3e2e4bf793466a351510c7fbbd08234e93bb0e (patch) | |
| tree | 7177a919de9982d9e3c7f36a76d2025696f465b6 /azalea-client | |
| parent | 83cce236145cdab1872a472a70943b669a880965 (diff) | |
| download | azalea-drasl-7b3e2e4bf793466a351510c7fbbd08234e93bb0e.tar.xz | |
1.20.2 (#99)
* add configuration state
* start updating to 23w31a
* implement a bit more of 23w31a
* chunk batching
* start adding configuration state
* ioasfhjgsd
* almost works
* configuration state mostly implemented
* handle other packets in configuration state and fix keepalive
* cleanup, fix warnings
* 23w32a
* fix some doctests
* 23w33a
* 23w35a
* 1.20.2-pre2
* fix system conflicts
* 1.20.2-pre4
* make tests compile
* tests pass
* 1.20.2-rc2
* 1.20.2
* Revert "1.20.2"
This reverts commit dd152fd265332ead333c919e585ded6d609d7468.
* didn't mean to commit that code
---------
Co-authored-by: mat <git@matdoes.dev>
Diffstat (limited to 'azalea-client')
| -rw-r--r-- | azalea-client/Cargo.toml | 2 | ||||
| -rw-r--r-- | azalea-client/src/chunk_batching.rs | 146 | ||||
| -rw-r--r-- | azalea-client/src/client.rs | 269 | ||||
| -rw-r--r-- | azalea-client/src/disconnect.rs | 24 | ||||
| -rw-r--r-- | azalea-client/src/events.rs | 18 | ||||
| -rw-r--r-- | azalea-client/src/interact.rs | 31 | ||||
| -rw-r--r-- | azalea-client/src/inventory.rs | 33 | ||||
| -rw-r--r-- | azalea-client/src/lib.rs | 9 | ||||
| -rw-r--r-- | azalea-client/src/local_player.rs | 133 | ||||
| -rw-r--r-- | azalea-client/src/mining.rs | 3 | ||||
| -rw-r--r-- | azalea-client/src/movement.rs | 77 | ||||
| -rw-r--r-- | azalea-client/src/packet_handling/configuration.rs | 204 | ||||
| -rw-r--r-- | azalea-client/src/packet_handling/game.rs (renamed from azalea-client/src/packet_handling.rs) | 514 | ||||
| -rw-r--r-- | azalea-client/src/packet_handling/mod.rs | 59 | ||||
| -rwxr-xr-x | azalea-client/src/ping.rs | 2 | ||||
| -rwxr-xr-x | azalea-client/src/player.rs | 2 | ||||
| -rw-r--r-- | azalea-client/src/raw_connection.rs | 174 | ||||
| -rw-r--r-- | azalea-client/src/received_registries.rs | 33 |
18 files changed, 1138 insertions, 595 deletions
diff --git a/azalea-client/Cargo.toml b/azalea-client/Cargo.toml index 4c26b218..de36860e 100644 --- a/azalea-client/Cargo.toml +++ b/azalea-client/Cargo.toml @@ -40,6 +40,8 @@ thiserror = "^1.0.48" tokio = { version = "^1.32.0", features = ["sync"] } uuid = "^1.4.1" azalea-entity = { version = "0.8.0", path = "../azalea-entity" } +serde_json = "1.0.104" +serde = "1.0.183" [features] default = ["log"] diff --git a/azalea-client/src/chunk_batching.rs b/azalea-client/src/chunk_batching.rs new file mode 100644 index 00000000..c0e8bb34 --- /dev/null +++ b/azalea-client/src/chunk_batching.rs @@ -0,0 +1,146 @@ +//! Used for Minecraft's chunk batching introduced in 23w31a (1.20.2). It's used +//! for making the server spread out how often it sends us chunk packets +//! depending on our receiving speed. + +use std::time::{Duration, Instant}; + +use azalea_protocol::packets::game::serverbound_chunk_batch_received_packet::ServerboundChunkBatchReceivedPacket; +use bevy_app::{App, Plugin, Update}; +use bevy_ecs::prelude::*; + +use crate::{ + interact::handle_block_interact_event, + inventory::InventorySet, + local_player::{handle_send_packet_event, SendPacketEvent}, + respawn::perform_respawn, +}; + +pub struct ChunkBatchingPlugin; +impl Plugin for ChunkBatchingPlugin { + fn build(&self, app: &mut App) { + app.add_systems( + Update, + ( + handle_chunk_batch_start_event, + handle_chunk_batch_finished_event, + ) + .chain() + .before(handle_send_packet_event) + .before(InventorySet) + .before(handle_block_interact_event) + .before(perform_respawn), + ) + .add_event::<ChunkBatchStartEvent>() + .add_event::<ChunkBatchFinishedEvent>(); + } +} + +#[derive(Component, Clone, Debug)] +pub struct ChunkBatchInfo { + pub start_time: Instant, + pub accumulator: ChunkReceiveSpeedAccumulator, +} + +#[derive(Event)] +pub struct ChunkBatchStartEvent { + pub entity: Entity, +} +#[derive(Event)] +pub struct ChunkBatchFinishedEvent { + pub entity: Entity, + pub batch_size: u32, +} + +pub fn handle_chunk_batch_start_event( + mut query: Query<&mut ChunkBatchInfo>, + mut events: EventReader<ChunkBatchStartEvent>, +) { + for event in events.iter() { + if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) { + chunk_batch_info.start_time = Instant::now(); + } + } +} + +pub fn handle_chunk_batch_finished_event( + mut query: Query<&mut ChunkBatchInfo>, + mut events: EventReader<ChunkBatchFinishedEvent>, + mut send_packets: EventWriter<SendPacketEvent>, +) { + for event in events.iter() { + if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) { + let batch_duration = chunk_batch_info.start_time.elapsed(); + if event.batch_size > 0 { + chunk_batch_info + .accumulator + .accumulate(event.batch_size, batch_duration); + } + let millis_per_chunk = + f64::max(0., chunk_batch_info.accumulator.get_millis_per_chunk()); + let desired_chunks_per_tick = if millis_per_chunk == 0. { + // make it the server's problem instead + f32::NAN + } else { + (25. / millis_per_chunk) as f32 + }; + send_packets.send(SendPacketEvent { + entity: event.entity, + packet: ServerboundChunkBatchReceivedPacket { + desired_chunks_per_tick, + } + .get(), + }); + } + } +} + +#[derive(Clone, Debug)] +pub struct ChunkReceiveSpeedAccumulator { + batch_sizes: Vec<u32>, + /// as milliseconds + batch_durations: Vec<u32>, + index: usize, + filled_size: usize, +} +impl ChunkReceiveSpeedAccumulator { + pub fn new(capacity: usize) -> Self { + Self { + batch_sizes: vec![0; capacity], + batch_durations: vec![0; capacity], + index: 0, + filled_size: 0, + } + } + + pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) { + self.batch_sizes[self.index] = batch_size; + self.batch_durations[self.index] = + f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32; + self.index = (self.index + 1) % self.batch_sizes.len(); + if self.filled_size < self.batch_sizes.len() { + self.filled_size += 1; + } + } + + pub fn get_millis_per_chunk(&self) -> f64 { + let mut total_batch_size = 0; + let mut total_batch_duration = 0; + for i in 0..self.filled_size { + total_batch_size += self.batch_sizes[i]; + total_batch_duration += self.batch_durations[i]; + } + if total_batch_size == 0 { + return 0.; + } + total_batch_duration as f64 / total_batch_size as f64 + } +} + +impl Default for ChunkBatchInfo { + fn default() -> Self { + Self { + start_time: Instant::now(), + accumulator: ChunkReceiveSpeedAccumulator::new(50), + } + } +} diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index 8424bf39..cd191e0f 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -1,26 +1,29 @@ use crate::{ attack::{self, AttackPlugin}, chat::ChatPlugin, + chunk_batching::{ChunkBatchInfo, ChunkBatchingPlugin}, disconnect::{DisconnectEvent, DisconnectPlugin}, events::{Event, EventPlugin, LocalPlayerEvents}, interact::{CurrentSequenceNumber, InteractPlugin}, inventory::{InventoryComponent, InventoryPlugin}, local_player::{ - death_event, handle_send_packet_event, GameProfileComponent, Hunger, LocalPlayer, - SendPacketEvent, + death_event, handle_send_packet_event, GameProfileComponent, Hunger, InstanceHolder, + PermissionLevel, PlayerAbilities, SendPacketEvent, TabList, }, mining::{self, MinePlugin}, movement::{LastSentLookDirection, PhysicsState, PlayerMovePlugin}, - packet_handling::{self, PacketHandlerPlugin, PacketReceiver}, + packet_handling::PacketHandlerPlugin, player::retroactively_add_game_profile_component, + raw_connection::RawConnection, respawn::RespawnPlugin, task_pool::TaskPoolPlugin, - Account, PlayerInfo, + Account, PlayerInfo, ReceivedRegistries, }; use azalea_auth::{game_profile::GameProfile, sessionserver::ClientSessionServerError}; +use azalea_buf::McBufWritable; use azalea_chat::FormattedText; -use azalea_core::Vec3; +use azalea_core::{ResourceLocation, Vec3}; use azalea_entity::{ indexing::{EntityIdIndex, Loaded}, metadata::Health, @@ -30,19 +33,21 @@ use azalea_physics::PhysicsPlugin; use azalea_protocol::{ connect::{Connection, ConnectionError}, packets::{ - game::{ - clientbound_player_abilities_packet::ClientboundPlayerAbilitiesPacket, - serverbound_client_information_packet::ServerboundClientInformationPacket, - ClientboundGamePacket, ServerboundGamePacket, + configuration::{ + serverbound_client_information_packet::ClientInformation, + ClientboundConfigurationPacket, ServerboundConfigurationPacket, }, - handshake::{ + game::ServerboundGamePacket, + handshaking::{ client_intention_packet::ClientIntentionPacket, ClientboundHandshakePacket, ServerboundHandshakePacket, }, login::{ - serverbound_custom_query_packet::ServerboundCustomQueryPacket, + serverbound_custom_query_answer_packet::ServerboundCustomQueryAnswerPacket, serverbound_hello_packet::ServerboundHelloPacket, - serverbound_key_packet::ServerboundKeyPacket, ClientboundLoginPacket, + serverbound_key_packet::ServerboundKeyPacket, + serverbound_login_acknowledged_packet::ServerboundLoginAcknowledgedPacket, + ClientboundLoginPacket, }, ConnectionProtocol, PROTOCOL_VERSION, }, @@ -59,10 +64,12 @@ use bevy_ecs::{ world::World, }; use bevy_time::{prelude::FixedTime, TimePlugin}; -use derive_more::{Deref, DerefMut}; +use derive_more::Deref; use log::{debug, error}; use parking_lot::{Mutex, RwLock}; -use std::{collections::HashMap, fmt::Debug, io, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, fmt::Debug, io, net::SocketAddr, ops::Deref, sync::Arc, time::Duration, +}; use thiserror::Error; use tokio::{ sync::{broadcast, mpsc}, @@ -71,7 +78,6 @@ use tokio::{ use uuid::Uuid; /// `Client` has the things that a user interacting with the library will want. -/// Things that a player in the world will want to know are in [`LocalPlayer`]. /// /// To make a new client, use either [`azalea::ClientBuilder`] or /// [`Client::join`]. @@ -105,62 +111,6 @@ pub struct Client { pub run_schedule_sender: mpsc::UnboundedSender<()>, } -/// A component that contains some of the "settings" for this client that are -/// sent to the server, such as render distance. This is only present on local -/// players. -pub type ClientInformation = ServerboundClientInformationPacket; - -/// A component that contains the abilities the player has, like flying -/// or instantly breaking blocks. This is only present on local players. -#[derive(Clone, Debug, Component, Default)] -pub struct PlayerAbilities { - pub invulnerable: bool, - pub flying: bool, - pub can_fly: bool, - /// Whether the player can instantly break blocks and can duplicate blocks - /// in their inventory. - pub instant_break: bool, - - pub flying_speed: f32, - /// Used for the fov - pub walking_speed: f32, -} -impl From<ClientboundPlayerAbilitiesPacket> for PlayerAbilities { - fn from(packet: ClientboundPlayerAbilitiesPacket) -> Self { - Self { - invulnerable: packet.flags.invulnerable, - flying: packet.flags.flying, - can_fly: packet.flags.can_fly, - instant_break: packet.flags.instant_break, - flying_speed: packet.flying_speed, - walking_speed: packet.walking_speed, - } - } -} - -/// Level must be 0..=4 -#[derive(Component, Clone, Default, Deref, DerefMut)] -pub struct PermissionLevel(pub u8); - -/// A component and resource that contains a map of player UUIDs to their -/// information in the tab list. -/// -/// This is a component on local players in case you want to get the tab list -/// that a certain client is seeing, and it's also a resource in case you know -/// that the server gives the same tab list to every player. -/// -/// ``` -/// # use azalea_client::TabList; -/// # fn example(client: &azalea_client::Client) { -/// let tab_list = client.component::<TabList>(); -/// println!("Online players:"); -/// for (uuid, player_info) in tab_list.iter() { -/// println!("- {} ({}ms)", player_info.profile.name, player_info.latency); -/// } -/// # } -#[derive(Component, Resource, Clone, Debug, Deref, DerefMut, Default)] -pub struct TabList(HashMap<Uuid, PlayerInfo>); - /// An error that happened while joining the server. #[derive(Error, Debug)] pub enum JoinError { @@ -261,71 +211,55 @@ impl Client { let entity = ecs_lock.lock().spawn(account.to_owned()).id(); let conn = Connection::new(resolved_address).await?; - let (conn, game_profile) = Self::handshake(conn, account, address).await?; + let (mut conn, game_profile) = Self::handshake(conn, account, address).await?; + + { + // quickly send the brand here + let mut brand_data = Vec::new(); + // they don't have to know :) + "vanilla".write_into(&mut brand_data).unwrap(); + conn.write( + azalea_protocol::packets::configuration::serverbound_custom_payload_packet::ServerboundCustomPayloadPacket { + identifier: ResourceLocation::new("brand"), + data: brand_data.into(), + } + .get(), + ).await?; + } + let (read_conn, write_conn) = conn.into_split(); + let (read_conn, write_conn) = (read_conn.raw, write_conn.raw); // we did the handshake, so now we're connected to the server let (tx, rx) = mpsc::unbounded_channel(); - let (packet_writer_sender, packet_writer_receiver) = mpsc::unbounded_channel(); - - // start receiving packets - let packet_receiver = packet_handling::PacketReceiver { - packets: Arc::new(Mutex::new(Vec::new())), - run_schedule_sender: run_schedule_sender.clone(), - }; - - let read_packets_task = tokio::spawn(packet_receiver.clone().read_task(read_conn)); - let write_packets_task = tokio::spawn( - packet_receiver - .clone() - .write_task(write_conn, packet_writer_receiver), - ); - - let local_player = crate::local_player::LocalPlayer::new( - entity, - packet_writer_sender, - // default to an empty world, it'll be set correctly later when we - // get the login packet - Arc::new(RwLock::new(Instance::default())), - read_packets_task, - write_packets_task, - ); - - ecs_lock - .lock() - .entity_mut(entity) - .insert(JoinedClientBundle { - local_player, - packet_receiver, - game_profile: GameProfileComponent(game_profile.clone()), - physics_state: PhysicsState::default(), - local_player_events: LocalPlayerEvents(tx), - inventory: InventoryComponent::default(), - client_information: ClientInformation::default(), - tab_list: TabList::default(), - current_sequence_number: CurrentSequenceNumber::default(), - last_sent_direction: LastSentLookDirection::default(), - abilities: PlayerAbilities::default(), - permission_level: PermissionLevel::default(), - hunger: Hunger::default(), - - entity_id_index: EntityIdIndex::default(), - - mining: mining::MineBundle::default(), - attack: attack::AttackBundle::default(), - - _local: LocalEntity, - _loaded: Loaded, - }); + let mut ecs = ecs_lock.lock(); + // we got the ConfigurationConnection, so the client is now connected :) let client = Client::new( - game_profile, + game_profile.clone(), entity, ecs_lock.clone(), run_schedule_sender.clone(), ); + + ecs.entity_mut(entity).insert(( + // these stay when we switch to the game state + LocalPlayerBundle { + raw_connection: RawConnection::new( + run_schedule_sender, + ConnectionProtocol::Configuration, + read_conn, + write_conn, + ), + received_registries: ReceivedRegistries::default(), + local_player_events: LocalPlayerEvents(tx), + game_profile: GameProfileComponent(game_profile), + }, + InConfigurationState, + )); + Ok((client, rx)) } @@ -340,7 +274,7 @@ impl Client { address: &ServerAddress, ) -> Result< ( - Connection<ClientboundGamePacket, ServerboundGamePacket>, + Connection<ClientboundConfigurationPacket, ServerboundConfigurationPacket>, GameProfile, ), JoinError, @@ -362,7 +296,9 @@ impl Client { conn.write( ServerboundHelloPacket { name: account.username.clone(), - profile_id: account.uuid, + // TODO: pretty sure this should generate an offline-mode uuid instead of just + // Uuid::default() + profile_id: account.uuid.unwrap_or_default(), } .get(), ) @@ -428,8 +364,13 @@ impl Client { conn.set_compression_threshold(p.compression_threshold); } ClientboundLoginPacket::GameProfile(p) => { - debug!("Got profile {:?}", p.game_profile); - break (conn.game(), p.game_profile); + debug!( + "Got profile {:?}. handshake is finished and we're now switching to the configuration state", + p.game_profile + ); + conn.write(ServerboundLoginAcknowledgedPacket {}.get()) + .await?; + break (conn.configuration(), p.game_profile); } ClientboundLoginPacket::LoginDisconnect(p) => { debug!("Got disconnect {:?}", p); @@ -438,7 +379,7 @@ impl Client { ClientboundLoginPacket::CustomQuery(p) => { debug!("Got custom query {:?}", p); conn.write( - ServerboundCustomQueryPacket { + ServerboundCustomQueryAnswerPacket { transaction_id: p.transaction_id, data: None, } @@ -453,9 +394,12 @@ impl Client { } /// Write a packet directly to the server. - pub fn write_packet(&self, packet: ServerboundGamePacket) { - self.local_player_mut(&mut self.ecs.lock()) - .write_packet(packet); + pub fn write_packet( + &self, + packet: ServerboundGamePacket, + ) -> Result<(), crate::raw_connection::WritePacketError> { + self.raw_connection_mut(&mut self.ecs.lock()) + .write_packet(packet) } /// Disconnect this client from the server by ending all tasks. @@ -468,14 +412,24 @@ impl Client { }); } - pub fn local_player<'a>(&'a self, ecs: &'a mut World) -> &'a LocalPlayer { - self.query::<&LocalPlayer>(ecs) + pub fn local_player<'a>(&'a self, ecs: &'a mut World) -> &'a InstanceHolder { + self.query::<&InstanceHolder>(ecs) } pub fn local_player_mut<'a>( &'a self, ecs: &'a mut World, - ) -> bevy_ecs::world::Mut<'a, LocalPlayer> { - self.query::<&mut LocalPlayer>(ecs) + ) -> bevy_ecs::world::Mut<'a, InstanceHolder> { + self.query::<&mut InstanceHolder>(ecs) + } + + pub fn raw_connection<'a>(&'a self, ecs: &'a mut World) -> &'a RawConnection { + self.query::<&RawConnection>(ecs) + } + pub fn raw_connection_mut<'a>( + &'a self, + ecs: &'a mut World, + ) -> bevy_ecs::world::Mut<'a, RawConnection> { + self.query::<&mut RawConnection>(ecs) } /// Get a component from this client. This will clone the component and @@ -538,8 +492,8 @@ impl Client { /// ``` pub async fn set_client_information( &self, - client_information: ServerboundClientInformationPacket, - ) -> Result<(), std::io::Error> { + client_information: ClientInformation, + ) -> Result<(), crate::raw_connection::WritePacketError> { { let mut ecs = self.ecs.lock(); let mut client_information_mut = self.query::<&mut ClientInformation>(&mut ecs); @@ -551,7 +505,7 @@ impl Client { "Sending client information (already logged in): {:?}", client_information ); - self.write_packet(client_information.get()); + self.write_packet(azalea_protocol::packets::game::serverbound_client_information_packet::ServerboundClientInformationPacket { information: client_information.clone() }.get())?; } Ok(()) @@ -606,21 +560,33 @@ impl Client { /// Get a map of player UUIDs to their information in the tab list. /// - /// This is a shortcut for `bot.component::<TabList>().0`. + /// This is a shortcut for `*bot.component::<TabList>()`. pub fn tab_list(&self) -> HashMap<Uuid, PlayerInfo> { - self.component::<TabList>().0 + self.component::<TabList>().deref().clone() } } -/// A bundle for the components that are present on a local player that received -/// a login packet. If you want to filter for this, just use [`LocalEntity`]. +/// The bundle of components that's shared when we're either in the +/// `configuration` or `game` state. +/// +/// For the components that are only present in the `game` state, see +/// [`JoinedClientBundle`] and for the ones in the `configuration` state, see +/// [`ConfigurationClientBundle`]. #[derive(Bundle)] -pub struct JoinedClientBundle { - pub local_player: LocalPlayer, - pub packet_receiver: PacketReceiver, +pub struct LocalPlayerBundle { + pub raw_connection: RawConnection, + pub received_registries: ReceivedRegistries, + pub local_player_events: LocalPlayerEvents, pub game_profile: GameProfileComponent, +} + +/// A bundle for the components that are present on a local player that is +/// currently in the `game` protocol state. If you want to filter for this, just +/// use [`LocalEntity`]. +#[derive(Bundle)] +pub struct JoinedClientBundle { + pub instance_holder: InstanceHolder, pub physics_state: PhysicsState, - pub local_player_events: LocalPlayerEvents, pub inventory: InventoryComponent, pub client_information: ClientInformation, pub tab_list: TabList, @@ -628,6 +594,7 @@ pub struct JoinedClientBundle { pub last_sent_direction: LastSentLookDirection, pub abilities: PlayerAbilities, pub permission_level: PermissionLevel, + pub chunk_batch_info: ChunkBatchInfo, pub hunger: Hunger, pub entity_id_index: EntityIdIndex, @@ -635,10 +602,15 @@ pub struct JoinedClientBundle { pub mining: mining::MineBundle, pub attack: attack::AttackBundle, - pub _local: LocalEntity, + pub _local_entity: LocalEntity, pub _loaded: Loaded, } +/// A marker component for local players that are currently in the +/// `configuration` state. +#[derive(Component)] +pub struct InConfigurationState; + pub struct AzaleaPlugin; impl Plugin for AzaleaPlugin { fn build(&self, app: &mut App) { @@ -790,6 +762,7 @@ impl PluginGroup for DefaultPlugins { .add(RespawnPlugin) .add(MinePlugin) .add(AttackPlugin) + .add(ChunkBatchingPlugin) .add(TickBroadcastPlugin); #[cfg(feature = "log")] { diff --git a/azalea-client/src/disconnect.rs b/azalea-client/src/disconnect.rs index 10aef7ba..966e5bb7 100644 --- a/azalea-client/src/disconnect.rs +++ b/azalea-client/src/disconnect.rs @@ -12,7 +12,7 @@ use bevy_ecs::{ }; use derive_more::Deref; -use crate::{client::JoinedClientBundle, LocalPlayer}; +use crate::{client::JoinedClientBundle, raw_connection::RawConnection}; pub struct DisconnectPlugin; impl Plugin for DisconnectPlugin { @@ -21,7 +21,7 @@ impl Plugin for DisconnectPlugin { PostUpdate, ( update_read_packets_task_running_component, - disconnect_on_read_packets_ended, + disconnect_on_connection_dead, remove_components_from_disconnected_players, ) .chain(), @@ -47,25 +47,23 @@ pub fn remove_components_from_disconnected_players( } #[derive(Component, Clone, Copy, Debug, Deref)] -pub struct ReadPacketsTaskRunning(bool); +pub struct IsConnectionAlive(bool); fn update_read_packets_task_running_component( + query: Query<(Entity, &RawConnection)>, mut commands: Commands, - local_player: Query<(Entity, &LocalPlayer)>, ) { - for (entity, local_player) in &local_player { - let running = !local_player.read_packets_task.is_finished(); - commands - .entity(entity) - .insert(ReadPacketsTaskRunning(running)); + for (entity, raw_connection) in &query { + let running = raw_connection.is_alive(); + commands.entity(entity).insert(IsConnectionAlive(running)); } } -fn disconnect_on_read_packets_ended( - local_player: Query<(Entity, &ReadPacketsTaskRunning), Changed<ReadPacketsTaskRunning>>, +fn disconnect_on_connection_dead( + query: Query<(Entity, &IsConnectionAlive), Changed<IsConnectionAlive>>, mut disconnect_events: EventWriter<DisconnectEvent>, ) { - for (entity, &read_packets_task_running) in &local_player { - if !*read_packets_task_running { + for (entity, &is_connection_alive) in &query { + if !*is_connection_alive { disconnect_events.send(DisconnectEvent { entity }); } } diff --git a/azalea-client/src/events.rs b/azalea-client/src/events.rs index 0d34d47b..17ebd4e8 100644 --- a/azalea-client/src/events.rs +++ b/azalea-client/src/events.rs @@ -20,7 +20,7 @@ use tokio::sync::mpsc; use crate::{ chat::{ChatPacket, ChatReceivedEvent}, - packet_handling::{ + packet_handling::game::{ AddPlayerEvent, DeathEvent, KeepAliveEvent, PacketEvent, RemovePlayerEvent, UpdatePlayerEvent, }, @@ -115,13 +115,13 @@ impl Plugin for EventPlugin { add_player_listener, update_player_listener, remove_player_listener, - death_listener, keepalive_listener, + death_listener, ), ) .add_systems( PreUpdate, - init_listener.before(crate::packet_handling::process_packet_events), + init_listener.before(crate::packet_handling::game::process_packet_events), ) .add_systems(FixedUpdate, tick_listener); } @@ -145,7 +145,7 @@ fn chat_listener(query: Query<&LocalPlayerEvents>, mut events: EventReader<ChatR for event in events.iter() { let local_player_events = query .get(event.entity) - .expect("Non-localplayer entities shouldn't be able to receive chat events"); + .expect("Non-local entities shouldn't be able to receive chat events"); local_player_events .send(Event::Chat(event.packet.clone())) .unwrap(); @@ -163,7 +163,7 @@ fn packet_listener(query: Query<&LocalPlayerEvents>, mut events: EventReader<Pac for event in events.iter() { let local_player_events = query .get(event.entity) - .expect("Non-localplayer entities shouldn't be able to receive add player events"); + .expect("Non-local entities shouldn't be able to receive add player events"); local_player_events .send(Event::Packet(Arc::new(event.packet.clone()))) .unwrap(); @@ -174,7 +174,7 @@ fn add_player_listener(query: Query<&LocalPlayerEvents>, mut events: EventReader for event in events.iter() { let local_player_events = query .get(event.entity) - .expect("Non-localplayer entities shouldn't be able to receive add player events"); + .expect("Non-local entities shouldn't be able to receive add player events"); local_player_events .send(Event::AddPlayer(event.info.clone())) .unwrap(); @@ -188,7 +188,7 @@ fn update_player_listener( for event in events.iter() { let local_player_events = query .get(event.entity) - .expect("Non-localplayer entities shouldn't be able to receive update player events"); + .expect("Non-local entities shouldn't be able to receive update player events"); local_player_events .send(Event::UpdatePlayer(event.info.clone())) .unwrap(); @@ -202,7 +202,7 @@ fn remove_player_listener( for event in events.iter() { let local_player_events = query .get(event.entity) - .expect("Non-localplayer entities shouldn't be able to receive remove player events"); + .expect("Non-local entities shouldn't be able to receive remove player events"); local_player_events .send(Event::RemovePlayer(event.info.clone())) .unwrap(); @@ -223,7 +223,7 @@ fn keepalive_listener(query: Query<&LocalPlayerEvents>, mut events: EventReader< for event in events.iter() { let local_player_events = query .get(event.entity) - .expect("Non-localplayer entities shouldn't be able to receive keepalive events"); + .expect("Non-local entities shouldn't be able to receive keepalive events"); local_player_events .send(Event::KeepAlive(event.id)) .unwrap(); diff --git a/azalea-client/src/interact.rs b/azalea-client/src/interact.rs index b28baca5..dc1306e3 100644 --- a/azalea-client/src/interact.rs +++ b/azalea-client/src/interact.rs @@ -27,10 +27,13 @@ use derive_more::{Deref, DerefMut}; use log::warn; use crate::{ - client::{PermissionLevel, PlayerAbilities}, + attack::handle_attack_event, inventory::{InventoryComponent, InventorySet}, - local_player::{handle_send_packet_event, LocalGameMode, SendPacketEvent}, - Client, LocalPlayer, + local_player::{ + handle_send_packet_event, LocalGameMode, PermissionLevel, PlayerAbilities, SendPacketEvent, + }, + respawn::perform_respawn, + Client, }; /// A plugin that allows clients to interact with blocks in the world. @@ -48,6 +51,9 @@ impl Plugin for InteractPlugin { handle_swing_arm_event, ) .before(handle_send_packet_event) + .after(InventorySet) + .after(perform_respawn) + .after(handle_attack_event) .chain(), update_modifiers_for_held_item .after(InventorySet) @@ -100,16 +106,12 @@ pub struct HitResultComponent(BlockHitResult); pub fn handle_block_interact_event( mut events: EventReader<BlockInteractEvent>, - mut query: Query<( - &LocalPlayer, - &mut CurrentSequenceNumber, - &HitResultComponent, - )>, + mut query: Query<(Entity, &mut CurrentSequenceNumber, &HitResultComponent)>, + mut send_packet_events: EventWriter<SendPacketEvent>, ) { for event in events.iter() { - let Ok((local_player, mut sequence_number, hit_result)) = query.get_mut(event.entity) - else { - warn!("Sent BlockInteractEvent for entity that isn't LocalPlayer"); + let Ok((entity, mut sequence_number, hit_result)) = query.get_mut(event.entity) else { + warn!("Sent BlockInteractEvent for entity that doesn't have the required components"); continue; }; @@ -141,14 +143,15 @@ pub fn handle_block_interact_event( } }; - local_player.write_packet( - ServerboundUseItemOnPacket { + send_packet_events.send(SendPacketEvent { + entity, + packet: ServerboundUseItemOnPacket { hand: InteractionHand::MainHand, block_hit, sequence: sequence_number.0, } .get(), - ) + }) } } diff --git a/azalea-client/src/inventory.rs b/azalea-client/src/inventory.rs index da5376fc..2e8478d4 100644 --- a/azalea-client/src/inventory.rs +++ b/azalea-client/src/inventory.rs @@ -25,7 +25,11 @@ use bevy_ecs::{ }; use log::warn; -use crate::{client::PlayerAbilities, local_player::handle_send_packet_event, Client, LocalPlayer}; +use crate::{ + local_player::{handle_send_packet_event, PlayerAbilities, SendPacketEvent}, + respawn::perform_respawn, + Client, +}; pub struct InventoryPlugin; impl Plugin for InventoryPlugin { @@ -45,7 +49,8 @@ impl Plugin for InventoryPlugin { handle_client_side_close_container_event, ) .chain() - .in_set(InventorySet), + .in_set(InventorySet) + .before(perform_respawn), ); } } @@ -599,12 +604,13 @@ pub struct CloseContainerEvent { pub id: u8, } fn handle_container_close_event( + query: Query<(Entity, &InventoryComponent)>, mut events: EventReader<CloseContainerEvent>, mut client_side_events: EventWriter<ClientSideCloseContainerEvent>, - query: Query<(&LocalPlayer, &InventoryComponent)>, + mut send_packet_events: EventWriter<SendPacketEvent>, ) { for event in events.iter() { - let (local_player, inventory) = query.get(event.entity).unwrap(); + let (entity, inventory) = query.get(event.entity).unwrap(); if event.id != inventory.id { warn!( "Tried to close container with ID {}, but the current container ID is {}", @@ -613,12 +619,13 @@ fn handle_container_close_event( continue; } - local_player.write_packet( - ServerboundContainerClosePacket { + send_packet_events.send(SendPacketEvent { + entity, + packet: ServerboundContainerClosePacket { container_id: inventory.id, } .get(), - ); + }); client_side_events.send(ClientSideCloseContainerEvent { entity: event.entity, }); @@ -650,11 +657,12 @@ pub struct ContainerClickEvent { pub operation: ClickOperation, } pub fn handle_container_click_event( + mut query: Query<(Entity, &mut InventoryComponent)>, mut events: EventReader<ContainerClickEvent>, - mut query: Query<(&mut InventoryComponent, &LocalPlayer)>, + mut send_packet_events: EventWriter<SendPacketEvent>, ) { for event in events.iter() { - let (mut inventory, local_player) = query.get_mut(event.entity).unwrap(); + let (entity, mut inventory) = query.get_mut(event.entity).unwrap(); if inventory.id != event.window_id { warn!( "Tried to click container with ID {}, but the current container ID is {}", @@ -678,8 +686,9 @@ pub fn handle_container_click_event( } } - local_player.write_packet( - ServerboundContainerClickPacket { + send_packet_events.send(SendPacketEvent { + entity, + packet: ServerboundContainerClickPacket { container_id: event.window_id, state_id: inventory.state_id, slot_num: event.operation.slot_num().map(|n| n as i16).unwrap_or(-999), @@ -689,7 +698,7 @@ pub fn handle_container_click_event( carried_item: inventory.carried.clone(), } .get(), - ) + }) } } diff --git a/azalea-client/src/lib.rs b/azalea-client/src/lib.rs index 40544540..25c75bcf 100644 --- a/azalea-client/src/lib.rs +++ b/azalea-client/src/lib.rs @@ -13,6 +13,7 @@ mod account; pub mod attack; pub mod chat; +pub mod chunk_batching; mod client; pub mod disconnect; mod entity_query; @@ -26,18 +27,20 @@ pub mod movement; pub mod packet_handling; pub mod ping; mod player; +pub mod raw_connection; pub mod received_registries; pub mod respawn; pub mod task_pool; pub use account::{Account, AccountOpts}; +pub use azalea_protocol::packets::configuration::serverbound_client_information_packet::ClientInformation; pub use client::{ - start_ecs_runner, Client, ClientInformation, DefaultPlugins, JoinError, JoinedClientBundle, - TabList, TickBroadcast, + start_ecs_runner, Client, DefaultPlugins, JoinError, JoinedClientBundle, TickBroadcast, }; pub use events::Event; -pub use local_player::{GameProfileComponent, LocalPlayer, SendPacketEvent}; +pub use local_player::{GameProfileComponent, InstanceHolder, SendPacketEvent, TabList}; pub use movement::{ PhysicsState, SprintDirection, StartSprintEvent, StartWalkEvent, WalkDirection, }; pub use player::PlayerInfo; +pub use received_registries::ReceivedRegistries; diff --git a/azalea-client/src/local_player.rs b/azalea-client/src/local_player.rs index 1bcb0948..2989f36e 100644 --- a/azalea-client/src/local_player.rs +++ b/azalea-client/src/local_player.rs @@ -1,51 +1,40 @@ -use std::{io, sync::Arc}; +use std::{collections::HashMap, io, sync::Arc}; use azalea_auth::game_profile::GameProfile; use azalea_core::GameMode; use azalea_entity::Dead; -use azalea_protocol::packets::game::ServerboundGamePacket; +use azalea_protocol::packets::game::{ + clientbound_player_abilities_packet::ClientboundPlayerAbilitiesPacket, ServerboundGamePacket, +}; use azalea_world::{Instance, PartialInstance}; use bevy_ecs::{ - component::Component, entity::Entity, event::EventReader, prelude::Event, query::Added, + component::Component, entity::Entity, event::EventReader, prelude::*, query::Added, system::Query, }; use derive_more::{Deref, DerefMut}; +use log::error; use parking_lot::RwLock; use thiserror::Error; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::sync::mpsc; +use uuid::Uuid; use crate::{ events::{Event as AzaleaEvent, LocalPlayerEvents}, - ClientInformation, + raw_connection::RawConnection, + ClientInformation, PlayerInfo, }; -/// This is a component for our local player entities that are probably in a -/// world. If you have access to a [`Client`], you probably don't need to care -/// about this since `Client` gives you access to everything here. -/// -/// You can also use the [`LocalEntity`] marker component for queries if you're -/// only checking for a local player and don't need the contents of this -/// component. -/// -/// [`LocalEntity`]: azalea_entity::LocalEntity -/// [`Client`]: crate::Client +/// A component that keeps strong references to our [`PartialInstance`] and +/// [`Instance`] for local players. #[derive(Component)] -pub struct LocalPlayer { - pub packet_writer: mpsc::UnboundedSender<ServerboundGamePacket>, - +pub struct InstanceHolder { /// The partial instance is the world this client currently has loaded. It /// has a limited render distance. pub partial_instance: Arc<RwLock<PartialInstance>>, /// The world is the combined [`PartialInstance`]s of all clients in the /// same world. (Only relevant if you're using a shared world, i.e. a /// swarm) - pub world: Arc<RwLock<Instance>>, - - /// A task that reads packets from the server. The client is disconnected - /// when this task ends. - pub(crate) read_packets_task: JoinHandle<()>, - /// A task that writes packets from the server. - pub(crate) write_packets_task: JoinHandle<()>, + pub instance: Arc<RwLock<Instance>>, } /// A component only present in players that contains the [`GameProfile`] (which @@ -64,6 +53,53 @@ pub struct LocalGameMode { pub previous: Option<GameMode>, } +/// A component that contains the abilities the player has, like flying +/// or instantly breaking blocks. This is only present on local players. +#[derive(Clone, Debug, Component, Default)] +pub struct PlayerAbilities { + pub invulnerable: bool, + pub flying: bool, + pub can_fly: bool, + /// Whether the player can instantly break blocks and can duplicate blocks + /// in their inventory. + pub instant_break: bool, + + pub flying_speed: f32, + /// Used for the fov + pub walking_speed: f32, +} +impl From<ClientboundPlayerAbilitiesPacket> for PlayerAbilities { + fn from(packet: ClientboundPlayerAbilitiesPacket) -> Self { + Self { + invulnerable: packet.flags.invulnerable, + flying: packet.flags.flying, + can_fly: packet.flags.can_fly, + instant_break: packet.flags.instant_break, + flying_speed: packet.flying_speed, + walking_speed: packet.walking_speed, + } + } +} + +/// Level must be 0..=4 +#[derive(Component, Clone, Default, Deref, DerefMut)] +pub struct PermissionLevel(pub u8); + +/// A component that contains a map of player UUIDs to their information in the +/// tab list. +/// +/// ``` +/// # use azalea_client::TabList; +/// # fn example(client: &azalea_client::Client) { +/// let tab_list = client.component::<TabList>(); +/// println!("Online players:"); +/// for (uuid, player_info) in tab_list.iter() { +/// println!("- {} ({}ms)", player_info.profile.name, player_info.latency); +/// } +/// # } +#[derive(Component, Resource, Clone, Debug, Deref, DerefMut, Default)] +pub struct TabList(HashMap<Uuid, PlayerInfo>); + #[derive(Component, Clone)] pub struct Hunger { /// The main hunger bar. Goes from 0 to 20. @@ -83,50 +119,24 @@ impl Default for Hunger { } } -impl LocalPlayer { - /// Create a new `LocalPlayer`. - pub fn new( - entity: Entity, - packet_writer: mpsc::UnboundedSender<ServerboundGamePacket>, - world: Arc<RwLock<Instance>>, - read_packets_task: JoinHandle<()>, - write_packets_task: JoinHandle<()>, - ) -> Self { +impl InstanceHolder { + /// Create a new `InstanceHolder`. + pub fn new(entity: Entity, world: Arc<RwLock<Instance>>) -> Self { let client_information = ClientInformation::default(); - LocalPlayer { - packet_writer, - - world, + InstanceHolder { + instance: world, partial_instance: Arc::new(RwLock::new(PartialInstance::new( azalea_world::calculate_chunk_storage_range( client_information.view_distance.into(), ), Some(entity), ))), - - read_packets_task, - write_packets_task, } } - - /// Write a packet directly to the server. - pub fn write_packet(&self, packet: ServerboundGamePacket) { - self.packet_writer - .send(packet) - .expect("write_packet shouldn't be able to be called if the connection is closed"); - } -} - -impl Drop for LocalPlayer { - /// Stop every active task when the `LocalPlayer` is dropped. - fn drop(&mut self) { - self.read_packets_task.abort(); - self.write_packets_task.abort(); - } } -/// Send the "Death" event for [`LocalPlayer`]s that died with no reason. +/// Send the "Death" event for [`LocalEntity`]s that died with no reason. pub fn death_event(query: Query<&LocalPlayerEvents, Added<Dead>>) { for local_player_events in &query { local_player_events.send(AzaleaEvent::Death(None)).unwrap(); @@ -160,11 +170,14 @@ pub struct SendPacketEvent { pub fn handle_send_packet_event( mut send_packet_events: EventReader<SendPacketEvent>, - mut query: Query<&mut LocalPlayer>, + mut query: Query<&mut RawConnection>, ) { for event in send_packet_events.iter() { - if let Ok(local_player) = query.get_mut(event.entity) { - local_player.write_packet(event.packet.clone()); + if let Ok(raw_connection) = query.get_mut(event.entity) { + // debug!("Sending packet: {:?}", event.packet); + if let Err(e) = raw_connection.write_packet(event.packet.clone()) { + error!("Failed to send packet: {e}"); + } } } } diff --git a/azalea-client/src/mining.rs b/azalea-client/src/mining.rs index 336bfe24..5db357b8 100644 --- a/azalea-client/src/mining.rs +++ b/azalea-client/src/mining.rs @@ -12,13 +12,12 @@ use bevy_ecs::prelude::*; use derive_more::{Deref, DerefMut}; use crate::{ - client::{PermissionLevel, PlayerAbilities}, interact::{ can_use_game_master_blocks, check_is_interaction_restricted, CurrentSequenceNumber, HitResultComponent, SwingArmEvent, }, inventory::{InventoryComponent, InventorySet}, - local_player::{LocalGameMode, SendPacketEvent}, + local_player::{LocalGameMode, PermissionLevel, PlayerAbilities, SendPacketEvent}, Client, }; diff --git a/azalea-client/src/movement.rs b/azalea-client/src/movement.rs index 3bdcdeac..782e98ff 100644 --- a/azalea-client/src/movement.rs +++ b/azalea-client/src/movement.rs @@ -1,8 +1,8 @@ use crate::client::Client; -use crate::local_player::LocalPlayer; +use crate::local_player::SendPacketEvent; use azalea_entity::{metadata::Sprinting, Attributes, Jumping}; use azalea_entity::{InLoadedChunk, LastSentPosition, LookDirection, Physics, Position}; -use azalea_physics::PhysicsSet; +use azalea_physics::{ai_step, PhysicsSet}; use azalea_protocol::packets::game::serverbound_player_command_packet::ServerboundPlayerCommandPacket; use azalea_protocol::packets::game::{ serverbound_move_player_pos_packet::ServerboundMovePlayerPosPacket, @@ -12,7 +12,7 @@ use azalea_protocol::packets::game::{ }; use azalea_world::{MinecraftEntityId, MoveEntityError}; use bevy_app::{App, FixedUpdate, Plugin, Update}; -use bevy_ecs::prelude::Event; +use bevy_ecs::prelude::{Event, EventWriter}; use bevy_ecs::{ component::Component, entity::Entity, event::EventReader, query::With, schedule::IntoSystemConfigs, system::Query, @@ -48,9 +48,11 @@ impl Plugin for PlayerMovePlugin { .add_systems( FixedUpdate, ( - local_player_ai_step + (tick_controls, local_player_ai_step) + .chain() .in_set(PhysicsSet) - .before(azalea_physics::ai_step), + .before(ai_step), + send_sprinting_if_needed.after(azalea_entity::update_in_loaded_chunk), send_position.after(PhysicsSet), ) .chain(), @@ -118,33 +120,28 @@ pub struct PhysicsState { pub fn send_position( mut query: Query< ( - &MinecraftEntityId, - &mut LocalPlayer, - &mut PhysicsState, + Entity, &Position, + &LookDirection, + &mut PhysicsState, &mut LastSentPosition, &mut Physics, - &LookDirection, &mut LastSentLookDirection, - &Sprinting, ), With<InLoadedChunk>, >, + mut send_packet_events: EventWriter<SendPacketEvent>, ) { for ( - id, - mut local_player, - mut physics_state, + entity, position, + direction, + mut physics_state, mut last_sent_position, mut physics, - direction, mut last_direction, - sprinting, ) in query.iter_mut() { - local_player.send_sprinting_if_needed(id, sprinting, &mut physics_state); - let packet = { // TODO: the camera being able to be controlled by other entities isn't // implemented yet if !self.is_controlled_camera() { return }; @@ -225,18 +222,16 @@ pub fn send_position( }; if let Some(packet) = packet { - local_player.write_packet(packet); + send_packet_events.send(SendPacketEvent { entity, packet }); } } } -impl LocalPlayer { - fn send_sprinting_if_needed( - &mut self, - id: &MinecraftEntityId, - sprinting: &Sprinting, - physics_state: &mut PhysicsState, - ) { +fn send_sprinting_if_needed( + mut query: Query<(Entity, &MinecraftEntityId, &Sprinting, &mut PhysicsState)>, + mut send_packet_events: EventWriter<SendPacketEvent>, +) { + for (entity, minecraft_entity_id, sprinting, mut physics_state) in query.iter_mut() { let was_sprinting = physics_state.was_sprinting; if **sprinting != was_sprinting { let sprinting_action = if **sprinting { @@ -244,21 +239,26 @@ impl LocalPlayer { } else { azalea_protocol::packets::game::serverbound_player_command_packet::Action::StopSprinting }; - self.write_packet( - ServerboundPlayerCommandPacket { - id: **id, + send_packet_events.send(SendPacketEvent { + entity, + packet: ServerboundPlayerCommandPacket { + id: **minecraft_entity_id, action: sprinting_action, data: 0, } .get(), - ); + }); physics_state.was_sprinting = **sprinting; } } +} + +/// Update the impulse from self.move_direction. The multipler is used for +/// sneaking. +pub(crate) fn tick_controls(mut query: Query<&mut PhysicsState>) { + for mut physics_state in query.iter_mut() { + let multiplier: Option<f32> = None; - /// Update the impulse from self.move_direction. The multipler is used for - /// sneaking. - pub(crate) fn tick_controls(multiplier: Option<f32>, physics_state: &mut PhysicsState) { let mut forward_impulse: f32 = 0.; let mut left_impulse: f32 = 0.; let move_direction = physics_state.move_direction; @@ -296,18 +296,11 @@ impl LocalPlayer { /// automatically by the client. pub fn local_player_ai_step( mut query: Query< - ( - &mut PhysicsState, - &mut Physics, - &mut Sprinting, - &mut Attributes, - ), + (&PhysicsState, &mut Physics, &mut Sprinting, &mut Attributes), With<InLoadedChunk>, >, ) { - for (mut physics_state, mut physics, mut sprinting, mut attributes) in query.iter_mut() { - LocalPlayer::tick_controls(None, &mut physics_state); - + for (physics_state, mut physics, mut sprinting, mut attributes) in query.iter_mut() { // server ai step physics.xxa = physics_state.left_impulse; physics.zza = physics_state.forward_impulse; @@ -325,7 +318,7 @@ pub fn local_player_ai_step( && ( // !self.is_in_water() // || self.is_underwater() && - has_enough_impulse_to_start_sprinting(&physics_state) + has_enough_impulse_to_start_sprinting(physics_state) && has_enough_food_to_sprint // && !self.using_item() // && !self.has_effect(MobEffects.BLINDNESS) diff --git a/azalea-client/src/packet_handling/configuration.rs b/azalea-client/src/packet_handling/configuration.rs new file mode 100644 index 00000000..6930e739 --- /dev/null +++ b/azalea-client/src/packet_handling/configuration.rs @@ -0,0 +1,204 @@ +use std::io::Cursor; +use std::sync::Arc; + +use azalea_entity::indexing::{EntityIdIndex, Loaded}; +use azalea_protocol::packets::configuration::serverbound_finish_configuration_packet::ServerboundFinishConfigurationPacket; +use azalea_protocol::packets::configuration::serverbound_keep_alive_packet::ServerboundKeepAlivePacket; +use azalea_protocol::packets::configuration::serverbound_pong_packet::ServerboundPongPacket; +use azalea_protocol::packets::configuration::serverbound_resource_pack_packet::ServerboundResourcePackPacket; +use azalea_protocol::packets::configuration::ClientboundConfigurationPacket; +use azalea_protocol::packets::ConnectionProtocol; +use azalea_protocol::read::deserialize_packet; +use azalea_world::Instance; +use bevy_ecs::prelude::*; +use bevy_ecs::system::SystemState; +use log::{debug, error, warn}; +use parking_lot::RwLock; + +use crate::client::InConfigurationState; +use crate::disconnect::DisconnectEvent; +use crate::local_player::Hunger; +use crate::packet_handling::game::KeepAliveEvent; +use crate::raw_connection::RawConnection; +use crate::ReceivedRegistries; + +#[derive(Event, Debug, Clone)] +pub struct PacketEvent { + /// The client entity that received the packet. + pub entity: Entity, + /// The packet that was actually received. + pub packet: ClientboundConfigurationPacket, +} + +pub fn send_packet_events( + query: Query<(Entity, &RawConnection), With<InConfigurationState>>, + mut packet_events: ResMut<Events<PacketEvent>>, +) { + // we manually clear and send the events at the beginning of each update + // since otherwise it'd cause issues with events in process_packet_events + // running twice + packet_events.clear(); + for (player_entity, raw_connection) in &query { + let packets_lock = raw_connection.incoming_packet_queue(); + let mut packets = packets_lock.lock(); + if !packets.is_empty() { + for raw_packet in packets.iter() { + let packet = match deserialize_packet::<ClientboundConfigurationPacket>( + &mut Cursor::new(raw_packet), + ) { + Ok(packet) => packet, + Err(err) => { + error!("failed to read packet: {:?}", err); + continue; + } + }; + packet_events.send(PacketEvent { + entity: player_entity, + packet: packet.clone(), + }); + } + // clear the packets right after we read them + packets.clear(); + } + } +} + +pub fn process_packet_events(ecs: &mut World) { + let mut events_owned = Vec::new(); + let mut system_state: SystemState<EventReader<PacketEvent>> = SystemState::new(ecs); + let mut events = system_state.get_mut(ecs); + for PacketEvent { + entity: player_entity, + packet, + } in events.iter() + { + // we do this so `ecs` isn't borrowed for the whole loop + events_owned.push((*player_entity, packet.clone())); + } + for (player_entity, packet) in events_owned { + match packet { + ClientboundConfigurationPacket::RegistryData(p) => { + let mut system_state: SystemState<Query<&mut ReceivedRegistries>> = + SystemState::new(ecs); + let mut query = system_state.get_mut(ecs); + let mut received_registries = query.get_mut(player_entity).unwrap(); + + let new_received_registries = p.registry_holder.registries; + // override the old registries with the new ones + // but if a registry wasn't sent, keep the old one + for (registry_name, registry) in new_received_registries { + received_registries + .registries + .insert(registry_name, registry); + } + } + + ClientboundConfigurationPacket::CustomPayload(p) => { + debug!("Got custom payload packet {p:?}"); + } + ClientboundConfigurationPacket::Disconnect(p) => { + warn!("Got disconnect packet {p:?}"); + let mut system_state: SystemState<EventWriter<DisconnectEvent>> = + SystemState::new(ecs); + let mut disconnect_events = system_state.get_mut(ecs); + disconnect_events.send(DisconnectEvent { + entity: player_entity, + }); + } + ClientboundConfigurationPacket::FinishConfiguration(p) => { + debug!("got FinishConfiguration packet: {p:?}"); + + let mut system_state: SystemState<Query<&mut RawConnection>> = + SystemState::new(ecs); + let mut query = system_state.get_mut(ecs); + let mut raw_connection = query.get_mut(player_entity).unwrap(); + + let instance_holder = crate::local_player::InstanceHolder::new( + player_entity, + // default to an empty world, it'll be set correctly later when we + // get the login packet + Arc::new(RwLock::new(Instance::default())), + ); + + raw_connection + .write_packet(ServerboundFinishConfigurationPacket {}.get()) + .expect( + "we should be in the right state and encoding this packet shouldn't fail", + ); + raw_connection.set_state(ConnectionProtocol::Game); + + // these components are added now that we're going to be in the Game state + ecs.entity_mut(player_entity) + .remove::<InConfigurationState>() + .insert(crate::JoinedClientBundle { + instance_holder, + physics_state: crate::PhysicsState::default(), + inventory: crate::inventory::InventoryComponent::default(), + client_information: crate::ClientInformation::default(), + tab_list: crate::local_player::TabList::default(), + current_sequence_number: crate::interact::CurrentSequenceNumber::default(), + last_sent_direction: crate::movement::LastSentLookDirection::default(), + abilities: crate::local_player::PlayerAbilities::default(), + permission_level: crate::local_player::PermissionLevel::default(), + hunger: Hunger::default(), + chunk_batch_info: crate::chunk_batching::ChunkBatchInfo::default(), + + entity_id_index: EntityIdIndex::default(), + + mining: crate::mining::MineBundle::default(), + attack: crate::attack::AttackBundle::default(), + + _local_entity: azalea_entity::LocalEntity, + _loaded: Loaded, + }); + } + ClientboundConfigurationPacket::KeepAlive(p) => { + debug!("Got keep alive packet (in configuration) {p:?} for {player_entity:?}"); + + let mut system_state: SystemState<( + Query<&RawConnection>, + EventWriter<KeepAliveEvent>, + )> = SystemState::new(ecs); + let (query, mut keepalive_events) = system_state.get_mut(ecs); + let raw_connection = query.get(player_entity).unwrap(); + + keepalive_events.send(KeepAliveEvent { + entity: player_entity, + id: p.id, + }); + raw_connection + .write_packet(ServerboundKeepAlivePacket { id: p.id }.get()) + .unwrap(); + } + ClientboundConfigurationPacket::Ping(p) => { + debug!("Got ping packet {p:?}"); + + let mut system_state: SystemState<Query<&RawConnection>> = SystemState::new(ecs); + let mut query = system_state.get_mut(ecs); + let raw_connection = query.get_mut(player_entity).unwrap(); + + raw_connection + .write_packet(ServerboundPongPacket { id: p.id }.get()) + .unwrap(); + } + ClientboundConfigurationPacket::ResourcePack(p) => { + debug!("Got resource pack packet {p:?}"); + + let mut system_state: SystemState<Query<&RawConnection>> = SystemState::new(ecs); + let mut query = system_state.get_mut(ecs); + let raw_connection = query.get_mut(player_entity).unwrap(); + + // always accept resource pack + raw_connection.write_packet( + ServerboundResourcePackPacket { action: azalea_protocol::packets::configuration::serverbound_resource_pack_packet::Action::Accepted }.get() + ).unwrap(); + } + ClientboundConfigurationPacket::UpdateEnabledFeatures(p) => { + debug!("Got update enabled features packet {p:?}"); + } + ClientboundConfigurationPacket::UpdateTags(_p) => { + debug!("Got update tags packet"); + } + } + } +} diff --git a/azalea-client/src/packet_handling.rs b/azalea-client/src/packet_handling/game.rs index 6ac657d7..e0a8b017 100644 --- a/azalea-client/src/packet_handling.rs +++ b/azalea-client/src/packet_handling/game.rs @@ -4,62 +4,49 @@ use std::{ sync::{Arc, Weak}, }; -use azalea_buf::McBufWritable; use azalea_chat::FormattedText; use azalea_core::{ChunkPos, GameMode, ResourceLocation, Vec3}; use azalea_entity::{ indexing::{EntityIdIndex, EntityUuidIndex}, metadata::{apply_metadata, Health, PlayerMetadataBundle}, - Dead, EntityBundle, EntityKind, EntityUpdateSet, LastSentPosition, LoadedBy, LookDirection, + Dead, EntityBundle, EntityKind, LastSentPosition, LoadedBy, LocalEntity, LookDirection, Physics, PlayerBundle, Position, RelativeEntityUpdate, }; use azalea_nbt::NbtCompound; use azalea_protocol::{ - connect::{ReadConnection, WriteConnection}, packets::game::{ clientbound_player_combat_kill_packet::ClientboundPlayerCombatKillPacket, serverbound_accept_teleportation_packet::ServerboundAcceptTeleportationPacket, - serverbound_custom_payload_packet::ServerboundCustomPayloadPacket, serverbound_keep_alive_packet::ServerboundKeepAlivePacket, serverbound_move_player_pos_rot_packet::ServerboundMovePlayerPosRotPacket, serverbound_pong_packet::ServerboundPongPacket, ClientboundGamePacket, - ServerboundGamePacket, }, - read::ReadPacketError, + read::deserialize_packet, }; use azalea_world::{Instance, InstanceContainer, InstanceName, MinecraftEntityId, PartialInstance}; -use bevy_app::{App, First, Plugin, PreUpdate, Update}; -use bevy_ecs::{ - component::Component, - entity::Entity, - event::{EventReader, EventWriter, Events}, - prelude::Event, - query::Changed, - schedule::IntoSystemConfigs, - system::{Commands, Query, Res, ResMut, SystemState}, - world::World, -}; +use bevy_ecs::{prelude::*, system::SystemState}; use log::{debug, error, trace, warn}; -use parking_lot::{Mutex, RwLock}; -use tokio::sync::mpsc; +use parking_lot::RwLock; use crate::{ chat::{ChatPacket, ChatReceivedEvent}, - client::{PlayerAbilities, TabList}, + chunk_batching, disconnect::DisconnectEvent, - events::death_listener, inventory::{ ClientSideCloseContainerEvent, InventoryComponent, MenuOpenedEvent, SetContainerContentEvent, }, - local_player::{GameProfileComponent, Hunger, LocalGameMode, LocalPlayer}, - received_registries::ReceivedRegistries, - ClientInformation, PlayerInfo, + local_player::{ + GameProfileComponent, Hunger, InstanceHolder, LocalGameMode, PlayerAbilities, + SendPacketEvent, TabList, + }, + raw_connection::RawConnection, + ClientInformation, PlayerInfo, ReceivedRegistries, }; /// An event that's sent when we receive a packet. /// ``` -/// # use azalea_client::packet_handling::PacketEvent; +/// # use azalea_client::packet_handling::game::PacketEvent; /// # use azalea_protocol::packets::game::ClientboundGamePacket; /// # use bevy_ecs::event::EventReader; /// @@ -85,30 +72,6 @@ pub struct PacketEvent { pub packet: ClientboundGamePacket, } -pub struct PacketHandlerPlugin; - -impl Plugin for PacketHandlerPlugin { - fn build(&self, app: &mut App) { - app.add_systems(First, send_packet_events) - .add_systems( - PreUpdate, - process_packet_events - // we want to index and deindex right after - .before(EntityUpdateSet::Deindex), - ) - .add_systems(Update, death_event_on_0_health.before(death_listener)) - .init_resource::<Events<PacketEvent>>() - .add_event::<AddPlayerEvent>() - .add_event::<RemovePlayerEvent>() - .add_event::<UpdatePlayerEvent>() - .add_event::<ChatReceivedEvent>() - .add_event::<DeathEvent>() - .add_event::<KeepAliveEvent>() - .add_event::<ResourcePackEvent>() - .add_event::<InstanceLoadedEvent>(); - } -} - /// A player joined the game (or more specifically, was added to the tab /// list of a local player). #[derive(Event, Debug, Clone)] @@ -143,20 +106,6 @@ pub struct DeathEvent { pub packet: Option<ClientboundPlayerCombatKillPacket>, } -pub fn death_event_on_0_health( - query: Query<(Entity, &Health), Changed<Health>>, - mut death_events: EventWriter<DeathEvent>, -) { - for (entity, health) in query.iter() { - if **health == 0. { - death_events.send(DeathEvent { - entity, - packet: None, - }); - } - } -} - /// A KeepAlive packet is sent from the server to verify that the client is /// still connected. #[derive(Event, Debug, Clone)] @@ -187,25 +136,28 @@ pub struct InstanceLoadedEvent { pub instance: Weak<RwLock<Instance>>, } -/// Something that receives packets from the server. -#[derive(Event, Component, Clone)] -pub struct PacketReceiver { - pub packets: Arc<Mutex<Vec<ClientboundGamePacket>>>, - pub run_schedule_sender: mpsc::UnboundedSender<()>, -} - pub fn send_packet_events( - query: Query<(Entity, &PacketReceiver)>, + query: Query<(Entity, &RawConnection), With<LocalEntity>>, mut packet_events: ResMut<Events<PacketEvent>>, ) { // we manually clear and send the events at the beginning of each update // since otherwise it'd cause issues with events in process_packet_events // running twice packet_events.clear(); - for (player_entity, packet_receiver) in &query { - let mut packets = packet_receiver.packets.lock(); + for (player_entity, raw_connection) in &query { + let packets_lock = raw_connection.incoming_packet_queue(); + let mut packets = packets_lock.lock(); if !packets.is_empty() { - for packet in packets.iter() { + for raw_packet in packets.iter() { + let packet = + match deserialize_packet::<ClientboundGamePacket>(&mut Cursor::new(raw_packet)) + { + Ok(packet) => packet, + Err(err) => { + error!("failed to read packet: {:?}", err); + continue; + } + }; packet_events.send(PacketEvent { entity: player_entity, packet: packet.clone(), @@ -238,37 +190,60 @@ pub fn process_packet_events(ecs: &mut World) { let mut system_state: SystemState<( Commands, Query<( - &mut LocalPlayer, - &mut EntityIdIndex, &GameProfileComponent, &ClientInformation, + &ReceivedRegistries, + Option<&mut InstanceName>, + &mut EntityIdIndex, + &mut InstanceHolder, )>, EventWriter<InstanceLoadedEvent>, ResMut<InstanceContainer>, + EventWriter<SendPacketEvent>, )> = SystemState::new(ecs); - let (mut commands, mut query, mut instance_loaded_events, mut instance_container) = - system_state.get_mut(ecs); - let (mut local_player, mut entity_id_index, game_profile, client_information) = - query.get_mut(player_entity).unwrap(); + let ( + mut commands, + mut query, + mut instance_loaded_events, + mut instance_container, + mut send_packet_events, + ) = system_state.get_mut(ecs); + let ( + game_profile, + client_information, + received_registries, + instance_name, + mut entity_id_index, + mut instance_holder, + ) = query.get_mut(player_entity).unwrap(); { - let received_registries = ReceivedRegistries(p.registry_holder.root); + let new_instance_name = p.common.dimension.clone(); - let dimension = &received_registries - .dimension_type + if let Some(mut instance_name) = instance_name { + *instance_name = instance_name.clone(); + } else { + commands + .entity(player_entity) + .insert(InstanceName(new_instance_name.clone())); + } + + let Some(dimension_type) = received_registries.dimension_type() else { + error!("Server didn't send dimension type registry, can't log in"); + continue; + }; + let dimension = &dimension_type .value .iter() - .find(|t| t.name == p.dimension_type) + .find(|t| t.name == p.common.dimension_type) .unwrap_or_else(|| { - panic!("No dimension_type with name {}", p.dimension_type) + panic!("No dimension_type with name {}", p.common.dimension_type) }) .element; - let new_instance_name = p.dimension.clone(); - // add this world to the instance_container (or don't if it's already // there) - let instance = instance_container.insert( + let weak_instance = instance_container.insert( new_instance_name.clone(), dimension.height, dimension.min_y, @@ -276,22 +251,22 @@ pub fn process_packet_events(ecs: &mut World) { instance_loaded_events.send(InstanceLoadedEvent { entity: player_entity, name: new_instance_name.clone(), - instance: Arc::downgrade(&instance), + instance: Arc::downgrade(&weak_instance), }); // set the partial_world to an empty world // (when we add chunks or entities those will be in the // instance_container) - *local_player.partial_instance.write() = PartialInstance::new( + *instance_holder.partial_instance.write() = PartialInstance::new( azalea_world::calculate_chunk_storage_range( client_information.view_distance.into(), ), // this argument makes it so other clients don't update this player entity - // in a shared world + // in a shared instance Some(player_entity), ); - local_player.world = instance; + instance_holder.instance = weak_instance; let player_bundle = PlayerBundle { entity: EntityBundle::new( @@ -306,11 +281,11 @@ pub fn process_packet_events(ecs: &mut World) { commands.entity(player_entity).insert(( MinecraftEntityId(p.player_id), LocalGameMode { - current: p.game_type, - previous: p.previous_game_type.into(), + current: p.common.game_type, + previous: p.common.previous_game_type.into(), }, // this gets overwritten later by the SetHealth packet - received_registries, + received_registries.clone(), player_bundle, )); @@ -318,41 +293,59 @@ pub fn process_packet_events(ecs: &mut World) { entity_id_index.insert(MinecraftEntityId(p.player_id), player_entity); } - // brand - let mut brand_data = Vec::new(); - // they don't have to know :) - "vanilla".write_into(&mut brand_data).unwrap(); - local_player.write_packet( - ServerboundCustomPayloadPacket { - identifier: ResourceLocation::new("brand"), - data: brand_data.into(), - } - .get(), - ); - // send the client information that we have set - log::debug!( + debug!( "Sending client information because login: {:?}", client_information ); - local_player.write_packet(client_information.clone().get()); + send_packet_events.send(SendPacketEvent { + entity: player_entity, + packet: azalea_protocol::packets::game::serverbound_client_information_packet::ServerboundClientInformationPacket { information: client_information.clone() }.get(), + }); system_state.apply(ecs); } ClientboundGamePacket::SetChunkCacheRadius(p) => { - debug!("Got set chunk cache radius packet {:?}", p); + debug!("Got set chunk cache radius packet {p:?}"); } + + ClientboundGamePacket::ChunkBatchStart(_p) => { + // the packet is empty, just a marker to tell us when the batch starts and ends + debug!("Got chunk batch start"); + let mut system_state: SystemState< + EventWriter<chunk_batching::ChunkBatchStartEvent>, + > = SystemState::new(ecs); + let mut chunk_batch_start_events = system_state.get_mut(ecs); + + chunk_batch_start_events.send(chunk_batching::ChunkBatchStartEvent { + entity: player_entity, + }); + } + ClientboundGamePacket::ChunkBatchFinished(p) => { + debug!("Got chunk batch finished {p:?}"); + + let mut system_state: SystemState< + EventWriter<chunk_batching::ChunkBatchFinishedEvent>, + > = SystemState::new(ecs); + let mut chunk_batch_start_events = system_state.get_mut(ecs); + + chunk_batch_start_events.send(chunk_batching::ChunkBatchFinishedEvent { + entity: player_entity, + batch_size: p.batch_size, + }); + } + ClientboundGamePacket::CustomPayload(p) => { - debug!("Got custom payload packet {:?}", p); + debug!("Got custom payload packet {p:?}"); } ClientboundGamePacket::ChangeDifficulty(p) => { - debug!("Got difficulty packet {:?}", p); + debug!("Got difficulty packet {p:?}"); } ClientboundGamePacket::Commands(_p) => { debug!("Got declare commands packet"); } ClientboundGamePacket::PlayerAbilities(p) => { - debug!("Got player abilities packet {:?}", p); + debug!("Got player abilities packet {p:?}"); let mut system_state: SystemState<Query<&mut PlayerAbilities>> = SystemState::new(ecs); let mut query = system_state.get_mut(ecs); @@ -361,53 +354,46 @@ pub fn process_packet_events(ecs: &mut World) { *player_abilities = PlayerAbilities::from(p); } ClientboundGamePacket::SetCarriedItem(p) => { - debug!("Got set carried item packet {:?}", p); + debug!("Got set carried item packet {p:?}"); } ClientboundGamePacket::UpdateTags(_p) => { debug!("Got update tags packet"); } ClientboundGamePacket::Disconnect(p) => { - warn!("Got disconnect packet {:?}", p); + warn!("Got disconnect packet {p:?}"); let mut system_state: SystemState<EventWriter<DisconnectEvent>> = SystemState::new(ecs); let mut disconnect_events = system_state.get_mut(ecs); disconnect_events.send(DisconnectEvent { entity: player_entity, }); - // bye - return; } ClientboundGamePacket::UpdateRecipes(_p) => { debug!("Got update recipes packet"); } ClientboundGamePacket::EntityEvent(_p) => { - // debug!("Got entity event packet {:?}", p); + // debug!("Got entity event packet {p:?}"); } ClientboundGamePacket::Recipe(_p) => { debug!("Got recipe packet"); } ClientboundGamePacket::PlayerPosition(p) => { // TODO: reply with teleport confirm - debug!("Got player position packet {:?}", p); + debug!("Got player position packet {p:?}"); #[allow(clippy::type_complexity)] - let mut system_state: SystemState< + let mut system_state: SystemState<( Query<( - &mut LocalPlayer, &mut Physics, &mut LookDirection, &mut Position, &mut LastSentPosition, )>, - > = SystemState::new(ecs); - let mut query = system_state.get_mut(ecs); - let Ok(( - local_player, - mut physics, - mut direction, - mut position, - mut last_sent_position, - )) = query.get_mut(player_entity) + EventWriter<SendPacketEvent>, + )> = SystemState::new(ecs); + let (mut query, mut send_packet_events) = system_state.get_mut(ecs); + let Ok((mut physics, mut direction, mut position, mut last_sent_position)) = + query.get_mut(player_entity) else { continue; }; @@ -470,9 +456,13 @@ pub fn process_packet_events(ecs: &mut World) { **position = new_pos; } - local_player.write_packet(ServerboundAcceptTeleportationPacket { id: p.id }.get()); - local_player.write_packet( - ServerboundMovePlayerPosRotPacket { + send_packet_events.send(SendPacketEvent { + entity: player_entity, + packet: ServerboundAcceptTeleportationPacket { id: p.id }.get(), + }); + send_packet_events.send(SendPacketEvent { + entity: player_entity, + packet: ServerboundMovePlayerPosRotPacket { x: new_pos.x, y: new_pos.y, z: new_pos.z, @@ -482,10 +472,10 @@ pub fn process_packet_events(ecs: &mut World) { on_ground: false, } .get(), - ); + }); } ClientboundGamePacket::PlayerInfoUpdate(p) => { - debug!("Got player info packet {:?}", p); + debug!("Got player info packet {p:?}"); #[allow(clippy::type_complexity)] let mut system_state: SystemState<( @@ -564,9 +554,10 @@ pub fn process_packet_events(ecs: &mut World) { } } ClientboundGamePacket::SetChunkCacheCenter(p) => { - debug!("Got chunk cache center packet {:?}", p); + debug!("Got chunk cache center packet {p:?}"); - let mut system_state: SystemState<Query<&mut LocalPlayer>> = SystemState::new(ecs); + let mut system_state: SystemState<Query<&mut InstanceHolder>> = + SystemState::new(ecs); let mut query = system_state.get_mut(ecs); let local_player = query.get_mut(player_entity).unwrap(); let mut partial_world = local_player.partial_instance.write(); @@ -575,13 +566,14 @@ pub fn process_packet_events(ecs: &mut World) { } ClientboundGamePacket::ChunksBiomes(_) => {} ClientboundGamePacket::LightUpdate(_p) => { - // debug!("Got light update packet {:?}", p); + // debug!("Got light update packet {p:?}"); } ClientboundGamePacket::LevelChunkWithLight(p) => { debug!("Got chunk with light packet {} {}", p.x, p.z); let pos = ChunkPos::new(p.x, p.z); - let mut system_state: SystemState<Query<&mut LocalPlayer>> = SystemState::new(ecs); + let mut system_state: SystemState<Query<&mut InstanceHolder>> = + SystemState::new(ecs); let mut query = system_state.get_mut(ecs); let local_player = query.get_mut(player_entity).unwrap(); @@ -590,7 +582,7 @@ pub fn process_packet_events(ecs: &mut World) { // parse it again. This is only used when we have a shared // world, since we check that the chunk isn't currently owned // by this client. - let shared_chunk = local_player.world.read().chunks.get(&pos); + let shared_chunk = local_player.instance.read().chunks.get(&pos); let this_client_has_chunk = local_player .partial_instance .read() @@ -598,7 +590,7 @@ pub fn process_packet_events(ecs: &mut World) { .limited_get(&pos) .is_some(); - let mut world = local_player.world.write(); + let mut world = local_player.instance.write(); let mut partial_world = local_player.partial_instance.write(); if !this_client_has_chunk { @@ -641,13 +633,14 @@ pub fn process_packet_events(ecs: &mut World) { #[allow(clippy::type_complexity)] let mut system_state: SystemState<( Commands, - Query<(&mut EntityIdIndex, Option<&InstanceName>)>, + Query<(&mut EntityIdIndex, Option<&InstanceName>, Option<&TabList>)>, Res<InstanceContainer>, ResMut<EntityUuidIndex>, )> = SystemState::new(ecs); let (mut commands, mut query, instance_container, mut entity_uuid_index) = system_state.get_mut(ecs); - let (mut entity_id_index, instance_name) = query.get_mut(player_entity).unwrap(); + let (mut entity_id_index, instance_name, tab_list) = + query.get_mut(player_entity).unwrap(); if let Some(instance_name) = instance_name { let bundle = p.as_entity_bundle((**instance_name).clone()); @@ -669,6 +662,15 @@ pub fn process_packet_events(ecs: &mut World) { entity_uuid_index.insert(p.uuid, spawned.id()); } + if let Some(tab_list) = tab_list { + // technically this makes it possible for non-player entities to have + // GameProfileComponents but the server would have to be doing something + // really weird + if let Some(player_info) = tab_list.get(&p.uuid) { + spawned.insert(GameProfileComponent(player_info.profile.clone())); + } + } + // the bundle doesn't include the default entity metadata so we add that // separately p.apply_metadata(&mut spawned); @@ -679,16 +681,16 @@ pub fn process_packet_events(ecs: &mut World) { system_state.apply(ecs); } ClientboundGamePacket::SetEntityData(p) => { - debug!("Got set entity data packet {:?}", p); + debug!("Got set entity data packet {p:?}"); #[allow(clippy::type_complexity)] let mut system_state: SystemState<( Commands, - Query<(&EntityIdIndex, &LocalPlayer)>, + Query<(&EntityIdIndex, &InstanceHolder)>, Query<&EntityKind>, )> = SystemState::new(ecs); let (mut commands, mut query, entity_kind_query) = system_state.get_mut(ecs); - let (entity_id_index, local_player) = query.get_mut(player_entity).unwrap(); + let (entity_id_index, instance_holder) = query.get_mut(player_entity).unwrap(); let entity = entity_id_index.get(&MinecraftEntityId(p.id)); @@ -701,7 +703,7 @@ pub fn process_packet_events(ecs: &mut World) { // we use RelativeEntityUpdate because it makes sure changes aren't made // multiple times commands.entity(entity).add(RelativeEntityUpdate { - partial_world: local_player.partial_instance.clone(), + partial_world: instance_holder.partial_instance.clone(), update: Box::new(move |entity| { let entity_id = entity.id(); entity.world_scope(|world| { @@ -722,55 +724,25 @@ pub fn process_packet_events(ecs: &mut World) { system_state.apply(ecs); } ClientboundGamePacket::UpdateAttributes(_p) => { - // debug!("Got update attributes packet {:?}", p); + // debug!("Got update attributes packet {p:?}"); } ClientboundGamePacket::SetEntityMotion(_p) => { - // debug!("Got entity velocity packet {:?}", p); + // debug!("Got entity velocity packet {p:?}"); } ClientboundGamePacket::SetEntityLink(p) => { debug!("Got set entity link packet {p:?}"); } - ClientboundGamePacket::AddPlayer(p) => { - debug!("Got add player packet {p:?}"); - - #[allow(clippy::type_complexity)] - let mut system_state: SystemState<( - Commands, - Query<(&mut EntityIdIndex, &TabList, Option<&InstanceName>)>, - )> = SystemState::new(ecs); - let (mut commands, mut query) = system_state.get_mut(ecs); - let (mut entity_id_index, tab_list, world_name) = - query.get_mut(player_entity).unwrap(); - - if let Some(InstanceName(world_name)) = world_name { - let bundle = p.as_player_bundle(world_name.clone()); - let mut spawned = commands.spawn(( - MinecraftEntityId(p.id), - LoadedBy(HashSet::from([player_entity])), - bundle, - )); - entity_id_index.insert(MinecraftEntityId(p.id), spawned.id()); - - if let Some(player_info) = tab_list.get(&p.uuid) { - spawned.insert(GameProfileComponent(player_info.profile.clone())); - } - } else { - warn!("got add player packet but we haven't gotten a login packet yet"); - } - - system_state.apply(ecs); - } ClientboundGamePacket::InitializeBorder(p) => { - debug!("Got initialize border packet {:?}", p); + debug!("Got initialize border packet {p:?}"); } ClientboundGamePacket::SetTime(_p) => { - // debug!("Got set time packet {:?}", p); + // debug!("Got set time packet {p:?}"); } ClientboundGamePacket::SetDefaultSpawnPosition(p) => { - debug!("Got set default spawn position packet {:?}", p); + debug!("Got set default spawn position packet {p:?}"); } ClientboundGamePacket::SetHealth(p) => { - debug!("Got set health packet {:?}", p); + debug!("Got set health packet {p:?}"); let mut system_state: SystemState<Query<(&mut Health, &mut Hunger)>> = SystemState::new(ecs); @@ -785,22 +757,22 @@ pub fn process_packet_events(ecs: &mut World) { // the Death event. } ClientboundGamePacket::SetExperience(p) => { - debug!("Got set experience packet {:?}", p); + debug!("Got set experience packet {p:?}"); } ClientboundGamePacket::TeleportEntity(p) => { let mut system_state: SystemState<( Commands, - Query<(&EntityIdIndex, &LocalPlayer)>, + Query<(&EntityIdIndex, &InstanceHolder)>, )> = SystemState::new(ecs); let (mut commands, mut query) = system_state.get_mut(ecs); - let (entity_id_index, local_player) = query.get_mut(player_entity).unwrap(); + let (entity_id_index, instance_holder) = query.get_mut(player_entity).unwrap(); let entity = entity_id_index.get(&MinecraftEntityId(p.id)); if let Some(entity) = entity { let new_pos = p.position; commands.entity(entity).add(RelativeEntityUpdate { - partial_world: local_player.partial_instance.clone(), + partial_world: instance_holder.partial_instance.clone(), update: Box::new(move |entity| { let mut position = entity.get_mut::<Position>().unwrap(); if new_pos != **position { @@ -815,25 +787,25 @@ pub fn process_packet_events(ecs: &mut World) { system_state.apply(ecs); } ClientboundGamePacket::UpdateAdvancements(p) => { - debug!("Got update advancements packet {:?}", p); + debug!("Got update advancements packet {p:?}"); } ClientboundGamePacket::RotateHead(_p) => { - // debug!("Got rotate head packet {:?}", p); + // debug!("Got rotate head packet {p:?}"); } ClientboundGamePacket::MoveEntityPos(p) => { let mut system_state: SystemState<( Commands, - Query<(&EntityIdIndex, &LocalPlayer)>, + Query<(&EntityIdIndex, &InstanceHolder)>, )> = SystemState::new(ecs); let (mut commands, mut query) = system_state.get_mut(ecs); - let (entity_id_index, local_player) = query.get_mut(player_entity).unwrap(); + let (entity_id_index, instance_holder) = query.get_mut(player_entity).unwrap(); let entity = entity_id_index.get(&MinecraftEntityId(p.entity_id)); if let Some(entity) = entity { let delta = p.delta.clone(); commands.entity(entity).add(RelativeEntityUpdate { - partial_world: local_player.partial_instance.clone(), + partial_world: instance_holder.partial_instance.clone(), update: Box::new(move |entity_mut| { let mut position = entity_mut.get_mut::<Position>().unwrap(); let new_pos = position.with_delta(&delta); @@ -854,17 +826,17 @@ pub fn process_packet_events(ecs: &mut World) { ClientboundGamePacket::MoveEntityPosRot(p) => { let mut system_state: SystemState<( Commands, - Query<(&EntityIdIndex, &LocalPlayer)>, + Query<(&EntityIdIndex, &InstanceHolder)>, )> = SystemState::new(ecs); let (mut commands, mut query) = system_state.get_mut(ecs); - let (entity_id_index, local_player) = query.get_mut(player_entity).unwrap(); + let (entity_id_index, instance_holder) = query.get_mut(player_entity).unwrap(); let entity = entity_id_index.get(&MinecraftEntityId(p.entity_id)); if let Some(entity) = entity { let delta = p.delta.clone(); commands.entity(entity).add(RelativeEntityUpdate { - partial_world: local_player.partial_instance.clone(), + partial_world: instance_holder.partial_instance.clone(), update: Box::new(move |entity_mut| { let mut position = entity_mut.get_mut::<Position>().unwrap(); let new_pos = position.with_delta(&delta); @@ -884,25 +856,25 @@ pub fn process_packet_events(ecs: &mut World) { } ClientboundGamePacket::MoveEntityRot(_p) => { - // debug!("Got move entity rot packet {:?}", p); + // debug!("Got move entity rot packet {p:?}"); } ClientboundGamePacket::KeepAlive(p) => { debug!("Got keep alive packet {p:?} for {player_entity:?}"); let mut system_state: SystemState<( - Query<&mut LocalPlayer>, EventWriter<KeepAliveEvent>, + EventWriter<SendPacketEvent>, )> = SystemState::new(ecs); - let (mut query, mut keepalive_events) = system_state.get_mut(ecs); + let (mut keepalive_events, mut send_packet_events) = system_state.get_mut(ecs); keepalive_events.send(KeepAliveEvent { entity: player_entity, id: p.id, }); - - let local_player = query.get_mut(player_entity).unwrap(); - local_player.write_packet(ServerboundKeepAlivePacket { id: p.id }.get()); - debug!("Sent keep alive packet {p:?} for {player_entity:?}"); + send_packet_events.send(SendPacketEvent { + entity: player_entity, + packet: ServerboundKeepAlivePacket { id: p.id }.get(), + }); } ClientboundGamePacket::RemoveEntities(p) => { debug!("Got remove entities packet {:?}", p); @@ -933,7 +905,7 @@ pub fn process_packet_events(ecs: &mut World) { } } ClientboundGamePacket::PlayerChat(p) => { - debug!("Got player chat packet {:?}", p); + debug!("Got player chat packet {p:?}"); let mut system_state: SystemState<EventWriter<ChatReceivedEvent>> = SystemState::new(ecs); @@ -945,7 +917,7 @@ pub fn process_packet_events(ecs: &mut World) { }); } ClientboundGamePacket::SystemChat(p) => { - debug!("Got system chat packet {:?}", p); + debug!("Got system chat packet {p:?}"); let mut system_state: SystemState<EventWriter<ChatReceivedEvent>> = SystemState::new(ecs); @@ -957,32 +929,34 @@ pub fn process_packet_events(ecs: &mut World) { }); } ClientboundGamePacket::Sound(_p) => { - // debug!("Got sound packet {:?}", p); + // debug!("Got sound packet {p:?}"); } ClientboundGamePacket::LevelEvent(p) => { - debug!("Got level event packet {:?}", p); + debug!("Got level event packet {p:?}"); } ClientboundGamePacket::BlockUpdate(p) => { - debug!("Got block update packet {:?}", p); + debug!("Got block update packet {p:?}"); - let mut system_state: SystemState<Query<&mut LocalPlayer>> = SystemState::new(ecs); + let mut system_state: SystemState<Query<&mut InstanceHolder>> = + SystemState::new(ecs); let mut query = system_state.get_mut(ecs); let local_player = query.get_mut(player_entity).unwrap(); - let world = local_player.world.write(); + let world = local_player.instance.write(); world.chunks.set_block_state(&p.pos, p.block_state); } ClientboundGamePacket::Animate(p) => { - debug!("Got animate packet {:?}", p); + debug!("Got animate packet {p:?}"); } ClientboundGamePacket::SectionBlocksUpdate(p) => { - debug!("Got section blocks update packet {:?}", p); - let mut system_state: SystemState<Query<&mut LocalPlayer>> = SystemState::new(ecs); + debug!("Got section blocks update packet {p:?}"); + let mut system_state: SystemState<Query<&mut InstanceHolder>> = + SystemState::new(ecs); let mut query = system_state.get_mut(ecs); let local_player = query.get_mut(player_entity).unwrap(); - let world = local_player.world.write(); + let world = local_player.instance.write(); for state in &p.states { world @@ -993,7 +967,7 @@ pub fn process_packet_events(ecs: &mut World) { ClientboundGamePacket::GameEvent(p) => { use azalea_protocol::packets::game::clientbound_game_event_packet::EventType; - debug!("Got game event packet {:?}", p); + debug!("Got game event packet {p:?}"); #[allow(clippy::single_match)] match p.event { @@ -1010,16 +984,16 @@ pub fn process_packet_events(ecs: &mut World) { } } ClientboundGamePacket::LevelParticles(p) => { - debug!("Got level particles packet {:?}", p); + debug!("Got level particles packet {p:?}"); } ClientboundGamePacket::ServerData(p) => { - debug!("Got server data packet {:?}", p); + debug!("Got server data packet {p:?}"); } ClientboundGamePacket::SetEquipment(p) => { - debug!("Got set equipment packet {:?}", p); + debug!("Got set equipment packet {p:?}"); } ClientboundGamePacket::UpdateMobEffect(p) => { - debug!("Got update mob effect packet {:?}", p); + debug!("Got update mob effect packet {p:?}"); } ClientboundGamePacket::AddExperienceOrb(_) => {} ClientboundGamePacket::AwardStats(_) => {} @@ -1027,12 +1001,12 @@ pub fn process_packet_events(ecs: &mut World) { ClientboundGamePacket::BlockDestruction(_) => {} ClientboundGamePacket::BlockEntityData(_) => {} ClientboundGamePacket::BlockEvent(p) => { - debug!("Got block event packet {:?}", p); + debug!("Got block event packet {p:?}"); } ClientboundGamePacket::BossEvent(_) => {} ClientboundGamePacket::CommandSuggestions(_) => {} ClientboundGamePacket::ContainerSetContent(p) => { - debug!("Got container set content packet {:?}", p); + debug!("Got container set content packet {p:?}"); let mut system_state: SystemState<( Query<&mut InventoryComponent>, @@ -1058,7 +1032,7 @@ pub fn process_packet_events(ecs: &mut World) { } } ClientboundGamePacket::ContainerSetData(p) => { - debug!("Got container set data packet {:?}", p); + debug!("Got container set data packet {p:?}"); // let mut system_state: SystemState<Query<&mut // InventoryComponent>> = // SystemState::new(ecs); @@ -1072,7 +1046,7 @@ pub fn process_packet_events(ecs: &mut World) { // see https://wiki.vg/Protocol#Set_Container_Property } ClientboundGamePacket::ContainerSetSlot(p) => { - debug!("Got container set slot packet {:?}", p); + debug!("Got container set slot packet {p:?}"); let mut system_state: SystemState<Query<&mut InventoryComponent>> = SystemState::new(ecs); @@ -1129,7 +1103,7 @@ pub fn process_packet_events(ecs: &mut World) { ClientboundGamePacket::MoveVehicle(_) => {} ClientboundGamePacket::OpenBook(_) => {} ClientboundGamePacket::OpenScreen(p) => { - debug!("Got open screen packet {:?}", p); + debug!("Got open screen packet {p:?}"); let mut system_state: SystemState<EventWriter<MenuOpenedEvent>> = SystemState::new(ecs); let mut menu_opened_events = system_state.get_mut(ecs); @@ -1142,19 +1116,25 @@ pub fn process_packet_events(ecs: &mut World) { } ClientboundGamePacket::OpenSignEditor(_) => {} ClientboundGamePacket::Ping(p) => { - trace!("Got ping packet {:?}", p); + debug!("Got ping packet {p:?}"); - let mut system_state: SystemState<Query<&mut LocalPlayer>> = SystemState::new(ecs); - let mut query = system_state.get_mut(ecs); + let mut system_state: SystemState<EventWriter<SendPacketEvent>> = + SystemState::new(ecs); + let mut send_packet_events = system_state.get_mut(ecs); - let local_player = query.get_mut(player_entity).unwrap(); - local_player.write_packet(ServerboundPongPacket { id: p.id }.get()); + send_packet_events.send(SendPacketEvent { + entity: player_entity, + packet: ServerboundPongPacket { id: p.id }.get(), + }); + } + ClientboundGamePacket::PongResponse(p) => { + debug!("Got pong response packet {p:?}"); } ClientboundGamePacket::PlaceGhostRecipe(_) => {} ClientboundGamePacket::PlayerCombatEnd(_) => {} ClientboundGamePacket::PlayerCombatEnter(_) => {} ClientboundGamePacket::PlayerCombatKill(p) => { - debug!("Got player kill packet {:?}", p); + debug!("Got player kill packet {p:?}"); #[allow(clippy::type_complexity)] let mut system_state: SystemState<( @@ -1178,7 +1158,7 @@ pub fn process_packet_events(ecs: &mut World) { ClientboundGamePacket::PlayerLookAt(_) => {} ClientboundGamePacket::RemoveMobEffect(_) => {} ClientboundGamePacket::ResourcePack(p) => { - debug!("Got resource pack packet {:?}", p); + debug!("Got resource pack packet {p:?}"); let mut system_state: SystemState<EventWriter<ResourcePackEvent>> = SystemState::new(ecs); @@ -1195,13 +1175,13 @@ pub fn process_packet_events(ecs: &mut World) { system_state.apply(ecs); } ClientboundGamePacket::Respawn(p) => { - debug!("Got respawn packet {:?}", p); + debug!("Got respawn packet {p:?}"); #[allow(clippy::type_complexity)] let mut system_state: SystemState<( Commands, Query<( - &mut LocalPlayer, + &mut InstanceHolder, &GameProfileComponent, &ClientInformation, &ReceivedRegistries, @@ -1211,25 +1191,29 @@ pub fn process_packet_events(ecs: &mut World) { )> = SystemState::new(ecs); let (mut commands, mut query, mut instance_loaded_events, mut instance_container) = system_state.get_mut(ecs); - let (mut local_player, game_profile, client_information, received_registries) = + let (mut instance_holder, game_profile, client_information, received_registries) = query.get_mut(player_entity).unwrap(); { - let dimension = &received_registries - .dimension_type + let new_instance_name = p.common.dimension.clone(); + + let Some(dimension_type) = received_registries.dimension_type() else { + error!("Server didn't send dimension type registry, can't log in"); + continue; + }; + + let dimension = &dimension_type .value .iter() - .find(|t| t.name == p.dimension_type) + .find(|t| t.name == p.common.dimension_type) .unwrap_or_else(|| { - panic!("No dimension_type with name {}", p.dimension_type) + panic!("No dimension_type with name {}", p.common.dimension_type) }) .element; - let new_instance_name = p.dimension.clone(); - // add this world to the instance_container (or don't if it's already // there) - let instance = instance_container.insert( + let weak_instance = instance_container.insert( new_instance_name.clone(), dimension.height, dimension.min_y, @@ -1237,19 +1221,19 @@ pub fn process_packet_events(ecs: &mut World) { instance_loaded_events.send(InstanceLoadedEvent { entity: player_entity, name: new_instance_name.clone(), - instance: Arc::downgrade(&instance), + instance: Arc::downgrade(&weak_instance), }); // set the partial_world to an empty world // (when we add chunks or entities those will be in the // instance_container) - *local_player.partial_instance.write() = PartialInstance::new( + *instance_holder.partial_instance.write() = PartialInstance::new( azalea_world::calculate_chunk_storage_range( client_information.view_distance.into(), ), Some(player_entity), ); - local_player.world = instance; + instance_holder.instance = weak_instance; // this resets a bunch of our components like physics and stuff let player_bundle = PlayerBundle { @@ -1264,8 +1248,8 @@ pub fn process_packet_events(ecs: &mut World) { // update the local gamemode and metadata things commands.entity(player_entity).insert(( LocalGameMode { - current: p.game_type, - previous: p.previous_game_type.into(), + current: p.common.game_type, + previous: p.common.previous_game_type.into(), }, player_bundle, )); @@ -1301,49 +1285,11 @@ pub fn process_packet_events(ecs: &mut World) { ClientboundGamePacket::TagQuery(_) => {} ClientboundGamePacket::TakeItemEntity(_) => {} ClientboundGamePacket::DisguisedChat(_) => {} - ClientboundGamePacket::UpdateEnabledFeatures(_) => {} ClientboundGamePacket::Bundle(_) => {} ClientboundGamePacket::DamageEvent(_) => {} ClientboundGamePacket::HurtAnimation(_) => {} - } - } -} - -impl PacketReceiver { - /// Loop that reads from the connection and adds the packets to the queue + - /// runs the schedule. - pub async fn read_task(self, mut read_conn: ReadConnection<ClientboundGamePacket>) { - loop { - match read_conn.read().await { - Ok(packet) => { - self.packets.lock().push(packet); - // tell the client to run all the systems - self.run_schedule_sender.send(()).unwrap(); - } - Err(error) => { - if !matches!(*error, ReadPacketError::ConnectionClosed) { - error!("Error reading packet from Client: {error:?}"); - } - break; - } - } - } - } - /// Consume the [`ServerboundGamePacket`] queue and actually write the - /// packets to the server. It's like this so writing packets doesn't need to - /// be awaited. - pub async fn write_task( - self, - mut write_conn: WriteConnection<ServerboundGamePacket>, - mut write_receiver: mpsc::UnboundedReceiver<ServerboundGamePacket>, - ) { - while let Some(packet) = write_receiver.recv().await { - if let Err(err) = write_conn.write(packet).await { - error!("Disconnecting because we couldn't write a packet: {err}."); - break; - }; + ClientboundGamePacket::StartConfiguration(_) => todo!(), } - // receiver is automatically closed when it's dropped } } diff --git a/azalea-client/src/packet_handling/mod.rs b/azalea-client/src/packet_handling/mod.rs new file mode 100644 index 00000000..35bdfc04 --- /dev/null +++ b/azalea-client/src/packet_handling/mod.rs @@ -0,0 +1,59 @@ +use azalea_entity::{metadata::Health, EntityUpdateSet}; +use bevy_app::{App, First, Plugin, PreUpdate, Update}; +use bevy_ecs::prelude::*; + +use crate::{chat::ChatReceivedEvent, events::death_listener}; + +use self::game::{ + AddPlayerEvent, DeathEvent, InstanceLoadedEvent, KeepAliveEvent, RemovePlayerEvent, + ResourcePackEvent, UpdatePlayerEvent, +}; + +pub mod configuration; +pub mod game; + +pub struct PacketHandlerPlugin; + +pub fn death_event_on_0_health( + query: Query<(Entity, &Health), Changed<Health>>, + mut death_events: EventWriter<DeathEvent>, +) { + for (entity, health) in query.iter() { + if **health == 0. { + death_events.send(DeathEvent { + entity, + packet: None, + }); + } + } +} + +impl Plugin for PacketHandlerPlugin { + fn build(&self, app: &mut App) { + app.add_systems( + First, + (game::send_packet_events, configuration::send_packet_events), + ) + .add_systems( + PreUpdate, + ( + game::process_packet_events, + configuration::process_packet_events, + ) + // we want to index and deindex right after + .before(EntityUpdateSet::Deindex), + ) + .add_systems(Update, death_event_on_0_health.before(death_listener)) + // we do this instead of add_event so we can handle the events ourselves + .init_resource::<Events<game::PacketEvent>>() + .init_resource::<Events<configuration::PacketEvent>>() + .add_event::<AddPlayerEvent>() + .add_event::<RemovePlayerEvent>() + .add_event::<UpdatePlayerEvent>() + .add_event::<ChatReceivedEvent>() + .add_event::<DeathEvent>() + .add_event::<KeepAliveEvent>() + .add_event::<ResourcePackEvent>() + .add_event::<InstanceLoadedEvent>(); + } +} diff --git a/azalea-client/src/ping.rs b/azalea-client/src/ping.rs index 8acde7a5..9064065c 100755 --- a/azalea-client/src/ping.rs +++ b/azalea-client/src/ping.rs @@ -3,7 +3,7 @@ use azalea_protocol::{ connect::{Connection, ConnectionError}, packets::{ - handshake::client_intention_packet::ClientIntentionPacket, + handshaking::client_intention_packet::ClientIntentionPacket, status::{ clientbound_status_response_packet::ClientboundStatusResponsePacket, serverbound_status_request_packet::ServerboundStatusRequestPacket, diff --git a/azalea-client/src/player.rs b/azalea-client/src/player.rs index a94340ab..1aba172a 100755 --- a/azalea-client/src/player.rs +++ b/azalea-client/src/player.rs @@ -8,7 +8,7 @@ use bevy_ecs::{ }; use uuid::Uuid; -use crate::{packet_handling::AddPlayerEvent, GameProfileComponent}; +use crate::{packet_handling::game::AddPlayerEvent, GameProfileComponent}; /// A player in the tab list. #[derive(Debug, Clone)] diff --git a/azalea-client/src/raw_connection.rs b/azalea-client/src/raw_connection.rs new file mode 100644 index 00000000..0df13a60 --- /dev/null +++ b/azalea-client/src/raw_connection.rs @@ -0,0 +1,174 @@ +use std::fmt::Debug; +use std::sync::Arc; + +use azalea_protocol::{ + connect::{RawReadConnection, RawWriteConnection}, + packets::{ConnectionProtocol, ProtocolPacket}, + read::ReadPacketError, + write::serialize_packet, +}; +use bevy_ecs::prelude::*; +use log::error; +use parking_lot::Mutex; +use thiserror::Error; +use tokio::sync::mpsc; + +/// A component for clients that can read and write packets to the server. This +/// works with raw bytes, so you'll have to serialize/deserialize packets +/// yourself. It will do the compression and encryption for you though. +#[derive(Component)] +pub struct RawConnection { + reader: RawConnectionReader, + writer: RawConnectionWriter, + + /// Packets sent to this will be sent to the server. + + /// A task that reads packets from the server. The client is disconnected + /// when this task ends. + read_packets_task: tokio::task::JoinHandle<()>, + /// A task that writes packets from the server. + write_packets_task: tokio::task::JoinHandle<()>, + + connection_protocol: ConnectionProtocol, +} + +#[derive(Clone)] +struct RawConnectionReader { + pub incoming_packet_queue: Arc<Mutex<Vec<Vec<u8>>>>, + pub run_schedule_sender: mpsc::UnboundedSender<()>, +} +#[derive(Clone)] +struct RawConnectionWriter { + pub outgoing_packets_sender: mpsc::UnboundedSender<Vec<u8>>, +} + +#[derive(Error, Debug)] +pub enum WritePacketError { + #[error("Wrong protocol state: expected {expected:?}, got {got:?}")] + WrongState { + expected: ConnectionProtocol, + got: ConnectionProtocol, + }, + #[error(transparent)] + Encoding(#[from] azalea_protocol::write::PacketEncodeError), +} + +impl RawConnection { + pub fn new( + run_schedule_sender: mpsc::UnboundedSender<()>, + connection_protocol: ConnectionProtocol, + raw_read_connection: RawReadConnection, + raw_write_connection: RawWriteConnection, + ) -> Self { + let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel(); + + let incoming_packet_queue = Arc::new(Mutex::new(Vec::new())); + + let reader = RawConnectionReader { + incoming_packet_queue: incoming_packet_queue.clone(), + run_schedule_sender, + }; + let writer = RawConnectionWriter { + outgoing_packets_sender, + }; + + let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection)); + let write_packets_task = tokio::spawn( + writer + .clone() + .write_task(raw_write_connection, outgoing_packets_receiver), + ); + + Self { + reader, + writer, + read_packets_task, + write_packets_task, + connection_protocol, + } + } + + pub fn write_raw_packet(&self, raw_packet: Vec<u8>) { + self.writer + .outgoing_packets_sender + .send(raw_packet) + .unwrap(); + } + + /// Write the packet with the given state to the server. + /// + /// # Errors + /// + /// Returns an error if the packet is not valid for the current state, or if + /// encoding it failed somehow (like it's too big or something). + pub fn write_packet<P: ProtocolPacket + Debug>( + &self, + packet: P, + ) -> Result<(), WritePacketError> { + let raw_packet = serialize_packet(&packet)?; + self.write_raw_packet(raw_packet); + Ok(()) + } + + /// Returns whether the connection is still alive. + pub fn is_alive(&self) -> bool { + !self.read_packets_task.is_finished() + } + + pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Vec<u8>>>> { + self.reader.incoming_packet_queue.clone() + } + + pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) { + self.connection_protocol = connection_protocol; + } +} + +impl RawConnectionReader { + /// Loop that reads from the connection and adds the packets to the queue + + /// runs the schedule. + pub async fn read_task(self, mut read_conn: RawReadConnection) { + loop { + match read_conn.read().await { + Ok(raw_packet) => { + self.incoming_packet_queue.lock().push(raw_packet); + // tell the client to run all the systems + self.run_schedule_sender.send(()).unwrap(); + } + Err(error) => { + if !matches!(*error, ReadPacketError::ConnectionClosed) { + error!("Error reading packet from Client: {error:?}"); + } + break; + } + } + } + } +} + +impl RawConnectionWriter { + /// Consume the [`ServerboundGamePacket`] queue and actually write the + /// packets to the server. It's like this so writing packets doesn't need to + /// be awaited. + pub async fn write_task( + self, + mut write_conn: RawWriteConnection, + mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Vec<u8>>, + ) { + while let Some(raw_packet) = outgoing_packets_receiver.recv().await { + if let Err(err) = write_conn.write(&raw_packet).await { + error!("Disconnecting because we couldn't write a packet: {err}."); + break; + }; + } + // receiver is automatically closed when it's dropped + } +} + +impl Drop for RawConnection { + /// Stop every active task when this `RawConnection` is dropped. + fn drop(&mut self) { + self.read_packets_task.abort(); + self.write_packets_task.abort(); + } +} diff --git a/azalea-client/src/received_registries.rs b/azalea-client/src/received_registries.rs index 845527ae..024f5222 100644 --- a/azalea-client/src/received_registries.rs +++ b/azalea-client/src/received_registries.rs @@ -1,7 +1,28 @@ -use azalea_protocol::packets::game::clientbound_login_packet::registry::RegistryRoot; -use bevy_ecs::component::Component; -use derive_more::Deref; +use std::collections::HashMap; -/// The registries that the server sent us on login. -#[derive(Clone, Debug, Component, Deref)] -pub struct ReceivedRegistries(pub RegistryRoot); +use azalea_core::ResourceLocation; +use azalea_nbt::Nbt; +use azalea_protocol::packets::configuration::clientbound_registry_data_packet::registry::{ + DimensionTypeElement, RegistryType, +}; +use bevy_ecs::prelude::*; +use serde::de::DeserializeOwned; + +/// The registries that were sent to us during the configuration state. +#[derive(Default, Component, Clone)] +pub struct ReceivedRegistries { + pub registries: HashMap<ResourceLocation, Nbt>, +} + +impl ReceivedRegistries { + fn get<T: DeserializeOwned>(&self, name: &ResourceLocation) -> Option<T> { + let nbt = self.registries.get(name)?; + serde_json::from_value(serde_json::to_value(nbt).ok()?).ok() + } + + /// Get the dimension type registry, or `None` if it doesn't exist. You + /// should do some type of error handling if this returns `None`. + pub fn dimension_type(&self) -> Option<RegistryType<DimensionTypeElement>> { + self.get(&ResourceLocation::new("minecraft:dimension_type")) + } +} |
