diff options
author | Lizzy Fleckenstein <eliasfleckenstein@web.de> | 2022-12-23 15:31:10 +0100 |
---|---|---|
committer | Lizzy Fleckenstein <eliasfleckenstein@web.de> | 2022-12-23 15:31:10 +0100 |
commit | 7b7721a8b5c73cef37d75a6f96e2d8a55668bbd5 (patch) | |
tree | bb16f12439efbb13a13886ab8d8b8dcf9c2178da /src/recv_worker.rs | |
parent | 98b4178e7bbdc256cf270763a59d3dc7c8d3f957 (diff) | |
download | mt_rudp-7b7721a8b5c73cef37d75a6f96e2d8a55668bbd5.tar.xz |
splits infrastructure
Diffstat (limited to 'src/recv_worker.rs')
-rw-r--r-- | src/recv_worker.rs | 72 |
1 files changed, 52 insertions, 20 deletions
diff --git a/src/recv_worker.rs b/src/recv_worker.rs index 578cf2e..2cd8197 100644 --- a/src/recv_worker.rs +++ b/src/recv_worker.rs @@ -2,25 +2,33 @@ use crate::{error::Error, *}; use byteorder::{BigEndian, ReadBytesExt}; use std::{ cell::Cell, + collections::HashMap, io, result, - sync::{mpsc, Arc}, + sync::{mpsc, Arc, Mutex, Weak}, + thread, time, }; fn to_seqnum(seqnum: u16) -> usize { (seqnum as usize) & (REL_BUFFER - 1) } -struct RelChan { +type PktTx = mpsc::Sender<InPkt>; +type Result = result::Result<(), Error>; + +struct Split { + timestamp: time::Instant, +} + +struct Chan { packets: Vec<Cell<Option<Vec<u8>>>>, // in the good old days this used to be called char ** + splits: HashMap<u16, Split>, seqnum: u16, num: u8, } -type PktTx = mpsc::Sender<InPkt>; -type Result = result::Result<(), Error>; - pub struct RecvWorker<R: UdpReceiver, S: UdpSender> { share: Arc<RudpShare<S>>, + chans: Arc<Vec<Mutex<Chan>>>, pkt_tx: PktTx, udp_rx: R, } @@ -31,20 +39,41 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { udp_rx, share, pkt_tx, + chans: Arc::new( + (0..NUM_CHANS as u8) + .map(|num| { + Mutex::new(Chan { + num, + packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(), + seqnum: INIT_SEQNUM, + splits: HashMap::new(), + }) + }) + .collect(), + ), } } pub fn run(&self) { - let mut recv_chans = (0..NUM_CHANS as u8) - .map(|num| RelChan { - num, - packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(), - seqnum: INIT_SEQNUM, - }) - .collect(); + let cleanup_chans = Arc::downgrade(&self.chans); + thread::spawn(move || { + let timeout = time::Duration::from_secs(TIMEOUT); + + while let Some(chans) = Weak::upgrade(&cleanup_chans) { + for chan in chans.iter() { + let mut ch = chan.lock().unwrap(); + ch.splits = ch + .splits + .drain_filter(|_k, v| v.timestamp.elapsed() < timeout) + .collect(); + } + + thread::sleep(timeout); + } + }); loop { - if let Err(e) = self.handle(self.recv_pkt(&mut recv_chans)) { + if let Err(e) = self.handle(self.recv_pkt()) { if let Error::LocalDisco = e { self.share .send( @@ -62,7 +91,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { } } - fn recv_pkt(&self, chans: &mut Vec<RelChan>) -> Result { + fn recv_pkt(&self) -> Result { use Error::*; // todo: reset timeout @@ -76,14 +105,17 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { let peer_id = cursor.read_u16::<BigEndian>()?; let n_chan = cursor.read_u8()?; - let chan = chans - .get_mut(n_chan as usize) - .ok_or(InvalidChannel(n_chan))?; - - self.process_pkt(cursor, chan) + let mut chan = self + .chans + .get(n_chan as usize) + .ok_or(InvalidChannel(n_chan))? + .lock() + .unwrap(); + + self.process_pkt(cursor, &mut chan) } - fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut RelChan) -> Result { + fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut Chan) -> Result { use CtlType::*; use Error::*; use PktType::*; |