From 7b3e2e4bf793466a351510c7fbbd08234e93bb0e Mon Sep 17 00:00:00 2001 From: mat <27899617+mat-1@users.noreply.github.com> Date: Thu, 21 Sep 2023 11:16:29 -0500 Subject: 1.20.2 (#99) * add configuration state * start updating to 23w31a * implement a bit more of 23w31a * chunk batching * start adding configuration state * ioasfhjgsd * almost works * configuration state mostly implemented * handle other packets in configuration state and fix keepalive * cleanup, fix warnings * 23w32a * fix some doctests * 23w33a * 23w35a * 1.20.2-pre2 * fix system conflicts * 1.20.2-pre4 * make tests compile * tests pass * 1.20.2-rc2 * 1.20.2 * Revert "1.20.2" This reverts commit dd152fd265332ead333c919e585ded6d609d7468. * didn't mean to commit that code --------- Co-authored-by: mat --- azalea-client/src/raw_connection.rs | 174 ++++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 azalea-client/src/raw_connection.rs (limited to 'azalea-client/src/raw_connection.rs') diff --git a/azalea-client/src/raw_connection.rs b/azalea-client/src/raw_connection.rs new file mode 100644 index 00000000..0df13a60 --- /dev/null +++ b/azalea-client/src/raw_connection.rs @@ -0,0 +1,174 @@ +use std::fmt::Debug; +use std::sync::Arc; + +use azalea_protocol::{ + connect::{RawReadConnection, RawWriteConnection}, + packets::{ConnectionProtocol, ProtocolPacket}, + read::ReadPacketError, + write::serialize_packet, +}; +use bevy_ecs::prelude::*; +use log::error; +use parking_lot::Mutex; +use thiserror::Error; +use tokio::sync::mpsc; + +/// A component for clients that can read and write packets to the server. This +/// works with raw bytes, so you'll have to serialize/deserialize packets +/// yourself. It will do the compression and encryption for you though. +#[derive(Component)] +pub struct RawConnection { + reader: RawConnectionReader, + writer: RawConnectionWriter, + + /// Packets sent to this will be sent to the server. + + /// A task that reads packets from the server. The client is disconnected + /// when this task ends. + read_packets_task: tokio::task::JoinHandle<()>, + /// A task that writes packets from the server. + write_packets_task: tokio::task::JoinHandle<()>, + + connection_protocol: ConnectionProtocol, +} + +#[derive(Clone)] +struct RawConnectionReader { + pub incoming_packet_queue: Arc>>>, + pub run_schedule_sender: mpsc::UnboundedSender<()>, +} +#[derive(Clone)] +struct RawConnectionWriter { + pub outgoing_packets_sender: mpsc::UnboundedSender>, +} + +#[derive(Error, Debug)] +pub enum WritePacketError { + #[error("Wrong protocol state: expected {expected:?}, got {got:?}")] + WrongState { + expected: ConnectionProtocol, + got: ConnectionProtocol, + }, + #[error(transparent)] + Encoding(#[from] azalea_protocol::write::PacketEncodeError), +} + +impl RawConnection { + pub fn new( + run_schedule_sender: mpsc::UnboundedSender<()>, + connection_protocol: ConnectionProtocol, + raw_read_connection: RawReadConnection, + raw_write_connection: RawWriteConnection, + ) -> Self { + let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel(); + + let incoming_packet_queue = Arc::new(Mutex::new(Vec::new())); + + let reader = RawConnectionReader { + incoming_packet_queue: incoming_packet_queue.clone(), + run_schedule_sender, + }; + let writer = RawConnectionWriter { + outgoing_packets_sender, + }; + + let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection)); + let write_packets_task = tokio::spawn( + writer + .clone() + .write_task(raw_write_connection, outgoing_packets_receiver), + ); + + Self { + reader, + writer, + read_packets_task, + write_packets_task, + connection_protocol, + } + } + + pub fn write_raw_packet(&self, raw_packet: Vec) { + self.writer + .outgoing_packets_sender + .send(raw_packet) + .unwrap(); + } + + /// Write the packet with the given state to the server. + /// + /// # Errors + /// + /// Returns an error if the packet is not valid for the current state, or if + /// encoding it failed somehow (like it's too big or something). + pub fn write_packet( + &self, + packet: P, + ) -> Result<(), WritePacketError> { + let raw_packet = serialize_packet(&packet)?; + self.write_raw_packet(raw_packet); + Ok(()) + } + + /// Returns whether the connection is still alive. + pub fn is_alive(&self) -> bool { + !self.read_packets_task.is_finished() + } + + pub fn incoming_packet_queue(&self) -> Arc>>> { + self.reader.incoming_packet_queue.clone() + } + + pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) { + self.connection_protocol = connection_protocol; + } +} + +impl RawConnectionReader { + /// Loop that reads from the connection and adds the packets to the queue + + /// runs the schedule. + pub async fn read_task(self, mut read_conn: RawReadConnection) { + loop { + match read_conn.read().await { + Ok(raw_packet) => { + self.incoming_packet_queue.lock().push(raw_packet); + // tell the client to run all the systems + self.run_schedule_sender.send(()).unwrap(); + } + Err(error) => { + if !matches!(*error, ReadPacketError::ConnectionClosed) { + error!("Error reading packet from Client: {error:?}"); + } + break; + } + } + } + } +} + +impl RawConnectionWriter { + /// Consume the [`ServerboundGamePacket`] queue and actually write the + /// packets to the server. It's like this so writing packets doesn't need to + /// be awaited. + pub async fn write_task( + self, + mut write_conn: RawWriteConnection, + mut outgoing_packets_receiver: mpsc::UnboundedReceiver>, + ) { + while let Some(raw_packet) = outgoing_packets_receiver.recv().await { + if let Err(err) = write_conn.write(&raw_packet).await { + error!("Disconnecting because we couldn't write a packet: {err}."); + break; + }; + } + // receiver is automatically closed when it's dropped + } +} + +impl Drop for RawConnection { + /// Stop every active task when this `RawConnection` is dropped. + fn drop(&mut self) { + self.read_packets_task.abort(); + self.write_packets_task.abort(); + } +} -- cgit v1.2.3