Skip to content

Commit

Permalink
wake up poll_wait when delivery ack
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Jul 21, 2024
1 parent ae45288 commit 8a53ffa
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 84 deletions.
27 changes: 16 additions & 11 deletions benches/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use raknet_rs::server::{self, MakeIncoming};
use tokio::net::UdpSocket as TokioUdpSocket;
use tokio::runtime::Runtime;

const SEND_BUF_CAP: usize = 1024;
const MTU: u16 = 1500;

pub fn bulk_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("bulk_benchmark");
let server_addr = spawn_server();
Expand Down Expand Up @@ -44,6 +47,14 @@ pub fn bulk_benchmark(c: &mut Criterion) {
});
}

// The following benchmarks are not stable, and the reason is as follows:
// Some ack may fail to wake up a certain client (This client has already received the ack
// before falling asleep to wait RTO, causing the ack not to wake it up. This is almost
// impossible to happen in real-life scenarios.), and this client will wait for a complete
// RTO period before receiving this ack. This ultimately causes this round of benchmarking to
// stall for a while, affecting the benchmark results.

// TODO: find a way to make the benchmark stable
{
group.throughput(Throughput::Bytes(short_data.len() as u64 * 10));
group.bench_function("short_data_10_clients", |bencher| {
Expand Down Expand Up @@ -79,9 +90,8 @@ fn configure_bencher(
sock.connect_to(
server_addr,
client::Config::default()
.send_buf_cap(1024)
.mtu(1400)
.client_guid(1919810)
.send_buf_cap(SEND_BUF_CAP)
.mtu(MTU)
.protocol_version(11),
)
.await
Expand All @@ -100,10 +110,6 @@ fn configure_bencher(
tokio::pin!(client);
client.feed(Bytes::from_static(data)).await.unwrap();
debug!("client {} finished feeding", i);
// TODO: This is the culprit that currently causes the benchmark to be very
// slow. The current implementation avoids spinning in close check by waiting
// for an RTO each time before starting the check, which usually takes a long
// time.
client.close().await.unwrap(); // make sure all data is sent
debug!("client {} closed", i);
});
Expand All @@ -122,11 +128,10 @@ fn spawn_server() -> SocketAddr {
let server_addr = sock.local_addr().unwrap();
rt().spawn(async move {
let config = server::Config::new()
.send_buf_cap(1024)
.sever_guid(114514)
.advertisement(&b"Hello, I am proxy server"[..])
.send_buf_cap(SEND_BUF_CAP)
.advertisement(&b"Hello, I am server"[..])
.min_mtu(500)
.max_mtu(1400)
.max_mtu(MTU)
.support_version(vec![9, 11, 13])
.max_pending(1024);
let mut incoming = sock.make_incoming(config);
Expand Down
16 changes: 6 additions & 10 deletions src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where
peer,
role,
cap,
resend: ResendMap::new(),
resend: ResendMap::new(role),
}
}
}
Expand Down Expand Up @@ -227,8 +227,10 @@ where
}

/// Close the outgoing guard, notice that it may resend infinitely if you do not cancel it.
/// Insure all frames are received by the peer at the point of closing
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// insure all frames are received by the peer at the point of closing
// maybe go to sleep, turn on the waking
self.link.turn_on_waking();
loop {
ready!(self.as_mut().try_empty(cx))?;
debug_assert!(self.buf.is_empty() && self.link.flush_empty());
Expand All @@ -240,16 +242,10 @@ where
);
break;
}
// wait for the next resend
// TODO: When receiving an ack, we should immediately stop waiting and check if it can
// be terminated.
trace!(
"[{}] poll_wait for next timeout, resend map size: {}",
self.role,
self.resend.size()
);
ready!(self.resend.poll_wait(cx));
}
// no need to wake up
self.link.turn_off_waking();
self.project().frame.poll_close(cx)
}
}
Expand Down
55 changes: 52 additions & 3 deletions src/link.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::cmp::Reverse;
use std::collections::{BinaryHeap, VecDeque};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use async_channel::Sender;
use concurrent_queue::ConcurrentQueue;
use futures::Stream;
use log::{trace, warn};
use log::{debug, trace, warn};

use crate::packet::connected::{self, AckOrNack, Frame, FrameBody, FrameSet, FramesMut};
use crate::packet::connected::{self, AckOrNack, Frame, FrameBody, FrameSet, FramesMut, Record};
use crate::packet::unconnected;
use crate::resend_map::ResendMap;
use crate::resend_map::{reactor, ResendMap};
use crate::utils::u24;
use crate::RoleContext;

Expand All @@ -21,6 +22,7 @@ pub(crate) type SharedLink = Arc<TransferLink>;
pub(crate) struct TransferLink {
incoming_ack: ConcurrentQueue<AckOrNack>,
incoming_nack: ConcurrentQueue<AckOrNack>,
forward_waking: AtomicBool,

outgoing_ack: parking_lot::Mutex<BinaryHeap<Reverse<u24>>>,
// TODO: nack channel should always be in order according to [`DeFragment::poll_next`], replace
Expand Down Expand Up @@ -60,6 +62,7 @@ impl TransferLink {
Arc::new(Self {
incoming_ack: ConcurrentQueue::bounded(MAX_ACK_BUFFER),
incoming_nack: ConcurrentQueue::bounded(MAX_ACK_BUFFER),
forward_waking: AtomicBool::new(false),
outgoing_ack: parking_lot::Mutex::new(BinaryHeap::with_capacity(MAX_ACK_BUFFER)),
outgoing_nack: parking_lot::Mutex::new(BinaryHeap::with_capacity(MAX_ACK_BUFFER)),
unconnected: ConcurrentQueue::unbounded(),
Expand All @@ -68,14 +71,60 @@ impl TransferLink {
})
}

pub(crate) fn turn_on_waking(&self) {
self.forward_waking
.store(true, std::sync::atomic::Ordering::Relaxed);
}

fn should_waking(&self) -> bool {
self.forward_waking
.load(std::sync::atomic::Ordering::Relaxed)
}

pub(crate) fn turn_off_waking(&self) {
self.forward_waking
.store(false, std::sync::atomic::Ordering::Relaxed);
}

pub(crate) fn incoming_ack(&self, records: AckOrNack) {
let to_wakes = if self.should_waking() {
let mut wakers = Vec::new();
let mut guard = reactor::Reactor::get().lock();
for record in &records.records {
match record {
Record::Range(start, end) => {
for seq_num in start.to_u32()..=end.to_u32() {
guard.cancel_timer(seq_num.into(), self.role.guid(), &mut wakers);
}
}
Record::Single(seq_num) => {
guard.cancel_timer(*seq_num, self.role.guid(), &mut wakers);
}
}
}
Some(wakers)
} else {
None
};
if let Some(dropped) = self.incoming_ack.force_push(records).unwrap() {
warn!(
"[{}] discard received ack {dropped:?}, total count: {}",
self.role,
dropped.total_cnt()
);
}
// wake up after sends ack
if let Some(wakers) = to_wakes {
debug!(
"[{}] wake up {} wakers after receives ack",
self.role,
wakers.len()
);
for waker in wakers {
// safe to panic
waker.wake();
}
}
}

pub(crate) fn incoming_nack(&self, records: AckOrNack) {
Expand Down
Loading

0 comments on commit 8a53ffa

Please sign in to comment.