From 000abfa13665abccf543b875d10c8c2a48dd75be Mon Sep 17 00:00:00 2001 From: mat Date: Sat, 18 Nov 2023 14:54:01 -0600 Subject: make loading chunks its own bevy system --- azalea-client/src/chunk_batching.rs | 159 -------------- azalea-client/src/chunks.rs | 230 +++++++++++++++++++++ azalea-client/src/client.rs | 4 +- azalea-client/src/lib.rs | 2 +- azalea-client/src/packet_handling/configuration.rs | 2 +- azalea-client/src/packet_handling/game.rs | 69 ++----- 6 files changed, 247 insertions(+), 219 deletions(-) delete mode 100644 azalea-client/src/chunk_batching.rs create mode 100644 azalea-client/src/chunks.rs (limited to 'azalea-client/src') diff --git a/azalea-client/src/chunk_batching.rs b/azalea-client/src/chunk_batching.rs deleted file mode 100644 index eda16442..00000000 --- a/azalea-client/src/chunk_batching.rs +++ /dev/null @@ -1,159 +0,0 @@ -//! Used for Minecraft's chunk batching introduced in 23w31a (1.20.2). It's used -//! for making the server spread out how often it sends us chunk packets -//! depending on our receiving speed. - -use std::time::{Duration, Instant}; - -use azalea_protocol::packets::game::serverbound_chunk_batch_received_packet::ServerboundChunkBatchReceivedPacket; -use bevy_app::{App, Plugin, Update}; -use bevy_ecs::prelude::*; - -use crate::{ - interact::handle_block_interact_event, - inventory::InventorySet, - local_player::{handle_send_packet_event, SendPacketEvent}, - respawn::perform_respawn, -}; - -pub struct ChunkBatchingPlugin; -impl Plugin for ChunkBatchingPlugin { - fn build(&self, app: &mut App) { - app.add_systems( - Update, - ( - handle_chunk_batch_start_event, - handle_chunk_batch_finished_event, - ) - .chain() - .before(handle_send_packet_event) - .before(InventorySet) - .before(handle_block_interact_event) - .before(perform_respawn), - ) - .add_event::() - .add_event::(); - } -} - -#[derive(Component, Clone, Debug)] -pub struct ChunkBatchInfo { - pub start_time: Instant, - pub aggregated_duration_per_chunk: Duration, - pub old_samples_weight: u32, -} - -impl ChunkBatchInfo { - pub fn batch_finished(&mut self, batch_size: u32) { - if batch_size == 0 { - return; - } - let batch_duration = self.start_time.elapsed(); - let duration_per_chunk = batch_duration / batch_size; - let clamped_duration = Duration::clamp( - duration_per_chunk, - self.aggregated_duration_per_chunk / 3, - self.aggregated_duration_per_chunk * 3, - ); - self.aggregated_duration_per_chunk = - ((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration) - / (self.old_samples_weight + 1); - self.old_samples_weight = u32::min(49, self.old_samples_weight + 1); - } - - pub fn desired_chunks_per_tick(&self) -> f32 { - (7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32 - } -} - -#[derive(Event)] -pub struct ChunkBatchStartEvent { - pub entity: Entity, -} -#[derive(Event)] -pub struct ChunkBatchFinishedEvent { - pub entity: Entity, - pub batch_size: u32, -} - -pub fn handle_chunk_batch_start_event( - mut query: Query<&mut ChunkBatchInfo>, - mut events: EventReader, -) { - for event in events.read() { - if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) { - chunk_batch_info.start_time = Instant::now(); - } - } -} - -pub fn handle_chunk_batch_finished_event( - mut query: Query<&mut ChunkBatchInfo>, - mut events: EventReader, - mut send_packets: EventWriter, -) { - for event in events.read() { - if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) { - chunk_batch_info.batch_finished(event.batch_size); - let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick(); - send_packets.send(SendPacketEvent { - entity: event.entity, - packet: ServerboundChunkBatchReceivedPacket { - desired_chunks_per_tick, - } - .get(), - }); - } - } -} - -#[derive(Clone, Debug)] -pub struct ChunkReceiveSpeedAccumulator { - batch_sizes: Vec, - /// as milliseconds - batch_durations: Vec, - index: usize, - filled_size: usize, -} -impl ChunkReceiveSpeedAccumulator { - pub fn new(capacity: usize) -> Self { - Self { - batch_sizes: vec![0; capacity], - batch_durations: vec![0; capacity], - index: 0, - filled_size: 0, - } - } - - pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) { - self.batch_sizes[self.index] = batch_size; - self.batch_durations[self.index] = - f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32; - self.index = (self.index + 1) % self.batch_sizes.len(); - if self.filled_size < self.batch_sizes.len() { - self.filled_size += 1; - } - } - - pub fn get_millis_per_chunk(&self) -> f64 { - let mut total_batch_size = 0; - let mut total_batch_duration = 0; - for i in 0..self.filled_size { - total_batch_size += self.batch_sizes[i]; - total_batch_duration += self.batch_durations[i]; - } - if total_batch_size == 0 { - return 0.; - } - total_batch_duration as f64 / total_batch_size as f64 - } -} - -impl Default for ChunkBatchInfo { - fn default() -> Self { - Self { - start_time: Instant::now(), - aggregated_duration_per_chunk: Duration::from_millis(2), - old_samples_weight: 1, - } - } -} diff --git a/azalea-client/src/chunks.rs b/azalea-client/src/chunks.rs new file mode 100644 index 00000000..80c350c8 --- /dev/null +++ b/azalea-client/src/chunks.rs @@ -0,0 +1,230 @@ +//! Used for Minecraft's chunk batching introduced in 23w31a (1.20.2). It's used +//! for making the server spread out how often it sends us chunk packets +//! depending on our receiving speed. + +use std::{ + io::Cursor, + time::{Duration, Instant}, +}; + +use azalea_core::position::ChunkPos; +use azalea_nbt::NbtCompound; +use azalea_protocol::packets::game::{ + clientbound_level_chunk_with_light_packet::ClientboundLevelChunkWithLightPacket, + serverbound_chunk_batch_received_packet::ServerboundChunkBatchReceivedPacket, +}; +use bevy_app::{App, Plugin, Update}; +use bevy_ecs::prelude::*; +use tracing::{error, trace}; + +use crate::{ + interact::handle_block_interact_event, + inventory::InventorySet, + local_player::{handle_send_packet_event, SendPacketEvent}, + respawn::perform_respawn, + InstanceHolder, +}; + +pub struct ChunkPlugin; +impl Plugin for ChunkPlugin { + fn build(&self, app: &mut App) { + app.add_systems( + Update, + ( + handle_chunk_batch_start_event, + handle_receive_chunk_events, + handle_chunk_batch_finished_event, + ) + .chain() + .before(handle_send_packet_event) + .before(InventorySet) + .before(handle_block_interact_event) + .before(perform_respawn), + ) + .add_event::() + .add_event::() + .add_event::(); + } +} + +#[derive(Event)] +pub struct ReceiveChunkEvent { + pub entity: Entity, + pub packet: ClientboundLevelChunkWithLightPacket, +} + +#[derive(Component, Clone, Debug)] +pub struct ChunkBatchInfo { + pub start_time: Instant, + pub aggregated_duration_per_chunk: Duration, + pub old_samples_weight: u32, +} + +#[derive(Event)] +pub struct ChunkBatchStartEvent { + pub entity: Entity, +} +#[derive(Event)] +pub struct ChunkBatchFinishedEvent { + pub entity: Entity, + pub batch_size: u32, +} + +fn handle_receive_chunk_events( + mut events: EventReader, + mut query: Query<&mut InstanceHolder>, +) { + for event in events.read() { + let pos = ChunkPos::new(event.packet.x, event.packet.z); + + let local_player = query.get_mut(event.entity).unwrap(); + + // OPTIMIZATION: if we already know about the chunk from the + // shared world (and not ourselves), then we don't need to + // parse it again. This is only used when we have a shared + // world, since we check that the chunk isn't currently owned + // by this client. + let shared_chunk = local_player.instance.read().chunks.get(&pos); + let this_client_has_chunk = local_player + .partial_instance + .read() + .chunks + .limited_get(&pos) + .is_some(); + + let mut world = local_player.instance.write(); + let mut partial_world = local_player.partial_instance.write(); + + if !this_client_has_chunk { + if let Some(shared_chunk) = shared_chunk { + trace!("Skipping parsing chunk {pos:?} because we already know about it"); + partial_world.chunks.set_with_shared_reference( + &pos, + Some(shared_chunk.clone()), + &mut world.chunks, + ); + continue; + } + } + + let heightmaps = event.packet.chunk_data.heightmaps.as_compound(); + // necessary to make the unwrap_or work + let empty_nbt_compound = NbtCompound::default(); + let heightmaps = heightmaps.unwrap_or(&empty_nbt_compound); + + if let Err(e) = partial_world.chunks.replace_with_packet_data( + &pos, + &mut Cursor::new(&event.packet.chunk_data.data), + heightmaps, + &mut world.chunks, + ) { + error!("Couldn't set chunk data: {e}"); + } + } +} + +impl ChunkBatchInfo { + pub fn batch_finished(&mut self, batch_size: u32) { + if batch_size == 0 { + return; + } + let batch_duration = self.start_time.elapsed(); + let duration_per_chunk = batch_duration / batch_size; + let clamped_duration = Duration::clamp( + duration_per_chunk, + self.aggregated_duration_per_chunk / 3, + self.aggregated_duration_per_chunk * 3, + ); + self.aggregated_duration_per_chunk = + ((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration) + / (self.old_samples_weight + 1); + self.old_samples_weight = u32::min(49, self.old_samples_weight + 1); + } + + pub fn desired_chunks_per_tick(&self) -> f32 { + (7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32 + } +} + +pub fn handle_chunk_batch_start_event( + mut query: Query<&mut ChunkBatchInfo>, + mut events: EventReader, +) { + for event in events.read() { + if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) { + chunk_batch_info.start_time = Instant::now(); + } + } +} + +pub fn handle_chunk_batch_finished_event( + mut query: Query<&mut ChunkBatchInfo>, + mut events: EventReader, + mut send_packets: EventWriter, +) { + for event in events.read() { + if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) { + chunk_batch_info.batch_finished(event.batch_size); + let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick(); + send_packets.send(SendPacketEvent { + entity: event.entity, + packet: ServerboundChunkBatchReceivedPacket { + desired_chunks_per_tick, + } + .get(), + }); + } + } +} + +#[derive(Clone, Debug)] +pub struct ChunkReceiveSpeedAccumulator { + batch_sizes: Vec, + /// as milliseconds + batch_durations: Vec, + index: usize, + filled_size: usize, +} +impl ChunkReceiveSpeedAccumulator { + pub fn new(capacity: usize) -> Self { + Self { + batch_sizes: vec![0; capacity], + batch_durations: vec![0; capacity], + index: 0, + filled_size: 0, + } + } + + pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) { + self.batch_sizes[self.index] = batch_size; + self.batch_durations[self.index] = + f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32; + self.index = (self.index + 1) % self.batch_sizes.len(); + if self.filled_size < self.batch_sizes.len() { + self.filled_size += 1; + } + } + + pub fn get_millis_per_chunk(&self) -> f64 { + let mut total_batch_size = 0; + let mut total_batch_duration = 0; + for i in 0..self.filled_size { + total_batch_size += self.batch_sizes[i]; + total_batch_duration += self.batch_durations[i]; + } + if total_batch_size == 0 { + return 0.; + } + total_batch_duration as f64 / total_batch_size as f64 + } +} + +impl Default for ChunkBatchInfo { + fn default() -> Self { + Self { + start_time: Instant::now(), + aggregated_duration_per_chunk: Duration::from_millis(2), + old_samples_weight: 1, + } + } +} diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index 96e4eb1c..4f1c4b9e 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -1,7 +1,7 @@ use crate::{ attack::{self, AttackPlugin}, chat::ChatPlugin, - chunk_batching::{ChunkBatchInfo, ChunkBatchingPlugin}, + chunks::{ChunkBatchInfo, ChunkPlugin}, disconnect::{DisconnectEvent, DisconnectPlugin}, events::{Event, EventPlugin, LocalPlayerEvents}, interact::{CurrentSequenceNumber, InteractPlugin}, @@ -782,7 +782,7 @@ impl PluginGroup for DefaultPlugins { .add(RespawnPlugin) .add(MinePlugin) .add(AttackPlugin) - .add(ChunkBatchingPlugin) + .add(ChunkPlugin) .add(TickBroadcastPlugin); #[cfg(feature = "log")] { diff --git a/azalea-client/src/lib.rs b/azalea-client/src/lib.rs index a7231ffc..bc5616e8 100644 --- a/azalea-client/src/lib.rs +++ b/azalea-client/src/lib.rs @@ -12,7 +12,7 @@ mod account; pub mod attack; pub mod chat; -pub mod chunk_batching; +pub mod chunks; mod client; pub mod disconnect; mod entity_query; diff --git a/azalea-client/src/packet_handling/configuration.rs b/azalea-client/src/packet_handling/configuration.rs index b82ed76f..f35e25b2 100644 --- a/azalea-client/src/packet_handling/configuration.rs +++ b/azalea-client/src/packet_handling/configuration.rs @@ -140,7 +140,7 @@ pub fn process_packet_events(ecs: &mut World) { abilities: crate::local_player::PlayerAbilities::default(), permission_level: crate::local_player::PermissionLevel::default(), hunger: Hunger::default(), - chunk_batch_info: crate::chunk_batching::ChunkBatchInfo::default(), + chunk_batch_info: crate::chunks::ChunkBatchInfo::default(), entity_id_index: EntityIdIndex::default(), diff --git a/azalea-client/src/packet_handling/game.rs b/azalea-client/src/packet_handling/game.rs index 3d61d790..71726142 100644 --- a/azalea-client/src/packet_handling/game.rs +++ b/azalea-client/src/packet_handling/game.rs @@ -16,7 +16,6 @@ use azalea_entity::{ Dead, EntityBundle, EntityKind, LastSentPosition, LoadedBy, LocalEntity, LookDirection, Physics, PlayerBundle, Position, RelativeEntityUpdate, }; -use azalea_nbt::NbtCompound; use azalea_protocol::{ packets::game::{ clientbound_player_combat_kill_packet::ClientboundPlayerCombatKillPacket, @@ -34,7 +33,7 @@ use tracing::{debug, error, trace, warn}; use crate::{ chat::{ChatPacket, ChatReceivedEvent}, - chunk_batching, + chunks, disconnect::DisconnectEvent, inventory::{ ClientSideCloseContainerEvent, InventoryComponent, MenuOpenedEvent, @@ -339,24 +338,22 @@ pub fn process_packet_events(ecs: &mut World) { ClientboundGamePacket::ChunkBatchStart(_p) => { // the packet is empty, just a marker to tell us when the batch starts and ends debug!("Got chunk batch start"); - let mut system_state: SystemState< - EventWriter, - > = SystemState::new(ecs); + let mut system_state: SystemState> = + SystemState::new(ecs); let mut chunk_batch_start_events = system_state.get_mut(ecs); - chunk_batch_start_events.send(chunk_batching::ChunkBatchStartEvent { + chunk_batch_start_events.send(chunks::ChunkBatchStartEvent { entity: player_entity, }); } ClientboundGamePacket::ChunkBatchFinished(p) => { debug!("Got chunk batch finished {p:?}"); - let mut system_state: SystemState< - EventWriter, - > = SystemState::new(ecs); + let mut system_state: SystemState> = + SystemState::new(ecs); let mut chunk_batch_start_events = system_state.get_mut(ecs); - chunk_batch_start_events.send(chunk_batching::ChunkBatchFinishedEvent { + chunk_batch_start_events.send(chunks::ChunkBatchFinishedEvent { entity: player_entity, batch_size: p.batch_size, }); @@ -597,54 +594,14 @@ pub fn process_packet_events(ecs: &mut World) { } ClientboundGamePacket::LevelChunkWithLight(p) => { debug!("Got chunk with light packet {} {}", p.x, p.z); - let pos = ChunkPos::new(p.x, p.z); - let mut system_state: SystemState> = + let mut system_state: SystemState> = SystemState::new(ecs); - let mut query = system_state.get_mut(ecs); - let local_player = query.get_mut(player_entity).unwrap(); - - // OPTIMIZATION: if we already know about the chunk from the - // shared world (and not ourselves), then we don't need to - // parse it again. This is only used when we have a shared - // world, since we check that the chunk isn't currently owned - // by this client. - let shared_chunk = local_player.instance.read().chunks.get(&pos); - let this_client_has_chunk = local_player - .partial_instance - .read() - .chunks - .limited_get(&pos) - .is_some(); - - let mut world = local_player.instance.write(); - let mut partial_world = local_player.partial_instance.write(); - - if !this_client_has_chunk { - if let Some(shared_chunk) = shared_chunk { - trace!("Skipping parsing chunk {pos:?} because we already know about it"); - partial_world.chunks.set_with_shared_reference( - &pos, - Some(shared_chunk.clone()), - &mut world.chunks, - ); - continue; - } - } - - let heightmaps = p.chunk_data.heightmaps.as_compound(); - // necessary to make the unwrap_or work - let empty_nbt_compound = NbtCompound::default(); - let heightmaps = heightmaps.unwrap_or(&empty_nbt_compound); - - if let Err(e) = partial_world.chunks.replace_with_packet_data( - &pos, - &mut Cursor::new(&p.chunk_data.data), - heightmaps, - &mut world.chunks, - ) { - error!("Couldn't set chunk data: {e}"); - } + let mut receive_chunk_events = system_state.get_mut(ecs); + receive_chunk_events.send(chunks::ReceiveChunkEvent { + entity: player_entity, + packet: p.clone(), + }); } ClientboundGamePacket::AddEntity(p) => { debug!("Got add entity packet {p:?}"); -- cgit v1.2.3