Skip to content

Commit

Permalink
bulk benchmark now available literally
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Jul 21, 2024
1 parent 3c9f543 commit ae45288
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[profile.default]
retries = 0
slow-timeout = { period = "5s" }
slow-timeout = { period = "5s", terminate-after = 3 }
status-level = "all"
final-status-level = "slow"
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ jobs:
- name: Run bench with codecov
run: |
cargo llvm-cov --no-report --bench micro --features="micro-bench"
cargo llvm-cov --no-report --bench bulk
- name: Generate codecov report
run: |
Expand Down
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ micro-bench = []
name = "micro"
harness = false
required-features = ["micro-bench"]
# TODO: Not available currently
# [[bench]]
# name = "bulk"
# harness = false

[[bench]]
name = "bulk"
harness = false

[profile.bench]
opt-level = 3
Expand Down
46 changes: 23 additions & 23 deletions benches/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion, Throughput};
use futures::{SinkExt, StreamExt};
use log::debug;
use raknet_rs::client::{self, ConnectTo};
use raknet_rs::server::{self, MakeIncoming};
use tokio::net::UdpSocket as TokioUdpSocket;
Expand Down Expand Up @@ -90,29 +91,21 @@ fn configure_bencher(
|| {
std::iter::repeat_with(mk_client)
.take(clients_num)
.map(|handshake| {
futures::executor::block_on(async move {
let io = handshake.await;
std::thread::sleep(Duration::from_millis(100));
io
})
})
.map(futures::executor::block_on)
},
|clients| async move {
let mut join = vec![];
for client in clients {
for (i, client) in clients.enumerate() {
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_millis(10));
tokio::pin!(client);
loop {
tokio::select! {
_ = client.send(Bytes::from_static(data)) => break,
_ = ticker.tick() => {
assert!(client.flush().await.is_ok());
}
}
}
assert!(client.close().await.is_ok());
client.feed(Bytes::from_static(data)).await.unwrap();
debug!("client {} finished feeding", i);
// TODO: This is the culprit that currently causes the benchmark to be very
// slow. The current implementation avoids spinning in close check by waiting
// for an RTO each time before starting the check, which usually takes a long
// time.
client.close().await.unwrap(); // make sure all data is sent
debug!("client {} closed", i);
});
join.push(handle);
}
Expand Down Expand Up @@ -140,19 +133,26 @@ fn spawn_server() -> SocketAddr {
while let Some(io) = incoming.next().await {
tokio::spawn(async move {
tokio::pin!(io);
let mut ticker = tokio::time::interval(Duration::from_millis(10));
// 20ms, one tick, from Minecraft
let mut ticker = tokio::time::interval(Duration::from_millis(20));
loop {
tokio::select! {
res = io.next() => {
if res.is_none() {
break;
}
None = io.next() => {
break;
}
_ = ticker.tick() => {
assert!(io.flush().await.is_ok());
}
};
}
// No 2MSL for benchmark, so just wait for a while
// On the server side, it may never receive the final acknowledgment of FIN
// (Usually, the peer that initiates the closure first will wait for
// 2MSL to ensure.). It is necessary to ensure that when the server
// closes, all previously sent data packets are sent. Therefore, the
// client will receive an acknowledgment of the FIN it sends and can
// proceed with a normal closure.
let _ = tokio::time::timeout(Duration::from_millis(100), io.close()).await;
});
}
});
Expand Down
2 changes: 2 additions & 0 deletions src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ where
break;
}
// wait for the next resend
// TODO: When receiving an ack, we should immediately stop waiting and check if it can
// be terminated.
trace!(
"[{}] poll_wait for next timeout, resend map size: {}",
self.role,
Expand Down
2 changes: 1 addition & 1 deletion src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl Router {
}
}

self.router_tx.try_send(frames).unwrap();
return self.router_tx.try_send(frames).is_ok();
}
connected::Packet::Ack(ack) => self.link.incoming_ack(ack),
connected::Packet::Nack(nack) => self.link.incoming_nack(nack),
Expand Down
4 changes: 2 additions & 2 deletions src/resend_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::packet::connected::{AckOrNack, Frame, Frames, Record};
use crate::utils::u24;

// TODO: use RTTEstimator to get adaptive RTO
const RTO: Duration = Duration::from_millis(144);
const RTO: Duration = Duration::from_secs(1);

struct ResendEntry {
frames: Option<Frames>,
Expand Down Expand Up @@ -235,7 +235,7 @@ mod test {
use crate::tests::test_trace_log_setup;
use crate::Reliability;

const TEST_RTO: Duration = Duration::from_millis(500);
const TEST_RTO: Duration = Duration::from_millis(1200);

#[test]
fn test_resend_map_works() {
Expand Down
8 changes: 5 additions & 3 deletions src/server/handler/offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::task::{Context, Poll};

use bytes::Bytes;
use futures::{ready, Sink, Stream};
use log::{debug, error, warn};
use log::{debug, error, trace, warn};
use minitrace::collector::SpanContext;
use minitrace::Span;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -204,7 +204,7 @@ where
this.role
);
} else {
debug!(
trace!(
"[{}] received open connection request 1 from {addr}",
this.role,
);
Expand All @@ -227,7 +227,7 @@ where
)));
continue;
}
debug!(
trace!(
"[{}] received open connection request 2 from {addr}",
this.role
);
Expand All @@ -236,12 +236,14 @@ where
|| mtu > this.config.max_mtu
|| this.connected.contains_key(&addr)
{
debug!("[{}] received unexpected mtu({mtu}) from {addr}", this.role);
*this.state = OfflineState::SendingPrepare(Some((
Self::make_already_connected(this.config),
addr,
)));
continue;
}
debug!("[{}] client {addr} connected with mtu {mtu}", this.role);
this.connected.insert(addr, PeerContext { addr, mtu });
unconnected::Packet::OpenConnectionReply2 {
magic: (),
Expand Down

0 comments on commit ae45288

Please sign in to comment.