aboutsummaryrefslogtreecommitdiff
path: root/azalea-protocol/src
diff options
context:
space:
mode:
authormat <git@matdoes.dev>2023-07-16 05:50:02 -0500
committermat <git@matdoes.dev>2023-07-16 05:50:02 -0500
commit0a83dc73b4c06b9300b8e16f8a30d512374262cd (patch)
tree62ecc67501300bb7ef6c1996cefc440a94f826ee /azalea-protocol/src
parent509c154b4d90515f8964f5a1a732106cb7fa0288 (diff)
downloadazalea-drasl-0a83dc73b4c06b9300b8e16f8a30d512374262cd.tar.xz
add try_read to connection
Diffstat (limited to 'azalea-protocol/src')
-rwxr-xr-xazalea-protocol/src/connect.rs19
-rwxr-xr-xazalea-protocol/src/read.rs65
2 files changed, 77 insertions, 7 deletions
diff --git a/azalea-protocol/src/connect.rs b/azalea-protocol/src/connect.rs
index cb837ba5..5df1d874 100755
--- a/azalea-protocol/src/connect.rs
+++ b/azalea-protocol/src/connect.rs
@@ -6,7 +6,7 @@ use crate::packets::login::clientbound_hello_packet::ClientboundHelloPacket;
use crate::packets::login::{ClientboundLoginPacket, ServerboundLoginPacket};
use crate::packets::status::{ClientboundStatusPacket, ServerboundStatusPacket};
use crate::packets::ProtocolPacket;
-use crate::read::{read_packet, ReadPacketError};
+use crate::read::{read_packet, try_read_packet, ReadPacketError};
use crate::write::write_packet;
use azalea_auth::game_profile::GameProfile;
use azalea_auth::sessionserver::{ClientSessionServerError, ServerSessionServerError};
@@ -140,6 +140,17 @@ where
)
.await
}
+
+ /// Try to read a packet from the stream, or return Ok(None) if there's no
+ /// packet.
+ pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
+ try_read_packet::<R, _>(
+ &mut self.read_stream,
+ &mut self.buffer,
+ self.compression_threshold,
+ &mut self.dec_cipher,
+ )
+ }
}
impl<W> WriteConnection<W>
where
@@ -183,6 +194,12 @@ where
self.reader.read().await
}
+ /// Try to read a packet from the other side of the connection, or return
+ /// Ok(None) if there's no packet to read.
+ pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
+ self.reader.try_read()
+ }
+
/// Write a packet to the other side of the connection.
pub async fn write(&mut self, packet: W) -> std::io::Result<()> {
self.writer.write(packet).await
diff --git a/azalea-protocol/src/read.rs b/azalea-protocol/src/read.rs
index d92897b9..bffb22bd 100755
--- a/azalea-protocol/src/read.rs
+++ b/azalea-protocol/src/read.rs
@@ -8,6 +8,7 @@ use bytes::Buf;
use bytes::BytesMut;
use flate2::read::ZlibDecoder;
use futures::StreamExt;
+use futures_lite::future;
use log::{log_enabled, trace};
use std::backtrace::Backtrace;
use std::{
@@ -204,6 +205,8 @@ pub fn compression_decoder(
/// same frame, so we need to store the packet data that's left to read.
///
/// The current protocol state must be passed as a generic.
+///
+/// For the non-waiting version, see [`try_read_packet`].
pub async fn read_packet<'a, P: ProtocolPacket + Debug, R>(
stream: &'a mut R,
buffer: &mut BytesMut,
@@ -214,12 +217,10 @@ where
R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync,
{
let mut framed = FramedRead::new(stream, BytesCodec::new());
- let mut buf = loop {
- if let Some(buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? {
+ loop {
+ if let Some(buf) = try_process_buffer::<P, R>(buffer, compression_threshold)? {
// we got a full packet!!
- break buf;
- } else {
- // no full packet yet :( keep reading
+ return Ok(buf);
};
// if we were given a cipher, decrypt the packet
@@ -234,6 +235,58 @@ where
} else {
return Err(Box::new(ReadPacketError::ConnectionClosed));
};
+ }
+}
+
+/// Try to read a single packet from a stream. Returns None if we haven't
+/// received a full packet yet.
+pub fn try_read_packet<P: ProtocolPacket + Debug, R>(
+ stream: &mut R,
+ buffer: &mut BytesMut,
+ compression_threshold: Option<u32>,
+ cipher: &mut Option<Aes128CfbDec>,
+) -> Result<Option<P>, Box<ReadPacketError>>
+where
+ R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync,
+{
+ let mut framed = FramedRead::new(stream, BytesCodec::new());
+ loop {
+ if let Some(buf) = try_process_buffer::<P, R>(buffer, compression_threshold)? {
+ // we got a full packet!!
+ return Ok(Some(buf));
+ };
+
+ // if we were given a cipher, decrypt the packet
+ if let Some(message) = future::block_on(future::poll_once(framed.next())) {
+ if let Some(message) = message {
+ let mut bytes = message.map_err(ReadPacketError::from)?;
+
+ if let Some(cipher) = cipher {
+ azalea_crypto::decrypt_packet(cipher, &mut bytes);
+ }
+
+ buffer.extend_from_slice(&bytes);
+ } else {
+ return Err(Box::new(ReadPacketError::ConnectionClosed));
+ }
+ } else {
+ return Ok(None);
+ };
+ }
+}
+
+/// Try to get a Minecraft packet from a buffer. Returns None if the packet
+/// isn't complete yet.
+pub fn try_process_buffer<P: ProtocolPacket + Debug, R>(
+ buffer: &mut BytesMut,
+ compression_threshold: Option<u32>,
+) -> Result<Option<P>, Box<ReadPacketError>>
+where
+ R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync,
+{
+ let Some(mut buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? else {
+ // no full packet yet :(
+ return Ok(None);
};
if let Some(compression_threshold) = compression_threshold {
@@ -255,5 +308,5 @@ where
let packet = packet_decoder(&mut Cursor::new(&buf[..]))?;
- Ok(packet)
+ Ok(Some(packet))
}