From fd23bb3a2b57d43c115005dcd70f1e18bb005032 Mon Sep 17 00:00:00 2001 From: Lizzy Fleckenstein Date: Fri, 6 Jan 2023 17:45:16 +0100 Subject: clean shutdown; send reliables --- src/send.rs | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 src/send.rs (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs new file mode 100644 index 0000000..89c15c7 --- /dev/null +++ b/src/send.rs @@ -0,0 +1,55 @@ +use crate::*; +use tokio::sync::watch; + +type AckResult = io::Result>>; + +impl RudpSender { + pub async fn send(&self, pkt: Pkt<&[u8]>) -> AckResult { + self.share.send(PktType::Orig, pkt).await // TODO: splits + } +} + +impl RudpShare { + pub async fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> AckResult { + let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + 2 + 1 + pkt.data.len()); + buf.write_u32::(PROTO_ID)?; + buf.write_u16::(*self.remote_id.read().await)?; + buf.write_u8(pkt.chan as u8)?; + + let mut chan = self.chans[pkt.chan as usize].lock().await; + let seqnum = chan.seqnum; + + if !pkt.unrel { + buf.write_u8(PktType::Rel as u8)?; + buf.write_u16::(seqnum)?; + } + + buf.write_u8(tp as u8)?; + buf.write(pkt.data)?; + + self.send_raw(&buf).await?; + + if pkt.unrel { + Ok(None) + } else { + // TODO: reliable window + let (tx, rx) = watch::channel(false); + chan.acks.insert( + seqnum, + Ack { + tx, + rx: rx.clone(), + data: buf, + }, + ); + chan.seqnum += 1; + + Ok(Some(rx)) + } + } + + pub async fn send_raw(&self, data: &[u8]) -> io::Result<()> { + self.udp_tx.send(data).await + // TODO: reset ping timeout + } +} -- cgit v1.2.3