aboutsummaryrefslogtreecommitdiff
path: root/src/recv_worker.rs
diff options
context:
space:
mode:
authorLizzy Fleckenstein <eliasfleckenstein@web.de>2023-01-06 17:45:16 +0100
committerLizzy Fleckenstein <eliasfleckenstein@web.de>2023-01-06 17:45:16 +0100
commitfd23bb3a2b57d43c115005dcd70f1e18bb005032 (patch)
tree7fa77c2db1faa55685e24a180bbd419a78d7be53 /src/recv_worker.rs
parentd3b8019227137853406891e2aa84e0c8a9e3c31c (diff)
downloadmt_rudp-fd23bb3a2b57d43c115005dcd70f1e18bb005032.tar.xz
clean shutdown; send reliables
Diffstat (limited to 'src/recv_worker.rs')
-rw-r--r--src/recv_worker.rs256
1 files changed, 0 insertions, 256 deletions
diff --git a/src/recv_worker.rs b/src/recv_worker.rs
deleted file mode 100644
index 83b3273..0000000
--- a/src/recv_worker.rs
+++ /dev/null
@@ -1,256 +0,0 @@
-use crate::{error::Error, *};
-use async_recursion::async_recursion;
-use byteorder::{BigEndian, ReadBytesExt};
-use std::{
- cell::{Cell, OnceCell},
- collections::HashMap,
- io,
- sync::{Arc, Weak},
- time,
-};
-use tokio::sync::{mpsc, Mutex};
-
-fn to_seqnum(seqnum: u16) -> usize {
- (seqnum as usize) & (REL_BUFFER - 1)
-}
-
-type Result<T> = std::result::Result<T, Error>;
-
-struct Split {
- timestamp: Option<time::Instant>,
- chunks: Vec<OnceCell<Vec<u8>>>,
- got: usize,
-}
-
-struct Chan {
- packets: Vec<Cell<Option<Vec<u8>>>>, // char ** 😛
- splits: HashMap<u16, Split>,
- seqnum: u16,
- num: u8,
-}
-
-pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
- share: Arc<RudpShare<S>>,
- chans: Arc<Vec<Mutex<Chan>>>,
- pkt_tx: mpsc::UnboundedSender<InPkt>,
- udp_rx: R,
-}
-
-impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
- pub fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: mpsc::UnboundedSender<InPkt>) -> Self {
- Self {
- udp_rx,
- share,
- pkt_tx,
- chans: Arc::new(
- (0..NUM_CHANS as u8)
- .map(|num| {
- Mutex::new(Chan {
- num,
- packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
- seqnum: INIT_SEQNUM,
- splits: HashMap::new(),
- })
- })
- .collect(),
- ),
- }
- }
-
- pub async fn run(&self) {
- let cleanup_chans = Arc::downgrade(&self.chans);
- tokio::spawn(async move {
- let timeout = time::Duration::from_secs(TIMEOUT);
- let mut interval = tokio::time::interval(timeout);
-
- while let Some(chans) = Weak::upgrade(&cleanup_chans) {
- for chan in chans.iter() {
- let mut ch = chan.lock().await;
- ch.splits = ch
- .splits
- .drain_filter(
- |_k, v| !matches!(v.timestamp, Some(t) if t.elapsed() < timeout),
- )
- .collect();
- }
-
- interval.tick().await;
- }
- });
-
- loop {
- if let Err(e) = self.handle(self.recv_pkt().await) {
- if let Error::LocalDisco = e {
- self.share
- .send(
- PktType::Ctl,
- Pkt {
- unrel: true,
- chan: 0,
- data: &[CtlType::Disco as u8],
- },
- )
- .await
- .ok();
- }
- break;
- }
- }
- }
-
- async fn recv_pkt(&self) -> Result<()> {
- use Error::*;
-
- // todo: reset timeout
- let mut cursor = io::Cursor::new(self.udp_rx.recv().await?);
-
- let proto_id = cursor.read_u32::<BigEndian>()?;
- if proto_id != PROTO_ID {
- return Err(InvalidProtoId(proto_id));
- }
-
- let _peer_id = cursor.read_u16::<BigEndian>()?;
-
- let n_chan = cursor.read_u8()?;
- let mut chan = self
- .chans
- .get(n_chan as usize)
- .ok_or(InvalidChannel(n_chan))?
- .lock()
- .await;
-
- self.process_pkt(cursor, true, &mut chan).await
- }
-
- #[async_recursion]
- async fn process_pkt(
- &self,
- mut cursor: io::Cursor<Vec<u8>>,
- unrel: bool,
- chan: &mut Chan,
- ) -> Result<()> {
- use Error::*;
-
- match cursor.read_u8()?.try_into()? {
- PktType::Ctl => match cursor.read_u8()?.try_into()? {
- CtlType::Ack => {
- let seqnum = cursor.read_u16::<BigEndian>()?;
- if let Some((tx, _)) = self.share.ack_chans.lock().await.remove(&seqnum) {
- tx.send(true).ok();
- }
- }
- CtlType::SetPeerID => {
- let mut id = self.share.remote_id.write().await;
-
- if *id != PeerID::Nil as u16 {
- return Err(PeerIDAlreadySet);
- }
-
- *id = cursor.read_u16::<BigEndian>()?;
- }
- CtlType::Ping => {}
- CtlType::Disco => return Err(RemoteDisco),
- },
- PktType::Orig => {
- println!("Orig");
-
- self.pkt_tx.send(Ok(Pkt {
- chan: chan.num,
- unrel,
- data: cursor.remaining_slice().into(),
- }))?;
- }
- PktType::Split => {
- println!("Split");
-
- let seqnum = cursor.read_u16::<BigEndian>()?;
- let chunk_index = cursor.read_u16::<BigEndian>()? as usize;
- let chunk_count = cursor.read_u16::<BigEndian>()? as usize;
-
- let mut split = chan.splits.entry(seqnum).or_insert_with(|| Split {
- got: 0,
- chunks: (0..chunk_count).map(|_| OnceCell::new()).collect(),
- timestamp: None,
- });
-
- if split.chunks.len() != chunk_count {
- return Err(InvalidChunkCount(split.chunks.len(), chunk_count));
- }
-
- if split
- .chunks
- .get(chunk_index)
- .ok_or(InvalidChunkIndex(chunk_index, chunk_count))?
- .set(cursor.remaining_slice().into())
- .is_ok()
- {
- split.got += 1;
- }
-
- split.timestamp = if unrel {
- Some(time::Instant::now())
- } else {
- None
- };
-
- if split.got == chunk_count {
- self.pkt_tx.send(Ok(Pkt {
- chan: chan.num,
- unrel,
- data: split
- .chunks
- .iter()
- .flat_map(|chunk| chunk.get().unwrap().iter())
- .copied()
- .collect(),
- }))?;
-
- chan.splits.remove(&seqnum);
- }
- }
- PktType::Rel => {
- println!("Rel");
-
- let seqnum = cursor.read_u16::<BigEndian>()?;
- chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into()));
-
- let mut ack_data = Vec::with_capacity(3);
- ack_data.write_u8(CtlType::Ack as u8)?;
- ack_data.write_u16::<BigEndian>(seqnum)?;
-
- self.share
- .send(
- PktType::Ctl,
- Pkt {
- unrel: true,
- chan: chan.num,
- data: &ack_data,
- },
- )
- .await?;
-
- fn next_pkt(chan: &mut Chan) -> Option<Vec<u8>> {
- chan.packets[to_seqnum(chan.seqnum)].take()
- }
-
- while let Some(pkt) = next_pkt(chan) {
- self.handle(self.process_pkt(io::Cursor::new(pkt), false, chan).await)?;
- chan.seqnum = chan.seqnum.overflowing_add(1).0;
- }
- }
- }
-
- Ok(())
- }
-
- fn handle(&self, res: Result<()>) -> Result<()> {
- use Error::*;
-
- match res {
- Ok(v) => Ok(v),
- Err(RemoteDisco) => Err(RemoteDisco),
- Err(LocalDisco) => Err(LocalDisco),
- Err(e) => Ok(self.pkt_tx.send(Err(e))?),
- }
- }
-}