Skip to content

Commit

Permalink
add controller example
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Sep 15, 2024
1 parent bf52f3f commit 13d5ab0
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 24 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ jobs:
- name: Run example with codecov
run: |
cargo llvm-cov --no-report run --example proxy
cargo llvm-cov --no-report run --example controller
cargo llvm-cov --no-report run --example tracing --features fastrace/enable
- name: Run bench with codecov
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ required-features = ["micro-bench"]
[profile.bench]
opt-level = 3
lto = true
## WORKSPACE

[lints]
workspace = true
Expand Down
99 changes: 99 additions & 0 deletions examples/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#![feature(local_waker)]
#![feature(context_ext)]
#![allow(clippy::print_stdout)]

use std::error::Error;
use std::pin::Pin;
use std::task::ContextBuilder;
use std::{cmp, io};

use futures::future::poll_fn;
use futures::{Sink, SinkExt, StreamExt};
use raknet_rs::opts::FlushStrategy;
use raknet_rs::server::MakeIncoming;
use raknet_rs::{server, Message};
use tokio::net::UdpSocket;

/// Self-balancing flush controller
struct FlushController {
write: Pin<Box<dyn Sink<Message, Error = io::Error> + Send + Sync + 'static>>,
next_flush: Option<tokio::time::Instant>,
delay: u64, // us
}

impl FlushController {
fn new(write: Pin<Box<dyn Sink<Message, Error = io::Error> + Send + Sync + 'static>>) -> Self {
Self {
write,
next_flush: None,
delay: 5_000, // 5ms
}
}

async fn _flush0(&mut self) -> io::Result<()> {
self.write.flush().await
}

async fn wait(&self) {
if let Some(next_flush) = self.next_flush {
tokio::time::sleep_until(next_flush).await;
}
}

async fn flush(&mut self) -> io::Result<()> {
let mut strategy = FlushStrategy::new(true, true, true);
poll_fn(|cx| {
let mut cx = ContextBuilder::from(cx).ext(&mut strategy).build();
self.write.as_mut().poll_flush(&mut cx)
})
.await?;

// Adjust delay
if strategy.flushed_ack() + strategy.flushed_nack() + strategy.flushed_pack() > 0 {
self.delay = cmp::max(self.delay / 2, 5_000);
} else {
self.delay = cmp::min(self.delay * 2, 100_000);
}
self.next_flush =
Some(tokio::time::Instant::now() + tokio::time::Duration::from_micros(self.delay));
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let local_addr = socket.local_addr()?;
println!("[server] server listening on {local_addr} with flush controller");
let mut incoming = socket.make_incoming(
server::Config::new()
.send_buf_cap(1024)
.sever_guid(114514)
.advertisement(&b"Hello, I am proxy server"[..])
.min_mtu(500)
.max_mtu(1400)
.support_version(vec![9, 11, 13])
.max_pending(64),
);

tokio::spawn(async move {
loop {
let (src, dst) = incoming.next().await.unwrap();
tokio::spawn(async move {
tokio::pin!(src);
let mut controller = FlushController::new(Box::pin(dst));
loop {
tokio::select! {
Some(_data) = src.next() => {
// handle data
}
_ = controller.wait() => {
controller.flush().await.unwrap();
}
}
}
});
}
});
Ok(())
}
6 changes: 2 additions & 4 deletions src/codec/encoder/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,8 @@ where
frame.body.len() <= max_len,
"split failed, the frame body is too large"
);
// FIXME: poll_ready is not ensured before send. But it is ok because the next
// layer has buffer(ie. next_frame.start_send will always return Ok, and never mess up
// data)
this.frame.as_mut().start_send(frame)?;
// We rely on the underlying sink to handle backpressure
this.frame.as_mut().start_send(frame).expect("send fragmented frame failed");
}

if reliability.is_sequenced_or_ordered() {
Expand Down
4 changes: 2 additions & 2 deletions src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,11 @@ struct ResendMap {
role: Role,
peer: Peer,
last_record_expired_at: Instant,
estimator: Box<dyn Estimator + Send>,
estimator: Box<dyn Estimator + Send + Sync + 'static>,
}

impl ResendMap {
fn new(role: Role, peer: Peer, estimator: Box<dyn Estimator + Send>) -> Self {
fn new(role: Role, peer: Peer, estimator: Box<dyn Estimator + Send + Sync + 'static>) -> Self {
Self {
map: HashMap::new(),
role,
Expand Down
30 changes: 15 additions & 15 deletions src/link.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::cmp::Reverse;
use std::collections::{BTreeSet, BinaryHeap};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{self, AtomicBool};
use std::sync::Arc;
use std::time::Instant;

Expand Down Expand Up @@ -72,18 +72,15 @@ impl TransferLink {
}

pub(crate) fn turn_on_waking(&self) {
self.forward_waking
.store(true, std::sync::atomic::Ordering::Relaxed);
self.forward_waking.store(true, atomic::Ordering::Relaxed);
}

fn should_waking(&self) -> bool {
self.forward_waking
.load(std::sync::atomic::Ordering::Relaxed)
self.forward_waking.load(atomic::Ordering::Relaxed)
}

pub(crate) fn turn_off_waking(&self) {
self.forward_waking
.store(false, std::sync::atomic::Ordering::Relaxed);
self.forward_waking.store(false, atomic::Ordering::Relaxed);
}

pub(crate) fn incoming_ack(&self, records: AckOrNack) {
Expand Down Expand Up @@ -180,6 +177,7 @@ impl TransferLink {
pub(crate) struct Route {
router_tx: Sender<FrameSet<FramesMut>>,
link: SharedLink,
// the next expected sequence number
seq_read: u24,
}

Expand Down Expand Up @@ -208,14 +206,16 @@ impl Route {

self.link.outgoing_ack.lock().push(Reverse(frames.seq_num));

let mut nack = self.link.outgoing_nack.lock();
let seq_num = frames.seq_num;
nack.remove(&Reverse(seq_num));
let pre_read = self.seq_read;
if pre_read <= seq_num {
self.seq_read = seq_num + 1;
for n in pre_read.to_u32()..seq_num.to_u32() {
nack.insert(Reverse(n.into()));
{
let mut nack = self.link.outgoing_nack.lock();
let seq_num = frames.seq_num;
nack.remove(&Reverse(seq_num));
let pre_read = self.seq_read;
if pre_read <= seq_num {
self.seq_read = seq_num + 1;
for n in pre_read.to_u32()..seq_num.to_u32() {
nack.insert(Reverse(n.into()));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/utils/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Reactor {
REACTOR.get_or_init(|| {
// Spawn the daemon thread to motivate the reactor.
thread::Builder::new()
.name("timer-reactor".to_string())
.name("raknet-timer-reactor".to_string())
.spawn(main_loop)
.expect("cannot spawn timer-reactor thread");

Expand Down
1 change: 0 additions & 1 deletion src/utils/seq_num.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use bytes::{Buf, BufMut};

/// Unsigned 24bits integer (actually occupied 32 bits) with litter endian and wrapping checking
/// TODO: Can the sequence number wrap around?
#[allow(non_camel_case_types)]
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Default)]
pub(crate) struct u24(u32);
Expand Down

0 comments on commit 13d5ab0

Please sign in to comment.