aboutsummaryrefslogtreecommitdiff
path: root/src/share.rs
diff options
context:
space:
mode:
authorLizzy Fleckenstein <eliasfleckenstein@web.de>2023-02-15 21:54:25 +0100
committerLizzy Fleckenstein <eliasfleckenstein@web.de>2023-02-15 21:55:47 +0100
commit1d4ebed25ff3e05d2fac70a040901fd3ea3029eb (patch)
treee4d8d52177ffd907064a6dc19d61bd3aa482ffcd /src/share.rs
parent45d7cd0049b9349de428945c4a7c9b73cb0f461d (diff)
downloadmt_rudp-1d4ebed25ff3e05d2fac70a040901fd3ea3029eb.tar.xz
Rework structure
Diffstat (limited to 'src/share.rs')
-rw-r--r--src/share.rs112
1 files changed, 112 insertions, 0 deletions
diff --git a/src/share.rs b/src/share.rs
new file mode 100644
index 0000000..e0d2d2b
--- /dev/null
+++ b/src/share.rs
@@ -0,0 +1,112 @@
+use super::*;
+use std::{collections::HashMap, io, sync::Arc, time::Duration};
+use tokio::{
+ sync::{mpsc, watch, Mutex, RwLock},
+ task::JoinSet,
+};
+
+#[derive(Debug)]
+pub(crate) struct Ack {
+ pub(crate) tx: watch::Sender<bool>,
+ pub(crate) rx: watch::Receiver<bool>,
+ pub(crate) data: Vec<u8>,
+}
+
+#[derive(Debug)]
+pub(crate) struct Chan {
+ pub(crate) acks: HashMap<u16, Ack>,
+ pub(crate) seqnum: u16,
+}
+
+#[derive(Debug)]
+pub(crate) struct RudpShare<S: UdpSender> {
+ pub(crate) id: u16,
+ pub(crate) remote_id: RwLock<u16>,
+ pub(crate) chans: Vec<Mutex<Chan>>,
+ pub(crate) udp_tx: S,
+ pub(crate) close_tx: watch::Sender<bool>,
+ pub(crate) tasks: Mutex<JoinSet<()>>,
+}
+
+pub async fn new<S: UdpSender, R: UdpReceiver>(
+ id: u16,
+ remote_id: u16,
+ udp_tx: S,
+ udp_rx: R,
+) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
+ let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
+ let (close_tx, close_rx) = watch::channel(false);
+
+ let share = Arc::new(RudpShare {
+ id,
+ remote_id: RwLock::new(remote_id),
+ udp_tx,
+ close_tx,
+ chans: (0..NUM_CHANS)
+ .map(|_| {
+ Mutex::new(Chan {
+ acks: HashMap::new(),
+ seqnum: INIT_SEQNUM,
+ })
+ })
+ .collect(),
+ tasks: Mutex::new(JoinSet::new()),
+ });
+
+ let mut tasks = share.tasks.lock().await;
+
+ let recv_share = Arc::clone(&share);
+ let recv_close = close_rx.clone();
+ tasks
+ /*.build_task()
+ .name("recv")*/
+ .spawn(async move {
+ let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
+ worker.run().await;
+ });
+
+ let resend_share = Arc::clone(&share);
+ let mut resend_close = close_rx.clone();
+ tasks
+ /*.build_task()
+ .name("resend")*/
+ .spawn(async move {
+ ticker!(Duration::from_millis(500), resend_close, {
+ for chan in resend_share.chans.iter() {
+ for (_, ack) in chan.lock().await.acks.iter() {
+ resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
+ }
+ }
+ });
+ });
+
+ let ping_share = Arc::clone(&share);
+ let mut ping_close = close_rx.clone();
+ tasks
+ /*.build_task()
+ .name("ping")*/
+ .spawn(async move {
+ ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
+ ping_share
+ .send(
+ PktType::Ctl,
+ Pkt {
+ chan: 0,
+ unrel: false,
+ data: &[CtlType::Ping as u8],
+ },
+ )
+ .await
+ .ok();
+ });
+ });
+
+ drop(tasks);
+
+ Ok((
+ RudpSender {
+ share: Arc::clone(&share),
+ },
+ RudpReceiver { share, pkt_rx },
+ ))
+}