use crate::{error::Error, *}; use byteorder::{BigEndian, ReadBytesExt}; use std::{ cell::Cell, collections::HashMap, io, result, sync::{mpsc, Arc, Mutex, Weak}, thread, time, }; fn to_seqnum(seqnum: u16) -> usize { (seqnum as usize) & (REL_BUFFER - 1) } type PktTx = mpsc::Sender; type Result = result::Result<(), Error>; struct Split { timestamp: time::Instant, } struct Chan { packets: Vec>>>, // in the good old days this used to be called char ** splits: HashMap, seqnum: u16, num: u8, } pub struct RecvWorker { share: Arc>, chans: Arc>>, pkt_tx: PktTx, udp_rx: R, } impl RecvWorker { pub fn new(udp_rx: R, share: Arc>, pkt_tx: PktTx) -> Self { Self { udp_rx, share, pkt_tx, chans: Arc::new( (0..NUM_CHANS as u8) .map(|num| { Mutex::new(Chan { num, packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(), seqnum: INIT_SEQNUM, splits: HashMap::new(), }) }) .collect(), ), } } pub fn run(&self) { let cleanup_chans = Arc::downgrade(&self.chans); thread::spawn(move || { let timeout = time::Duration::from_secs(TIMEOUT); while let Some(chans) = Weak::upgrade(&cleanup_chans) { for chan in chans.iter() { let mut ch = chan.lock().unwrap(); ch.splits = ch .splits .drain_filter(|_k, v| v.timestamp.elapsed() < timeout) .collect(); } thread::sleep(timeout); } }); loop { if let Err(e) = self.handle(self.recv_pkt()) { if let Error::LocalDisco = e { self.share .send( PktType::Ctl, Pkt { unrel: true, chan: 0, data: &[CtlType::Disco as u8], }, ) .ok(); } break; } } } fn recv_pkt(&self) -> Result { use Error::*; // todo: reset timeout let mut cursor = io::Cursor::new(self.udp_rx.recv()?); let proto_id = cursor.read_u32::()?; if proto_id != PROTO_ID { do yeet InvalidProtoId(proto_id); } let peer_id = cursor.read_u16::()?; let n_chan = cursor.read_u8()?; let mut chan = self .chans .get(n_chan as usize) .ok_or(InvalidChannel(n_chan))? .lock() .unwrap(); self.process_pkt(cursor, &mut chan) } fn process_pkt(&self, mut cursor: io::Cursor>, chan: &mut Chan) -> Result { use CtlType::*; use Error::*; use PktType::*; match cursor.read_u8()?.try_into()? { Ctl => match cursor.read_u8()?.try_into()? { Disco => return Err(RemoteDisco), _ => {} }, Orig => { println!("Orig"); self.pkt_tx.send(Ok(Pkt { chan: chan.num, unrel: true, data: cursor.remaining_slice().into(), }))?; } Split => { println!("Split"); dbg!(cursor.remaining_slice()); } Rel => { println!("Rel"); let seqnum = cursor.read_u16::()?; chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into())); while let Some(pkt) = chan.packets[to_seqnum(chan.seqnum)].take() { self.handle(self.process_pkt(io::Cursor::new(pkt), chan))?; chan.seqnum = chan.seqnum.overflowing_add(1).0; } } } Ok(()) } fn handle(&self, res: Result) -> Result { use Error::*; match res { Ok(v) => Ok(v), Err(RemoteDisco) => Err(RemoteDisco), Err(LocalDisco) => Err(LocalDisco), Err(e) => Ok(self.pkt_tx.send(Err(e))?), } } }