diff options
author | Lizzy Fleckenstein <eliasfleckenstein@web.de> | 2023-01-06 17:45:16 +0100 |
---|---|---|
committer | Lizzy Fleckenstein <eliasfleckenstein@web.de> | 2023-01-06 17:45:16 +0100 |
commit | fd23bb3a2b57d43c115005dcd70f1e18bb005032 (patch) | |
tree | 7fa77c2db1faa55685e24a180bbd419a78d7be53 /src/send.rs | |
parent | d3b8019227137853406891e2aa84e0c8a9e3c31c (diff) | |
download | mt_rudp-fd23bb3a2b57d43c115005dcd70f1e18bb005032.tar.xz |
clean shutdown; send reliables
Diffstat (limited to 'src/send.rs')
-rw-r--r-- | src/send.rs | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/src/send.rs b/src/send.rs new file mode 100644 index 0000000..89c15c7 --- /dev/null +++ b/src/send.rs @@ -0,0 +1,55 @@ +use crate::*; +use tokio::sync::watch; + +type AckResult = io::Result<Option<watch::Receiver<bool>>>; + +impl<S: UdpSender> RudpSender<S> { + pub async fn send(&self, pkt: Pkt<&[u8]>) -> AckResult { + self.share.send(PktType::Orig, pkt).await // TODO: splits + } +} + +impl<S: UdpSender> RudpShare<S> { + pub async fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> AckResult { + let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + 2 + 1 + pkt.data.len()); + buf.write_u32::<BigEndian>(PROTO_ID)?; + buf.write_u16::<BigEndian>(*self.remote_id.read().await)?; + buf.write_u8(pkt.chan as u8)?; + + let mut chan = self.chans[pkt.chan as usize].lock().await; + let seqnum = chan.seqnum; + + if !pkt.unrel { + buf.write_u8(PktType::Rel as u8)?; + buf.write_u16::<BigEndian>(seqnum)?; + } + + buf.write_u8(tp as u8)?; + buf.write(pkt.data)?; + + self.send_raw(&buf).await?; + + if pkt.unrel { + Ok(None) + } else { + // TODO: reliable window + let (tx, rx) = watch::channel(false); + chan.acks.insert( + seqnum, + Ack { + tx, + rx: rx.clone(), + data: buf, + }, + ); + chan.seqnum += 1; + + Ok(Some(rx)) + } + } + + pub async fn send_raw(&self, data: &[u8]) -> io::Result<()> { + self.udp_tx.send(data).await + // TODO: reset ping timeout + } +} |