aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLizzy Fleckenstein <eliasfleckenstein@web.de>2023-01-06 21:20:09 +0100
committerLizzy Fleckenstein <eliasfleckenstein@web.de>2023-01-06 21:20:15 +0100
commit9ee7b7ddc5c3611bf515dbdee4134b6df9efed92 (patch)
tree09247c8cf80c19e1e005d3be8b03b97a30aaa6b4 /src
parent12bfebc06ed29fabbc4a4357e314b8fbde1b552d (diff)
downloadmt_rudp-9ee7b7ddc5c3611bf515dbdee4134b6df9efed92.tar.xz
cleanup; readme
Diffstat (limited to 'src')
-rw-r--r--src/client.rs3
-rw-r--r--src/error.rs2
-rw-r--r--src/lib.rs (renamed from src/main.rs)135
-rw-r--r--src/new.rs9
-rw-r--r--src/recv.rs28
-rw-r--r--src/send.rs5
6 files changed, 71 insertions, 111 deletions
diff --git a/src/client.rs b/src/client.rs
index 172aa96..e6d7e75 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,4 +1,5 @@
-use crate::*;
+use crate::prelude::*;
+use async_trait::async_trait;
use std::{io, sync::Arc};
use tokio::net;
diff --git a/src/error.rs b/src/error.rs
index 7853f59..6dec6c8 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -1,4 +1,4 @@
-use crate::{CtlType, InPkt, PktType};
+use crate::prelude::*;
use num_enum::TryFromPrimitiveError;
use std::{fmt, io};
use tokio::sync::mpsc::error::SendError;
diff --git a/src/main.rs b/src/lib.rs
index 9a85183..f0a91fe 100644
--- a/src/main.rs
+++ b/src/lib.rs
@@ -2,53 +2,27 @@
#![feature(hash_drain_filter)]
#![feature(once_cell)]
mod client;
-pub mod error;
+mod error;
mod new;
mod recv;
mod send;
+pub use prelude::*;
+
use async_trait::async_trait;
-use byteorder::{BigEndian, WriteBytesExt};
-pub use client::{connect, Sender as Client};
-pub use new::new;
use num_enum::TryFromPrimitive;
-use pretty_hex::PrettyHex;
use std::{
+ cell::{Cell, OnceCell},
collections::HashMap,
- io::{self, Write},
- ops,
+ io, ops,
sync::Arc,
- time::Duration,
+ time::Instant,
};
use tokio::{
sync::{mpsc, watch, Mutex, RwLock},
task::JoinSet,
};
-pub const PROTO_ID: u32 = 0x4f457403;
-pub const UDP_PKT_SIZE: usize = 512;
-pub const NUM_CHANS: usize = 3;
-pub const REL_BUFFER: usize = 0x8000;
-pub const INIT_SEQNUM: u16 = 65500;
-pub const TIMEOUT: u64 = 30;
-pub const PING_TIMEOUT: u64 = 5;
-
-mod ticker_mod {
- #[macro_export]
- macro_rules! ticker {
- ($duration:expr, $close:expr, $body:block) => {
- let mut interval = tokio::time::interval($duration);
-
- while tokio::select!{
- _ = interval.tick() => true,
- _ = $close.changed() => false,
- } $body
- };
- }
-
- //pub(crate) use ticker;
-}
-
#[async_trait]
pub trait UdpSender: Send + Sync + 'static {
async fn send(&self, data: &[u8]) -> io::Result<()>;
@@ -87,13 +61,12 @@ pub enum CtlType {
#[derive(Debug)]
pub struct Pkt<T> {
- unrel: bool,
- chan: u8,
- data: T,
+ pub unrel: bool,
+ pub chan: u8,
+ pub data: T,
}
-pub type Error = error::Error;
-pub type InPkt = Result<Pkt<Vec<u8>>, Error>;
+pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
#[derive(Debug)]
struct Ack {
@@ -109,7 +82,7 @@ struct Chan {
}
#[derive(Debug)]
-pub struct RudpShare<S: UdpSender> {
+struct RudpShare<S: UdpSender> {
id: u16,
remote_id: RwLock<u16>,
chans: Vec<Mutex<Chan>>,
@@ -169,51 +142,53 @@ impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
}
}
-async fn example(tx: &RudpSender<Client>, rx: &mut RudpReceiver<Client>) -> io::Result<()> {
- // send hello packet
- let mut mtpkt = vec![];
- mtpkt.write_u16::<BigEndian>(2)?; // high level type
- mtpkt.write_u8(29)?; // serialize ver
- mtpkt.write_u16::<BigEndian>(0)?; // compression modes
- mtpkt.write_u16::<BigEndian>(40)?; // MinProtoVer
- mtpkt.write_u16::<BigEndian>(40)?; // MaxProtoVer
- mtpkt.write_u16::<BigEndian>(6)?; // player name length
- mtpkt.write(b"foobar")?; // player name
-
- tx.send(Pkt {
- unrel: true,
- chan: 1,
- data: &mtpkt,
- })
- .await?;
- // handle incoming packets
- while let Some(result) = rx.recv().await {
- match result {
- Ok(pkt) => {
- println!("{}", pkt.data.hex_dump());
- }
- Err(err) => eprintln!("Error: {}", err),
- }
- }
-
- Ok(())
+#[derive(Debug)]
+struct Split {
+ timestamp: Option<Instant>,
+ chunks: Vec<OnceCell<Vec<u8>>>,
+ got: usize,
}
-#[tokio::main]
-async fn main() -> io::Result<()> {
- let (tx, mut rx) = connect("127.0.0.1:30000").await?;
+struct RecvChan {
+ packets: Vec<Cell<Option<Vec<u8>>>>, // char ** 😛
+ splits: HashMap<u16, Split>,
+ seqnum: u16,
+ num: u8,
+}
- tokio::select! {
- _ = tokio::signal::ctrl_c() => println!("canceled"),
- res = example(&tx, &mut rx) => {
- res?;
- println!("disconnected");
- }
- }
+struct RecvWorker<R: UdpReceiver, S: UdpSender> {
+ share: Arc<RudpShare<S>>,
+ close: watch::Receiver<bool>,
+ chans: Arc<Vec<Mutex<RecvChan>>>,
+ pkt_tx: mpsc::UnboundedSender<InPkt>,
+ udp_rx: R,
+}
+
+mod prelude {
+ pub const PROTO_ID: u32 = 0x4f457403;
+ pub const UDP_PKT_SIZE: usize = 512;
+ pub const NUM_CHANS: usize = 3;
+ pub const REL_BUFFER: usize = 0x8000;
+ pub const INIT_SEQNUM: u16 = 65500;
+ pub const TIMEOUT: u64 = 30;
+ pub const PING_TIMEOUT: u64 = 5;
+
+ pub use super::{
+ client::{connect, Sender as Client},
+ error::Error,
+ new::new,
+ CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender,
+ };
- // close either the receiver or the sender
- // this shuts down associated tasks
- rx.close().await;
+ #[macro_export]
+ macro_rules! ticker {
+ ($duration:expr, $close:expr, $body:block) => {
+ let mut interval = tokio::time::interval($duration);
- Ok(())
+ while tokio::select!{
+ _ = interval.tick() => true,
+ _ = $close.changed() => false,
+ } $body
+ };
+ }
}
diff --git a/src/new.rs b/src/new.rs
index 9f820a9..b17f518 100644
--- a/src/new.rs
+++ b/src/new.rs
@@ -1,4 +1,9 @@
-use crate::*;
+use crate::{prelude::*, ticker, Chan, RecvWorker, RudpShare};
+use std::{collections::HashMap, io, sync::Arc, time::Duration};
+use tokio::{
+ sync::{mpsc, watch, Mutex, RwLock},
+ task::JoinSet,
+};
pub async fn new<S: UdpSender, R: UdpReceiver>(
id: u16,
@@ -33,7 +38,7 @@ pub async fn new<S: UdpSender, R: UdpReceiver>(
/*.build_task()
.name("recv")*/
.spawn(async move {
- let worker = recv::RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
+ let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
worker.run().await;
});
diff --git a/src/recv.rs b/src/recv.rs
index 2fabe3a..e3d3e6b 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -1,6 +1,6 @@
-use crate::{error::Error, *};
+use crate::{prelude::*, ticker, RecvChan, RecvWorker, RudpShare, Split};
use async_recursion::async_recursion;
-use byteorder::{BigEndian, ReadBytesExt};
+use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
cell::{Cell, OnceCell},
collections::HashMap,
@@ -9,7 +9,7 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
-use tokio::sync::{mpsc, Mutex};
+use tokio::sync::{mpsc, watch, Mutex};
fn to_seqnum(seqnum: u16) -> usize {
(seqnum as usize) & (REL_BUFFER - 1)
@@ -17,27 +17,6 @@ fn to_seqnum(seqnum: u16) -> usize {
type Result<T> = std::result::Result<T, Error>;
-struct Split {
- timestamp: Option<Instant>,
- chunks: Vec<OnceCell<Vec<u8>>>,
- got: usize,
-}
-
-struct RecvChan {
- packets: Vec<Cell<Option<Vec<u8>>>>, // char ** 😛
- splits: HashMap<u16, Split>,
- seqnum: u16,
- num: u8,
-}
-
-pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
- share: Arc<RudpShare<S>>,
- close: watch::Receiver<bool>,
- chans: Arc<Vec<Mutex<RecvChan>>>,
- pkt_tx: mpsc::UnboundedSender<InPkt>,
- udp_rx: R,
-}
-
impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
pub fn new(
udp_rx: R,
@@ -133,7 +112,6 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
) -> Result<()> {
use Error::*;
- // TODO: reset timeout
let mut cursor = io::Cursor::new(tokio::select! {
pkt = self.udp_rx.recv() => pkt?,
_ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)),
diff --git a/src/send.rs b/src/send.rs
index 20308e6..c41ad80 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -1,4 +1,6 @@
-use crate::*;
+use crate::{prelude::*, Ack, RudpShare};
+use byteorder::{BigEndian, WriteBytesExt};
+use std::io::{self, Write};
use tokio::sync::watch;
type AckResult = io::Result<Option<watch::Receiver<bool>>>;
@@ -50,6 +52,5 @@ impl<S: UdpSender> RudpShare<S> {
pub async fn send_raw(&self, data: &[u8]) -> io::Result<()> {
self.udp_tx.send(data).await
- // TODO: reset ping timeout
}
}