diff options
author | Lizzy Fleckenstein <eliasfleckenstein@web.de> | 2023-02-15 21:54:25 +0100 |
---|---|---|
committer | Lizzy Fleckenstein <eliasfleckenstein@web.de> | 2023-02-15 21:55:47 +0100 |
commit | 1d4ebed25ff3e05d2fac70a040901fd3ea3029eb (patch) | |
tree | e4d8d52177ffd907064a6dc19d61bd3aa482ffcd /src/share.rs | |
parent | 45d7cd0049b9349de428945c4a7c9b73cb0f461d (diff) | |
download | mt_rudp-1d4ebed25ff3e05d2fac70a040901fd3ea3029eb.tar.xz |
Rework structure
Diffstat (limited to 'src/share.rs')
-rw-r--r-- | src/share.rs | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/src/share.rs b/src/share.rs new file mode 100644 index 0000000..e0d2d2b --- /dev/null +++ b/src/share.rs @@ -0,0 +1,112 @@ +use super::*; +use std::{collections::HashMap, io, sync::Arc, time::Duration}; +use tokio::{ + sync::{mpsc, watch, Mutex, RwLock}, + task::JoinSet, +}; + +#[derive(Debug)] +pub(crate) struct Ack { + pub(crate) tx: watch::Sender<bool>, + pub(crate) rx: watch::Receiver<bool>, + pub(crate) data: Vec<u8>, +} + +#[derive(Debug)] +pub(crate) struct Chan { + pub(crate) acks: HashMap<u16, Ack>, + pub(crate) seqnum: u16, +} + +#[derive(Debug)] +pub(crate) struct RudpShare<S: UdpSender> { + pub(crate) id: u16, + pub(crate) remote_id: RwLock<u16>, + pub(crate) chans: Vec<Mutex<Chan>>, + pub(crate) udp_tx: S, + pub(crate) close_tx: watch::Sender<bool>, + pub(crate) tasks: Mutex<JoinSet<()>>, +} + +pub async fn new<S: UdpSender, R: UdpReceiver>( + id: u16, + remote_id: u16, + udp_tx: S, + udp_rx: R, +) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> { + let (pkt_tx, pkt_rx) = mpsc::unbounded_channel(); + let (close_tx, close_rx) = watch::channel(false); + + let share = Arc::new(RudpShare { + id, + remote_id: RwLock::new(remote_id), + udp_tx, + close_tx, + chans: (0..NUM_CHANS) + .map(|_| { + Mutex::new(Chan { + acks: HashMap::new(), + seqnum: INIT_SEQNUM, + }) + }) + .collect(), + tasks: Mutex::new(JoinSet::new()), + }); + + let mut tasks = share.tasks.lock().await; + + let recv_share = Arc::clone(&share); + let recv_close = close_rx.clone(); + tasks + /*.build_task() + .name("recv")*/ + .spawn(async move { + let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx); + worker.run().await; + }); + + let resend_share = Arc::clone(&share); + let mut resend_close = close_rx.clone(); + tasks + /*.build_task() + .name("resend")*/ + .spawn(async move { + ticker!(Duration::from_millis(500), resend_close, { + for chan in resend_share.chans.iter() { + for (_, ack) in chan.lock().await.acks.iter() { + resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?) + } + } + }); + }); + + let ping_share = Arc::clone(&share); + let mut ping_close = close_rx.clone(); + tasks + /*.build_task() + .name("ping")*/ + .spawn(async move { + ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, { + ping_share + .send( + PktType::Ctl, + Pkt { + chan: 0, + unrel: false, + data: &[CtlType::Ping as u8], + }, + ) + .await + .ok(); + }); + }); + + drop(tasks); + + Ok(( + RudpSender { + share: Arc::clone(&share), + }, + RudpReceiver { share, pkt_rx }, + )) +} |