aboutsummaryrefslogtreecommitdiff
path: root/src/recv_worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/recv_worker.rs')
-rw-r--r--src/recv_worker.rs34
1 files changed, 19 insertions, 15 deletions
diff --git a/src/recv_worker.rs b/src/recv_worker.rs
index 2cd8197..60cadeb 100644
--- a/src/recv_worker.rs
+++ b/src/recv_worker.rs
@@ -3,17 +3,17 @@ use byteorder::{BigEndian, ReadBytesExt};
use std::{
cell::Cell,
collections::HashMap,
- io, result,
- sync::{mpsc, Arc, Mutex, Weak},
- thread, time,
+ io,
+ sync::{Arc, Weak},
+ time,
};
+use tokio::sync::{mpsc, Mutex};
fn to_seqnum(seqnum: u16) -> usize {
(seqnum as usize) & (REL_BUFFER - 1)
}
-type PktTx = mpsc::Sender<InPkt>;
-type Result = result::Result<(), Error>;
+type Result = std::result::Result<(), Error>;
struct Split {
timestamp: time::Instant,
@@ -29,12 +29,12 @@ struct Chan {
pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
share: Arc<RudpShare<S>>,
chans: Arc<Vec<Mutex<Chan>>>,
- pkt_tx: PktTx,
+ pkt_tx: mpsc::UnboundedSender<InPkt>,
udp_rx: R,
}
impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
- pub fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: PktTx) -> Self {
+ pub async fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: mpsc::UnboundedSender<InPkt>) {
Self {
udp_rx,
share,
@@ -52,28 +52,31 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
.collect(),
),
}
+ .run()
+ .await
}
- pub fn run(&self) {
+ pub async fn run(&self) {
let cleanup_chans = Arc::downgrade(&self.chans);
- thread::spawn(move || {
+ tokio::spawn(async move {
let timeout = time::Duration::from_secs(TIMEOUT);
+ let mut interval = tokio::time::interval(timeout);
while let Some(chans) = Weak::upgrade(&cleanup_chans) {
for chan in chans.iter() {
- let mut ch = chan.lock().unwrap();
+ let mut ch = chan.lock().await;
ch.splits = ch
.splits
.drain_filter(|_k, v| v.timestamp.elapsed() < timeout)
.collect();
}
- thread::sleep(timeout);
+ interval.tick().await;
}
});
loop {
- if let Err(e) = self.handle(self.recv_pkt()) {
+ if let Err(e) = self.handle(self.recv_pkt().await) {
if let Error::LocalDisco = e {
self.share
.send(
@@ -84,6 +87,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
data: &[CtlType::Disco as u8],
},
)
+ .await
.ok();
}
break;
@@ -91,11 +95,11 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
}
}
- fn recv_pkt(&self) -> Result {
+ async fn recv_pkt(&self) -> Result {
use Error::*;
// todo: reset timeout
- let mut cursor = io::Cursor::new(self.udp_rx.recv()?);
+ let mut cursor = io::Cursor::new(self.udp_rx.recv().await?);
let proto_id = cursor.read_u32::<BigEndian>()?;
if proto_id != PROTO_ID {
@@ -110,7 +114,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
.get(n_chan as usize)
.ok_or(InvalidChannel(n_chan))?
.lock()
- .unwrap();
+ .await;
self.process_pkt(cursor, &mut chan)
}