aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/client.rs30
-rw-r--r--src/common.rs104
-rw-r--r--src/error.rs2
-rw-r--r--src/lib.rs172
-rw-r--r--src/recv.rs24
-rw-r--r--src/send.rs4
-rw-r--r--src/share.rs (renamed from src/new.rs)25
7 files changed, 177 insertions, 184 deletions
diff --git a/src/client.rs b/src/client.rs
index 6785a1f..c4922ec 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,48 +1,44 @@
-use crate::prelude::*;
+use super::*;
use async_trait::async_trait;
use std::{io, sync::Arc};
use tokio::net;
-pub struct Sender {
- sock: Arc<net::UdpSocket>,
-}
+#[derive(Debug)]
+pub struct ToSrv(Arc<net::UdpSocket>);
+
+#[derive(Debug)]
+pub struct FromSrv(Arc<net::UdpSocket>);
#[async_trait]
-impl UdpSender for Sender {
+impl UdpSender for ToSrv {
async fn send(&self, data: &[u8]) -> io::Result<()> {
- self.sock.send(data).await?;
+ self.0.send(data).await?;
Ok(())
}
}
-pub struct Receiver {
- sock: Arc<net::UdpSocket>,
-}
-
#[async_trait]
-impl UdpReceiver for Receiver {
+impl UdpReceiver for FromSrv {
async fn recv(&self) -> io::Result<Vec<u8>> {
let mut buffer = Vec::new();
buffer.resize(UDP_PKT_SIZE, 0);
- let len = self.sock.recv(&mut buffer).await?;
+ let len = self.0.recv(&mut buffer).await?;
buffer.truncate(len);
Ok(buffer)
}
}
-pub async fn connect(addr: &str) -> io::Result<(RudpSender<Sender>, RudpReceiver<Sender>)> {
+pub async fn connect(addr: &str) -> io::Result<(RudpSender<ToSrv>, RudpReceiver<ToSrv>)> {
let sock = Arc::new(net::UdpSocket::bind("0.0.0.0:0").await?);
sock.connect(addr).await?;
new(
PeerID::Srv as u16,
PeerID::Nil as u16,
- Sender {
- sock: Arc::clone(&sock),
- },
- Receiver { sock },
+ ToSrv(Arc::clone(&sock)),
+ FromSrv(sock),
)
.await
}
diff --git a/src/common.rs b/src/common.rs
new file mode 100644
index 0000000..797ccd1
--- /dev/null
+++ b/src/common.rs
@@ -0,0 +1,104 @@
+use super::*;
+use async_trait::async_trait;
+use delegate::delegate;
+use num_enum::TryFromPrimitive;
+use std::{io, sync::Arc};
+use tokio::sync::mpsc;
+
+pub const PROTO_ID: u32 = 0x4f457403;
+pub const UDP_PKT_SIZE: usize = 512;
+pub const NUM_CHANS: usize = 3;
+pub const REL_BUFFER: usize = 0x8000;
+pub const INIT_SEQNUM: u16 = 65500;
+pub const TIMEOUT: u64 = 30;
+pub const PING_TIMEOUT: u64 = 5;
+
+#[async_trait]
+pub trait UdpSender: Send + Sync + 'static {
+ async fn send(&self, data: &[u8]) -> io::Result<()>;
+}
+
+#[async_trait]
+pub trait UdpReceiver: Send + Sync + 'static {
+ async fn recv(&self) -> io::Result<Vec<u8>>;
+}
+
+#[derive(Debug, Copy, Clone, PartialEq)]
+#[repr(u16)]
+pub enum PeerID {
+ Nil = 0,
+ Srv,
+ CltMin,
+}
+
+#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
+#[repr(u8)]
+pub enum PktType {
+ Ctl = 0,
+ Orig,
+ Split,
+ Rel,
+}
+
+#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
+#[repr(u8)]
+pub enum CtlType {
+ Ack = 0,
+ SetPeerID,
+ Ping,
+ Disco,
+}
+
+#[derive(Debug)]
+pub struct Pkt<T> {
+ pub unrel: bool,
+ pub chan: u8,
+ pub data: T,
+}
+
+pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
+
+#[derive(Debug)]
+pub struct RudpReceiver<S: UdpSender> {
+ pub(crate) share: Arc<RudpShare<S>>,
+ pub(crate) pkt_rx: mpsc::UnboundedReceiver<InPkt>,
+}
+
+#[derive(Debug)]
+pub struct RudpSender<S: UdpSender> {
+ pub(crate) share: Arc<RudpShare<S>>,
+}
+
+macro_rules! impl_share {
+ ($T:ident) => {
+ impl<S: UdpSender> $T<S> {
+ pub async fn peer_id(&self) -> u16 {
+ self.share.id
+ }
+
+ pub async fn is_server(&self) -> bool {
+ self.share.id == PeerID::Srv as u16
+ }
+
+ pub async fn close(self) {
+ self.share.close_tx.send(true).ok();
+
+ let mut tasks = self.share.tasks.lock().await;
+ while let Some(res) = tasks.join_next().await {
+ res.ok(); // TODO: handle error (?)
+ }
+ }
+ }
+ };
+}
+
+impl_share!(RudpReceiver);
+impl_share!(RudpSender);
+
+impl<S: UdpSender> RudpReceiver<S> {
+ delegate! {
+ to self.pkt_rx {
+ pub async fn recv(&mut self) -> Option<InPkt>;
+ }
+ }
+}
diff --git a/src/error.rs b/src/error.rs
index bac843a..7cfc057 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -1,4 +1,4 @@
-use crate::prelude::*;
+use super::*;
use num_enum::TryFromPrimitiveError;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
diff --git a/src/lib.rs b/src/lib.rs
index 76f0311..a02eb20 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -2,173 +2,21 @@
#![feature(hash_drain_filter)]
#![feature(once_cell)]
mod client;
+mod common;
mod error;
-mod new;
mod recv;
mod send;
+mod share;
-pub use prelude::*;
-
-use async_trait::async_trait;
-use delegate::delegate;
-use num_enum::TryFromPrimitive;
-use std::{cell::OnceCell, collections::HashMap, io, sync::Arc, time::Instant};
-use tokio::{
- sync::{mpsc, watch, Mutex, RwLock},
- task::JoinSet,
-};
-
-#[async_trait]
-pub trait UdpSender: Send + Sync + 'static {
- async fn send(&self, data: &[u8]) -> io::Result<()>;
-}
-
-#[async_trait]
-pub trait UdpReceiver: Send + Sync + 'static {
- async fn recv(&self) -> io::Result<Vec<u8>>;
-}
-
-#[derive(Debug, Copy, Clone, PartialEq)]
-#[repr(u16)]
-pub enum PeerID {
- Nil = 0,
- Srv,
- CltMin,
-}
-
-#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
-#[repr(u8)]
-pub enum PktType {
- Ctl = 0,
- Orig,
- Split,
- Rel,
-}
-
-#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
-#[repr(u8)]
-pub enum CtlType {
- Ack = 0,
- SetPeerID,
- Ping,
- Disco,
-}
-
-#[derive(Debug)]
-pub struct Pkt<T> {
- pub unrel: bool,
- pub chan: u8,
- pub data: T,
-}
-
-pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
-
-#[derive(Debug)]
-struct Ack {
- tx: watch::Sender<bool>,
- rx: watch::Receiver<bool>,
- data: Vec<u8>,
-}
-
-#[derive(Debug)]
-struct Chan {
- acks: HashMap<u16, Ack>,
- seqnum: u16,
-}
-
-#[derive(Debug)]
-struct RudpShare<S: UdpSender> {
- id: u16,
- remote_id: RwLock<u16>,
- chans: Vec<Mutex<Chan>>,
- udp_tx: S,
- close_tx: watch::Sender<bool>,
- tasks: Mutex<JoinSet<()>>,
-}
-
-#[derive(Debug)]
-pub struct RudpReceiver<S: UdpSender> {
- share: Arc<RudpShare<S>>,
- pkt_rx: mpsc::UnboundedReceiver<InPkt>,
-}
-
-#[derive(Debug)]
-pub struct RudpSender<S: UdpSender> {
- share: Arc<RudpShare<S>>,
-}
-
-macro_rules! impl_share {
- ($T:ident) => {
- impl<S: UdpSender> $T<S> {
- pub async fn peer_id(&self) -> u16 {
- self.share.id
- }
-
- pub async fn is_server(&self) -> bool {
- self.share.id == PeerID::Srv as u16
- }
-
- pub async fn close(self) {
- self.share.close_tx.send(true).ok();
-
- let mut tasks = self.share.tasks.lock().await;
- while let Some(res) = tasks.join_next().await {
- res.ok(); // TODO: handle error (?)
- }
- }
- }
- };
-}
-
-impl_share!(RudpReceiver);
-impl_share!(RudpSender);
-
-impl<S: UdpSender> RudpReceiver<S> {
- delegate! {
- to self.pkt_rx {
- pub async fn recv(&mut self) -> Option<InPkt>;
- }
- }
-}
-
-#[derive(Debug)]
-struct Split {
- timestamp: Option<Instant>,
- chunks: Vec<OnceCell<Vec<u8>>>,
- got: usize,
-}
-
-struct RecvChan {
- packets: Vec<Option<Vec<u8>>>, // char ** 😛
- splits: HashMap<u16, Split>,
- seqnum: u16,
- num: u8,
-}
-
-struct RecvWorker<R: UdpReceiver, S: UdpSender> {
- share: Arc<RudpShare<S>>,
- close: watch::Receiver<bool>,
- chans: Arc<Vec<Mutex<RecvChan>>>,
- pkt_tx: mpsc::UnboundedSender<InPkt>,
- udp_rx: R,
-}
-
-mod prelude {
- pub const PROTO_ID: u32 = 0x4f457403;
- pub const UDP_PKT_SIZE: usize = 512;
- pub const NUM_CHANS: usize = 3;
- pub const REL_BUFFER: usize = 0x8000;
- pub const INIT_SEQNUM: u16 = 65500;
- pub const TIMEOUT: u64 = 30;
- pub const PING_TIMEOUT: u64 = 5;
-
- pub use super::{
- client::{connect, Sender as Client},
- error::Error,
- new::new,
- CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender,
- };
+pub use client::*;
+pub use common::*;
+pub use error::*;
+use recv::*;
+pub use send::*;
+pub use share::*;
+pub use ticker_mod::*;
+mod ticker_mod {
#[macro_export]
macro_rules! ticker {
($duration:expr, $close:expr, $body:block) => {
diff --git a/src/recv.rs b/src/recv.rs
index a88426f..572b17e 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -1,4 +1,4 @@
-use crate::{prelude::*, ticker, RecvChan, RecvWorker, RudpShare, Split};
+use super::*;
use async_recursion::async_recursion;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
@@ -17,6 +17,28 @@ fn to_seqnum(seqnum: u16) -> usize {
type Result<T> = std::result::Result<T, Error>;
+#[derive(Debug)]
+struct Split {
+ timestamp: Option<Instant>,
+ chunks: Vec<OnceCell<Vec<u8>>>,
+ got: usize,
+}
+
+struct RecvChan {
+ packets: Vec<Option<Vec<u8>>>, // char ** 😛
+ splits: HashMap<u16, Split>,
+ seqnum: u16,
+ num: u8,
+}
+
+pub(crate) struct RecvWorker<R: UdpReceiver, S: UdpSender> {
+ share: Arc<RudpShare<S>>,
+ close: watch::Receiver<bool>,
+ chans: Arc<Vec<Mutex<RecvChan>>>,
+ pkt_tx: mpsc::UnboundedSender<InPkt>,
+ udp_rx: R,
+}
+
impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
pub fn new(
udp_rx: R,
diff --git a/src/send.rs b/src/send.rs
index 0bbce47..e0c2fa3 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -1,9 +1,9 @@
-use crate::{prelude::*, Ack, RudpShare};
+use super::*;
use byteorder::{BigEndian, WriteBytesExt};
use std::io::{self, Write};
use tokio::sync::watch;
-type AckResult = io::Result<Option<watch::Receiver<bool>>>;
+pub type AckResult = io::Result<Option<watch::Receiver<bool>>>;
impl<S: UdpSender> RudpSender<S> {
pub async fn send(&self, pkt: Pkt<&[u8]>) -> AckResult {
diff --git a/src/new.rs b/src/share.rs
index b17f518..e0d2d2b 100644
--- a/src/new.rs
+++ b/src/share.rs
@@ -1,10 +1,33 @@
-use crate::{prelude::*, ticker, Chan, RecvWorker, RudpShare};
+use super::*;
use std::{collections::HashMap, io, sync::Arc, time::Duration};
use tokio::{
sync::{mpsc, watch, Mutex, RwLock},
task::JoinSet,
};
+#[derive(Debug)]
+pub(crate) struct Ack {
+ pub(crate) tx: watch::Sender<bool>,
+ pub(crate) rx: watch::Receiver<bool>,
+ pub(crate) data: Vec<u8>,
+}
+
+#[derive(Debug)]
+pub(crate) struct Chan {
+ pub(crate) acks: HashMap<u16, Ack>,
+ pub(crate) seqnum: u16,
+}
+
+#[derive(Debug)]
+pub(crate) struct RudpShare<S: UdpSender> {
+ pub(crate) id: u16,
+ pub(crate) remote_id: RwLock<u16>,
+ pub(crate) chans: Vec<Mutex<Chan>>,
+ pub(crate) udp_tx: S,
+ pub(crate) close_tx: watch::Sender<bool>,
+ pub(crate) tasks: Mutex<JoinSet<()>>,
+}
+
pub async fn new<S: UdpSender, R: UdpReceiver>(
id: u16,
remote_id: u16,