diff options
| -rw-r--r-- | azalea-client/src/client.rs | 39 | ||||
| -rw-r--r-- | azalea-client/src/movement.rs | 2 | ||||
| -rw-r--r-- | azalea-crypto/benches/my_benchmark.rs | 2 | ||||
| -rwxr-xr-x | azalea-protocol/src/connect.rs | 99 | ||||
| -rwxr-xr-x | azalea-protocol/src/read.rs | 5 |
5 files changed, 98 insertions, 49 deletions
diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index 14ebf9e9..9a6fd762 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -4,7 +4,7 @@ use azalea_block::BlockState; use azalea_chat::component::Component; use azalea_core::{ChunkPos, ResourceLocation, Vec3}; use azalea_protocol::{ - connect::{Connection, ConnectionError}, + connect::{Connection, ConnectionError, ReadConnection, WriteConnection}, packets::{ game::{ clientbound_player_chat_packet::ClientboundPlayerChatPacket, @@ -67,7 +67,8 @@ impl ChatPacket { #[derive(Clone)] pub struct Client { game_profile: GameProfile, - pub conn: Arc<tokio::sync::Mutex<Connection<ClientboundGamePacket, ServerboundGamePacket>>>, + pub read_conn: Arc<tokio::sync::Mutex<ReadConnection<ClientboundGamePacket>>>, + pub write_conn: Arc<tokio::sync::Mutex<WriteConnection<ServerboundGamePacket>>>, pub player: Arc<Mutex<Player>>, pub dimension: Arc<Mutex<Dimension>>, pub physics_state: Arc<Mutex<PhysicsState>>, @@ -185,14 +186,18 @@ impl Client { } }; - let conn = Arc::new(tokio::sync::Mutex::new(conn)); + let (read_conn, write_conn) = conn.into_split(); + + let read_conn = Arc::new(tokio::sync::Mutex::new(read_conn)); + let write_conn = Arc::new(tokio::sync::Mutex::new(write_conn)); let (tx, rx) = mpsc::unbounded_channel(); // we got the GameConnection, so the server is now connected :) let client = Client { game_profile, - conn, + read_conn, + write_conn, player: Arc::new(Mutex::new(Player::default())), dimension: Arc::new(Mutex::new(Dimension::default())), physics_state: Arc::new(Mutex::new(PhysicsState::default())), @@ -209,9 +214,14 @@ impl Client { Ok((client, rx)) } + /// Write a packet directly to the server. + pub async fn write_packet(&self, packet: ServerboundGamePacket) -> Result<(), std::io::Error> { + self.write_conn.lock().await.write(packet).await + } + async fn protocol_loop(client: Client, tx: UnboundedSender<Event>) { loop { - let r = client.conn.lock().await.read().await; + let r = client.read_conn.lock().await.read().await; match r { Ok(packet) => match Self::handle(&packet, &client, &tx).await { Ok(_) => {} @@ -323,10 +333,7 @@ impl Client { } client - .conn - .lock() - .await - .write( + .write_packet( ServerboundCustomPayloadPacket { identifier: ResourceLocation::new("brand").unwrap(), // they don't have to know :) @@ -444,12 +451,11 @@ impl Client { (new_pos, y_rot, x_rot) }; - let mut conn_lock = client.conn.lock().await; - conn_lock - .write(ServerboundAcceptTeleportationPacket { id: p.id }.get()) + client + .write_packet(ServerboundAcceptTeleportationPacket { id: p.id }.get()) .await?; - conn_lock - .write( + client + .write_packet( ServerboundMovePlayerPosRotPacket { x: new_pos.x, y: new_pos.y, @@ -567,10 +573,7 @@ impl Client { ClientboundGamePacket::KeepAlive(p) => { debug!("Got keep alive packet {:?}", p); client - .conn - .lock() - .await - .write(ServerboundKeepAlivePacket { id: p.id }.get()) + .write_packet(ServerboundKeepAlivePacket { id: p.id }.get()) .await?; } ClientboundGamePacket::RemoveEntities(p) => { diff --git a/azalea-client/src/movement.rs b/azalea-client/src/movement.rs index ddc44c0a..0a4a05e8 100644 --- a/azalea-client/src/movement.rs +++ b/azalea-client/src/movement.rs @@ -121,7 +121,7 @@ impl Client { }; if let Some(packet) = packet { - self.conn.lock().await.write(packet).await?; + self.write_packet(packet).await?; } Ok(()) diff --git a/azalea-crypto/benches/my_benchmark.rs b/azalea-crypto/benches/my_benchmark.rs index aeee8b07..e8c4ecad 100644 --- a/azalea-crypto/benches/my_benchmark.rs +++ b/azalea-crypto/benches/my_benchmark.rs @@ -2,7 +2,7 @@ use azalea_crypto::{create_cipher, decrypt_packet, encrypt_packet}; use criterion::{criterion_group, criterion_main, Criterion}; fn bench(c: &mut Criterion) { - let (mut enc, mut dec) = create_cipher(b"0123456789abcdef"); + let (mut enc, dec) = create_cipher(b"0123456789abcdef"); let mut packet = [0u8; 65536]; for i in 0..packet.len() { diff --git a/azalea-protocol/src/connect.rs b/azalea-protocol/src/connect.rs index dbca4214..3fdcecd3 100755 --- a/azalea-protocol/src/connect.rs +++ b/azalea-protocol/src/connect.rs @@ -12,37 +12,50 @@ use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc}; use std::fmt::Debug; use std::marker::PhantomData; use thiserror::Error; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::TcpStream; -pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> { - /// The buffered writer - pub stream: TcpStream, +pub struct ReadConnection<R: ProtocolPacket> { + pub read_stream: OwnedReadHalf, pub compression_threshold: Option<u32>, - pub enc_cipher: Option<Aes128CfbEnc>, pub dec_cipher: Option<Aes128CfbDec>, _reading: PhantomData<R>, +} + +pub struct WriteConnection<W: ProtocolPacket> { + pub write_stream: OwnedWriteHalf, + pub compression_threshold: Option<u32>, + pub enc_cipher: Option<Aes128CfbEnc>, _writing: PhantomData<W>, } -impl<R, W> Connection<R, W> +pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> { + pub reader: ReadConnection<R>, + pub writer: WriteConnection<W>, +} + +impl<R> ReadConnection<R> where R: ProtocolPacket + Debug, - W: ProtocolPacket + Debug, { pub async fn read(&mut self) -> Result<R, ReadPacketError> { read_packet::<R, _>( - &mut self.stream, + &mut self.read_stream, self.compression_threshold, &mut self.dec_cipher, ) .await } - +} +impl<W> WriteConnection<W> +where + W: ProtocolPacket + Debug, +{ /// Write a packet to the server pub async fn write(&mut self, packet: W) -> std::io::Result<()> { write_packet( packet, - &mut self.stream, + &mut self.write_stream, self.compression_threshold, &mut self.enc_cipher, ) @@ -50,6 +63,26 @@ where } } +impl<R, W> Connection<R, W> +where + R: ProtocolPacket + Debug, + W: ProtocolPacket + Debug, +{ + pub async fn read(&mut self) -> Result<R, ReadPacketError> { + self.reader.read().await + } + + /// Write a packet to the server + pub async fn write(&mut self, packet: W) -> std::io::Result<()> { + self.writer.write(packet).await + } + + /// Split the reader and writer into two objects. This doesn't allocate. + pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) { + (self.reader, self.writer) + } +} + #[derive(Error, Debug)] pub enum ConnectionError { #[error("{0}")] @@ -66,13 +99,21 @@ impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> { // enable tcp_nodelay stream.set_nodelay(true)?; + let (read_stream, write_stream) = stream.into_split(); + Ok(Connection { - stream, - compression_threshold: None, - enc_cipher: None, - dec_cipher: None, - _reading: PhantomData, - _writing: PhantomData, + reader: ReadConnection { + read_stream, + compression_threshold: None, + dec_cipher: None, + _reading: PhantomData, + }, + writer: WriteConnection { + write_stream, + compression_threshold: None, + enc_cipher: None, + _writing: PhantomData, + }, }) } @@ -89,17 +130,19 @@ impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> { pub fn set_compression_threshold(&mut self, threshold: i32) { // if you pass a threshold of less than 0, compression is disabled if threshold >= 0 { - self.compression_threshold = Some(threshold as u32); + self.reader.compression_threshold = Some(threshold as u32); + self.writer.compression_threshold = Some(threshold as u32); } else { - self.compression_threshold = None; + self.reader.compression_threshold = None; + self.writer.compression_threshold = None; } } pub fn set_encryption_key(&mut self, key: [u8; 16]) { // minecraft has a cipher decoder and encoder, i don't think it matters though? let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key); - self.enc_cipher = Some(enc_cipher); - self.dec_cipher = Some(dec_cipher); + self.writer.enc_cipher = Some(enc_cipher); + self.reader.dec_cipher = Some(dec_cipher); } pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> { @@ -120,12 +163,18 @@ where W2: ProtocolPacket + Debug, { Connection { - stream: connection.stream, - compression_threshold: connection.compression_threshold, - enc_cipher: connection.enc_cipher, - dec_cipher: connection.dec_cipher, - _reading: PhantomData, - _writing: PhantomData, + reader: ReadConnection { + read_stream: connection.reader.read_stream, + compression_threshold: connection.reader.compression_threshold, + dec_cipher: connection.reader.dec_cipher, + _reading: PhantomData, + }, + writer: WriteConnection { + compression_threshold: connection.writer.compression_threshold, + write_stream: connection.writer.write_stream, + enc_cipher: connection.writer.enc_cipher, + _writing: PhantomData, + }, } } } diff --git a/azalea-protocol/src/read.rs b/azalea-protocol/src/read.rs index 313fb412..3ff24f72 100755 --- a/azalea-protocol/src/read.rs +++ b/azalea-protocol/src/read.rs @@ -221,10 +221,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::packets::{ - game::{clientbound_player_chat_packet::ChatType, ClientboundGamePacket}, - handshake::ClientboundHandshakePacket, - }; + use crate::packets::game::{clientbound_player_chat_packet::ChatType, ClientboundGamePacket}; use std::io::Cursor; #[tokio::test] |
