1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
use crate::{prelude::*, ticker, Chan, RecvWorker, RudpShare};
use std::{collections::HashMap, io, sync::Arc, time::Duration};
use tokio::{
sync::{mpsc, watch, Mutex, RwLock},
task::JoinSet,
};
pub async fn new<S: UdpSender, R: UdpReceiver>(
id: u16,
remote_id: u16,
udp_tx: S,
udp_rx: R,
) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
let (close_tx, close_rx) = watch::channel(false);
let share = Arc::new(RudpShare {
id,
remote_id: RwLock::new(remote_id),
udp_tx,
close_tx,
chans: (0..NUM_CHANS)
.map(|_| {
Mutex::new(Chan {
acks: HashMap::new(),
seqnum: INIT_SEQNUM,
})
})
.collect(),
tasks: Mutex::new(JoinSet::new()),
});
let mut tasks = share.tasks.lock().await;
let recv_share = Arc::clone(&share);
let recv_close = close_rx.clone();
tasks
/*.build_task()
.name("recv")*/
.spawn(async move {
let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
worker.run().await;
});
let resend_share = Arc::clone(&share);
let mut resend_close = close_rx.clone();
tasks
/*.build_task()
.name("resend")*/
.spawn(async move {
ticker!(Duration::from_millis(500), resend_close, {
for chan in resend_share.chans.iter() {
for (_, ack) in chan.lock().await.acks.iter() {
resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
}
}
});
});
let ping_share = Arc::clone(&share);
let mut ping_close = close_rx.clone();
tasks
/*.build_task()
.name("ping")*/
.spawn(async move {
ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
ping_share
.send(
PktType::Ctl,
Pkt {
chan: 0,
unrel: false,
data: &[CtlType::Ping as u8],
},
)
.await
.ok();
});
});
drop(tasks);
Ok((
RudpSender {
share: Arc::clone(&share),
},
RudpReceiver { share, pkt_rx },
))
}
|