#![feature(cursor_remaining)] #![feature(hash_drain_filter)] #![feature(once_cell)] mod client; pub mod error; mod new; mod recv; mod send; use async_trait::async_trait; use byteorder::{BigEndian, WriteBytesExt}; pub use client::{connect, Sender as Client}; pub use new::new; use num_enum::TryFromPrimitive; use pretty_hex::PrettyHex; use std::{ collections::HashMap, io::{self, Write}, ops, sync::Arc, time::Duration, }; use tokio::{ sync::{mpsc, watch, Mutex, RwLock}, task::JoinSet, }; pub const PROTO_ID: u32 = 0x4f457403; pub const UDP_PKT_SIZE: usize = 512; pub const NUM_CHANS: usize = 3; pub const REL_BUFFER: usize = 0x8000; pub const INIT_SEQNUM: u16 = 65500; pub const TIMEOUT: u64 = 30; pub const PING_TIMEOUT: u64 = 5; mod ticker_mod { #[macro_export] macro_rules! ticker { ($duration:expr, $close:expr, $body:block) => { let mut interval = tokio::time::interval($duration); while tokio::select!{ _ = interval.tick() => true, _ = $close.changed() => false, } $body }; } //pub(crate) use ticker; } #[async_trait] pub trait UdpSender: Send + Sync + 'static { async fn send(&self, data: &[u8]) -> io::Result<()>; } #[async_trait] pub trait UdpReceiver: Send + Sync + 'static { async fn recv(&self) -> io::Result>; } #[derive(Debug, Copy, Clone, PartialEq)] #[repr(u16)] pub enum PeerID { Nil = 0, Srv, CltMin, } #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] #[repr(u8)] pub enum PktType { Ctl = 0, Orig, Split, Rel, } #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] #[repr(u8)] pub enum CtlType { Ack = 0, SetPeerID, Ping, Disco, } #[derive(Debug)] pub struct Pkt { unrel: bool, chan: u8, data: T, } pub type Error = error::Error; pub type InPkt = Result>, Error>; #[derive(Debug)] struct Ack { tx: watch::Sender, rx: watch::Receiver, data: Vec, } #[derive(Debug)] struct Chan { acks: HashMap, seqnum: u16, } #[derive(Debug)] pub struct RudpShare { id: u16, remote_id: RwLock, chans: Vec>, udp_tx: S, close_tx: watch::Sender, tasks: Mutex>, } #[derive(Debug)] pub struct RudpReceiver { share: Arc>, pkt_rx: mpsc::UnboundedReceiver, } #[derive(Debug)] pub struct RudpSender { share: Arc>, } macro_rules! impl_share { ($T:ident) => { impl $T { pub async fn peer_id(&self) -> u16 { self.share.id } pub async fn is_server(&self) -> bool { self.share.id == PeerID::Srv as u16 } pub async fn close(self) { self.share.close_tx.send(true).ok(); let mut tasks = self.share.tasks.lock().await; while let Some(res) = tasks.join_next().await { res.ok(); // TODO: handle error (?) } } } }; } impl_share!(RudpReceiver); impl_share!(RudpSender); impl ops::Deref for RudpReceiver { type Target = mpsc::UnboundedReceiver; fn deref(&self) -> &Self::Target { &self.pkt_rx } } impl ops::DerefMut for RudpReceiver { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.pkt_rx } } async fn example(tx: &RudpSender, rx: &mut RudpReceiver) -> io::Result<()> { // send hello packet let mut mtpkt = vec![]; mtpkt.write_u16::(2)?; // high level type mtpkt.write_u8(29)?; // serialize ver mtpkt.write_u16::(0)?; // compression modes mtpkt.write_u16::(40)?; // MinProtoVer mtpkt.write_u16::(40)?; // MaxProtoVer mtpkt.write_u16::(6)?; // player name length mtpkt.write(b"foobar")?; // player name tx.send(Pkt { unrel: true, chan: 1, data: &mtpkt, }) .await?; // handle incoming packets while let Some(result) = rx.recv().await { match result { Ok(pkt) => { println!("{}", pkt.data.hex_dump()); } Err(err) => eprintln!("Error: {}", err), } } Ok(()) } #[tokio::main] async fn main() -> io::Result<()> { let (tx, mut rx) = connect("127.0.0.1:30000").await?; tokio::select! { _ = tokio::signal::ctrl_c() => println!("canceled"), res = example(&tx, &mut rx) => { res?; println!("disconnected"); } } // close either the receiver or the sender // this shuts down associated tasks rx.close().await; Ok(()) }