#![feature(cursor_remaining)] #![feature(hash_drain_filter)] #![feature(once_cell)] mod client; mod error; mod new; mod recv; mod send; pub use prelude::*; use async_trait::async_trait; use num_enum::TryFromPrimitive; use std::{ cell::{Cell, OnceCell}, collections::HashMap, io, ops, sync::Arc, time::Instant, }; use tokio::{ sync::{mpsc, watch, Mutex, RwLock}, task::JoinSet, }; #[async_trait] pub trait UdpSender: Send + Sync + 'static { async fn send(&self, data: &[u8]) -> io::Result<()>; } #[async_trait] pub trait UdpReceiver: Send + Sync + 'static { async fn recv(&self) -> io::Result>; } #[derive(Debug, Copy, Clone, PartialEq)] #[repr(u16)] pub enum PeerID { Nil = 0, Srv, CltMin, } #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] #[repr(u8)] pub enum PktType { Ctl = 0, Orig, Split, Rel, } #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] #[repr(u8)] pub enum CtlType { Ack = 0, SetPeerID, Ping, Disco, } #[derive(Debug)] pub struct Pkt { pub unrel: bool, pub chan: u8, pub data: T, } pub type InPkt = Result>, error::Error>; #[derive(Debug)] struct Ack { tx: watch::Sender, rx: watch::Receiver, data: Vec, } #[derive(Debug)] struct Chan { acks: HashMap, seqnum: u16, } #[derive(Debug)] struct RudpShare { id: u16, remote_id: RwLock, chans: Vec>, udp_tx: S, close_tx: watch::Sender, tasks: Mutex>, } #[derive(Debug)] pub struct RudpReceiver { share: Arc>, pkt_rx: mpsc::UnboundedReceiver, } #[derive(Debug)] pub struct RudpSender { share: Arc>, } macro_rules! impl_share { ($T:ident) => { impl $T { pub async fn peer_id(&self) -> u16 { self.share.id } pub async fn is_server(&self) -> bool { self.share.id == PeerID::Srv as u16 } pub async fn close(self) { self.share.close_tx.send(true).ok(); let mut tasks = self.share.tasks.lock().await; while let Some(res) = tasks.join_next().await { res.ok(); // TODO: handle error (?) } } } }; } impl_share!(RudpReceiver); impl_share!(RudpSender); impl ops::Deref for RudpReceiver { type Target = mpsc::UnboundedReceiver; fn deref(&self) -> &Self::Target { &self.pkt_rx } } impl ops::DerefMut for RudpReceiver { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.pkt_rx } } #[derive(Debug)] struct Split { timestamp: Option, chunks: Vec>>, got: usize, } struct RecvChan { packets: Vec>>>, // char ** 😛 splits: HashMap, seqnum: u16, num: u8, } struct RecvWorker { share: Arc>, close: watch::Receiver, chans: Arc>>, pkt_tx: mpsc::UnboundedSender, udp_rx: R, } mod prelude { pub const PROTO_ID: u32 = 0x4f457403; pub const UDP_PKT_SIZE: usize = 512; pub const NUM_CHANS: usize = 3; pub const REL_BUFFER: usize = 0x8000; pub const INIT_SEQNUM: u16 = 65500; pub const TIMEOUT: u64 = 30; pub const PING_TIMEOUT: u64 = 5; pub use super::{ client::{connect, Sender as Client}, error::Error, new::new, CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender, }; #[macro_export] macro_rules! ticker { ($duration:expr, $close:expr, $body:block) => { let mut interval = tokio::time::interval($duration); while tokio::select!{ _ = interval.tick() => true, _ = $close.changed() => false, } $body }; } }