diff options
Diffstat (limited to 'src/recv.rs')
-rw-r--r-- | src/recv.rs | 55 |
1 files changed, 38 insertions, 17 deletions
diff --git a/src/recv.rs b/src/recv.rs index 15811f2..2fabe3a 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -5,6 +5,7 @@ use std::{ cell::{Cell, OnceCell}, collections::HashMap, io, + pin::Pin, sync::Arc, time::{Duration, Instant}, }; @@ -65,6 +66,8 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { } pub async fn run(&self) { + use Error::*; + let cleanup_chans = Arc::clone(&self.chans); let mut cleanup_close = self.close.clone(); self.share @@ -90,36 +93,54 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { }); let mut close = self.close.clone(); + let timeout = tokio::time::sleep(Duration::from_secs(TIMEOUT)); + tokio::pin!(timeout); + loop { - if let Err(e) = self.handle(self.recv_pkt(&mut close).await) { - if let Error::LocalDisco = e { - self.share - .send( - PktType::Ctl, - Pkt { - unrel: true, - chan: 0, - data: &[CtlType::Disco as u8], - }, - ) - .await - .ok(); + if let Err(e) = self.handle(self.recv_pkt(&mut close, timeout.as_mut()).await) { + // TODO: figure out whether this is a good idea + if let RemoteDisco(to) = e { + self.pkt_tx.send(Err(RemoteDisco(to))).ok(); } + + match e { + // anon5's mt notifies the peer on timeout, C++ MT does not + LocalDisco /*| RemoteDisco(true)*/ => drop( + self.share + .send( + PktType::Ctl, + Pkt { + unrel: true, + chan: 0, + data: &[CtlType::Disco as u8], + }, + ) + .await + .ok(), + ), + _ => {} + } + break; } } } - async fn recv_pkt(&self, close: &mut watch::Receiver<bool>) -> Result<()> { + async fn recv_pkt( + &self, + close: &mut watch::Receiver<bool>, + timeout: Pin<&mut tokio::time::Sleep>, + ) -> Result<()> { use Error::*; // TODO: reset timeout let mut cursor = io::Cursor::new(tokio::select! { pkt = self.udp_rx.recv() => pkt?, + _ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)), _ = close.changed() => return Err(LocalDisco), }); - println!("recv"); + timeout.reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT)); let proto_id = cursor.read_u32::<BigEndian>()?; if proto_id != PROTO_ID { @@ -179,7 +200,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { } CtlType::Disco => { println!("Disco"); - return Err(RemoteDisco); + return Err(RemoteDisco(false)); } }, PktType::Orig => { @@ -275,7 +296,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { match res { Ok(v) => Ok(v), - Err(RemoteDisco) => Err(RemoteDisco), + Err(RemoteDisco(to)) => Err(RemoteDisco(to)), Err(LocalDisco) => Err(LocalDisco), Err(e) => Ok(self.pkt_tx.send(Err(e))?), } |