aboutsummaryrefslogtreecommitdiff
path: root/src/recv.rs
diff options
context:
space:
mode:
authorLizzy Fleckenstein <eliasfleckenstein@web.de>2023-01-06 19:48:23 +0100
committerLizzy Fleckenstein <eliasfleckenstein@web.de>2023-01-06 19:48:23 +0100
commit12bfebc06ed29fabbc4a4357e314b8fbde1b552d (patch)
treea5c1db628d00a6c93613d7bbeccb36d3f345ef6f /src/recv.rs
parente061b359fa6bbe142473989ce41a26c1334f7872 (diff)
downloadmt_rudp-12bfebc06ed29fabbc4a4357e314b8fbde1b552d.tar.xz
timeouts
Diffstat (limited to 'src/recv.rs')
-rw-r--r--src/recv.rs55
1 files changed, 38 insertions, 17 deletions
diff --git a/src/recv.rs b/src/recv.rs
index 15811f2..2fabe3a 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -5,6 +5,7 @@ use std::{
cell::{Cell, OnceCell},
collections::HashMap,
io,
+ pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
@@ -65,6 +66,8 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
}
pub async fn run(&self) {
+ use Error::*;
+
let cleanup_chans = Arc::clone(&self.chans);
let mut cleanup_close = self.close.clone();
self.share
@@ -90,36 +93,54 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
});
let mut close = self.close.clone();
+ let timeout = tokio::time::sleep(Duration::from_secs(TIMEOUT));
+ tokio::pin!(timeout);
+
loop {
- if let Err(e) = self.handle(self.recv_pkt(&mut close).await) {
- if let Error::LocalDisco = e {
- self.share
- .send(
- PktType::Ctl,
- Pkt {
- unrel: true,
- chan: 0,
- data: &[CtlType::Disco as u8],
- },
- )
- .await
- .ok();
+ if let Err(e) = self.handle(self.recv_pkt(&mut close, timeout.as_mut()).await) {
+ // TODO: figure out whether this is a good idea
+ if let RemoteDisco(to) = e {
+ self.pkt_tx.send(Err(RemoteDisco(to))).ok();
}
+
+ match e {
+ // anon5's mt notifies the peer on timeout, C++ MT does not
+ LocalDisco /*| RemoteDisco(true)*/ => drop(
+ self.share
+ .send(
+ PktType::Ctl,
+ Pkt {
+ unrel: true,
+ chan: 0,
+ data: &[CtlType::Disco as u8],
+ },
+ )
+ .await
+ .ok(),
+ ),
+ _ => {}
+ }
+
break;
}
}
}
- async fn recv_pkt(&self, close: &mut watch::Receiver<bool>) -> Result<()> {
+ async fn recv_pkt(
+ &self,
+ close: &mut watch::Receiver<bool>,
+ timeout: Pin<&mut tokio::time::Sleep>,
+ ) -> Result<()> {
use Error::*;
// TODO: reset timeout
let mut cursor = io::Cursor::new(tokio::select! {
pkt = self.udp_rx.recv() => pkt?,
+ _ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)),
_ = close.changed() => return Err(LocalDisco),
});
- println!("recv");
+ timeout.reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
let proto_id = cursor.read_u32::<BigEndian>()?;
if proto_id != PROTO_ID {
@@ -179,7 +200,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
}
CtlType::Disco => {
println!("Disco");
- return Err(RemoteDisco);
+ return Err(RemoteDisco(false));
}
},
PktType::Orig => {
@@ -275,7 +296,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
match res {
Ok(v) => Ok(v),
- Err(RemoteDisco) => Err(RemoteDisco),
+ Err(RemoteDisco(to)) => Err(RemoteDisco(to)),
Err(LocalDisco) => Err(LocalDisco),
Err(e) => Ok(self.pkt_tx.send(Err(e))?),
}