aboutsummaryrefslogtreecommitdiff
path: root/src/new.rs
diff options
context:
space:
mode:
authorLizzy Fleckenstein <eliasfleckenstein@web.de>2023-01-06 17:45:16 +0100
committerLizzy Fleckenstein <eliasfleckenstein@web.de>2023-01-06 17:45:16 +0100
commitfd23bb3a2b57d43c115005dcd70f1e18bb005032 (patch)
tree7fa77c2db1faa55685e24a180bbd419a78d7be53 /src/new.rs
parentd3b8019227137853406891e2aa84e0c8a9e3c31c (diff)
downloadmt_rudp-fd23bb3a2b57d43c115005dcd70f1e18bb005032.tar.xz
clean shutdown; send reliables
Diffstat (limited to 'src/new.rs')
-rw-r--r--src/new.rs63
1 files changed, 63 insertions, 0 deletions
diff --git a/src/new.rs b/src/new.rs
new file mode 100644
index 0000000..a70b117
--- /dev/null
+++ b/src/new.rs
@@ -0,0 +1,63 @@
+use crate::*;
+
+pub async fn new<S: UdpSender, R: UdpReceiver>(
+ id: u16,
+ remote_id: u16,
+ udp_tx: S,
+ udp_rx: R,
+) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
+ let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
+ let (close_tx, close_rx) = watch::channel(false);
+
+ let share = Arc::new(RudpShare {
+ id,
+ remote_id: RwLock::new(remote_id),
+ udp_tx,
+ close_tx,
+ chans: (0..NUM_CHANS)
+ .map(|_| {
+ Mutex::new(Chan {
+ acks: HashMap::new(),
+ seqnum: INIT_SEQNUM,
+ })
+ })
+ .collect(),
+ tasks: Mutex::new(JoinSet::new()),
+ });
+
+ let mut tasks = share.tasks.lock().await;
+
+ let recv_share = Arc::clone(&share);
+ let recv_close = close_rx.clone();
+ tasks
+ /*.build_task()
+ .name("recv")*/
+ .spawn(async move {
+ let worker = recv::RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
+ worker.run().await;
+ });
+
+ let resend_share = Arc::clone(&share);
+ let mut resend_close = close_rx.clone();
+ tasks
+ /*.build_task()
+ .name("resend")*/
+ .spawn(async move {
+ ticker!(Duration::from_millis(500), resend_close, {
+ for chan in resend_share.chans.iter() {
+ for (_, ack) in chan.lock().await.acks.iter() {
+ resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
+ }
+ }
+ });
+ });
+
+ drop(tasks);
+
+ Ok((
+ RudpSender {
+ share: Arc::clone(&share),
+ },
+ RudpReceiver { share, pkt_rx },
+ ))
+}