aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLizzy Fleckenstein <eliasfleckenstein@web.de>2023-02-18 03:03:40 +0100
committerLizzy Fleckenstein <eliasfleckenstein@web.de>2023-02-18 03:05:06 +0100
commit718e8618544c4cdde78138655305eee7c08058ee (patch)
treebb578698c985ebf8c9601f325d1fc90dc0153e46 /src
parent88ff69e7a8e90a4fa3929fa173ed93e99b60f37c (diff)
downloadmt_rudp-718e8618544c4cdde78138655305eee7c08058ee.tar.xz
Don't spawn tasks
Diffstat (limited to 'src')
-rw-r--r--src/client.rs10
-rw-r--r--src/common.rs73
-rw-r--r--src/error.rs7
-rw-r--r--src/lib.rs19
-rw-r--r--src/recv.rs293
-rw-r--r--src/send.rs22
-rw-r--r--src/share.rs116
7 files changed, 230 insertions, 310 deletions
diff --git a/src/client.rs b/src/client.rs
index c4922ec..56db92a 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -19,7 +19,7 @@ impl UdpSender for ToSrv {
#[async_trait]
impl UdpReceiver for FromSrv {
- async fn recv(&self) -> io::Result<Vec<u8>> {
+ async fn recv(&mut self) -> io::Result<Vec<u8>> {
let mut buffer = Vec::new();
buffer.resize(UDP_PKT_SIZE, 0);
@@ -30,7 +30,13 @@ impl UdpReceiver for FromSrv {
}
}
-pub async fn connect(addr: &str) -> io::Result<(RudpSender<ToSrv>, RudpReceiver<ToSrv>)> {
+pub struct RemoteSrv;
+impl UdpPeer for RemoteSrv {
+ type Sender = ToSrv;
+ type Receiver = FromSrv;
+}
+
+pub async fn connect(addr: &str) -> io::Result<(RudpSender<RemoteSrv>, RudpReceiver<RemoteSrv>)> {
let sock = Arc::new(net::UdpSocket::bind("0.0.0.0:0").await?);
sock.connect(addr).await?;
diff --git a/src/common.rs b/src/common.rs
index 4c0bc08..0ed08f3 100644
--- a/src/common.rs
+++ b/src/common.rs
@@ -1,9 +1,6 @@
-use super::*;
use async_trait::async_trait;
-use delegate::delegate;
use num_enum::TryFromPrimitive;
-use std::{borrow::Cow, io, sync::Arc};
-use tokio::sync::mpsc;
+use std::{borrow::Cow, fmt::Debug, io};
pub const PROTO_ID: u32 = 0x4f457403;
pub const UDP_PKT_SIZE: usize = 512;
@@ -14,13 +11,18 @@ pub const TIMEOUT: u64 = 30;
pub const PING_TIMEOUT: u64 = 5;
#[async_trait]
-pub trait UdpSender: Send + Sync + 'static {
+pub trait UdpSender: Send + Sync {
async fn send(&self, data: &[u8]) -> io::Result<()>;
}
#[async_trait]
-pub trait UdpReceiver: Send + Sync + 'static {
- async fn recv(&self) -> io::Result<Vec<u8>>;
+pub trait UdpReceiver: Send {
+ async fn recv(&mut self) -> io::Result<Vec<u8>>;
+}
+
+pub trait UdpPeer {
+ type Sender: UdpSender;
+ type Receiver: UdpReceiver;
}
#[derive(Debug, Copy, Clone, PartialEq)]
@@ -55,60 +57,3 @@ pub struct Pkt<'a> {
pub chan: u8,
pub data: Cow<'a, [u8]>,
}
-
-pub type InPkt = Result<Pkt<'static>, Error>;
-
-#[derive(Debug)]
-pub struct RudpReceiver<S: UdpSender> {
- pub(crate) share: Arc<RudpShare<S>>,
- pub(crate) pkt_rx: mpsc::UnboundedReceiver<InPkt>,
-}
-
-#[derive(Debug)]
-pub struct RudpSender<S: UdpSender> {
- pub(crate) share: Arc<RudpShare<S>>,
-}
-
-// derive(Clone) adds unwanted Clone trait bound to S parameter
-impl<S: UdpSender> Clone for RudpSender<S> {
- fn clone(&self) -> Self {
- Self {
- share: Arc::clone(&self.share),
- }
- }
-}
-
-macro_rules! impl_share {
- ($T:ident) => {
- impl<S: UdpSender> $T<S> {
- pub async fn peer_id(&self) -> u16 {
- self.share.id
- }
-
- pub async fn is_server(&self) -> bool {
- self.share.id == PeerID::Srv as u16
- }
-
- pub async fn close(self) {
- self.share.bomb.lock().await.defuse();
- self.share.close_tx.send(true).ok();
-
- let mut tasks = self.share.tasks.lock().await;
- while let Some(res) = tasks.join_next().await {
- res.ok(); // TODO: handle error (?)
- }
- }
- }
- };
-}
-
-impl_share!(RudpReceiver);
-impl_share!(RudpSender);
-
-impl<S: UdpSender> RudpReceiver<S> {
- delegate! {
- to self.pkt_rx {
- pub async fn recv(&mut self) -> Option<InPkt>;
- }
- }
-}
diff --git a/src/error.rs b/src/error.rs
index 7cfc057..28233e8 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -1,7 +1,6 @@
use super::*;
use num_enum::TryFromPrimitiveError;
use thiserror::Error;
-use tokio::sync::mpsc::error::SendError;
#[derive(Error, Debug)]
pub enum Error {
@@ -38,9 +37,3 @@ impl From<TryFromPrimitiveError<CtlType>> for Error {
Self::InvalidCtlType(err.number)
}
}
-
-impl From<SendError<InPkt>> for Error {
- fn from(_err: SendError<InPkt>) -> Self {
- Self::LocalDisco
- }
-}
diff --git a/src/lib.rs b/src/lib.rs
index a02eb20..e7a8ebe 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,6 +1,6 @@
#![feature(cursor_remaining)]
#![feature(hash_drain_filter)]
-#![feature(once_cell)]
+
mod client;
mod common;
mod error;
@@ -11,21 +11,6 @@ mod share;
pub use client::*;
pub use common::*;
pub use error::*;
-use recv::*;
+pub use recv::*;
pub use send::*;
pub use share::*;
-pub use ticker_mod::*;
-
-mod ticker_mod {
- #[macro_export]
- macro_rules! ticker {
- ($duration:expr, $close:expr, $body:block) => {
- let mut interval = tokio::time::interval($duration);
-
- while tokio::select!{
- _ = interval.tick() => true,
- _ = $close.changed() => false,
- } $body
- };
- }
-}
diff --git a/src/recv.rs b/src/recv.rs
index fd6f299..34e273c 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -3,14 +3,16 @@ use async_recursion::async_recursion;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
borrow::Cow,
- cell::OnceCell,
- collections::HashMap,
+ collections::{HashMap, VecDeque},
io,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
-use tokio::sync::{mpsc, watch, Mutex};
+use tokio::{
+ sync::watch,
+ time::{interval, sleep, Interval, Sleep},
+};
fn to_seqnum(seqnum: u16) -> usize {
(seqnum as usize) & (REL_BUFFER - 1)
@@ -21,7 +23,7 @@ type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
struct Split {
timestamp: Option<Instant>,
- chunks: Vec<OnceCell<Vec<u8>>>,
+ chunks: Vec<Option<Vec<u8>>>,
got: usize,
}
@@ -29,61 +31,82 @@ struct RecvChan {
packets: Vec<Option<Vec<u8>>>, // char ** 😛
splits: HashMap<u16, Split>,
seqnum: u16,
- num: u8,
}
-pub(crate) struct RecvWorker<R: UdpReceiver, S: UdpSender> {
- share: Arc<RudpShare<S>>,
+pub struct RudpReceiver<P: UdpPeer> {
+ pub(crate) share: Arc<RudpShare<P>>,
+ chans: [RecvChan; NUM_CHANS],
+ udp: P::Receiver,
close: watch::Receiver<bool>,
- chans: Arc<Vec<Mutex<RecvChan>>>,
- pkt_tx: mpsc::UnboundedSender<InPkt>,
- udp_rx: R,
+ closed: bool,
+ resend: Interval,
+ ping: Interval,
+ cleanup: Interval,
+ timeout: Pin<Box<Sleep>>,
+ queue: VecDeque<Result<Pkt<'static>>>,
}
-impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
- pub fn new(
- udp_rx: R,
- share: Arc<RudpShare<S>>,
+impl<P: UdpPeer> RudpReceiver<P> {
+ pub(crate) fn new(
+ udp: P::Receiver,
+ share: Arc<RudpShare<P>>,
close: watch::Receiver<bool>,
- pkt_tx: mpsc::UnboundedSender<InPkt>,
) -> Self {
Self {
- udp_rx,
+ udp,
share,
close,
- pkt_tx,
- chans: Arc::new(
- (0..NUM_CHANS as u8)
- .map(|num| {
- Mutex::new(RecvChan {
- num,
- packets: (0..REL_BUFFER).map(|_| None).collect(),
- seqnum: INIT_SEQNUM,
- splits: HashMap::new(),
- })
- })
- .collect(),
- ),
+ closed: false,
+ resend: interval(Duration::from_millis(500)),
+ ping: interval(Duration::from_secs(PING_TIMEOUT)),
+ cleanup: interval(Duration::from_secs(TIMEOUT)),
+ timeout: Box::pin(sleep(Duration::from_secs(TIMEOUT))),
+ chans: std::array::from_fn(|_| RecvChan {
+ packets: (0..REL_BUFFER).map(|_| None).collect(),
+ seqnum: INIT_SEQNUM,
+ splits: HashMap::new(),
+ }),
+ queue: VecDeque::new(),
+ }
+ }
+
+ fn handle_err(&mut self, res: Result<()>) -> Result<()> {
+ use Error::*;
+
+ match res {
+ Err(RemoteDisco(_)) | Err(LocalDisco) => {
+ self.closed = true;
+ res
+ }
+ Ok(_) => res,
+ Err(e) => {
+ self.queue.push_back(Err(e));
+ Ok(())
+ }
}
}
- pub async fn run(&self) {
+ pub async fn recv(&mut self) -> Option<Result<Pkt<'static>>> {
use Error::*;
- let cleanup_chans = Arc::clone(&self.chans);
- let mut cleanup_close = self.close.clone();
- self.share
- .tasks
- .lock()
- .await
- /*.build_task()
- .name("cleanup_splits")*/
- .spawn(async move {
- let timeout = Duration::from_secs(TIMEOUT);
-
- ticker!(timeout, cleanup_close, {
- for chan_mtx in cleanup_chans.iter() {
- let mut chan = chan_mtx.lock().await;
+ if self.closed {
+ return None;
+ }
+
+ loop {
+ if let Some(x) = self.queue.pop_front() {
+ return Some(x);
+ }
+
+ tokio::select! {
+ _ = self.close.changed() => {
+ self.closed = true;
+ return Some(Err(LocalDisco));
+ },
+ _ = self.cleanup.tick() => {
+ let timeout = Duration::from_secs(TIMEOUT);
+
+ for chan in self.chans.iter_mut() {
chan.splits = chan
.splits
.drain_filter(
@@ -91,58 +114,48 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
)
.collect();
}
- });
- });
-
- let mut close = self.close.clone();
- let timeout = tokio::time::sleep(Duration::from_secs(TIMEOUT));
- tokio::pin!(timeout);
-
- loop {
- if let Err(e) = self.handle(self.recv_pkt(&mut close, timeout.as_mut()).await) {
- // TODO: figure out whether this is a good idea
- if let RemoteDisco(to) = e {
- self.pkt_tx.send(Err(RemoteDisco(to))).ok();
+ },
+ _ = self.resend.tick() => {
+ for chan in self.share.chans.iter() {
+ for (_, ack) in chan.lock().await.acks.iter() {
+ self.share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
+ }
+ }
+ },
+ _ = self.ping.tick() => {
+ self.share
+ .send(
+ PktType::Ctl,
+ Pkt {
+ chan: 0,
+ unrel: false,
+ data: Cow::Borrowed(&[CtlType::Ping as u8]),
+ },
+ )
+ .await
+ .ok();
+ }
+ _ = &mut self.timeout => {
+ self.closed = true;
+ return Some(Err(RemoteDisco(true)));
+ },
+ pkt = self.udp.recv() => {
+ if let Err(e) = self.handle_pkt(pkt).await {
+ return Some(Err(e));
+ }
}
-
- #[allow(clippy::single_match)]
- match e {
- // anon5's mt notifies the peer on timeout, C++ MT does not
- LocalDisco /*| RemoteDisco(true)*/ => drop(
- self.share
- .send(
- PktType::Ctl,
- Pkt {
- unrel: true,
- chan: 0,
- data: Cow::Borrowed(&[CtlType::Disco as u8]),
- },
- )
- .await
- .ok(),
- ),
- _ => {}
- }
-
- break;
}
}
}
- async fn recv_pkt(
- &self,
- close: &mut watch::Receiver<bool>,
- timeout: Pin<&mut tokio::time::Sleep>,
- ) -> Result<()> {
+ async fn handle_pkt(&mut self, pkt: io::Result<Vec<u8>>) -> Result<()> {
use Error::*;
- let mut cursor = io::Cursor::new(tokio::select! {
- pkt = self.udp_rx.recv() => pkt?,
- _ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)),
- _ = close.changed() => return Err(LocalDisco),
- });
+ let mut cursor = io::Cursor::new(pkt?);
- timeout.reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
+ self.timeout
+ .as_mut()
+ .reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
let proto_id = cursor.read_u32::<BigEndian>()?;
if proto_id != PROTO_ID {
@@ -151,38 +164,34 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
let _peer_id = cursor.read_u16::<BigEndian>()?;
- let n_chan = cursor.read_u8()?;
- let mut chan = self
- .chans
- .get(n_chan as usize)
- .ok_or(InvalidChannel(n_chan))?
- .lock()
- .await;
+ let chan = cursor.read_u8()?;
+ if chan >= NUM_CHANS as u8 {
+ return Err(InvalidChannel(chan));
+ }
+
+ let res = self.process_pkt(cursor, true, chan).await;
+ self.handle_err(res)?;
- self.process_pkt(cursor, true, &mut chan).await
+ Ok(())
}
#[async_recursion]
async fn process_pkt(
- &self,
+ &mut self,
mut cursor: io::Cursor<Vec<u8>>,
unrel: bool,
- chan: &mut RecvChan,
+ chan: u8,
) -> Result<()> {
use Error::*;
+ let ch = chan as usize;
match cursor.read_u8()?.try_into()? {
PktType::Ctl => match cursor.read_u8()?.try_into()? {
CtlType::Ack => {
// println!("Ack");
let seqnum = cursor.read_u16::<BigEndian>()?;
- if let Some(ack) = self.share.chans[chan.num as usize]
- .lock()
- .await
- .acks
- .remove(&seqnum)
- {
+ if let Some(ack) = self.share.chans[ch].lock().await.acks.remove(&seqnum) {
ack.tx.send(true).ok();
}
}
@@ -208,11 +217,11 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
PktType::Orig => {
// println!("Orig");
- self.pkt_tx.send(Ok(Pkt {
- chan: chan.num,
+ self.queue.push_back(Ok(Pkt {
+ chan,
unrel,
data: Cow::Owned(cursor.remaining_slice().into()),
- }))?;
+ }));
}
PktType::Split => {
// println!("Split");
@@ -221,11 +230,14 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
let chunk_index = cursor.read_u16::<BigEndian>()? as usize;
let chunk_count = cursor.read_u16::<BigEndian>()? as usize;
- let mut split = chan.splits.entry(seqnum).or_insert_with(|| Split {
- got: 0,
- chunks: (0..chunk_count).map(|_| OnceCell::new()).collect(),
- timestamp: None,
- });
+ let mut split = self.chans[ch]
+ .splits
+ .entry(seqnum)
+ .or_insert_with(|| Split {
+ got: 0,
+ chunks: (0..chunk_count).map(|_| None).collect(),
+ timestamp: None,
+ });
if split.chunks.len() != chunk_count {
return Err(InvalidChunkCount(split.chunks.len(), chunk_count));
@@ -233,10 +245,10 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
if split
.chunks
- .get(chunk_index)
+ .get_mut(chunk_index)
.ok_or(InvalidChunkIndex(chunk_index, chunk_count))?
- .set(cursor.remaining_slice().into())
- .is_ok()
+ .replace(cursor.remaining_slice().into())
+ .is_none()
{
split.got += 1;
}
@@ -244,25 +256,29 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
split.timestamp = if unrel { Some(Instant::now()) } else { None };
if split.got == chunk_count {
- self.pkt_tx.send(Ok(Pkt {
- chan: chan.num,
+ let split = self.chans[ch].splits.remove(&seqnum).unwrap();
+
+ self.queue.push_back(Ok(Pkt {
+ chan,
unrel,
data: split
.chunks
- .iter()
- .flat_map(|chunk| chunk.get().unwrap().iter())
- .copied()
- .collect(),
- }))?;
-
- chan.splits.remove(&seqnum);
+ .into_iter()
+ .map(|x| x.unwrap())
+ .reduce(|mut a, mut b| {
+ a.append(&mut b);
+ a
+ })
+ .unwrap_or_default()
+ .into(),
+ }));
}
}
PktType::Rel => {
// println!("Rel");
let seqnum = cursor.read_u16::<BigEndian>()?;
- chan.packets[to_seqnum(seqnum)].replace(cursor.remaining_slice().into());
+ self.chans[ch].packets[to_seqnum(seqnum)].replace(cursor.remaining_slice().into());
let mut ack_data = Vec::with_capacity(3);
ack_data.write_u8(CtlType::Ack as u8)?;
@@ -272,35 +288,22 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
.send(
PktType::Ctl,
Pkt {
+ chan,
unrel: true,
- chan: chan.num,
- data: Cow::Borrowed(&ack_data),
+ data: ack_data.into(),
},
)
.await?;
- fn next_pkt(chan: &mut RecvChan) -> Option<Vec<u8>> {
- chan.packets[to_seqnum(chan.seqnum)].take()
- }
-
- while let Some(pkt) = next_pkt(chan) {
- self.handle(self.process_pkt(io::Cursor::new(pkt), false, chan).await)?;
- chan.seqnum = chan.seqnum.overflowing_add(1).0;
+ let next_pkt = |chan: &mut RecvChan| chan.packets[to_seqnum(chan.seqnum)].take();
+ while let Some(pkt) = next_pkt(&mut self.chans[ch]) {
+ let res = self.process_pkt(io::Cursor::new(pkt), false, chan).await;
+ self.handle_err(res)?;
+ self.chans[ch].seqnum = self.chans[ch].seqnum.overflowing_add(1).0;
}
}
}
Ok(())
}
-
- fn handle(&self, res: Result<()>) -> Result<()> {
- use Error::*;
-
- match res {
- Ok(v) => Ok(v),
- Err(RemoteDisco(to)) => Err(RemoteDisco(to)),
- Err(LocalDisco) => Err(LocalDisco),
- Err(e) => Ok(self.pkt_tx.send(Err(e))?),
- }
- }
}
diff --git a/src/send.rs b/src/send.rs
index a3a7f03..2c449e1 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -1,17 +1,33 @@
use super::*;
use byteorder::{BigEndian, WriteBytesExt};
-use std::io::{self, Write};
+use std::{
+ io::{self, Write},
+ sync::Arc,
+};
use tokio::sync::watch;
pub type AckResult = io::Result<Option<watch::Receiver<bool>>>;
-impl<S: UdpSender> RudpSender<S> {
+pub struct RudpSender<P: UdpPeer> {
+ pub(crate) share: Arc<RudpShare<P>>,
+}
+
+// derive(Clone) adds unwanted Clone trait bound to P parameter
+impl<P: UdpPeer> Clone for RudpSender<P> {
+ fn clone(&self) -> Self {
+ Self {
+ share: Arc::clone(&self.share),
+ }
+ }
+}
+
+impl<P: UdpPeer> RudpSender<P> {
pub async fn send(&self, pkt: Pkt<'_>) -> AckResult {
self.share.send(PktType::Orig, pkt).await // TODO: splits
}
}
-impl<S: UdpSender> RudpShare<S> {
+impl<P: UdpPeer> RudpShare<P> {
pub async fn send(&self, tp: PktType, pkt: Pkt<'_>) -> AckResult {
let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + 2 + 1 + pkt.data.len());
buf.write_u32::<BigEndian>(PROTO_ID)?;
diff --git a/src/share.rs b/src/share.rs
index a2afc4c..02e37b2 100644
--- a/src/share.rs
+++ b/src/share.rs
@@ -1,10 +1,6 @@
use super::*;
-use drop_bomb::DropBomb;
-use std::{borrow::Cow, collections::HashMap, io, sync::Arc, time::Duration};
-use tokio::{
- sync::{mpsc, watch, Mutex, RwLock},
- task::JoinSet,
-};
+use std::{borrow::Cow, collections::HashMap, io, sync::Arc};
+use tokio::sync::{watch, Mutex, RwLock};
#[derive(Debug)]
pub(crate) struct Ack {
@@ -20,96 +16,72 @@ pub(crate) struct Chan {
}
#[derive(Debug)]
-pub(crate) struct RudpShare<S: UdpSender> {
+pub(crate) struct RudpShare<P: UdpPeer> {
pub(crate) id: u16,
pub(crate) remote_id: RwLock<u16>,
- pub(crate) chans: Vec<Mutex<Chan>>,
- pub(crate) udp_tx: S,
- pub(crate) close_tx: watch::Sender<bool>,
- pub(crate) tasks: Mutex<JoinSet<()>>,
- pub(crate) bomb: Mutex<DropBomb>,
+ pub(crate) chans: [Mutex<Chan>; NUM_CHANS],
+ pub(crate) udp_tx: P::Sender,
+ pub(crate) close: watch::Sender<bool>,
}
-pub async fn new<S: UdpSender, R: UdpReceiver>(
+pub async fn new<P: UdpPeer>(
id: u16,
remote_id: u16,
- udp_tx: S,
- udp_rx: R,
-) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
- let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
+ udp_tx: P::Sender,
+ udp_rx: P::Receiver,
+) -> io::Result<(RudpSender<P>, RudpReceiver<P>)> {
let (close_tx, close_rx) = watch::channel(false);
let share = Arc::new(RudpShare {
id,
remote_id: RwLock::new(remote_id),
udp_tx,
- close_tx,
- chans: (0..NUM_CHANS)
- .map(|_| {
- Mutex::new(Chan {
- acks: HashMap::new(),
- seqnum: INIT_SEQNUM,
- })
+ close: close_tx,
+ chans: std::array::from_fn(|_| {
+ Mutex::new(Chan {
+ acks: HashMap::new(),
+ seqnum: INIT_SEQNUM,
})
- .collect(),
- tasks: Mutex::new(JoinSet::new()),
- bomb: Mutex::new(DropBomb::new("rudp connection must be explicitly closed")),
+ }),
});
- let mut tasks = share.tasks.lock().await;
+ Ok((
+ RudpSender {
+ share: Arc::clone(&share),
+ },
+ RudpReceiver::new(udp_rx, share, close_rx),
+ ))
+}
- let recv_share = Arc::clone(&share);
- let recv_close = close_rx.clone();
- tasks
- /*.build_task()
- .name("recv")*/
- .spawn(async move {
- let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
- worker.run().await;
- });
+macro_rules! impl_share {
+ ($T:ident) => {
+ impl<P: UdpPeer> $T<P> {
+ pub async fn peer_id(&self) -> u16 {
+ self.share.id
+ }
- let resend_share = Arc::clone(&share);
- let mut resend_close = close_rx.clone();
- tasks
- /*.build_task()
- .name("resend")*/
- .spawn(async move {
- ticker!(Duration::from_millis(500), resend_close, {
- for chan in resend_share.chans.iter() {
- for (_, ack) in chan.lock().await.acks.iter() {
- resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
- }
- }
- });
- });
+ pub async fn is_server(&self) -> bool {
+ self.share.id == PeerID::Srv as u16
+ }
- let ping_share = Arc::clone(&share);
- let mut ping_close = close_rx.clone();
- tasks
- /*.build_task()
- .name("ping")*/
- .spawn(async move {
- ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
- ping_share
+ pub async fn close(self) {
+ self.share.close.send(true).ok(); // FIXME: handle err?
+
+ self.share
.send(
PktType::Ctl,
Pkt {
+ unrel: true,
chan: 0,
- unrel: false,
- data: Cow::Borrowed(&[CtlType::Ping as u8]),
+ data: Cow::Borrowed(&[CtlType::Disco as u8]),
},
)
.await
.ok();
- });
- });
-
- drop(tasks);
-
- Ok((
- RudpSender {
- share: Arc::clone(&share),
- },
- RudpReceiver { share, pkt_rx },
- ))
+ }
+ }
+ };
}
+
+impl_share!(RudpReceiver);
+impl_share!(RudpSender);