diff options
| author | mat <27899617+mat-1@users.noreply.github.com> | 2023-09-21 11:16:29 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-09-21 11:16:29 -0500 |
| commit | 7b3e2e4bf793466a351510c7fbbd08234e93bb0e (patch) | |
| tree | 7177a919de9982d9e3c7f36a76d2025696f465b6 /azalea-client/src/raw_connection.rs | |
| parent | 83cce236145cdab1872a472a70943b669a880965 (diff) | |
| download | azalea-drasl-7b3e2e4bf793466a351510c7fbbd08234e93bb0e.tar.xz | |
1.20.2 (#99)
* add configuration state
* start updating to 23w31a
* implement a bit more of 23w31a
* chunk batching
* start adding configuration state
* ioasfhjgsd
* almost works
* configuration state mostly implemented
* handle other packets in configuration state and fix keepalive
* cleanup, fix warnings
* 23w32a
* fix some doctests
* 23w33a
* 23w35a
* 1.20.2-pre2
* fix system conflicts
* 1.20.2-pre4
* make tests compile
* tests pass
* 1.20.2-rc2
* 1.20.2
* Revert "1.20.2"
This reverts commit dd152fd265332ead333c919e585ded6d609d7468.
* didn't mean to commit that code
---------
Co-authored-by: mat <git@matdoes.dev>
Diffstat (limited to 'azalea-client/src/raw_connection.rs')
| -rw-r--r-- | azalea-client/src/raw_connection.rs | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/azalea-client/src/raw_connection.rs b/azalea-client/src/raw_connection.rs new file mode 100644 index 00000000..0df13a60 --- /dev/null +++ b/azalea-client/src/raw_connection.rs @@ -0,0 +1,174 @@ +use std::fmt::Debug; +use std::sync::Arc; + +use azalea_protocol::{ + connect::{RawReadConnection, RawWriteConnection}, + packets::{ConnectionProtocol, ProtocolPacket}, + read::ReadPacketError, + write::serialize_packet, +}; +use bevy_ecs::prelude::*; +use log::error; +use parking_lot::Mutex; +use thiserror::Error; +use tokio::sync::mpsc; + +/// A component for clients that can read and write packets to the server. This +/// works with raw bytes, so you'll have to serialize/deserialize packets +/// yourself. It will do the compression and encryption for you though. +#[derive(Component)] +pub struct RawConnection { + reader: RawConnectionReader, + writer: RawConnectionWriter, + + /// Packets sent to this will be sent to the server. + + /// A task that reads packets from the server. The client is disconnected + /// when this task ends. + read_packets_task: tokio::task::JoinHandle<()>, + /// A task that writes packets from the server. + write_packets_task: tokio::task::JoinHandle<()>, + + connection_protocol: ConnectionProtocol, +} + +#[derive(Clone)] +struct RawConnectionReader { + pub incoming_packet_queue: Arc<Mutex<Vec<Vec<u8>>>>, + pub run_schedule_sender: mpsc::UnboundedSender<()>, +} +#[derive(Clone)] +struct RawConnectionWriter { + pub outgoing_packets_sender: mpsc::UnboundedSender<Vec<u8>>, +} + +#[derive(Error, Debug)] +pub enum WritePacketError { + #[error("Wrong protocol state: expected {expected:?}, got {got:?}")] + WrongState { + expected: ConnectionProtocol, + got: ConnectionProtocol, + }, + #[error(transparent)] + Encoding(#[from] azalea_protocol::write::PacketEncodeError), +} + +impl RawConnection { + pub fn new( + run_schedule_sender: mpsc::UnboundedSender<()>, + connection_protocol: ConnectionProtocol, + raw_read_connection: RawReadConnection, + raw_write_connection: RawWriteConnection, + ) -> Self { + let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel(); + + let incoming_packet_queue = Arc::new(Mutex::new(Vec::new())); + + let reader = RawConnectionReader { + incoming_packet_queue: incoming_packet_queue.clone(), + run_schedule_sender, + }; + let writer = RawConnectionWriter { + outgoing_packets_sender, + }; + + let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection)); + let write_packets_task = tokio::spawn( + writer + .clone() + .write_task(raw_write_connection, outgoing_packets_receiver), + ); + + Self { + reader, + writer, + read_packets_task, + write_packets_task, + connection_protocol, + } + } + + pub fn write_raw_packet(&self, raw_packet: Vec<u8>) { + self.writer + .outgoing_packets_sender + .send(raw_packet) + .unwrap(); + } + + /// Write the packet with the given state to the server. + /// + /// # Errors + /// + /// Returns an error if the packet is not valid for the current state, or if + /// encoding it failed somehow (like it's too big or something). + pub fn write_packet<P: ProtocolPacket + Debug>( + &self, + packet: P, + ) -> Result<(), WritePacketError> { + let raw_packet = serialize_packet(&packet)?; + self.write_raw_packet(raw_packet); + Ok(()) + } + + /// Returns whether the connection is still alive. + pub fn is_alive(&self) -> bool { + !self.read_packets_task.is_finished() + } + + pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Vec<u8>>>> { + self.reader.incoming_packet_queue.clone() + } + + pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) { + self.connection_protocol = connection_protocol; + } +} + +impl RawConnectionReader { + /// Loop that reads from the connection and adds the packets to the queue + + /// runs the schedule. + pub async fn read_task(self, mut read_conn: RawReadConnection) { + loop { + match read_conn.read().await { + Ok(raw_packet) => { + self.incoming_packet_queue.lock().push(raw_packet); + // tell the client to run all the systems + self.run_schedule_sender.send(()).unwrap(); + } + Err(error) => { + if !matches!(*error, ReadPacketError::ConnectionClosed) { + error!("Error reading packet from Client: {error:?}"); + } + break; + } + } + } + } +} + +impl RawConnectionWriter { + /// Consume the [`ServerboundGamePacket`] queue and actually write the + /// packets to the server. It's like this so writing packets doesn't need to + /// be awaited. + pub async fn write_task( + self, + mut write_conn: RawWriteConnection, + mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Vec<u8>>, + ) { + while let Some(raw_packet) = outgoing_packets_receiver.recv().await { + if let Err(err) = write_conn.write(&raw_packet).await { + error!("Disconnecting because we couldn't write a packet: {err}."); + break; + }; + } + // receiver is automatically closed when it's dropped + } +} + +impl Drop for RawConnection { + /// Stop every active task when this `RawConnection` is dropped. + fn drop(&mut self) { + self.read_packets_task.abort(); + self.write_packets_task.abort(); + } +} |
