aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormat <github@matdoes.dev>2022-10-07 23:56:23 -0500
committermat <github@matdoes.dev>2022-10-07 23:56:23 -0500
commit6f6289376a0d9ffe7e58506824e37f6b380961c3 (patch)
tree97956fc560b338fbef630f0d0617a248e0e8b336
parente9d8d0357ee63cce321e177bf19a8974699894ee (diff)
downloadazalea-drasl-6f6289376a0d9ffe7e58506824e37f6b380961c3.tar.xz
fix errors with rewritten packet reading
i forgot i never tested it before LMAO
-rwxr-xr-xCargo.lock95
-rw-r--r--azalea-buf/src/read.rs4
-rw-r--r--azalea-client/src/client.rs3
-rwxr-xr-xazalea-nbt/src/decode.rs6
-rwxr-xr-xazalea-protocol/Cargo.toml4
-rw-r--r--azalea-protocol/src/connect.rs2
-rwxr-xr-xazalea-protocol/src/lib.rs39
-rw-r--r--azalea-protocol/src/read.rs144
-rwxr-xr-xazalea-protocol/src/write.rs4
-rw-r--r--azalea/src/bot.rs10
-rw-r--r--bot/src/main.rs2
11 files changed, 196 insertions, 117 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ff10db62..128bea90 100755
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -271,6 +271,8 @@ dependencies = [
"byteorder",
"bytes",
"flate2",
+ "futures",
+ "futures-util",
"log",
"serde",
"serde_json",
@@ -634,46 +636,89 @@ dependencies = [
]
[[package]]
+name = "futures"
+version = "0.3.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-channel"
-version = "0.3.21"
+version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
+checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050"
dependencies = [
"futures-core",
+ "futures-sink",
]
[[package]]
name = "futures-core"
-version = "0.3.21"
+version = "0.3.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf"
+
+[[package]]
+name = "futures-executor"
+version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
+checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
[[package]]
name = "futures-io"
-version = "0.3.21"
+version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
+checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
[[package]]
name = "futures-sink"
-version = "0.3.21"
+version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
+checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56"
[[package]]
name = "futures-task"
-version = "0.3.21"
+version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
+checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1"
[[package]]
name = "futures-util"
-version = "0.3.21"
+version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
+checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90"
dependencies = [
+ "futures-channel",
"futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
"futures-task",
+ "memchr",
"pin-project-lite",
"pin-utils",
"slab",
@@ -1494,16 +1539,36 @@ dependencies = [
[[package]]
name = "tokio-util"
-version = "0.6.10"
+version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507"
+checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
- "log",
"pin-project-lite",
"tokio",
+ "tracing",
+]
+
+[[package]]
+name = "tracing"
+version = "0.1.35"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160"
+dependencies = [
+ "cfg-if",
+ "pin-project-lite",
+ "tracing-core",
+]
+
+[[package]]
+name = "tracing-core"
+version = "0.1.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b7358be39f2f274f322d2aaed611acc57f382e8eb1e5b48cb9ae30933495ce7"
+dependencies = [
+ "once_cell",
]
[[package]]
diff --git a/azalea-buf/src/read.rs b/azalea-buf/src/read.rs
index 29f351c6..575066c4 100644
--- a/azalea-buf/src/read.rs
+++ b/azalea-buf/src/read.rs
@@ -42,10 +42,10 @@ pub enum BufReadError {
}
fn read_bytes<'a>(buf: &'a mut Cursor<&[u8]>, length: usize) -> Result<&'a [u8], BufReadError> {
- if length > buf.get_ref().len() {
+ if length > (buf.get_ref().len() - buf.position() as usize) {
return Err(BufReadError::UnexpectedEof {
attempted_read: length,
- actual_read: buf.get_ref().len(),
+ actual_read: buf.get_ref().len() - buf.position() as usize,
});
}
let initial_position = buf.position() as usize;
diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs
index ed0a75e7..bbf78ee6 100644
--- a/azalea-client/src/client.rs
+++ b/azalea-client/src/client.rs
@@ -231,7 +231,8 @@ impl Client {
/// Write a packet directly to the server.
pub async fn write_packet(&self, packet: ServerboundGamePacket) -> Result<(), std::io::Error> {
- self.write_conn.lock().await.write(packet).await
+ self.write_conn.lock().await.write(packet).await?;
+ Ok(())
}
/// Disconnect from the server, ending all tasks.
diff --git a/azalea-nbt/src/decode.rs b/azalea-nbt/src/decode.rs
index 8a1dfab5..a811bb1f 100755
--- a/azalea-nbt/src/decode.rs
+++ b/azalea-nbt/src/decode.rs
@@ -9,7 +9,7 @@ use std::io::{BufRead, Read};
#[inline]
fn read_bytes<'a>(buf: &'a mut Cursor<&[u8]>, length: usize) -> Result<&'a [u8], Error> {
- if length > buf.get_ref().len() {
+ if length > (buf.get_ref().len() - buf.position() as usize) {
return Err(Error::UnexpectedEof);
}
let initial_position = buf.position() as usize;
@@ -95,7 +95,7 @@ impl Tag {
// integers.
11 => {
let length = stream.read_u32::<BE>()? as usize;
- if length * 4 > stream.get_ref().len() {
+ if length * 4 > (stream.get_ref().len() - stream.position() as usize) {
return Err(Error::UnexpectedEof);
}
let mut ints = Vec::with_capacity(length as usize);
@@ -108,7 +108,7 @@ impl Tag {
// integer (thus 4 bytes) and indicates the number of 8 byte longs.
12 => {
let length = stream.read_u32::<BE>()? as usize;
- if length * 8 > stream.get_ref().len() {
+ if length * 8 > (stream.get_ref().len() - stream.position() as usize) {
return Err(Error::UnexpectedEof);
}
let mut longs = Vec::with_capacity(length as usize);
diff --git a/azalea-protocol/Cargo.toml b/azalea-protocol/Cargo.toml
index 62de47d9..415c0aa9 100755
--- a/azalea-protocol/Cargo.toml
+++ b/azalea-protocol/Cargo.toml
@@ -24,12 +24,14 @@ azalea-world = {path = "../azalea-world", version = "^0.1.0"}
byteorder = "^1.4.3"
bytes = "^1.1.0"
flate2 = "1.0.23"
+futures = "0.3.24"
+futures-util = "0.3.24"
log = "0.4.17"
serde = {version = "1.0.130", features = ["serde_derive"]}
serde_json = "^1.0.72"
thiserror = "^1.0.34"
tokio = {version = "^1.19.2", features = ["io-util", "net", "macros"]}
-tokio-util = "^0.6.9"
+tokio-util = {version = "0.7.4", features = ["codec"]}
trust-dns-resolver = "^0.20.3"
uuid = "1.1.2"
diff --git a/azalea-protocol/src/connect.rs b/azalea-protocol/src/connect.rs
index bd55e406..d7b9bd1d 100644
--- a/azalea-protocol/src/connect.rs
+++ b/azalea-protocol/src/connect.rs
@@ -57,7 +57,7 @@ where
/// Write a packet to the server
pub async fn write(&mut self, packet: W) -> std::io::Result<()> {
write_packet(
- packet,
+ &packet,
&mut self.write_stream,
self.compression_threshold,
&mut self.enc_cipher,
diff --git a/azalea-protocol/src/lib.rs b/azalea-protocol/src/lib.rs
index 4da2ba90..58ffac0a 100755
--- a/azalea-protocol/src/lib.rs
+++ b/azalea-protocol/src/lib.rs
@@ -1,5 +1,9 @@
//! This lib is responsible for parsing Minecraft packets.
+// these two are necessary for thiserror backtraces
+#![feature(error_generic_member_access)]
+#![feature(provide_any)]
+
use std::net::IpAddr;
use std::str::FromStr;
@@ -78,12 +82,10 @@ mod tests {
}
.get();
let mut stream = Vec::new();
- write_packet(packet, &mut stream, None, &mut None)
+ write_packet(&packet, &mut stream, None, &mut None)
.await
.unwrap();
- println!("stream: {stream:?}");
-
let mut stream = Cursor::new(stream);
let _ = read_packet::<ServerboundLoginPacket, _>(
@@ -95,4 +97,35 @@ mod tests {
.await
.unwrap();
}
+
+ #[tokio::test]
+ async fn test_double_hello_packet() {
+ let packet = ServerboundHelloPacket {
+ username: "test".to_string(),
+ public_key: Some(ProfilePublicKeyData {
+ expires_at: 0,
+ key: b"idontthinkthisreallymattersijustwantittobelongforthetest".to_vec(),
+ key_signature: b"idontthinkthisreallymattersijustwantittobelongforthetest".to_vec(),
+ }),
+ profile_id: Some(Uuid::from_u128(0)),
+ }
+ .get();
+ let mut stream = Vec::new();
+ write_packet(&packet, &mut stream, None, &mut None)
+ .await
+ .unwrap();
+ write_packet(&packet, &mut stream, None, &mut None)
+ .await
+ .unwrap();
+ let mut stream = Cursor::new(stream);
+
+ let mut buffer = BytesMut::new();
+
+ let _ = read_packet::<ServerboundLoginPacket, _>(&mut stream, &mut buffer, None, &mut None)
+ .await
+ .unwrap();
+ let _ = read_packet::<ServerboundLoginPacket, _>(&mut stream, &mut buffer, None, &mut None)
+ .await
+ .unwrap();
+ }
}
diff --git a/azalea-protocol/src/read.rs b/azalea-protocol/src/read.rs
index eceede9d..4c398e96 100644
--- a/azalea-protocol/src/read.rs
+++ b/azalea-protocol/src/read.rs
@@ -5,16 +5,15 @@ use azalea_crypto::Aes128CfbDec;
use bytes::Buf;
use bytes::BytesMut;
use flate2::read::ZlibDecoder;
+use futures::StreamExt;
use log::{log_enabled, trace};
-use std::io::Cursor;
use std::{
- cell::Cell,
- io::Read,
- pin::Pin,
- task::{Context, Poll},
+ fmt::Debug,
+ io::{Cursor, Read},
};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
+use tokio_util::codec::{BytesCodec, FramedRead};
#[derive(Error, Debug)]
pub enum ReadPacketError {
@@ -28,18 +27,28 @@ pub enum ReadPacketError {
UnknownPacketId { state_name: String, id: u32 },
#[error("Couldn't read packet id")]
ReadPacketId { source: BufReadError },
- #[error("Couldn't decompress packet")]
+ #[error(transparent)]
Decompress {
#[from]
+ #[backtrace]
source: DecompressionError,
},
- #[error("Frame splitter error")]
+ #[error(transparent)]
FrameSplitter {
#[from]
+ #[backtrace]
source: FrameSplitterError,
},
#[error("Leftover data after reading packet {packet_name}: {data:?}")]
LeftoverData { data: Vec<u8>, packet_name: String },
+ #[error(transparent)]
+ IoError {
+ #[from]
+ #[backtrace]
+ source: std::io::Error,
+ },
+ #[error("Connection closed")]
+ ConnectionClosed,
}
#[derive(Error, Debug)]
@@ -52,6 +61,7 @@ pub enum FrameSplitterError {
#[error("Io error")]
Io {
#[from]
+ #[backtrace]
source: std::io::Error,
},
#[error("Packet is longer than {max} bytes (is {size})")]
@@ -77,9 +87,9 @@ fn parse_frame(buffer: &mut BytesMut) -> Result<BytesMut, FrameSplitterError> {
},
};
- if length > buffer_copy.get_ref().len() {
+ if length > buffer_copy.remaining() {
return Err(FrameSplitterError::BadLength {
- max: buffer_copy.get_ref().len(),
+ max: buffer_copy.remaining(),
size: length,
});
}
@@ -88,49 +98,33 @@ fn parse_frame(buffer: &mut BytesMut) -> Result<BytesMut, FrameSplitterError> {
// from the real buffer now
// the length of the varint that says the length of the whole packet
- let varint_length = buffer.len() - buffer_copy.remaining();
- let _ = buffer.split_to(varint_length);
+ let varint_length = buffer.remaining() - buffer_copy.remaining();
+
+ buffer.advance(varint_length);
let data = buffer.split_to(length);
Ok(data)
}
-async fn frame_splitter<'a, R: ?Sized + Sized>(
- stream: &mut R,
- buffer: &'a mut BytesMut,
-) -> Result<Vec<u8>, FrameSplitterError>
-where
- R: AsyncRead + std::marker::Unpin + std::marker::Send,
-{
+fn frame_splitter<'a>(buffer: &'a mut BytesMut) -> Result<Option<Vec<u8>>, FrameSplitterError> {
// https://tokio.rs/tokio/tutorial/framing
- loop {
- let read_frame = parse_frame(buffer);
- match read_frame {
- Ok(frame) => return Ok(frame.to_vec()),
- Err(err) => match err {
- FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => {
- // we probably just haven't read enough yet
- }
- _ => return Err(err),
- },
- }
-
- let read_buf: usize = AsyncReadExt::read_buf(stream, buffer).await?;
- if 0 == read_buf {
- // The remote closed the connection. For this to be
- // a clean shutdown, there should be no data in the
- // read buffer. If there is, this means that the
- // peer closed the socket while sending a frame.
- if buffer.as_ref().is_empty() {
- return Err(FrameSplitterError::ConnectionClosed);
- } else {
- return Err(FrameSplitterError::ConnectionReset);
+ let read_frame = parse_frame(buffer);
+ match read_frame {
+ Ok(frame) => return Ok(Some(frame.to_vec())),
+ Err(err) => match err {
+ FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => {
+ // we probably just haven't read enough yet
}
- }
+ _ => return Err(err),
+ },
}
+
+ Ok(None)
}
-fn packet_decoder<P: ProtocolPacket>(stream: &mut Cursor<&[u8]>) -> Result<P, ReadPacketError> {
+fn packet_decoder<P: ProtocolPacket + Debug>(
+ stream: &mut Cursor<&[u8]>,
+) -> Result<P, ReadPacketError> {
// Packet ID
let packet_id =
u32::var_read_from(stream).map_err(|e| ReadPacketError::ReadPacketId { source: e })?;
@@ -152,6 +146,7 @@ pub enum DecompressionError {
#[error("Io error")]
Io {
#[from]
+ #[backtrace]
source: std::io::Error,
},
#[error("Badly compressed packet - size of {size} is below server threshold of {threshold}")]
@@ -197,42 +192,7 @@ fn compression_decoder(
Ok(decoded_buf)
}
-struct EncryptedStream<'a, R>
-where
- R: AsyncRead + std::marker::Unpin + std::marker::Send,
-{
- cipher: Cell<&'a mut Option<Aes128CfbDec>>,
- stream: &'a mut Pin<&'a mut R>,
-}
-
-impl<R> AsyncRead for EncryptedStream<'_, R>
-where
- R: AsyncRead + Unpin + Send,
-{
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut tokio::io::ReadBuf<'_>,
- ) -> Poll<std::io::Result<()>> {
- // i hate this
- let polled = self.as_mut().stream.as_mut().poll_read(cx, buf);
- match polled {
- Poll::Ready(r) => {
- // if we don't check for the remaining then we decrypt big packets incorrectly
- // (but only on linux and release mode for some reason LMAO)
- if buf.remaining() == 0 {
- if let Some(cipher) = self.as_mut().cipher.get_mut() {
- azalea_crypto::decrypt_packet(cipher, buf.filled_mut());
- }
- }
- Poll::Ready(r)
- }
- Poll::Pending => Poll::Pending,
- }
- }
-}
-
-pub async fn read_packet<'a, P: ProtocolPacket, R>(
+pub async fn read_packet<'a, P: ProtocolPacket + Debug, R>(
stream: &'a mut R,
buffer: &mut BytesMut,
compression_threshold: Option<u32>,
@@ -241,13 +201,29 @@ pub async fn read_packet<'a, P: ProtocolPacket, R>(
where
R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync,
{
- // if we were given a cipher, decrypt the packet
- let mut encrypted_stream = EncryptedStream {
- cipher: Cell::new(cipher),
- stream: &mut Pin::new(stream),
- };
+ let mut framed = FramedRead::new(stream, BytesCodec::new());
+ let mut buf = loop {
+ if let Some(buf) = frame_splitter(buffer)? {
+ // we got a full packet!!
+ break buf;
+ } else {
+ // no full packet yet :( keep reading
+ };
+
+ // if we were given a cipher, decrypt the packet
+ if let Some(message) = framed.next().await {
+ let mut bytes = message.unwrap();
+ println!("bytes: {:?}", bytes.len());
- let mut buf = frame_splitter(&mut encrypted_stream, buffer).await?;
+ if let Some(cipher) = cipher {
+ azalea_crypto::decrypt_packet(cipher, &mut bytes);
+ }
+
+ buffer.extend_from_slice(&bytes);
+ } else {
+ return Err(ReadPacketError::ConnectionClosed);
+ };
+ };
if let Some(compression_threshold) = compression_threshold {
buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)?;
diff --git a/azalea-protocol/src/write.rs b/azalea-protocol/src/write.rs
index b2ae2810..a04979a5 100755
--- a/azalea-protocol/src/write.rs
+++ b/azalea-protocol/src/write.rs
@@ -69,7 +69,7 @@ async fn compression_encoder(
}
pub async fn write_packet<P, W>(
- packet: P,
+ packet: &P,
stream: &mut W,
compression_threshold: Option<u32>,
cipher: &mut Option<Aes128CfbEnc>,
@@ -78,7 +78,7 @@ where
P: ProtocolPacket + Debug,
W: AsyncWrite + Unpin + Send,
{
- let mut buf = packet_encoder(&packet).unwrap();
+ let mut buf = packet_encoder(packet).unwrap();
if let Some(threshold) = compression_threshold {
buf = compression_encoder(&buf, threshold).await.unwrap();
}
diff --git a/azalea/src/bot.rs b/azalea/src/bot.rs
index a77e2a1c..1570fa5e 100644
--- a/azalea/src/bot.rs
+++ b/azalea/src/bot.rs
@@ -36,10 +36,12 @@ impl crate::Plugin for Plugin {
async fn handle(self: Arc<Self>, mut bot: Client, event: Arc<Event>) {
if let Event::Tick = *event {
let mut state = self.state.lock();
- if bot.jumping() {
- state.jumping_once = false;
- } else if state.jumping_once {
- bot.set_jumping(true);
+ if state.jumping_once {
+ if bot.jumping() {
+ state.jumping_once = false;
+ } else {
+ bot.set_jumping(true);
+ }
}
}
}
diff --git a/bot/src/main.rs b/bot/src/main.rs
index 0a291fd8..2f79ad26 100644
--- a/bot/src/main.rs
+++ b/bot/src/main.rs
@@ -25,7 +25,7 @@ async fn main() {
async fn handle(bot: Client, event: Arc<Event>, _state: Arc<Mutex<State>>) -> anyhow::Result<()> {
if let Event::Tick = *event {
- bot.jump();
+ // bot.jump();
}
Ok(())