diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 219 |
1 files changed, 0 insertions, 219 deletions
diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 9a85183..0000000 --- a/src/main.rs +++ /dev/null @@ -1,219 +0,0 @@ -#![feature(cursor_remaining)] -#![feature(hash_drain_filter)] -#![feature(once_cell)] -mod client; -pub mod error; -mod new; -mod recv; -mod send; - -use async_trait::async_trait; -use byteorder::{BigEndian, WriteBytesExt}; -pub use client::{connect, Sender as Client}; -pub use new::new; -use num_enum::TryFromPrimitive; -use pretty_hex::PrettyHex; -use std::{ - collections::HashMap, - io::{self, Write}, - ops, - sync::Arc, - time::Duration, -}; -use tokio::{ - sync::{mpsc, watch, Mutex, RwLock}, - task::JoinSet, -}; - -pub const PROTO_ID: u32 = 0x4f457403; -pub const UDP_PKT_SIZE: usize = 512; -pub const NUM_CHANS: usize = 3; -pub const REL_BUFFER: usize = 0x8000; -pub const INIT_SEQNUM: u16 = 65500; -pub const TIMEOUT: u64 = 30; -pub const PING_TIMEOUT: u64 = 5; - -mod ticker_mod { - #[macro_export] - macro_rules! ticker { - ($duration:expr, $close:expr, $body:block) => { - let mut interval = tokio::time::interval($duration); - - while tokio::select!{ - _ = interval.tick() => true, - _ = $close.changed() => false, - } $body - }; - } - - //pub(crate) use ticker; -} - -#[async_trait] -pub trait UdpSender: Send + Sync + 'static { - async fn send(&self, data: &[u8]) -> io::Result<()>; -} - -#[async_trait] -pub trait UdpReceiver: Send + Sync + 'static { - async fn recv(&self) -> io::Result<Vec<u8>>; -} - -#[derive(Debug, Copy, Clone, PartialEq)] -#[repr(u16)] -pub enum PeerID { - Nil = 0, - Srv, - CltMin, -} - -#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] -#[repr(u8)] -pub enum PktType { - Ctl = 0, - Orig, - Split, - Rel, -} - -#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] -#[repr(u8)] -pub enum CtlType { - Ack = 0, - SetPeerID, - Ping, - Disco, -} - -#[derive(Debug)] -pub struct Pkt<T> { - unrel: bool, - chan: u8, - data: T, -} - -pub type Error = error::Error; -pub type InPkt = Result<Pkt<Vec<u8>>, Error>; - -#[derive(Debug)] -struct Ack { - tx: watch::Sender<bool>, - rx: watch::Receiver<bool>, - data: Vec<u8>, -} - -#[derive(Debug)] -struct Chan { - acks: HashMap<u16, Ack>, - seqnum: u16, -} - -#[derive(Debug)] -pub struct RudpShare<S: UdpSender> { - id: u16, - remote_id: RwLock<u16>, - chans: Vec<Mutex<Chan>>, - udp_tx: S, - close_tx: watch::Sender<bool>, - tasks: Mutex<JoinSet<()>>, -} - -#[derive(Debug)] -pub struct RudpReceiver<S: UdpSender> { - share: Arc<RudpShare<S>>, - pkt_rx: mpsc::UnboundedReceiver<InPkt>, -} - -#[derive(Debug)] -pub struct RudpSender<S: UdpSender> { - share: Arc<RudpShare<S>>, -} - -macro_rules! impl_share { - ($T:ident) => { - impl<S: UdpSender> $T<S> { - pub async fn peer_id(&self) -> u16 { - self.share.id - } - - pub async fn is_server(&self) -> bool { - self.share.id == PeerID::Srv as u16 - } - - pub async fn close(self) { - self.share.close_tx.send(true).ok(); - - let mut tasks = self.share.tasks.lock().await; - while let Some(res) = tasks.join_next().await { - res.ok(); // TODO: handle error (?) - } - } - } - }; -} - -impl_share!(RudpReceiver); -impl_share!(RudpSender); - -impl<S: UdpSender> ops::Deref for RudpReceiver<S> { - type Target = mpsc::UnboundedReceiver<InPkt>; - - fn deref(&self) -> &Self::Target { - &self.pkt_rx - } -} - -impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.pkt_rx - } -} - -async fn example(tx: &RudpSender<Client>, rx: &mut RudpReceiver<Client>) -> io::Result<()> { - // send hello packet - let mut mtpkt = vec![]; - mtpkt.write_u16::<BigEndian>(2)?; // high level type - mtpkt.write_u8(29)?; // serialize ver - mtpkt.write_u16::<BigEndian>(0)?; // compression modes - mtpkt.write_u16::<BigEndian>(40)?; // MinProtoVer - mtpkt.write_u16::<BigEndian>(40)?; // MaxProtoVer - mtpkt.write_u16::<BigEndian>(6)?; // player name length - mtpkt.write(b"foobar")?; // player name - - tx.send(Pkt { - unrel: true, - chan: 1, - data: &mtpkt, - }) - .await?; - // handle incoming packets - while let Some(result) = rx.recv().await { - match result { - Ok(pkt) => { - println!("{}", pkt.data.hex_dump()); - } - Err(err) => eprintln!("Error: {}", err), - } - } - - Ok(()) -} - -#[tokio::main] -async fn main() -> io::Result<()> { - let (tx, mut rx) = connect("127.0.0.1:30000").await?; - - tokio::select! { - _ = tokio::signal::ctrl_c() => println!("canceled"), - res = example(&tx, &mut rx) => { - res?; - println!("disconnected"); - } - } - - // close either the receiver or the sender - // this shuts down associated tasks - rx.close().await; - - Ok(()) -} |