diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 38 |
1 files changed, 20 insertions, 18 deletions
diff --git a/src/main.rs b/src/main.rs index aadb5cc..241d324 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ #![feature(yeet_expr)] #![feature(cursor_remaining)] #![feature(hash_drain_filter)] +#![feature(async_fn_in_trait)] mod client; pub mod error; mod recv_worker; @@ -8,12 +9,13 @@ mod recv_worker; use byteorder::{BigEndian, WriteBytesExt}; pub use client::{connect, Sender as Client}; use num_enum::TryFromPrimitive; +use std::future::Future; use std::{ io::{self, Write}, ops, - sync::{mpsc, Arc}, - thread, + sync::Arc, }; +use tokio::sync::mpsc; pub const PROTO_ID: u32 = 0x4f457403; pub const UDP_PKT_SIZE: usize = 512; @@ -23,11 +25,11 @@ pub const INIT_SEQNUM: u16 = 65500; pub const TIMEOUT: u64 = 30; pub trait UdpSender: Send + Sync + 'static { - fn send(&self, data: Vec<u8>) -> io::Result<()>; + async fn send(&self, data: Vec<u8>) -> io::Result<()>; } pub trait UdpReceiver: Send + Sync + 'static { - fn recv(&self) -> io::Result<Vec<u8>>; + async fn recv(&self) -> io::Result<Vec<u8>>; } #[derive(Debug, Copy, Clone)] @@ -79,7 +81,7 @@ pub struct RudpShare<S: UdpSender> { #[derive(Debug)] pub struct RudpReceiver<S: UdpSender> { share: Arc<RudpShare<S>>, - pkt_rx: mpsc::Receiver<InPkt>, + pkt_rx: mpsc::UnboundedReceiver<InPkt>, } #[derive(Debug)] @@ -88,7 +90,7 @@ pub struct RudpSender<S: UdpSender> { } impl<S: UdpSender> RudpShare<S> { - pub fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> io::Result<()> { + pub async fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> io::Result<()> { let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len()); buf.write_u32::<BigEndian>(PROTO_ID)?; buf.write_u16::<BigEndian>(self.remote_id)?; @@ -96,20 +98,20 @@ impl<S: UdpSender> RudpShare<S> { buf.write_u8(tp as u8)?; buf.write(pkt.data)?; - self.udp_tx.send(buf)?; + self.udp_tx.send(buf).await?; Ok(()) } } impl<S: UdpSender> RudpSender<S> { - pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> { - self.share.send(PktType::Orig, pkt) // TODO + pub async fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> { + self.share.send(PktType::Orig, pkt).await // TODO } } impl<S: UdpSender> ops::Deref for RudpReceiver<S> { - type Target = mpsc::Receiver<InPkt>; + type Target = mpsc::UnboundedReceiver<InPkt>; fn deref(&self) -> &Self::Target { &self.pkt_rx @@ -122,7 +124,7 @@ pub fn new<S: UdpSender, R: UdpReceiver>( udp_tx: S, udp_rx: R, ) -> (RudpSender<S>, RudpReceiver<S>) { - let (pkt_tx, pkt_rx) = mpsc::channel(); + let (pkt_tx, pkt_rx) = mpsc::unbounded_channel(); let share = Arc::new(RudpShare { id, @@ -132,9 +134,7 @@ pub fn new<S: UdpSender, R: UdpReceiver>( }); let recv_share = Arc::clone(&share); - thread::spawn(|| { - recv_worker::RecvWorker::new(udp_rx, recv_share, pkt_tx).run(); - }); + tokio::spawn(async { recv_worker::RecvWorker::new(udp_rx, recv_share, pkt_tx).await }); ( RudpSender { @@ -146,9 +146,10 @@ pub fn new<S: UdpSender, R: UdpReceiver>( // connect -fn main() -> io::Result<()> { +#[tokio::main] +async fn main() -> io::Result<()> { //println!("{}", x.deep_size_of()); - let (tx, rx) = connect("127.0.0.1:30000")?; + let (tx, rx) = connect("127.0.0.1:30000").await?; let mut mtpkt = vec![]; mtpkt.write_u16::<BigEndian>(2)?; // high level type @@ -163,9 +164,10 @@ fn main() -> io::Result<()> { unrel: true, chan: 1, data: &mtpkt, - })?; + }) + .await?; - while let Ok(result) = rx.recv() { + while let Some(result) = rx.recv().await { match result { Ok(pkt) => { io::stdout().write(pkt.data.as_slice())?; |