diff options
Diffstat (limited to 'src/send.rs')
-rw-r--r-- | src/send.rs | 77 |
1 files changed, 56 insertions, 21 deletions
diff --git a/src/send.rs b/src/send.rs index 2c449e1..90bbe2d 100644 --- a/src/send.rs +++ b/src/send.rs @@ -1,34 +1,57 @@ use super::*; use byteorder::{BigEndian, WriteBytesExt}; use std::{ + collections::HashMap, io::{self, Write}, sync::Arc, }; -use tokio::sync::watch; +use tokio::sync::{watch, Mutex, RwLock}; -pub type AckResult = io::Result<Option<watch::Receiver<bool>>>; +pub type Ack = Option<watch::Receiver<bool>>; -pub struct RudpSender<P: UdpPeer> { - pub(crate) share: Arc<RudpShare<P>>, +#[derive(Debug)] +pub(crate) struct AckWait { + pub(crate) tx: watch::Sender<bool>, + pub(crate) rx: watch::Receiver<bool>, + pub(crate) data: Vec<u8>, } -// derive(Clone) adds unwanted Clone trait bound to P parameter -impl<P: UdpPeer> Clone for RudpSender<P> { - fn clone(&self) -> Self { - Self { - share: Arc::clone(&self.share), - } - } +#[derive(Debug)] +pub(crate) struct Chan { + pub(crate) acks: HashMap<u16, AckWait>, + pub(crate) seqnum: u16, } -impl<P: UdpPeer> RudpSender<P> { - pub async fn send(&self, pkt: Pkt<'_>) -> AckResult { - self.share.send(PktType::Orig, pkt).await // TODO: splits - } +#[derive(Debug)] +pub struct Sender<S: UdpSender> { + pub(crate) id: u16, + pub(crate) remote_id: RwLock<u16>, + pub(crate) chans: [Mutex<Chan>; NUM_CHANS], + udp: S, + close: watch::Sender<bool>, } -impl<P: UdpPeer> RudpShare<P> { - pub async fn send(&self, tp: PktType, pkt: Pkt<'_>) -> AckResult { +impl<S: UdpSender> Sender<S> { + pub fn new(udp: S, close: watch::Sender<bool>, id: u16, remote_id: u16) -> Arc<Self> { + Arc::new(Self { + id, + remote_id: RwLock::new(remote_id), + udp, + close, + chans: std::array::from_fn(|_| { + Mutex::new(Chan { + acks: HashMap::new(), + seqnum: INIT_SEQNUM, + }) + }), + }) + } + + pub async fn send_rudp(&self, pkt: Pkt<'_>) -> io::Result<Ack> { + self.send_rudp_type(PktType::Orig, pkt).await // TODO: splits + } + + pub async fn send_rudp_type(&self, tp: PktType, pkt: Pkt<'_>) -> io::Result<Ack> { let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + 2 + 1 + pkt.data.len()); buf.write_u32::<BigEndian>(PROTO_ID)?; buf.write_u16::<BigEndian>(*self.remote_id.read().await)?; @@ -45,7 +68,7 @@ impl<P: UdpPeer> RudpShare<P> { buf.write_u8(tp as u8)?; buf.write_all(pkt.data.as_ref())?; - self.send_raw(&buf).await?; + self.send_udp(&buf).await?; if pkt.unrel { Ok(None) @@ -54,7 +77,7 @@ impl<P: UdpPeer> RudpShare<P> { let (tx, rx) = watch::channel(false); chan.acks.insert( seqnum, - Ack { + AckWait { tx, rx: rx.clone(), data: buf, @@ -66,11 +89,23 @@ impl<P: UdpPeer> RudpShare<P> { } } - pub async fn send_raw(&self, data: &[u8]) -> io::Result<()> { + pub async fn send_udp(&self, data: &[u8]) -> io::Result<()> { if data.len() > UDP_PKT_SIZE { panic!("splitting packets is not implemented yet"); } - self.udp_tx.send(data).await + self.udp.send(data).await + } + + pub async fn peer_id(&self) -> u16 { + self.id + } + + pub async fn is_server(&self) -> bool { + self.id == PeerID::Srv as u16 + } + + pub fn close(&self) { + self.close.send(true).ok(); } } |