From 9ee7b7ddc5c3611bf515dbdee4134b6df9efed92 Mon Sep 17 00:00:00 2001 From: Lizzy Fleckenstein Date: Fri, 6 Jan 2023 21:20:09 +0100 Subject: cleanup; readme --- src/client.rs | 3 +- src/error.rs | 2 +- src/lib.rs | 194 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 219 ---------------------------------------------------------- src/new.rs | 9 ++- src/recv.rs | 28 +------- src/send.rs | 5 +- 7 files changed, 210 insertions(+), 250 deletions(-) create mode 100644 src/lib.rs delete mode 100644 src/main.rs (limited to 'src') diff --git a/src/client.rs b/src/client.rs index 172aa96..e6d7e75 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,5 @@ -use crate::*; +use crate::prelude::*; +use async_trait::async_trait; use std::{io, sync::Arc}; use tokio::net; diff --git a/src/error.rs b/src/error.rs index 7853f59..6dec6c8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,4 @@ -use crate::{CtlType, InPkt, PktType}; +use crate::prelude::*; use num_enum::TryFromPrimitiveError; use std::{fmt, io}; use tokio::sync::mpsc::error::SendError; diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..f0a91fe --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,194 @@ +#![feature(cursor_remaining)] +#![feature(hash_drain_filter)] +#![feature(once_cell)] +mod client; +mod error; +mod new; +mod recv; +mod send; + +pub use prelude::*; + +use async_trait::async_trait; +use num_enum::TryFromPrimitive; +use std::{ + cell::{Cell, OnceCell}, + collections::HashMap, + io, ops, + sync::Arc, + time::Instant, +}; +use tokio::{ + sync::{mpsc, watch, Mutex, RwLock}, + task::JoinSet, +}; + +#[async_trait] +pub trait UdpSender: Send + Sync + 'static { + async fn send(&self, data: &[u8]) -> io::Result<()>; +} + +#[async_trait] +pub trait UdpReceiver: Send + Sync + 'static { + async fn recv(&self) -> io::Result>; +} + +#[derive(Debug, Copy, Clone, PartialEq)] +#[repr(u16)] +pub enum PeerID { + Nil = 0, + Srv, + CltMin, +} + +#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] +#[repr(u8)] +pub enum PktType { + Ctl = 0, + Orig, + Split, + Rel, +} + +#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] +#[repr(u8)] +pub enum CtlType { + Ack = 0, + SetPeerID, + Ping, + Disco, +} + +#[derive(Debug)] +pub struct Pkt { + pub unrel: bool, + pub chan: u8, + pub data: T, +} + +pub type InPkt = Result>, error::Error>; + +#[derive(Debug)] +struct Ack { + tx: watch::Sender, + rx: watch::Receiver, + data: Vec, +} + +#[derive(Debug)] +struct Chan { + acks: HashMap, + seqnum: u16, +} + +#[derive(Debug)] +struct RudpShare { + id: u16, + remote_id: RwLock, + chans: Vec>, + udp_tx: S, + close_tx: watch::Sender, + tasks: Mutex>, +} + +#[derive(Debug)] +pub struct RudpReceiver { + share: Arc>, + pkt_rx: mpsc::UnboundedReceiver, +} + +#[derive(Debug)] +pub struct RudpSender { + share: Arc>, +} + +macro_rules! impl_share { + ($T:ident) => { + impl $T { + pub async fn peer_id(&self) -> u16 { + self.share.id + } + + pub async fn is_server(&self) -> bool { + self.share.id == PeerID::Srv as u16 + } + + pub async fn close(self) { + self.share.close_tx.send(true).ok(); + + let mut tasks = self.share.tasks.lock().await; + while let Some(res) = tasks.join_next().await { + res.ok(); // TODO: handle error (?) + } + } + } + }; +} + +impl_share!(RudpReceiver); +impl_share!(RudpSender); + +impl ops::Deref for RudpReceiver { + type Target = mpsc::UnboundedReceiver; + + fn deref(&self) -> &Self::Target { + &self.pkt_rx + } +} + +impl ops::DerefMut for RudpReceiver { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.pkt_rx + } +} + +#[derive(Debug)] +struct Split { + timestamp: Option, + chunks: Vec>>, + got: usize, +} + +struct RecvChan { + packets: Vec>>>, // char ** 😛 + splits: HashMap, + seqnum: u16, + num: u8, +} + +struct RecvWorker { + share: Arc>, + close: watch::Receiver, + chans: Arc>>, + pkt_tx: mpsc::UnboundedSender, + udp_rx: R, +} + +mod prelude { + pub const PROTO_ID: u32 = 0x4f457403; + pub const UDP_PKT_SIZE: usize = 512; + pub const NUM_CHANS: usize = 3; + pub const REL_BUFFER: usize = 0x8000; + pub const INIT_SEQNUM: u16 = 65500; + pub const TIMEOUT: u64 = 30; + pub const PING_TIMEOUT: u64 = 5; + + pub use super::{ + client::{connect, Sender as Client}, + error::Error, + new::new, + CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender, + }; + + #[macro_export] + macro_rules! ticker { + ($duration:expr, $close:expr, $body:block) => { + let mut interval = tokio::time::interval($duration); + + while tokio::select!{ + _ = interval.tick() => true, + _ = $close.changed() => false, + } $body + }; + } +} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 9a85183..0000000 --- a/src/main.rs +++ /dev/null @@ -1,219 +0,0 @@ -#![feature(cursor_remaining)] -#![feature(hash_drain_filter)] -#![feature(once_cell)] -mod client; -pub mod error; -mod new; -mod recv; -mod send; - -use async_trait::async_trait; -use byteorder::{BigEndian, WriteBytesExt}; -pub use client::{connect, Sender as Client}; -pub use new::new; -use num_enum::TryFromPrimitive; -use pretty_hex::PrettyHex; -use std::{ - collections::HashMap, - io::{self, Write}, - ops, - sync::Arc, - time::Duration, -}; -use tokio::{ - sync::{mpsc, watch, Mutex, RwLock}, - task::JoinSet, -}; - -pub const PROTO_ID: u32 = 0x4f457403; -pub const UDP_PKT_SIZE: usize = 512; -pub const NUM_CHANS: usize = 3; -pub const REL_BUFFER: usize = 0x8000; -pub const INIT_SEQNUM: u16 = 65500; -pub const TIMEOUT: u64 = 30; -pub const PING_TIMEOUT: u64 = 5; - -mod ticker_mod { - #[macro_export] - macro_rules! ticker { - ($duration:expr, $close:expr, $body:block) => { - let mut interval = tokio::time::interval($duration); - - while tokio::select!{ - _ = interval.tick() => true, - _ = $close.changed() => false, - } $body - }; - } - - //pub(crate) use ticker; -} - -#[async_trait] -pub trait UdpSender: Send + Sync + 'static { - async fn send(&self, data: &[u8]) -> io::Result<()>; -} - -#[async_trait] -pub trait UdpReceiver: Send + Sync + 'static { - async fn recv(&self) -> io::Result>; -} - -#[derive(Debug, Copy, Clone, PartialEq)] -#[repr(u16)] -pub enum PeerID { - Nil = 0, - Srv, - CltMin, -} - -#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] -#[repr(u8)] -pub enum PktType { - Ctl = 0, - Orig, - Split, - Rel, -} - -#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] -#[repr(u8)] -pub enum CtlType { - Ack = 0, - SetPeerID, - Ping, - Disco, -} - -#[derive(Debug)] -pub struct Pkt { - unrel: bool, - chan: u8, - data: T, -} - -pub type Error = error::Error; -pub type InPkt = Result>, Error>; - -#[derive(Debug)] -struct Ack { - tx: watch::Sender, - rx: watch::Receiver, - data: Vec, -} - -#[derive(Debug)] -struct Chan { - acks: HashMap, - seqnum: u16, -} - -#[derive(Debug)] -pub struct RudpShare { - id: u16, - remote_id: RwLock, - chans: Vec>, - udp_tx: S, - close_tx: watch::Sender, - tasks: Mutex>, -} - -#[derive(Debug)] -pub struct RudpReceiver { - share: Arc>, - pkt_rx: mpsc::UnboundedReceiver, -} - -#[derive(Debug)] -pub struct RudpSender { - share: Arc>, -} - -macro_rules! impl_share { - ($T:ident) => { - impl $T { - pub async fn peer_id(&self) -> u16 { - self.share.id - } - - pub async fn is_server(&self) -> bool { - self.share.id == PeerID::Srv as u16 - } - - pub async fn close(self) { - self.share.close_tx.send(true).ok(); - - let mut tasks = self.share.tasks.lock().await; - while let Some(res) = tasks.join_next().await { - res.ok(); // TODO: handle error (?) - } - } - } - }; -} - -impl_share!(RudpReceiver); -impl_share!(RudpSender); - -impl ops::Deref for RudpReceiver { - type Target = mpsc::UnboundedReceiver; - - fn deref(&self) -> &Self::Target { - &self.pkt_rx - } -} - -impl ops::DerefMut for RudpReceiver { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.pkt_rx - } -} - -async fn example(tx: &RudpSender, rx: &mut RudpReceiver) -> io::Result<()> { - // send hello packet - let mut mtpkt = vec![]; - mtpkt.write_u16::(2)?; // high level type - mtpkt.write_u8(29)?; // serialize ver - mtpkt.write_u16::(0)?; // compression modes - mtpkt.write_u16::(40)?; // MinProtoVer - mtpkt.write_u16::(40)?; // MaxProtoVer - mtpkt.write_u16::(6)?; // player name length - mtpkt.write(b"foobar")?; // player name - - tx.send(Pkt { - unrel: true, - chan: 1, - data: &mtpkt, - }) - .await?; - // handle incoming packets - while let Some(result) = rx.recv().await { - match result { - Ok(pkt) => { - println!("{}", pkt.data.hex_dump()); - } - Err(err) => eprintln!("Error: {}", err), - } - } - - Ok(()) -} - -#[tokio::main] -async fn main() -> io::Result<()> { - let (tx, mut rx) = connect("127.0.0.1:30000").await?; - - tokio::select! { - _ = tokio::signal::ctrl_c() => println!("canceled"), - res = example(&tx, &mut rx) => { - res?; - println!("disconnected"); - } - } - - // close either the receiver or the sender - // this shuts down associated tasks - rx.close().await; - - Ok(()) -} diff --git a/src/new.rs b/src/new.rs index 9f820a9..b17f518 100644 --- a/src/new.rs +++ b/src/new.rs @@ -1,4 +1,9 @@ -use crate::*; +use crate::{prelude::*, ticker, Chan, RecvWorker, RudpShare}; +use std::{collections::HashMap, io, sync::Arc, time::Duration}; +use tokio::{ + sync::{mpsc, watch, Mutex, RwLock}, + task::JoinSet, +}; pub async fn new( id: u16, @@ -33,7 +38,7 @@ pub async fn new( /*.build_task() .name("recv")*/ .spawn(async move { - let worker = recv::RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx); + let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx); worker.run().await; }); diff --git a/src/recv.rs b/src/recv.rs index 2fabe3a..e3d3e6b 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -1,6 +1,6 @@ -use crate::{error::Error, *}; +use crate::{prelude::*, ticker, RecvChan, RecvWorker, RudpShare, Split}; use async_recursion::async_recursion; -use byteorder::{BigEndian, ReadBytesExt}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::{ cell::{Cell, OnceCell}, collections::HashMap, @@ -9,7 +9,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, watch, Mutex}; fn to_seqnum(seqnum: u16) -> usize { (seqnum as usize) & (REL_BUFFER - 1) @@ -17,27 +17,6 @@ fn to_seqnum(seqnum: u16) -> usize { type Result = std::result::Result; -struct Split { - timestamp: Option, - chunks: Vec>>, - got: usize, -} - -struct RecvChan { - packets: Vec>>>, // char ** 😛 - splits: HashMap, - seqnum: u16, - num: u8, -} - -pub struct RecvWorker { - share: Arc>, - close: watch::Receiver, - chans: Arc>>, - pkt_tx: mpsc::UnboundedSender, - udp_rx: R, -} - impl RecvWorker { pub fn new( udp_rx: R, @@ -133,7 +112,6 @@ impl RecvWorker { ) -> Result<()> { use Error::*; - // TODO: reset timeout let mut cursor = io::Cursor::new(tokio::select! { pkt = self.udp_rx.recv() => pkt?, _ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)), diff --git a/src/send.rs b/src/send.rs index 20308e6..c41ad80 100644 --- a/src/send.rs +++ b/src/send.rs @@ -1,4 +1,6 @@ -use crate::*; +use crate::{prelude::*, Ack, RudpShare}; +use byteorder::{BigEndian, WriteBytesExt}; +use std::io::{self, Write}; use tokio::sync::watch; type AckResult = io::Result>>; @@ -50,6 +52,5 @@ impl RudpShare { pub async fn send_raw(&self, data: &[u8]) -> io::Result<()> { self.udp_tx.send(data).await - // TODO: reset ping timeout } } -- cgit v1.2.3