From 13d5ab06735fe70c82d676d02451b821a8a2d7ec Mon Sep 17 00:00:00 2001 From: iGxnon Date: Sun, 15 Sep 2024 09:44:57 +0800 Subject: [PATCH] add controller example Signed-off-by: iGxnon --- .github/workflows/ci.yml | 1 + Cargo.toml | 1 - examples/controller.rs | 99 +++++++++++++++++++++++++++++++++++ src/codec/encoder/fragment.rs | 6 +-- src/guard.rs | 4 +- src/link.rs | 30 +++++------ src/utils/reactor.rs | 2 +- src/utils/seq_num.rs | 1 - 8 files changed, 120 insertions(+), 24 deletions(-) create mode 100644 examples/controller.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9eeb4b4..8ac83e1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,6 +49,7 @@ jobs: - name: Run example with codecov run: | cargo llvm-cov --no-report run --example proxy + cargo llvm-cov --no-report run --example controller cargo llvm-cov --no-report run --example tracing --features fastrace/enable - name: Run bench with codecov diff --git a/Cargo.toml b/Cargo.toml index 9a2e1c7..f5591dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,6 @@ required-features = ["micro-bench"] [profile.bench] opt-level = 3 lto = true -## WORKSPACE [lints] workspace = true diff --git a/examples/controller.rs b/examples/controller.rs new file mode 100644 index 0000000..293ab2a --- /dev/null +++ b/examples/controller.rs @@ -0,0 +1,99 @@ +#![feature(local_waker)] +#![feature(context_ext)] +#![allow(clippy::print_stdout)] + +use std::error::Error; +use std::pin::Pin; +use std::task::ContextBuilder; +use std::{cmp, io}; + +use futures::future::poll_fn; +use futures::{Sink, SinkExt, StreamExt}; +use raknet_rs::opts::FlushStrategy; +use raknet_rs::server::MakeIncoming; +use raknet_rs::{server, Message}; +use tokio::net::UdpSocket; + +/// Self-balancing flush controller +struct FlushController { + write: Pin + Send + Sync + 'static>>, + next_flush: Option, + delay: u64, // us +} + +impl FlushController { + fn new(write: Pin + Send + Sync + 'static>>) -> Self { + Self { + write, + next_flush: None, + delay: 5_000, // 5ms + } + } + + async fn _flush0(&mut self) -> io::Result<()> { + self.write.flush().await + } + + async fn wait(&self) { + if let Some(next_flush) = self.next_flush { + tokio::time::sleep_until(next_flush).await; + } + } + + async fn flush(&mut self) -> io::Result<()> { + let mut strategy = FlushStrategy::new(true, true, true); + poll_fn(|cx| { + let mut cx = ContextBuilder::from(cx).ext(&mut strategy).build(); + self.write.as_mut().poll_flush(&mut cx) + }) + .await?; + + // Adjust delay + if strategy.flushed_ack() + strategy.flushed_nack() + strategy.flushed_pack() > 0 { + self.delay = cmp::max(self.delay / 2, 5_000); + } else { + self.delay = cmp::min(self.delay * 2, 100_000); + } + self.next_flush = + Some(tokio::time::Instant::now() + tokio::time::Duration::from_micros(self.delay)); + Ok(()) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let local_addr = socket.local_addr()?; + println!("[server] server listening on {local_addr} with flush controller"); + let mut incoming = socket.make_incoming( + server::Config::new() + .send_buf_cap(1024) + .sever_guid(114514) + .advertisement(&b"Hello, I am proxy server"[..]) + .min_mtu(500) + .max_mtu(1400) + .support_version(vec![9, 11, 13]) + .max_pending(64), + ); + + tokio::spawn(async move { + loop { + let (src, dst) = incoming.next().await.unwrap(); + tokio::spawn(async move { + tokio::pin!(src); + let mut controller = FlushController::new(Box::pin(dst)); + loop { + tokio::select! { + Some(_data) = src.next() => { + // handle data + } + _ = controller.wait() => { + controller.flush().await.unwrap(); + } + } + } + }); + } + }); + Ok(()) +} diff --git a/src/codec/encoder/fragment.rs b/src/codec/encoder/fragment.rs index 6dcd6a6..c91acad 100644 --- a/src/codec/encoder/fragment.rs +++ b/src/codec/encoder/fragment.rs @@ -140,10 +140,8 @@ where frame.body.len() <= max_len, "split failed, the frame body is too large" ); - // FIXME: poll_ready is not ensured before send. But it is ok because the next - // layer has buffer(ie. next_frame.start_send will always return Ok, and never mess up - // data) - this.frame.as_mut().start_send(frame)?; + // We rely on the underlying sink to handle backpressure + this.frame.as_mut().start_send(frame).expect("send fragmented frame failed"); } if reliability.is_sequenced_or_ordered() { diff --git a/src/guard.rs b/src/guard.rs index 16947f4..a06e206 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -281,11 +281,11 @@ struct ResendMap { role: Role, peer: Peer, last_record_expired_at: Instant, - estimator: Box, + estimator: Box, } impl ResendMap { - fn new(role: Role, peer: Peer, estimator: Box) -> Self { + fn new(role: Role, peer: Peer, estimator: Box) -> Self { Self { map: HashMap::new(), role, diff --git a/src/link.rs b/src/link.rs index d6b05f1..bb2db37 100644 --- a/src/link.rs +++ b/src/link.rs @@ -1,6 +1,6 @@ use std::cmp::Reverse; use std::collections::{BTreeSet, BinaryHeap}; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{self, AtomicBool}; use std::sync::Arc; use std::time::Instant; @@ -72,18 +72,15 @@ impl TransferLink { } pub(crate) fn turn_on_waking(&self) { - self.forward_waking - .store(true, std::sync::atomic::Ordering::Relaxed); + self.forward_waking.store(true, atomic::Ordering::Relaxed); } fn should_waking(&self) -> bool { - self.forward_waking - .load(std::sync::atomic::Ordering::Relaxed) + self.forward_waking.load(atomic::Ordering::Relaxed) } pub(crate) fn turn_off_waking(&self) { - self.forward_waking - .store(false, std::sync::atomic::Ordering::Relaxed); + self.forward_waking.store(false, atomic::Ordering::Relaxed); } pub(crate) fn incoming_ack(&self, records: AckOrNack) { @@ -180,6 +177,7 @@ impl TransferLink { pub(crate) struct Route { router_tx: Sender>, link: SharedLink, + // the next expected sequence number seq_read: u24, } @@ -208,14 +206,16 @@ impl Route { self.link.outgoing_ack.lock().push(Reverse(frames.seq_num)); - let mut nack = self.link.outgoing_nack.lock(); - let seq_num = frames.seq_num; - nack.remove(&Reverse(seq_num)); - let pre_read = self.seq_read; - if pre_read <= seq_num { - self.seq_read = seq_num + 1; - for n in pre_read.to_u32()..seq_num.to_u32() { - nack.insert(Reverse(n.into())); + { + let mut nack = self.link.outgoing_nack.lock(); + let seq_num = frames.seq_num; + nack.remove(&Reverse(seq_num)); + let pre_read = self.seq_read; + if pre_read <= seq_num { + self.seq_read = seq_num + 1; + for n in pre_read.to_u32()..seq_num.to_u32() { + nack.insert(Reverse(n.into())); + } } } diff --git a/src/utils/reactor.rs b/src/utils/reactor.rs index 06e94db..fafec3f 100644 --- a/src/utils/reactor.rs +++ b/src/utils/reactor.rs @@ -41,7 +41,7 @@ impl Reactor { REACTOR.get_or_init(|| { // Spawn the daemon thread to motivate the reactor. thread::Builder::new() - .name("timer-reactor".to_string()) + .name("raknet-timer-reactor".to_string()) .spawn(main_loop) .expect("cannot spawn timer-reactor thread"); diff --git a/src/utils/seq_num.rs b/src/utils/seq_num.rs index 369895e..af4c44d 100644 --- a/src/utils/seq_num.rs +++ b/src/utils/seq_num.rs @@ -1,7 +1,6 @@ use bytes::{Buf, BufMut}; /// Unsigned 24bits integer (actually occupied 32 bits) with litter endian and wrapping checking -/// TODO: Can the sequence number wrap around? #[allow(non_camel_case_types)] #[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Default)] pub(crate) struct u24(u32);