aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/error.rs10
-rw-r--r--src/main.rs1
-rw-r--r--src/new.rs18
-rw-r--r--src/recv.rs55
-rw-r--r--src/send.rs2
5 files changed, 57 insertions, 29 deletions
diff --git a/src/error.rs b/src/error.rs
index f434804..7853f59 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -13,7 +13,7 @@ pub enum Error {
PeerIDAlreadySet,
InvalidChunkIndex(usize, usize),
InvalidChunkCount(usize, usize),
- RemoteDisco,
+ RemoteDisco(bool),
LocalDisco,
}
@@ -44,7 +44,7 @@ impl From<SendError<InPkt>> for Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use Error::*;
- write!(f, "RUDP error: ")?;
+ write!(f, "rudp: ")?;
match self {
IoError(err) => write!(f, "IO error: {}", err),
@@ -55,7 +55,11 @@ impl fmt::Display for Error {
PeerIDAlreadySet => write!(f, "peer ID already set"),
InvalidChunkIndex(i, n) => write!(f, "chunk index {i} bigger than chunk count {n}"),
InvalidChunkCount(o, n) => write!(f, "chunk count changed from {o} to {n}"),
- RemoteDisco => write!(f, "remote disconnected"),
+ RemoteDisco(to) => write!(
+ f,
+ "remote disconnected{}",
+ if *to { " (timeout)" } else { "" }
+ ),
LocalDisco => write!(f, "local disconnected"),
}
}
diff --git a/src/main.rs b/src/main.rs
index 69c8797..9a85183 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -186,7 +186,6 @@ async fn example(tx: &RudpSender<Client>, rx: &mut RudpReceiver<Client>) -> io::
data: &mtpkt,
})
.await?;
-
// handle incoming packets
while let Some(result) = rx.recv().await {
match result {
diff --git a/src/new.rs b/src/new.rs
index 6d4987e..9f820a9 100644
--- a/src/new.rs
+++ b/src/new.rs
@@ -59,13 +59,17 @@ pub async fn new<S: UdpSender, R: UdpReceiver>(
.name("ping")*/
.spawn(async move {
ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
- let pkt: Pkt<&[u8]> = Pkt {
- chan: 0,
- unrel: false,
- data: &[CtlType::Ping as u8],
- };
-
- ping_share.send(PktType::Ctl, pkt).await.ok();
+ ping_share
+ .send(
+ PktType::Ctl,
+ Pkt {
+ chan: 0,
+ unrel: false,
+ data: &[CtlType::Ping as u8],
+ },
+ )
+ .await
+ .ok();
});
});
diff --git a/src/recv.rs b/src/recv.rs
index 15811f2..2fabe3a 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -5,6 +5,7 @@ use std::{
cell::{Cell, OnceCell},
collections::HashMap,
io,
+ pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
@@ -65,6 +66,8 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
}
pub async fn run(&self) {
+ use Error::*;
+
let cleanup_chans = Arc::clone(&self.chans);
let mut cleanup_close = self.close.clone();
self.share
@@ -90,36 +93,54 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
});
let mut close = self.close.clone();
+ let timeout = tokio::time::sleep(Duration::from_secs(TIMEOUT));
+ tokio::pin!(timeout);
+
loop {
- if let Err(e) = self.handle(self.recv_pkt(&mut close).await) {
- if let Error::LocalDisco = e {
- self.share
- .send(
- PktType::Ctl,
- Pkt {
- unrel: true,
- chan: 0,
- data: &[CtlType::Disco as u8],
- },
- )
- .await
- .ok();
+ if let Err(e) = self.handle(self.recv_pkt(&mut close, timeout.as_mut()).await) {
+ // TODO: figure out whether this is a good idea
+ if let RemoteDisco(to) = e {
+ self.pkt_tx.send(Err(RemoteDisco(to))).ok();
}
+
+ match e {
+ // anon5's mt notifies the peer on timeout, C++ MT does not
+ LocalDisco /*| RemoteDisco(true)*/ => drop(
+ self.share
+ .send(
+ PktType::Ctl,
+ Pkt {
+ unrel: true,
+ chan: 0,
+ data: &[CtlType::Disco as u8],
+ },
+ )
+ .await
+ .ok(),
+ ),
+ _ => {}
+ }
+
break;
}
}
}
- async fn recv_pkt(&self, close: &mut watch::Receiver<bool>) -> Result<()> {
+ async fn recv_pkt(
+ &self,
+ close: &mut watch::Receiver<bool>,
+ timeout: Pin<&mut tokio::time::Sleep>,
+ ) -> Result<()> {
use Error::*;
// TODO: reset timeout
let mut cursor = io::Cursor::new(tokio::select! {
pkt = self.udp_rx.recv() => pkt?,
+ _ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)),
_ = close.changed() => return Err(LocalDisco),
});
- println!("recv");
+ timeout.reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
let proto_id = cursor.read_u32::<BigEndian>()?;
if proto_id != PROTO_ID {
@@ -179,7 +200,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
}
CtlType::Disco => {
println!("Disco");
- return Err(RemoteDisco);
+ return Err(RemoteDisco(false));
}
},
PktType::Orig => {
@@ -275,7 +296,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
match res {
Ok(v) => Ok(v),
- Err(RemoteDisco) => Err(RemoteDisco),
+ Err(RemoteDisco(to)) => Err(RemoteDisco(to)),
Err(LocalDisco) => Err(LocalDisco),
Err(e) => Ok(self.pkt_tx.send(Err(e))?),
}
diff --git a/src/send.rs b/src/send.rs
index 89c15c7..20308e6 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -42,7 +42,7 @@ impl<S: UdpSender> RudpShare<S> {
data: buf,
},
);
- chan.seqnum += 1;
+ chan.seqnum = chan.seqnum.overflowing_add(1).0;
Ok(Some(rx))
}