From 89b1fc1d8d4bd886d80af0fe1d492cc877bce022 Mon Sep 17 00:00:00 2001 From: Lizzy Fleckenstein Date: Sat, 25 Feb 2023 18:55:53 +0100 Subject: Use channels --- src/send.rs | 77 ++++++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 56 insertions(+), 21 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 2c449e1..90bbe2d 100644 --- a/src/send.rs +++ b/src/send.rs @@ -1,34 +1,57 @@ use super::*; use byteorder::{BigEndian, WriteBytesExt}; use std::{ + collections::HashMap, io::{self, Write}, sync::Arc, }; -use tokio::sync::watch; +use tokio::sync::{watch, Mutex, RwLock}; -pub type AckResult = io::Result>>; +pub type Ack = Option>; -pub struct RudpSender { - pub(crate) share: Arc>, +#[derive(Debug)] +pub(crate) struct AckWait { + pub(crate) tx: watch::Sender, + pub(crate) rx: watch::Receiver, + pub(crate) data: Vec, } -// derive(Clone) adds unwanted Clone trait bound to P parameter -impl Clone for RudpSender

{ - fn clone(&self) -> Self { - Self { - share: Arc::clone(&self.share), - } - } +#[derive(Debug)] +pub(crate) struct Chan { + pub(crate) acks: HashMap, + pub(crate) seqnum: u16, } -impl RudpSender

{ - pub async fn send(&self, pkt: Pkt<'_>) -> AckResult { - self.share.send(PktType::Orig, pkt).await // TODO: splits - } +#[derive(Debug)] +pub struct Sender { + pub(crate) id: u16, + pub(crate) remote_id: RwLock, + pub(crate) chans: [Mutex; NUM_CHANS], + udp: S, + close: watch::Sender, } -impl RudpShare

{ - pub async fn send(&self, tp: PktType, pkt: Pkt<'_>) -> AckResult { +impl Sender { + pub fn new(udp: S, close: watch::Sender, id: u16, remote_id: u16) -> Arc { + Arc::new(Self { + id, + remote_id: RwLock::new(remote_id), + udp, + close, + chans: std::array::from_fn(|_| { + Mutex::new(Chan { + acks: HashMap::new(), + seqnum: INIT_SEQNUM, + }) + }), + }) + } + + pub async fn send_rudp(&self, pkt: Pkt<'_>) -> io::Result { + self.send_rudp_type(PktType::Orig, pkt).await // TODO: splits + } + + pub async fn send_rudp_type(&self, tp: PktType, pkt: Pkt<'_>) -> io::Result { let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + 2 + 1 + pkt.data.len()); buf.write_u32::(PROTO_ID)?; buf.write_u16::(*self.remote_id.read().await)?; @@ -45,7 +68,7 @@ impl RudpShare

{ buf.write_u8(tp as u8)?; buf.write_all(pkt.data.as_ref())?; - self.send_raw(&buf).await?; + self.send_udp(&buf).await?; if pkt.unrel { Ok(None) @@ -54,7 +77,7 @@ impl RudpShare

{ let (tx, rx) = watch::channel(false); chan.acks.insert( seqnum, - Ack { + AckWait { tx, rx: rx.clone(), data: buf, @@ -66,11 +89,23 @@ impl RudpShare

{ } } - pub async fn send_raw(&self, data: &[u8]) -> io::Result<()> { + pub async fn send_udp(&self, data: &[u8]) -> io::Result<()> { if data.len() > UDP_PKT_SIZE { panic!("splitting packets is not implemented yet"); } - self.udp_tx.send(data).await + self.udp.send(data).await + } + + pub async fn peer_id(&self) -> u16 { + self.id + } + + pub async fn is_server(&self) -> bool { + self.id == PeerID::Srv as u16 + } + + pub fn close(&self) { + self.close.send(true).ok(); } } -- cgit v1.2.3