From 567c6f4f2c39976d170111b816806453636f8241 Mon Sep 17 00:00:00 2001 From: mat Date: Sun, 1 May 2022 21:54:03 -0500 Subject: Reduce usage of AsyncRead We already receive everything from the server when it tells us the length, so we can actually just treat the stream as a Read instead of an AsyncRead. --- azalea-protocol/src/read.rs | 56 ++++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 34 deletions(-) (limited to 'azalea-protocol/src/read.rs') diff --git a/azalea-protocol/src/read.rs b/azalea-protocol/src/read.rs index 9afdb0e9..dd80bbc3 100755 --- a/azalea-protocol/src/read.rs +++ b/azalea-protocol/src/read.rs @@ -1,12 +1,16 @@ +use crate::{ + connect::PacketFlow, + mc_buf::{read_varint_async, Readable}, + packets::ProtocolPacket, +}; +use azalea_crypto::Aes128CfbDec; +use flate2::read::ZlibDecoder; use std::{ cell::Cell, + io::Read, pin::Pin, task::{Context, Poll}, }; - -use crate::{connect::PacketFlow, mc_buf::Readable, packets::ProtocolPacket}; -use async_compression::tokio::bufread::ZlibDecoder; -use azalea_crypto::Aes128CfbDec; use tokio::io::{AsyncRead, AsyncReadExt}; async fn frame_splitter(mut stream: &mut R) -> Result, String> @@ -14,7 +18,7 @@ where R: AsyncRead + std::marker::Unpin + std::marker::Send, { // Packet Length - let length_result = stream.read_varint().await; + let length_result = read_varint_async(&mut stream).await; match length_result { Ok(length) => { let mut buf = vec![0; length as usize]; @@ -30,16 +34,13 @@ where } } -async fn packet_decoder( - stream: &mut R, +fn packet_decoder( + stream: &mut impl Read, flow: &PacketFlow, -) -> Result -where - R: AsyncRead + std::marker::Unpin + std::marker::Send, -{ +) -> Result { // Packet ID - let packet_id = stream.read_varint().await?; - P::read(packet_id.try_into().unwrap(), flow, stream).await + let packet_id = stream.read_varint()?; + P::read(packet_id.try_into().unwrap(), flow, stream) } // this is always true in multiplayer, false in singleplayer @@ -47,22 +48,16 @@ static VALIDATE_DECOMPRESSED: bool = true; pub static MAXIMUM_UNCOMPRESSED_LENGTH: u32 = 2097152; -async fn compression_decoder( - stream: &mut R, +fn compression_decoder( + stream: &mut impl Read, compression_threshold: u32, -) -> Result, String> -where - R: AsyncRead + std::marker::Unpin + std::marker::Send, -{ +) -> Result, String> { // Data Length - let n: u32 = stream.read_varint().await?.try_into().unwrap(); + let n: u32 = stream.read_varint()?.try_into().unwrap(); if n == 0 { // no data size, no compression let mut buf = vec![]; - stream - .read_to_end(&mut buf) - .await - .map_err(|e| e.to_string())?; + stream.read_to_end(&mut buf).map_err(|e| e.to_string())?; return Ok(buf); } @@ -81,17 +76,10 @@ where } } - let mut buf = vec![]; - stream - .read_to_end(&mut buf) - .await - .map_err(|e| e.to_string())?; - let mut decoded_buf = vec![]; - let mut decoder = ZlibDecoder::new(buf.as_slice()); + let mut decoder = ZlibDecoder::new(stream); decoder .read_to_end(&mut decoded_buf) - .await .map_err(|e| e.to_string())?; Ok(decoded_buf) @@ -161,11 +149,11 @@ where // "decompressing packet ({}ms)", // start_time.elapsed().as_millis() // ); - buf = compression_decoder(&mut buf.as_slice(), compression_threshold).await?; + buf = compression_decoder(&mut &buf[..], compression_threshold)?; } // println!("decoding packet ({}ms)", start_time.elapsed().as_millis()); - let packet = packet_decoder(&mut buf.as_slice(), flow).await?; + let packet = packet_decoder(&mut buf.as_slice(), flow)?; // println!("decoded packet ({}ms)", start_time.elapsed().as_millis()); Ok(packet) -- cgit v1.2.3 From e1b6bc965a3f71d64b4dc3075da21c578ab5b508 Mon Sep 17 00:00:00 2001 From: mat Date: Sun, 1 May 2022 22:10:28 -0500 Subject: Update read.rs --- azalea-protocol/src/read.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'azalea-protocol/src/read.rs') diff --git a/azalea-protocol/src/read.rs b/azalea-protocol/src/read.rs index dd80bbc3..b4ba17ea 100755 --- a/azalea-protocol/src/read.rs +++ b/azalea-protocol/src/read.rs @@ -149,7 +149,7 @@ where // "decompressing packet ({}ms)", // start_time.elapsed().as_millis() // ); - buf = compression_decoder(&mut &buf[..], compression_threshold)?; + buf = compression_decoder(&mut buf.as_slice(), compression_threshold)?; } // println!("decoding packet ({}ms)", start_time.elapsed().as_millis()); -- cgit v1.2.3