diff options
Diffstat (limited to 'src/recv_worker.rs')
-rw-r--r-- | src/recv_worker.rs | 34 |
1 files changed, 19 insertions, 15 deletions
diff --git a/src/recv_worker.rs b/src/recv_worker.rs index 2cd8197..60cadeb 100644 --- a/src/recv_worker.rs +++ b/src/recv_worker.rs @@ -3,17 +3,17 @@ use byteorder::{BigEndian, ReadBytesExt}; use std::{ cell::Cell, collections::HashMap, - io, result, - sync::{mpsc, Arc, Mutex, Weak}, - thread, time, + io, + sync::{Arc, Weak}, + time, }; +use tokio::sync::{mpsc, Mutex}; fn to_seqnum(seqnum: u16) -> usize { (seqnum as usize) & (REL_BUFFER - 1) } -type PktTx = mpsc::Sender<InPkt>; -type Result = result::Result<(), Error>; +type Result = std::result::Result<(), Error>; struct Split { timestamp: time::Instant, @@ -29,12 +29,12 @@ struct Chan { pub struct RecvWorker<R: UdpReceiver, S: UdpSender> { share: Arc<RudpShare<S>>, chans: Arc<Vec<Mutex<Chan>>>, - pkt_tx: PktTx, + pkt_tx: mpsc::UnboundedSender<InPkt>, udp_rx: R, } impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { - pub fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: PktTx) -> Self { + pub async fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: mpsc::UnboundedSender<InPkt>) { Self { udp_rx, share, @@ -52,28 +52,31 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { .collect(), ), } + .run() + .await } - pub fn run(&self) { + pub async fn run(&self) { let cleanup_chans = Arc::downgrade(&self.chans); - thread::spawn(move || { + tokio::spawn(async move { let timeout = time::Duration::from_secs(TIMEOUT); + let mut interval = tokio::time::interval(timeout); while let Some(chans) = Weak::upgrade(&cleanup_chans) { for chan in chans.iter() { - let mut ch = chan.lock().unwrap(); + let mut ch = chan.lock().await; ch.splits = ch .splits .drain_filter(|_k, v| v.timestamp.elapsed() < timeout) .collect(); } - thread::sleep(timeout); + interval.tick().await; } }); loop { - if let Err(e) = self.handle(self.recv_pkt()) { + if let Err(e) = self.handle(self.recv_pkt().await) { if let Error::LocalDisco = e { self.share .send( @@ -84,6 +87,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { data: &[CtlType::Disco as u8], }, ) + .await .ok(); } break; @@ -91,11 +95,11 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { } } - fn recv_pkt(&self) -> Result { + async fn recv_pkt(&self) -> Result { use Error::*; // todo: reset timeout - let mut cursor = io::Cursor::new(self.udp_rx.recv()?); + let mut cursor = io::Cursor::new(self.udp_rx.recv().await?); let proto_id = cursor.read_u32::<BigEndian>()?; if proto_id != PROTO_ID { @@ -110,7 +114,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { .get(n_chan as usize) .ok_or(InvalidChannel(n_chan))? .lock() - .unwrap(); + .await; self.process_pkt(cursor, &mut chan) } |