From ae452880460c4e0361d87dbd7312205c3cd39392 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Sun, 21 Jul 2024 13:41:16 +0800 Subject: [PATCH] bulk benchmark now available literally Signed-off-by: iGxnon --- .config/nextest.toml | 2 +- .github/workflows/ci.yml | 1 + Cargo.toml | 8 +++--- benches/bulk.rs | 46 +++++++++++++++++------------------ src/guard.rs | 2 ++ src/link.rs | 2 +- src/resend_map.rs | 4 +-- src/server/handler/offline.rs | 8 +++--- 8 files changed, 39 insertions(+), 34 deletions(-) diff --git a/.config/nextest.toml b/.config/nextest.toml index f5afc16..d23cfb1 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -1,5 +1,5 @@ [profile.default] retries = 0 -slow-timeout = { period = "5s" } +slow-timeout = { period = "5s", terminate-after = 3 } status-level = "all" final-status-level = "slow" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9a1efd7..899dabd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -54,6 +54,7 @@ jobs: - name: Run bench with codecov run: | cargo llvm-cov --no-report --bench micro --features="micro-bench" + cargo llvm-cov --no-report --bench bulk - name: Generate codecov report run: | diff --git a/Cargo.toml b/Cargo.toml index a96bb28..c654c75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,10 +47,10 @@ micro-bench = [] name = "micro" harness = false required-features = ["micro-bench"] -# TODO: Not available currently -# [[bench]] -# name = "bulk" -# harness = false + +[[bench]] +name = "bulk" +harness = false [profile.bench] opt-level = 3 diff --git a/benches/bulk.rs b/benches/bulk.rs index 29dd747..89dc9f2 100644 --- a/benches/bulk.rs +++ b/benches/bulk.rs @@ -5,6 +5,7 @@ use std::time::Duration; use bytes::Bytes; use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion, Throughput}; use futures::{SinkExt, StreamExt}; +use log::debug; use raknet_rs::client::{self, ConnectTo}; use raknet_rs::server::{self, MakeIncoming}; use tokio::net::UdpSocket as TokioUdpSocket; @@ -90,29 +91,21 @@ fn configure_bencher( || { std::iter::repeat_with(mk_client) .take(clients_num) - .map(|handshake| { - futures::executor::block_on(async move { - let io = handshake.await; - std::thread::sleep(Duration::from_millis(100)); - io - }) - }) + .map(futures::executor::block_on) }, |clients| async move { let mut join = vec![]; - for client in clients { + for (i, client) in clients.enumerate() { let handle = tokio::spawn(async move { - let mut ticker = tokio::time::interval(Duration::from_millis(10)); tokio::pin!(client); - loop { - tokio::select! { - _ = client.send(Bytes::from_static(data)) => break, - _ = ticker.tick() => { - assert!(client.flush().await.is_ok()); - } - } - } - assert!(client.close().await.is_ok()); + client.feed(Bytes::from_static(data)).await.unwrap(); + debug!("client {} finished feeding", i); + // TODO: This is the culprit that currently causes the benchmark to be very + // slow. The current implementation avoids spinning in close check by waiting + // for an RTO each time before starting the check, which usually takes a long + // time. + client.close().await.unwrap(); // make sure all data is sent + debug!("client {} closed", i); }); join.push(handle); } @@ -140,19 +133,26 @@ fn spawn_server() -> SocketAddr { while let Some(io) = incoming.next().await { tokio::spawn(async move { tokio::pin!(io); - let mut ticker = tokio::time::interval(Duration::from_millis(10)); + // 20ms, one tick, from Minecraft + let mut ticker = tokio::time::interval(Duration::from_millis(20)); loop { tokio::select! { - res = io.next() => { - if res.is_none() { - break; - } + None = io.next() => { + break; } _ = ticker.tick() => { assert!(io.flush().await.is_ok()); } }; } + // No 2MSL for benchmark, so just wait for a while + // On the server side, it may never receive the final acknowledgment of FIN + // (Usually, the peer that initiates the closure first will wait for + // 2MSL to ensure.). It is necessary to ensure that when the server + // closes, all previously sent data packets are sent. Therefore, the + // client will receive an acknowledgment of the FIN it sends and can + // proceed with a normal closure. + let _ = tokio::time::timeout(Duration::from_millis(100), io.close()).await; }); } }); diff --git a/src/guard.rs b/src/guard.rs index 69cd064..41c7f9e 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -241,6 +241,8 @@ where break; } // wait for the next resend + // TODO: When receiving an ack, we should immediately stop waiting and check if it can + // be terminated. trace!( "[{}] poll_wait for next timeout, resend map size: {}", self.role, diff --git a/src/link.rs b/src/link.rs index 29a6bf4..5fad6cd 100644 --- a/src/link.rs +++ b/src/link.rs @@ -205,7 +205,7 @@ impl Router { } } - self.router_tx.try_send(frames).unwrap(); + return self.router_tx.try_send(frames).is_ok(); } connected::Packet::Ack(ack) => self.link.incoming_ack(ack), connected::Packet::Nack(nack) => self.link.incoming_nack(nack), diff --git a/src/resend_map.rs b/src/resend_map.rs index 174c101..d349814 100644 --- a/src/resend_map.rs +++ b/src/resend_map.rs @@ -6,7 +6,7 @@ use crate::packet::connected::{AckOrNack, Frame, Frames, Record}; use crate::utils::u24; // TODO: use RTTEstimator to get adaptive RTO -const RTO: Duration = Duration::from_millis(144); +const RTO: Duration = Duration::from_secs(1); struct ResendEntry { frames: Option, @@ -235,7 +235,7 @@ mod test { use crate::tests::test_trace_log_setup; use crate::Reliability; - const TEST_RTO: Duration = Duration::from_millis(500); + const TEST_RTO: Duration = Duration::from_millis(1200); #[test] fn test_resend_map_works() { diff --git a/src/server/handler/offline.rs b/src/server/handler/offline.rs index a77744b..d29c21c 100644 --- a/src/server/handler/offline.rs +++ b/src/server/handler/offline.rs @@ -6,7 +6,7 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::{ready, Sink, Stream}; -use log::{debug, error, warn}; +use log::{debug, error, trace, warn}; use minitrace::collector::SpanContext; use minitrace::Span; use pin_project_lite::pin_project; @@ -204,7 +204,7 @@ where this.role ); } else { - debug!( + trace!( "[{}] received open connection request 1 from {addr}", this.role, ); @@ -227,7 +227,7 @@ where ))); continue; } - debug!( + trace!( "[{}] received open connection request 2 from {addr}", this.role ); @@ -236,12 +236,14 @@ where || mtu > this.config.max_mtu || this.connected.contains_key(&addr) { + debug!("[{}] received unexpected mtu({mtu}) from {addr}", this.role); *this.state = OfflineState::SendingPrepare(Some(( Self::make_already_connected(this.config), addr, ))); continue; } + debug!("[{}] client {addr} connected with mtu {mtu}", this.role); this.connected.insert(addr, PeerContext { addr, mtu }); unconnected::Packet::OpenConnectionReply2 { magic: (),