From fd23bb3a2b57d43c115005dcd70f1e18bb005032 Mon Sep 17 00:00:00 2001 From: Lizzy Fleckenstein Date: Fri, 6 Jan 2023 17:45:16 +0100 Subject: clean shutdown; send reliables --- src/main.rs | 169 ++++++++++++++++++++++++++++++++---------------------------- 1 file changed, 89 insertions(+), 80 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index 1f0aca0..0510db7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,19 +3,27 @@ #![feature(once_cell)] mod client; pub mod error; -mod recv_worker; +mod new; +mod recv; +mod send; use async_trait::async_trait; use byteorder::{BigEndian, WriteBytesExt}; pub use client::{connect, Sender as Client}; +pub use new::new; use num_enum::TryFromPrimitive; +use pretty_hex::PrettyHex; use std::{ collections::HashMap, io::{self, Write}, ops, sync::Arc, + time::Duration, +}; +use tokio::{ + sync::{mpsc, watch, Mutex, RwLock}, + task::JoinSet, }; -use tokio::sync::{mpsc, watch, Mutex, RwLock}; pub const PROTO_ID: u32 = 0x4f457403; pub const UDP_PKT_SIZE: usize = 512; @@ -24,9 +32,25 @@ pub const REL_BUFFER: usize = 0x8000; pub const INIT_SEQNUM: u16 = 65500; pub const TIMEOUT: u64 = 30; +mod ticker_mod { + #[macro_export] + macro_rules! ticker { + ($duration:expr, $close:expr, $body:block) => { + let mut interval = tokio::time::interval($duration); + + while tokio::select!{ + _ = interval.tick() => true, + _ = $close.changed() => false, + } $body + }; + } + + //pub(crate) use ticker; +} + #[async_trait] pub trait UdpSender: Send + Sync + 'static { - async fn send(&self, data: Vec) -> io::Result<()>; + async fn send(&self, data: &[u8]) -> io::Result<()>; } #[async_trait] @@ -69,14 +93,28 @@ pub struct Pkt { pub type Error = error::Error; pub type InPkt = Result>, Error>; -type AckChan = (watch::Sender, watch::Receiver); + +#[derive(Debug)] +struct Ack { + tx: watch::Sender, + rx: watch::Receiver, + data: Vec, +} + +#[derive(Debug)] +struct Chan { + acks: HashMap, + seqnum: u16, +} #[derive(Debug)] pub struct RudpShare { - pub id: u16, - pub remote_id: RwLock, - pub ack_chans: Mutex>, + id: u16, + remote_id: RwLock, + chans: Vec>, udp_tx: S, + close_tx: watch::Sender, + tasks: Mutex>, } #[derive(Debug)] @@ -90,44 +128,31 @@ pub struct RudpSender { share: Arc>, } -impl RudpShare { - pub async fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> io::Result<()> { - let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len()); - buf.write_u32::(PROTO_ID)?; - buf.write_u16::(*self.remote_id.read().await)?; - buf.write_u8(pkt.chan as u8)?; - buf.write_u8(tp as u8)?; - buf.write(pkt.data)?; - - self.udp_tx.send(buf).await?; - - Ok(()) - } -} +macro_rules! impl_share { + ($T:ident) => { + impl $T { + pub async fn peer_id(&self) -> u16 { + self.share.id + } -impl RudpSender { - pub async fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> { - self.share.send(PktType::Orig, pkt).await // TODO - } + pub async fn is_server(&self) -> bool { + self.share.id == PeerID::Srv as u16 + } - pub async fn peer_id(&self) -> u16 { - self.share.id - } + pub async fn close(self) { + self.share.close_tx.send(true).ok(); - pub async fn is_server(&self) -> bool { - self.share.id == PeerID::Srv as u16 - } + let mut tasks = self.share.tasks.lock().await; + while let Some(res) = tasks.join_next().await { + res.ok(); // TODO: handle error (?) + } + } + } + }; } -impl RudpReceiver { - pub async fn peer_id(&self) -> u16 { - self.share.id - } - - pub async fn is_server(&self) -> bool { - self.share.id == PeerID::Srv as u16 - } -} +impl_share!(RudpReceiver); +impl_share!(RudpSender); impl ops::Deref for RudpReceiver { type Target = mpsc::UnboundedReceiver; @@ -143,49 +168,16 @@ impl ops::DerefMut for RudpReceiver { } } -pub fn new( - id: u16, - remote_id: u16, - udp_tx: S, - udp_rx: R, -) -> (RudpSender, RudpReceiver) { - let (pkt_tx, pkt_rx) = mpsc::unbounded_channel(); - - let share = Arc::new(RudpShare { - id, - remote_id: RwLock::new(remote_id), - udp_tx, - ack_chans: Mutex::new(HashMap::new()), - }); - let recv_share = Arc::clone(&share); - - tokio::spawn(async { - let worker = recv_worker::RecvWorker::new(udp_rx, recv_share, pkt_tx); - worker.run().await; - }); - - ( - RudpSender { - share: Arc::clone(&share), - }, - RudpReceiver { share, pkt_rx }, - ) -} - -// connect - -#[tokio::main] -async fn main() -> io::Result<()> { - let (tx, mut rx) = connect("127.0.0.1:30000").await?; - +async fn example(tx: &RudpSender, rx: &mut RudpReceiver) -> io::Result<()> { + // send hello packet let mut mtpkt = vec![]; mtpkt.write_u16::(2)?; // high level type mtpkt.write_u8(29)?; // serialize ver mtpkt.write_u16::(0)?; // compression modes mtpkt.write_u16::(40)?; // MinProtoVer mtpkt.write_u16::(40)?; // MaxProtoVer - mtpkt.write_u16::(3)?; // player name length - mtpkt.write(b"foo")?; // player name + mtpkt.write_u16::(6)?; // player name length + mtpkt.write(b"foobar")?; // player name tx.send(Pkt { unrel: true, @@ -194,17 +186,34 @@ async fn main() -> io::Result<()> { }) .await?; + // handle incoming packets while let Some(result) = rx.recv().await { match result { Ok(pkt) => { - io::stdout().write(pkt.data.as_slice())?; + println!("{}", pkt.data.hex_dump()); } Err(err) => eprintln!("Error: {}", err), } } - println!("disco"); - // close()ing rx is not needed because it has been consumed to the end + Ok(()) +} + +#[tokio::main] +async fn main() -> io::Result<()> { + let (tx, mut rx) = connect("127.0.0.1:30000").await?; + + tokio::select! { + _ = tokio::signal::ctrl_c() => println!("canceled"), + res = example(&tx, &mut rx) => { + res?; + println!("disconnected"); + } + } + + // close either the receiver or the sender + // this shuts down associated tasks + rx.close().await; Ok(()) } -- cgit v1.2.3