diff options
| author | mat <27899617+mat-1@users.noreply.github.com> | 2025-04-17 16:16:51 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-04-17 16:16:51 -0500 |
| commit | 3f60bdadac1a02e1109148bbbe5a8a3545f13849 (patch) | |
| tree | 6c0460be61e715c1b789f81b16ce4c0fb986c3b4 /azalea-client/src/raw_connection.rs | |
| parent | 1989f4ec979c138f8f466ccebadca335eb2917d6 (diff) | |
| download | azalea-drasl-3f60bdadac1a02e1109148bbbe5a8a3545f13849.tar.xz | |
Move login state to the ECS (#213)
* use packet handlers code for login custom_query
* initial broken implementation for ecs-only login
* fixes
* run Update schedule 60 times per second and delete code related to run_schedule_sender
* fix tests
* fix online-mode
* reply to query packets in a separate system and make it easier for plugins to disable individual replies
* remove unused imports
Diffstat (limited to 'azalea-client/src/raw_connection.rs')
| -rw-r--r-- | azalea-client/src/raw_connection.rs | 208 |
1 files changed, 0 insertions, 208 deletions
diff --git a/azalea-client/src/raw_connection.rs b/azalea-client/src/raw_connection.rs deleted file mode 100644 index 97e93f16..00000000 --- a/azalea-client/src/raw_connection.rs +++ /dev/null @@ -1,208 +0,0 @@ -use std::fmt::Debug; -use std::sync::Arc; - -use azalea_protocol::{ - connect::{RawReadConnection, RawWriteConnection}, - packets::{ConnectionProtocol, Packet, ProtocolPacket}, - read::ReadPacketError, - write::serialize_packet, -}; -use bevy_ecs::prelude::*; -use parking_lot::Mutex; -use thiserror::Error; -use tokio::sync::mpsc::{ - self, - error::{SendError, TrySendError}, -}; -use tracing::error; - -/// A component for clients that can read and write packets to the server. This -/// works with raw bytes, so you'll have to serialize/deserialize packets -/// yourself. It will do the compression and encryption for you though. -#[derive(Component)] -pub struct RawConnection { - pub reader: RawConnectionReader, - pub writer: RawConnectionWriter, - - /// Packets sent to this will be sent to the server. - /// A task that reads packets from the server. The client is disconnected - /// when this task ends. - pub read_packets_task: tokio::task::JoinHandle<()>, - /// A task that writes packets from the server. - pub write_packets_task: tokio::task::JoinHandle<()>, - - pub connection_protocol: ConnectionProtocol, -} - -#[derive(Clone)] -pub struct RawConnectionReader { - pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>, - pub run_schedule_sender: mpsc::Sender<()>, -} -#[derive(Clone)] -pub struct RawConnectionWriter { - pub outgoing_packets_sender: mpsc::UnboundedSender<Box<[u8]>>, -} - -#[derive(Error, Debug)] -pub enum WritePacketError { - #[error("Wrong protocol state: expected {expected:?}, got {got:?}")] - WrongState { - expected: ConnectionProtocol, - got: ConnectionProtocol, - }, - #[error(transparent)] - Encoding(#[from] azalea_protocol::write::PacketEncodeError), - #[error(transparent)] - SendError { - #[from] - #[backtrace] - source: SendError<Box<[u8]>>, - }, -} - -impl RawConnection { - pub fn new( - run_schedule_sender: mpsc::Sender<()>, - connection_protocol: ConnectionProtocol, - raw_read_connection: RawReadConnection, - raw_write_connection: RawWriteConnection, - ) -> Self { - let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel(); - - let incoming_packet_queue = Arc::new(Mutex::new(Vec::new())); - - let reader = RawConnectionReader { - incoming_packet_queue: incoming_packet_queue.clone(), - run_schedule_sender, - }; - let writer = RawConnectionWriter { - outgoing_packets_sender, - }; - - let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection)); - let write_packets_task = tokio::spawn( - writer - .clone() - .write_task(raw_write_connection, outgoing_packets_receiver), - ); - - Self { - reader, - writer, - read_packets_task, - write_packets_task, - connection_protocol, - } - } - - pub fn write_raw_packet(&self, raw_packet: Box<[u8]>) -> Result<(), WritePacketError> { - self.writer.outgoing_packets_sender.send(raw_packet)?; - Ok(()) - } - - /// Write the packet with the given state to the server. - /// - /// # Errors - /// - /// Returns an error if the packet is not valid for the current state, or if - /// encoding it failed somehow (like it's too big or something). - pub fn write_packet<P: ProtocolPacket + Debug>( - &self, - packet: impl Packet<P>, - ) -> Result<(), WritePacketError> { - let packet = packet.into_variant(); - let raw_packet = serialize_packet(&packet)?; - self.write_raw_packet(raw_packet)?; - - Ok(()) - } - - /// Returns whether the connection is still alive. - pub fn is_alive(&self) -> bool { - !self.read_packets_task.is_finished() - } - - pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Box<[u8]>>>> { - self.reader.incoming_packet_queue.clone() - } - - pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) { - self.connection_protocol = connection_protocol; - } -} - -impl RawConnectionReader { - /// Loop that reads from the connection and adds the packets to the queue + - /// runs the schedule. - pub async fn read_task(self, mut read_conn: RawReadConnection) { - fn log_for_error(error: &ReadPacketError) { - if !matches!(*error, ReadPacketError::ConnectionClosed) { - error!("Error reading packet from Client: {error:?}"); - } - } - - loop { - match read_conn.read().await { - Ok(raw_packet) => { - let mut incoming_packet_queue = self.incoming_packet_queue.lock(); - - incoming_packet_queue.push(raw_packet); - // this makes it so packets received at the same time are guaranteed to be - // handled in the same tick. this is also an attempt at making it so we can't - // receive any packets in the ticks/updates after being disconnected. - loop { - let raw_packet = match read_conn.try_read() { - Ok(p) => p, - Err(err) => { - log_for_error(&err); - return; - } - }; - let Some(raw_packet) = raw_packet else { break }; - incoming_packet_queue.push(raw_packet); - } - - // tell the client to run all the systems - if self.run_schedule_sender.try_send(()) == Err(TrySendError::Closed(())) { - // the client was dropped - break; - } - } - Err(err) => { - log_for_error(&err); - return; - } - } - } - } -} - -impl RawConnectionWriter { - /// Consume the [`ServerboundGamePacket`] queue and actually write the - /// packets to the server. It's like this so writing packets doesn't need to - /// be awaited. - /// - /// [`ServerboundGamePacket`]: azalea_protocol::packets::game::ServerboundGamePacket - pub async fn write_task( - self, - mut write_conn: RawWriteConnection, - mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Box<[u8]>>, - ) { - while let Some(raw_packet) = outgoing_packets_receiver.recv().await { - if let Err(err) = write_conn.write(&raw_packet).await { - error!("Disconnecting because we couldn't write a packet: {err}."); - break; - }; - } - // receiver is automatically closed when it's dropped - } -} - -impl Drop for RawConnection { - /// Stop every active task when this `RawConnection` is dropped. - fn drop(&mut self) { - self.read_packets_task.abort(); - self.write_packets_task.abort(); - } -} |
