diff options
author | Lizzy Fleckenstein <eliasfleckenstein@web.de> | 2022-12-23 15:31:10 +0100 |
---|---|---|
committer | Lizzy Fleckenstein <eliasfleckenstein@web.de> | 2022-12-23 15:31:10 +0100 |
commit | 7b7721a8b5c73cef37d75a6f96e2d8a55668bbd5 (patch) | |
tree | bb16f12439efbb13a13886ab8d8b8dcf9c2178da | |
parent | 98b4178e7bbdc256cf270763a59d3dc7c8d3f957 (diff) | |
download | mt_rudp-7b7721a8b5c73cef37d75a6f96e2d8a55668bbd5.tar.xz |
splits infrastructure
-rw-r--r-- | src/client.rs | 6 | ||||
-rw-r--r-- | src/main.rs | 2 | ||||
-rw-r--r-- | src/recv_worker.rs | 72 |
3 files changed, 57 insertions, 23 deletions
diff --git a/src/client.rs b/src/client.rs index e486488..81c1bfb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,7 +19,7 @@ pub struct Receiver { impl UdpReceiver for Receiver { fn recv(&self) -> io::Result<Vec<u8>> { let mut buffer = Vec::new(); - buffer.resize(crate::UDP_PKT_SIZE, 0); + buffer.resize(UDP_PKT_SIZE, 0); let len = self.sock.recv(&mut buffer)?; buffer.truncate(len); @@ -28,11 +28,11 @@ impl UdpReceiver for Receiver { } } -pub fn connect(addr: &str) -> io::Result<(crate::RudpSender<Sender>, crate::RudpReceiver<Sender>)> { +pub fn connect(addr: &str) -> io::Result<(RudpSender<Sender>, RudpReceiver<Sender>)> { let sock = Arc::new(net::UdpSocket::bind("0.0.0.0:0")?); sock.connect(addr)?; - Ok(crate::new( + Ok(new( PeerID::Srv as u16, PeerID::Nil as u16, Sender { diff --git a/src/main.rs b/src/main.rs index 2a5ff2f..aadb5cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ #![feature(yeet_expr)] #![feature(cursor_remaining)] +#![feature(hash_drain_filter)] mod client; pub mod error; mod recv_worker; @@ -19,6 +20,7 @@ pub const UDP_PKT_SIZE: usize = 512; pub const NUM_CHANS: usize = 3; pub const REL_BUFFER: usize = 0x8000; pub const INIT_SEQNUM: u16 = 65500; +pub const TIMEOUT: u64 = 30; pub trait UdpSender: Send + Sync + 'static { fn send(&self, data: Vec<u8>) -> io::Result<()>; diff --git a/src/recv_worker.rs b/src/recv_worker.rs index 578cf2e..2cd8197 100644 --- a/src/recv_worker.rs +++ b/src/recv_worker.rs @@ -2,25 +2,33 @@ use crate::{error::Error, *}; use byteorder::{BigEndian, ReadBytesExt}; use std::{ cell::Cell, + collections::HashMap, io, result, - sync::{mpsc, Arc}, + sync::{mpsc, Arc, Mutex, Weak}, + thread, time, }; fn to_seqnum(seqnum: u16) -> usize { (seqnum as usize) & (REL_BUFFER - 1) } -struct RelChan { +type PktTx = mpsc::Sender<InPkt>; +type Result = result::Result<(), Error>; + +struct Split { + timestamp: time::Instant, +} + +struct Chan { packets: Vec<Cell<Option<Vec<u8>>>>, // in the good old days this used to be called char ** + splits: HashMap<u16, Split>, seqnum: u16, num: u8, } -type PktTx = mpsc::Sender<InPkt>; -type Result = result::Result<(), Error>; - pub struct RecvWorker<R: UdpReceiver, S: UdpSender> { share: Arc<RudpShare<S>>, + chans: Arc<Vec<Mutex<Chan>>>, pkt_tx: PktTx, udp_rx: R, } @@ -31,20 +39,41 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { udp_rx, share, pkt_tx, + chans: Arc::new( + (0..NUM_CHANS as u8) + .map(|num| { + Mutex::new(Chan { + num, + packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(), + seqnum: INIT_SEQNUM, + splits: HashMap::new(), + }) + }) + .collect(), + ), } } pub fn run(&self) { - let mut recv_chans = (0..NUM_CHANS as u8) - .map(|num| RelChan { - num, - packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(), - seqnum: INIT_SEQNUM, - }) - .collect(); + let cleanup_chans = Arc::downgrade(&self.chans); + thread::spawn(move || { + let timeout = time::Duration::from_secs(TIMEOUT); + + while let Some(chans) = Weak::upgrade(&cleanup_chans) { + for chan in chans.iter() { + let mut ch = chan.lock().unwrap(); + ch.splits = ch + .splits + .drain_filter(|_k, v| v.timestamp.elapsed() < timeout) + .collect(); + } + + thread::sleep(timeout); + } + }); loop { - if let Err(e) = self.handle(self.recv_pkt(&mut recv_chans)) { + if let Err(e) = self.handle(self.recv_pkt()) { if let Error::LocalDisco = e { self.share .send( @@ -62,7 +91,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { } } - fn recv_pkt(&self, chans: &mut Vec<RelChan>) -> Result { + fn recv_pkt(&self) -> Result { use Error::*; // todo: reset timeout @@ -76,14 +105,17 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> { let peer_id = cursor.read_u16::<BigEndian>()?; let n_chan = cursor.read_u8()?; - let chan = chans - .get_mut(n_chan as usize) - .ok_or(InvalidChannel(n_chan))?; - - self.process_pkt(cursor, chan) + let mut chan = self + .chans + .get(n_chan as usize) + .ok_or(InvalidChannel(n_chan))? + .lock() + .unwrap(); + + self.process_pkt(cursor, &mut chan) } - fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut RelChan) -> Result { + fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut Chan) -> Result { use CtlType::*; use Error::*; use PktType::*; |