aboutsummaryrefslogtreecommitdiff
path: root/azalea-client/src/raw_connection.rs
diff options
context:
space:
mode:
authormat <27899617+mat-1@users.noreply.github.com>2023-09-21 11:16:29 -0500
committerGitHub <noreply@github.com>2023-09-21 11:16:29 -0500
commit7b3e2e4bf793466a351510c7fbbd08234e93bb0e (patch)
tree7177a919de9982d9e3c7f36a76d2025696f465b6 /azalea-client/src/raw_connection.rs
parent83cce236145cdab1872a472a70943b669a880965 (diff)
downloadazalea-drasl-7b3e2e4bf793466a351510c7fbbd08234e93bb0e.tar.xz
1.20.2 (#99)
* add configuration state * start updating to 23w31a * implement a bit more of 23w31a * chunk batching * start adding configuration state * ioasfhjgsd * almost works * configuration state mostly implemented * handle other packets in configuration state and fix keepalive * cleanup, fix warnings * 23w32a * fix some doctests * 23w33a * 23w35a * 1.20.2-pre2 * fix system conflicts * 1.20.2-pre4 * make tests compile * tests pass * 1.20.2-rc2 * 1.20.2 * Revert "1.20.2" This reverts commit dd152fd265332ead333c919e585ded6d609d7468. * didn't mean to commit that code --------- Co-authored-by: mat <git@matdoes.dev>
Diffstat (limited to 'azalea-client/src/raw_connection.rs')
-rw-r--r--azalea-client/src/raw_connection.rs174
1 files changed, 174 insertions, 0 deletions
diff --git a/azalea-client/src/raw_connection.rs b/azalea-client/src/raw_connection.rs
new file mode 100644
index 00000000..0df13a60
--- /dev/null
+++ b/azalea-client/src/raw_connection.rs
@@ -0,0 +1,174 @@
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use azalea_protocol::{
+ connect::{RawReadConnection, RawWriteConnection},
+ packets::{ConnectionProtocol, ProtocolPacket},
+ read::ReadPacketError,
+ write::serialize_packet,
+};
+use bevy_ecs::prelude::*;
+use log::error;
+use parking_lot::Mutex;
+use thiserror::Error;
+use tokio::sync::mpsc;
+
+/// A component for clients that can read and write packets to the server. This
+/// works with raw bytes, so you'll have to serialize/deserialize packets
+/// yourself. It will do the compression and encryption for you though.
+#[derive(Component)]
+pub struct RawConnection {
+ reader: RawConnectionReader,
+ writer: RawConnectionWriter,
+
+ /// Packets sent to this will be sent to the server.
+
+ /// A task that reads packets from the server. The client is disconnected
+ /// when this task ends.
+ read_packets_task: tokio::task::JoinHandle<()>,
+ /// A task that writes packets from the server.
+ write_packets_task: tokio::task::JoinHandle<()>,
+
+ connection_protocol: ConnectionProtocol,
+}
+
+#[derive(Clone)]
+struct RawConnectionReader {
+ pub incoming_packet_queue: Arc<Mutex<Vec<Vec<u8>>>>,
+ pub run_schedule_sender: mpsc::UnboundedSender<()>,
+}
+#[derive(Clone)]
+struct RawConnectionWriter {
+ pub outgoing_packets_sender: mpsc::UnboundedSender<Vec<u8>>,
+}
+
+#[derive(Error, Debug)]
+pub enum WritePacketError {
+ #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
+ WrongState {
+ expected: ConnectionProtocol,
+ got: ConnectionProtocol,
+ },
+ #[error(transparent)]
+ Encoding(#[from] azalea_protocol::write::PacketEncodeError),
+}
+
+impl RawConnection {
+ pub fn new(
+ run_schedule_sender: mpsc::UnboundedSender<()>,
+ connection_protocol: ConnectionProtocol,
+ raw_read_connection: RawReadConnection,
+ raw_write_connection: RawWriteConnection,
+ ) -> Self {
+ let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel();
+
+ let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));
+
+ let reader = RawConnectionReader {
+ incoming_packet_queue: incoming_packet_queue.clone(),
+ run_schedule_sender,
+ };
+ let writer = RawConnectionWriter {
+ outgoing_packets_sender,
+ };
+
+ let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection));
+ let write_packets_task = tokio::spawn(
+ writer
+ .clone()
+ .write_task(raw_write_connection, outgoing_packets_receiver),
+ );
+
+ Self {
+ reader,
+ writer,
+ read_packets_task,
+ write_packets_task,
+ connection_protocol,
+ }
+ }
+
+ pub fn write_raw_packet(&self, raw_packet: Vec<u8>) {
+ self.writer
+ .outgoing_packets_sender
+ .send(raw_packet)
+ .unwrap();
+ }
+
+ /// Write the packet with the given state to the server.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if the packet is not valid for the current state, or if
+ /// encoding it failed somehow (like it's too big or something).
+ pub fn write_packet<P: ProtocolPacket + Debug>(
+ &self,
+ packet: P,
+ ) -> Result<(), WritePacketError> {
+ let raw_packet = serialize_packet(&packet)?;
+ self.write_raw_packet(raw_packet);
+ Ok(())
+ }
+
+ /// Returns whether the connection is still alive.
+ pub fn is_alive(&self) -> bool {
+ !self.read_packets_task.is_finished()
+ }
+
+ pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Vec<u8>>>> {
+ self.reader.incoming_packet_queue.clone()
+ }
+
+ pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) {
+ self.connection_protocol = connection_protocol;
+ }
+}
+
+impl RawConnectionReader {
+ /// Loop that reads from the connection and adds the packets to the queue +
+ /// runs the schedule.
+ pub async fn read_task(self, mut read_conn: RawReadConnection) {
+ loop {
+ match read_conn.read().await {
+ Ok(raw_packet) => {
+ self.incoming_packet_queue.lock().push(raw_packet);
+ // tell the client to run all the systems
+ self.run_schedule_sender.send(()).unwrap();
+ }
+ Err(error) => {
+ if !matches!(*error, ReadPacketError::ConnectionClosed) {
+ error!("Error reading packet from Client: {error:?}");
+ }
+ break;
+ }
+ }
+ }
+ }
+}
+
+impl RawConnectionWriter {
+ /// Consume the [`ServerboundGamePacket`] queue and actually write the
+ /// packets to the server. It's like this so writing packets doesn't need to
+ /// be awaited.
+ pub async fn write_task(
+ self,
+ mut write_conn: RawWriteConnection,
+ mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
+ ) {
+ while let Some(raw_packet) = outgoing_packets_receiver.recv().await {
+ if let Err(err) = write_conn.write(&raw_packet).await {
+ error!("Disconnecting because we couldn't write a packet: {err}.");
+ break;
+ };
+ }
+ // receiver is automatically closed when it's dropped
+ }
+}
+
+impl Drop for RawConnection {
+ /// Stop every active task when this `RawConnection` is dropped.
+ fn drop(&mut self) {
+ self.read_packets_task.abort();
+ self.write_packets_task.abort();
+ }
+}