From 12bfebc06ed29fabbc4a4357e314b8fbde1b552d Mon Sep 17 00:00:00 2001 From: Lizzy Fleckenstein Date: Fri, 6 Jan 2023 19:48:23 +0100 Subject: timeouts --- src/recv.rs | 55 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 17 deletions(-) (limited to 'src/recv.rs') diff --git a/src/recv.rs b/src/recv.rs index 15811f2..2fabe3a 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -5,6 +5,7 @@ use std::{ cell::{Cell, OnceCell}, collections::HashMap, io, + pin::Pin, sync::Arc, time::{Duration, Instant}, }; @@ -65,6 +66,8 @@ impl RecvWorker { } pub async fn run(&self) { + use Error::*; + let cleanup_chans = Arc::clone(&self.chans); let mut cleanup_close = self.close.clone(); self.share @@ -90,36 +93,54 @@ impl RecvWorker { }); let mut close = self.close.clone(); + let timeout = tokio::time::sleep(Duration::from_secs(TIMEOUT)); + tokio::pin!(timeout); + loop { - if let Err(e) = self.handle(self.recv_pkt(&mut close).await) { - if let Error::LocalDisco = e { - self.share - .send( - PktType::Ctl, - Pkt { - unrel: true, - chan: 0, - data: &[CtlType::Disco as u8], - }, - ) - .await - .ok(); + if let Err(e) = self.handle(self.recv_pkt(&mut close, timeout.as_mut()).await) { + // TODO: figure out whether this is a good idea + if let RemoteDisco(to) = e { + self.pkt_tx.send(Err(RemoteDisco(to))).ok(); } + + match e { + // anon5's mt notifies the peer on timeout, C++ MT does not + LocalDisco /*| RemoteDisco(true)*/ => drop( + self.share + .send( + PktType::Ctl, + Pkt { + unrel: true, + chan: 0, + data: &[CtlType::Disco as u8], + }, + ) + .await + .ok(), + ), + _ => {} + } + break; } } } - async fn recv_pkt(&self, close: &mut watch::Receiver) -> Result<()> { + async fn recv_pkt( + &self, + close: &mut watch::Receiver, + timeout: Pin<&mut tokio::time::Sleep>, + ) -> Result<()> { use Error::*; // TODO: reset timeout let mut cursor = io::Cursor::new(tokio::select! { pkt = self.udp_rx.recv() => pkt?, + _ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)), _ = close.changed() => return Err(LocalDisco), }); - println!("recv"); + timeout.reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT)); let proto_id = cursor.read_u32::()?; if proto_id != PROTO_ID { @@ -179,7 +200,7 @@ impl RecvWorker { } CtlType::Disco => { println!("Disco"); - return Err(RemoteDisco); + return Err(RemoteDisco(false)); } }, PktType::Orig => { @@ -275,7 +296,7 @@ impl RecvWorker { match res { Ok(v) => Ok(v), - Err(RemoteDisco) => Err(RemoteDisco), + Err(RemoteDisco(to)) => Err(RemoteDisco(to)), Err(LocalDisco) => Err(LocalDisco), Err(e) => Ok(self.pkt_tx.send(Err(e))?), } -- cgit v1.2.3