diff options
Diffstat (limited to 'src/share.rs')
-rw-r--r-- | src/share.rs | 116 |
1 files changed, 44 insertions, 72 deletions
diff --git a/src/share.rs b/src/share.rs index a2afc4c..02e37b2 100644 --- a/src/share.rs +++ b/src/share.rs @@ -1,10 +1,6 @@ use super::*; -use drop_bomb::DropBomb; -use std::{borrow::Cow, collections::HashMap, io, sync::Arc, time::Duration}; -use tokio::{ - sync::{mpsc, watch, Mutex, RwLock}, - task::JoinSet, -}; +use std::{borrow::Cow, collections::HashMap, io, sync::Arc}; +use tokio::sync::{watch, Mutex, RwLock}; #[derive(Debug)] pub(crate) struct Ack { @@ -20,96 +16,72 @@ pub(crate) struct Chan { } #[derive(Debug)] -pub(crate) struct RudpShare<S: UdpSender> { +pub(crate) struct RudpShare<P: UdpPeer> { pub(crate) id: u16, pub(crate) remote_id: RwLock<u16>, - pub(crate) chans: Vec<Mutex<Chan>>, - pub(crate) udp_tx: S, - pub(crate) close_tx: watch::Sender<bool>, - pub(crate) tasks: Mutex<JoinSet<()>>, - pub(crate) bomb: Mutex<DropBomb>, + pub(crate) chans: [Mutex<Chan>; NUM_CHANS], + pub(crate) udp_tx: P::Sender, + pub(crate) close: watch::Sender<bool>, } -pub async fn new<S: UdpSender, R: UdpReceiver>( +pub async fn new<P: UdpPeer>( id: u16, remote_id: u16, - udp_tx: S, - udp_rx: R, -) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> { - let (pkt_tx, pkt_rx) = mpsc::unbounded_channel(); + udp_tx: P::Sender, + udp_rx: P::Receiver, +) -> io::Result<(RudpSender<P>, RudpReceiver<P>)> { let (close_tx, close_rx) = watch::channel(false); let share = Arc::new(RudpShare { id, remote_id: RwLock::new(remote_id), udp_tx, - close_tx, - chans: (0..NUM_CHANS) - .map(|_| { - Mutex::new(Chan { - acks: HashMap::new(), - seqnum: INIT_SEQNUM, - }) + close: close_tx, + chans: std::array::from_fn(|_| { + Mutex::new(Chan { + acks: HashMap::new(), + seqnum: INIT_SEQNUM, }) - .collect(), - tasks: Mutex::new(JoinSet::new()), - bomb: Mutex::new(DropBomb::new("rudp connection must be explicitly closed")), + }), }); - let mut tasks = share.tasks.lock().await; + Ok(( + RudpSender { + share: Arc::clone(&share), + }, + RudpReceiver::new(udp_rx, share, close_rx), + )) +} - let recv_share = Arc::clone(&share); - let recv_close = close_rx.clone(); - tasks - /*.build_task() - .name("recv")*/ - .spawn(async move { - let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx); - worker.run().await; - }); +macro_rules! impl_share { + ($T:ident) => { + impl<P: UdpPeer> $T<P> { + pub async fn peer_id(&self) -> u16 { + self.share.id + } - let resend_share = Arc::clone(&share); - let mut resend_close = close_rx.clone(); - tasks - /*.build_task() - .name("resend")*/ - .spawn(async move { - ticker!(Duration::from_millis(500), resend_close, { - for chan in resend_share.chans.iter() { - for (_, ack) in chan.lock().await.acks.iter() { - resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?) - } - } - }); - }); + pub async fn is_server(&self) -> bool { + self.share.id == PeerID::Srv as u16 + } - let ping_share = Arc::clone(&share); - let mut ping_close = close_rx.clone(); - tasks - /*.build_task() - .name("ping")*/ - .spawn(async move { - ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, { - ping_share + pub async fn close(self) { + self.share.close.send(true).ok(); // FIXME: handle err? + + self.share .send( PktType::Ctl, Pkt { + unrel: true, chan: 0, - unrel: false, - data: Cow::Borrowed(&[CtlType::Ping as u8]), + data: Cow::Borrowed(&[CtlType::Disco as u8]), }, ) .await .ok(); - }); - }); - - drop(tasks); - - Ok(( - RudpSender { - share: Arc::clone(&share), - }, - RudpReceiver { share, pkt_rx }, - )) + } + } + }; } + +impl_share!(RudpReceiver); +impl_share!(RudpSender); |