aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/client.rs6
-rw-r--r--src/main.rs2
-rw-r--r--src/recv_worker.rs72
3 files changed, 57 insertions, 23 deletions
diff --git a/src/client.rs b/src/client.rs
index e486488..81c1bfb 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -19,7 +19,7 @@ pub struct Receiver {
impl UdpReceiver for Receiver {
fn recv(&self) -> io::Result<Vec<u8>> {
let mut buffer = Vec::new();
- buffer.resize(crate::UDP_PKT_SIZE, 0);
+ buffer.resize(UDP_PKT_SIZE, 0);
let len = self.sock.recv(&mut buffer)?;
buffer.truncate(len);
@@ -28,11 +28,11 @@ impl UdpReceiver for Receiver {
}
}
-pub fn connect(addr: &str) -> io::Result<(crate::RudpSender<Sender>, crate::RudpReceiver<Sender>)> {
+pub fn connect(addr: &str) -> io::Result<(RudpSender<Sender>, RudpReceiver<Sender>)> {
let sock = Arc::new(net::UdpSocket::bind("0.0.0.0:0")?);
sock.connect(addr)?;
- Ok(crate::new(
+ Ok(new(
PeerID::Srv as u16,
PeerID::Nil as u16,
Sender {
diff --git a/src/main.rs b/src/main.rs
index 2a5ff2f..aadb5cc 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,6 @@
#![feature(yeet_expr)]
#![feature(cursor_remaining)]
+#![feature(hash_drain_filter)]
mod client;
pub mod error;
mod recv_worker;
@@ -19,6 +20,7 @@ pub const UDP_PKT_SIZE: usize = 512;
pub const NUM_CHANS: usize = 3;
pub const REL_BUFFER: usize = 0x8000;
pub const INIT_SEQNUM: u16 = 65500;
+pub const TIMEOUT: u64 = 30;
pub trait UdpSender: Send + Sync + 'static {
fn send(&self, data: Vec<u8>) -> io::Result<()>;
diff --git a/src/recv_worker.rs b/src/recv_worker.rs
index 578cf2e..2cd8197 100644
--- a/src/recv_worker.rs
+++ b/src/recv_worker.rs
@@ -2,25 +2,33 @@ use crate::{error::Error, *};
use byteorder::{BigEndian, ReadBytesExt};
use std::{
cell::Cell,
+ collections::HashMap,
io, result,
- sync::{mpsc, Arc},
+ sync::{mpsc, Arc, Mutex, Weak},
+ thread, time,
};
fn to_seqnum(seqnum: u16) -> usize {
(seqnum as usize) & (REL_BUFFER - 1)
}
-struct RelChan {
+type PktTx = mpsc::Sender<InPkt>;
+type Result = result::Result<(), Error>;
+
+struct Split {
+ timestamp: time::Instant,
+}
+
+struct Chan {
packets: Vec<Cell<Option<Vec<u8>>>>, // in the good old days this used to be called char **
+ splits: HashMap<u16, Split>,
seqnum: u16,
num: u8,
}
-type PktTx = mpsc::Sender<InPkt>;
-type Result = result::Result<(), Error>;
-
pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
share: Arc<RudpShare<S>>,
+ chans: Arc<Vec<Mutex<Chan>>>,
pkt_tx: PktTx,
udp_rx: R,
}
@@ -31,20 +39,41 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
udp_rx,
share,
pkt_tx,
+ chans: Arc::new(
+ (0..NUM_CHANS as u8)
+ .map(|num| {
+ Mutex::new(Chan {
+ num,
+ packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
+ seqnum: INIT_SEQNUM,
+ splits: HashMap::new(),
+ })
+ })
+ .collect(),
+ ),
}
}
pub fn run(&self) {
- let mut recv_chans = (0..NUM_CHANS as u8)
- .map(|num| RelChan {
- num,
- packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
- seqnum: INIT_SEQNUM,
- })
- .collect();
+ let cleanup_chans = Arc::downgrade(&self.chans);
+ thread::spawn(move || {
+ let timeout = time::Duration::from_secs(TIMEOUT);
+
+ while let Some(chans) = Weak::upgrade(&cleanup_chans) {
+ for chan in chans.iter() {
+ let mut ch = chan.lock().unwrap();
+ ch.splits = ch
+ .splits
+ .drain_filter(|_k, v| v.timestamp.elapsed() < timeout)
+ .collect();
+ }
+
+ thread::sleep(timeout);
+ }
+ });
loop {
- if let Err(e) = self.handle(self.recv_pkt(&mut recv_chans)) {
+ if let Err(e) = self.handle(self.recv_pkt()) {
if let Error::LocalDisco = e {
self.share
.send(
@@ -62,7 +91,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
}
}
- fn recv_pkt(&self, chans: &mut Vec<RelChan>) -> Result {
+ fn recv_pkt(&self) -> Result {
use Error::*;
// todo: reset timeout
@@ -76,14 +105,17 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
let peer_id = cursor.read_u16::<BigEndian>()?;
let n_chan = cursor.read_u8()?;
- let chan = chans
- .get_mut(n_chan as usize)
- .ok_or(InvalidChannel(n_chan))?;
-
- self.process_pkt(cursor, chan)
+ let mut chan = self
+ .chans
+ .get(n_chan as usize)
+ .ok_or(InvalidChannel(n_chan))?
+ .lock()
+ .unwrap();
+
+ self.process_pkt(cursor, &mut chan)
}
- fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut RelChan) -> Result {
+ fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut Chan) -> Result {
use CtlType::*;
use Error::*;
use PktType::*;