From 74dda0d5125ee7a9e0b6b922390133627ddf70de Mon Sep 17 00:00:00 2001 From: iGxnon Date: Sat, 3 Aug 2024 00:53:55 +0800 Subject: [PATCH] update micro benchmarks Signed-off-by: iGxnon --- benches/data/body-medium.txt | 4 - .../data/{body-large.txt => large_size.txt} | 0 benches/data/medium_size.txt | 4 + .../data/{body-short.txt => small_size.txt} | 0 benches/micro.rs | 259 +++++------------ src/codec/encoder/fragment.rs | 5 +- src/codec/mod.rs | 269 +++++++++--------- src/lib.rs | 1 + 8 files changed, 214 insertions(+), 328 deletions(-) delete mode 100644 benches/data/body-medium.txt rename benches/data/{body-large.txt => large_size.txt} (100%) create mode 100644 benches/data/medium_size.txt rename benches/data/{body-short.txt => small_size.txt} (100%) diff --git a/benches/data/body-medium.txt b/benches/data/body-medium.txt deleted file mode 100644 index e95a837..0000000 --- a/benches/data/body-medium.txt +++ /dev/null @@ -1,4 +0,0 @@ -In faith I do not love thee with mine eyes, -For they in thee a thousand errors note; -But `tis my heart that loves what they despise, -Who in despite of view is pleased to dote. diff --git a/benches/data/body-large.txt b/benches/data/large_size.txt similarity index 100% rename from benches/data/body-large.txt rename to benches/data/large_size.txt diff --git a/benches/data/medium_size.txt b/benches/data/medium_size.txt new file mode 100644 index 0000000..934904e --- /dev/null +++ b/benches/data/medium_size.txt @@ -0,0 +1,4 @@ +Strange about learning; the farther I go the more I see that I never knew even existed. +A short while ago I foolishly thought I could learn everything - all the knowledge in the world. +Now I hope only to be able to know of its existence, and to understand one grain of it. +Is there time? diff --git a/benches/data/body-short.txt b/benches/data/small_size.txt similarity index 100% rename from benches/data/body-short.txt rename to benches/data/small_size.txt diff --git a/benches/micro.rs b/benches/micro.rs index db7e67c..cdef9c5 100644 --- a/benches/micro.rs +++ b/benches/micro.rs @@ -1,197 +1,94 @@ //! Micro benches -use std::time::Duration; +use std::iter::repeat; -use bytes::BytesMut; +use bytes::Bytes; use criterion::async_executor::FuturesExecutor; -use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput}; +use criterion::measurement::WallTime; +use criterion::{ + black_box, criterion_group, criterion_main, BatchSize, BenchmarkGroup, Criterion, Throughput, +}; use raknet_rs::micro_bench; +use raknet_rs::micro_bench::codec::BenchOpts; pub fn codec_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("codec"); - let seed = 114514; - group.warm_up_time(Duration::from_secs(10)); - - // large packets, every frame set only contains one frame - { - let opts = micro_bench::codec::Options { - frame_per_set: 1, - frame_set_cnt: 14400, - duplicated_ratio: 0., - unordered: true, - parted_size: 4, - shuffle: false, - seed, - data: BytesMut::from_iter(include_bytes!("data/body-large.txt")), - }; - - // total data size: 16369200 bytes, data count: 3600, mtu: 1136 - println!( - "total data size: {} bytes, data count: {}, mtu: {}", - opts.input_data_size(), - opts.input_data_cnt(), - opts.input_mtu(), - ); - group.throughput(Throughput::Elements(opts.input_data_cnt() as u64)); - group.bench_function("decode_large_packets_same_data_cnt", |bencher| { - bencher.to_async(FuturesExecutor).iter_batched( - || micro_bench::codec::MicroBench::new(opts.clone()), - |bench| bench.bench_decoded(), - BatchSize::SmallInput, - ); - }); - } - - // medium packets, every frame set contains 6 frame - { - let opts = micro_bench::codec::Options { - frame_per_set: 6, - frame_set_cnt: 600, - duplicated_ratio: 0., - unordered: true, - parted_size: 1, - shuffle: false, - seed, - data: BytesMut::from_iter(include_bytes!("data/body-medium.txt")), - }; - - // total data size: 630000 bytes, data count: 3600, mtu: 1050 - println!( - "total data size: {} bytes, data count: {}, mtu: {}", - opts.input_data_size(), - opts.input_data_cnt(), - opts.input_mtu(), - ); - group.throughput(Throughput::Elements(opts.input_data_cnt() as u64)); - group.bench_function("decode_medium_packets_same_data_cnt", |bencher| { - bencher.to_async(FuturesExecutor).iter_batched( - || micro_bench::codec::MicroBench::new(opts.clone()), - |bench| bench.bench_decoded(), - BatchSize::SmallInput, - ); - }); - } - - // short packets, every frame set contains 36 frame - { - let opts = micro_bench::codec::Options { - frame_per_set: 36, - frame_set_cnt: 100, - duplicated_ratio: 0., - unordered: true, - parted_size: 1, - shuffle: false, - seed, - data: BytesMut::from_iter(include_bytes!("data/body-short.txt")), - }; - - // total data size: 118800 bytes, data count: 3600, mtu: 1188 - println!( - "total data size: {} bytes, data count: {}, mtu: {}", - opts.input_data_size(), - opts.input_data_cnt(), - opts.input_mtu(), - ); - group.throughput(Throughput::Elements(opts.input_data_cnt() as u64)); - group.bench_function("decode_short_packets_same_data_cnt", |bencher| { - bencher.to_async(FuturesExecutor).iter_batched( - || micro_bench::codec::MicroBench::new(opts.clone()), - |bench| bench.bench_decoded(), - BatchSize::SmallInput, - ); - }); - } - - // large packets, every frame set only contains one frame - { - let opts = micro_bench::codec::Options { - frame_per_set: 1, - frame_set_cnt: 1440, - duplicated_ratio: 0., - unordered: true, - parted_size: 4, - shuffle: false, - seed, - data: BytesMut::from_iter(include_bytes!("data/body-large.txt")), + fn decode( + group: &mut BenchmarkGroup, + datagram: &'static [u8], + cnt: usize, + throughput: impl Fn(&BenchOpts) -> Throughput, + ) { + let datagrams = repeat(Bytes::from_static(datagram)).take(cnt); + let opts = micro_bench::codec::BenchOpts { + datagrams: black_box(datagrams.collect()), + seed: 114514, + dup_ratio: 0., + shuffle_ratio: 0., + mtu: 1480, }; - - // total data size: 1,636,920 bytes, data count: 360, mtu: 1136 - println!( - "total data size: {} bytes, data count: {}, mtu: {}", - opts.input_data_size(), - opts.input_data_cnt(), - opts.input_mtu(), + group.throughput(throughput(&opts)); + group.bench_function( + format!("decode_cnt-{cnt}_size-{}", datagram.len()), + |bencher| { + bencher.to_async(FuturesExecutor).iter_batched( + || opts.clone(), + |opts| opts.run_bench(), + BatchSize::SmallInput, + ); + }, ); - group.throughput(Throughput::Bytes(opts.input_data_size() as u64)); - group.bench_function("decode_large_packets_same_data_size", |bencher| { - bencher.to_async(FuturesExecutor).iter_batched( - || micro_bench::codec::MicroBench::new(opts.clone()), - |bench| bench.bench_decoded(), - BatchSize::SmallInput, - ); - }); } - // medium packets, every frame set contains 6 frame - { - let opts = micro_bench::codec::Options { - frame_per_set: 6, - frame_set_cnt: 1550, - duplicated_ratio: 0., - unordered: true, - parted_size: 1, - shuffle: false, - seed, - data: BytesMut::from_iter(include_bytes!("data/body-medium.txt")), - }; - - // total data size: 1,636,800 bytes, data count: 9300, mtu: 1056 - println!( - "total data size: {} bytes, data count: {}, mtu: {}", - opts.input_data_size(), - opts.input_data_cnt(), - opts.input_mtu(), - ); - group.throughput(Throughput::Bytes(opts.input_data_size() as u64)); - group.bench_function("decode_medium_packets_same_data_size", |bencher| { - bencher.to_async(FuturesExecutor).iter_batched( - || micro_bench::codec::MicroBench::new(opts.clone()), - |bench| bench.bench_decoded(), - BatchSize::SmallInput, - ); - }); - } - - // short packets, every frame set contains 36 frame - { - let opts = micro_bench::codec::Options { - frame_per_set: 36, - frame_set_cnt: 1378, - duplicated_ratio: 0., - unordered: true, - parted_size: 1, - shuffle: false, - seed, - data: BytesMut::from_iter(include_bytes!("data/body-short.txt")), - }; - - // total data size: 1,637,064 bytes, data count: 49608, mtu: 1188 - println!( - "total data size: {} bytes, data count: {}, mtu: {}", - opts.input_data_size(), - opts.input_data_cnt(), - opts.input_mtu(), - ); - group.throughput(Throughput::Bytes(opts.input_data_size() as u64)); - group.bench_function("decode_short_packets_same_data_size", |bencher| { - bencher.to_async(FuturesExecutor).iter_batched( - || micro_bench::codec::MicroBench::new(opts.clone()), - |bench| bench.bench_decoded(), - BatchSize::SmallInput, - ); - }); - } + let el = |opts: &BenchOpts| Throughput::Elements(opts.elements()); + let by = |opts: &BenchOpts| Throughput::Bytes(opts.bytes()); + + let small_size = include_bytes!("data/small_size.txt"); + let medium_size = include_bytes!("data/medium_size.txt"); + let large_size = include_bytes!("data/large_size.txt"); + + decode(&mut group, small_size, 1, el); + decode(&mut group, small_size, 5, el); + decode(&mut group, small_size, 10, el); + decode(&mut group, small_size, 50, el); + decode(&mut group, small_size, 100, el); + decode(&mut group, small_size, 1000, el); + + decode(&mut group, small_size, 1, by); + decode(&mut group, small_size, 5, by); + decode(&mut group, small_size, 10, by); + decode(&mut group, small_size, 50, by); + decode(&mut group, small_size, 100, by); + decode(&mut group, small_size, 1000, by); + + decode(&mut group, medium_size, 1, el); + decode(&mut group, medium_size, 5, el); + decode(&mut group, medium_size, 10, el); + decode(&mut group, medium_size, 50, el); + decode(&mut group, medium_size, 100, el); + decode(&mut group, medium_size, 1000, el); + + decode(&mut group, medium_size, 1, by); + decode(&mut group, medium_size, 5, by); + decode(&mut group, medium_size, 10, by); + decode(&mut group, medium_size, 50, by); + decode(&mut group, medium_size, 100, by); + decode(&mut group, medium_size, 1000, by); + + decode(&mut group, large_size, 1, el); + decode(&mut group, large_size, 5, el); + decode(&mut group, large_size, 10, el); + decode(&mut group, large_size, 50, el); + decode(&mut group, large_size, 100, el); + decode(&mut group, large_size, 1000, el); + + decode(&mut group, large_size, 1, by); + decode(&mut group, large_size, 5, by); + decode(&mut group, large_size, 10, by); + decode(&mut group, large_size, 50, by); + decode(&mut group, large_size, 100, by); + decode(&mut group, large_size, 1000, by); group.finish(); } diff --git a/src/codec/encoder/fragment.rs b/src/codec/encoder/fragment.rs index 9f44e41..6dcd6a6 100644 --- a/src/codec/encoder/fragment.rs +++ b/src/codec/encoder/fragment.rs @@ -77,11 +77,10 @@ where Reliability::UnreliableWithAckReceipt => Reliability::ReliableWithAckReceipt, _ => reliability, }; + // calculate again as we may have adjusted reliability + max_len = *this.mtu - FRAME_SET_HEADER_SIZE - reliability.size(); } - // calculate again as we may have adjusted reliability - max_len = *this.mtu - FRAME_SET_HEADER_SIZE - reliability.size(); - // get reliable_frame_index and ordered part for each frame let mut indices_for_frame = || { // reliable_frame_index performs for each frame to ensure it is not duplicated diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 0797cbd..2c76b8a 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -117,181 +117,170 @@ where /// Micro bench helper #[cfg(feature = "micro-bench")] pub mod micro_bench { - use bytes::BytesMut; + use std::collections::VecDeque; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use bytes::{Bytes, BytesMut}; + use futures::{Sink, StreamExt}; use rand::rngs::StdRng; use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; - use super::{Config, Decoded, FrameSet, FramesMut, Stream}; - use crate::packet::connected::{Flags, Fragment, Frame, Ordered}; - use crate::Reliability; + use super::{Config, Decoded, Fragmented, FrameSet, FramesMut, Stream}; + use crate::packet::connected::Frame; + use crate::packet::FRAME_SET_HEADER_SIZE; + use crate::{Message, Reliability}; #[derive(Debug, Clone)] - pub struct Options { - pub frame_set_cnt: usize, - pub frame_per_set: usize, - pub duplicated_ratio: f32, - pub unordered: bool, - pub parted_size: usize, - pub shuffle: bool, + pub struct BenchOpts { + pub datagrams: Vec, pub seed: u64, - pub data: BytesMut, + pub dup_ratio: f32, + pub shuffle_ratio: f32, + pub mtu: usize, } - impl Options { - fn gen_inputs(&self) -> Vec> { - assert!(self.frame_per_set * self.frame_set_cnt % self.parted_size == 0); - assert!(self.data.len() > self.parted_size); - assert!(self.parted_size >= 1); - let mut rng = StdRng::seed_from_u64(self.seed); - let frames: FramesMut = std::iter::repeat(self.data.clone()) - .take(self.frame_per_set * self.frame_set_cnt) - .enumerate() - .map(|(idx, mut body)| { - let mut reliability = Reliability::Reliable; - let mut raw = 0; - let reliable_frame_index = Some(idx.into()); - let mut fragment = None; - let mut ordered = None; - if self.parted_size > 1 { - raw |= 0b0001_0000; - let parted_start = - (idx % self.parted_size) * (body.len() / self.parted_size); - let parted_end = if idx % self.parted_size == self.parted_size - 1 { - body.len() - } else { - parted_start + (body.len() / self.parted_size) - }; - let _ = body.split_to(parted_start); - let _ = body.split_off(parted_end - parted_start); - fragment = Some(Fragment { - parted_size: self.parted_size as u32, - parted_id: (idx / self.parted_size) as u16, - parted_index: (idx % self.parted_size) as u32, - }); - } - if self.unordered { - reliability = Reliability::ReliableOrdered; - ordered = Some(Ordered { - frame_index: (idx / self.parted_size).into(), - channel: 0, - }); - } - Frame { - flags: Flags::parse(((reliability as u8) << 5) | raw), - reliable_frame_index, - seq_frame_index: None, - ordered, - fragment, - body, - } - }) - .flat_map(|frame| { - if self.duplicated_ratio > 0. - && rng.gen_ratio((self.duplicated_ratio * 100.0) as u32, 100) - { - return vec![frame.clone(), frame]; - } - vec![frame] - }) - .collect(); - let mut sets = frames - .chunks(self.frame_per_set) - .enumerate() - .map(|(idx, chunk)| FrameSet { - seq_num: idx.into(), - set: chunk.to_vec(), - }) - .collect::>(); - if self.shuffle { - sets.shuffle(&mut rng); - } - sets - } + impl Sink for &mut VecDeque> { + type Error = io::Error; - pub fn input_data_cnt(&self) -> usize { - self.frame_per_set * self.frame_set_cnt / self.parted_size + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } - pub fn input_data_size(&self) -> usize { - self.data.len() * self.input_data_cnt() + fn start_send(self: Pin<&mut Self>, frame: Frame) -> Result<(), Self::Error> { + self.get_mut().push_back(Frame { + body: BytesMut::from(frame.body), + ..frame + }); + Ok(()) } - pub fn input_mtu(&self) -> usize { - self.frame_per_set * self.data.len() / self.parted_size + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } - } - #[derive(Debug)] - pub struct MicroBench { - config: Config, - #[cfg(test)] - data: BytesMut, - frame_sets: Vec>, + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } } - impl MicroBench { - pub fn new(option: Options) -> Self { - Self { - config: Config::default(), - #[cfg(test)] - data: option.data.clone(), - frame_sets: option.gen_inputs(), + impl BenchOpts { + fn gen_inputs(self) -> impl Stream> { + let mut frames: VecDeque> = VecDeque::new(); + let mut rng = StdRng::seed_from_u64(self.seed); + tokio::pin! { + let fragmented = (&mut frames).fragmented(self.mtu, 1); } - } + for datagram in self.datagrams { + fragmented + .as_mut() + .start_send(Message::new(Reliability::ReliableOrdered, 0, datagram)) // reliable ordered + .unwrap(); + } + let mut sets = Vec::new(); + let mut remain = self.mtu - FRAME_SET_HEADER_SIZE; + let mut set: Option = None; + while let Some(frame) = frames.front() { + if remain >= frame.size() { + remain -= frame.size(); + set.get_or_insert_default() + .push(frames.pop_front().unwrap()); + continue; + } + remain = self.mtu - FRAME_SET_HEADER_SIZE; - #[cfg(test)] - #[allow(clippy::semicolon_if_nothing_returned)] - async fn bench_decoded_checked(self) { - use bytes::Buf as _; - - let config = self.config; - let data = self.data.clone(); - - let stream = self.into_stream().frame_decoded(config); - #[futures_async_stream::for_await] - for res in stream { - let body = match res.unwrap() { - crate::packet::connected::FrameBody::User(body) => body, - _ => unreachable!("unexpected decoded result"), - }; - assert_eq!(body.chunk(), data.chunk()); + if self.dup_ratio > 0. && rng.gen_ratio((self.dup_ratio * 100.0) as u32, 100) { + sets.push(FrameSet { + seq_num: sets.len().into(), + set: set.clone().take().unwrap(), + }); + } + sets.push(FrameSet { + seq_num: sets.len().into(), + set: set.take().unwrap(), + }); + } + if let Some(set) = set { + sets.push(FrameSet { + seq_num: sets.len().into(), + set, + }); } - } - #[allow(clippy::semicolon_if_nothing_returned)] - pub async fn bench_decoded(self) { - let config = self.config; - let stream = self.into_stream().frame_decoded(config); - #[futures_async_stream::for_await] - for _r in stream {} - } + let len = sets.len(); + if self.shuffle_ratio > 0. { + sets.partial_shuffle(&mut rng, (len as f32 * self.shuffle_ratio) as usize); + } - fn into_stream(mut self) -> impl Stream> { #[futures_async_stream::stream] async move { - while let Some(frame_set) = self.frame_sets.pop() { + let mut sets = VecDeque::from(sets); + while let Some(frame_set) = sets.pop_front() { yield frame_set; } } } + + /// Run/Test codec benchmarks + #[allow(unused_variables)] // conditional compilation + #[allow(unused_mut)] + #[allow(clippy::missing_panics_doc)] + pub async fn run_bench(self) { + let mut len = self.datagrams.len(); + let mut datagrams = if cfg!(test) { + Some(VecDeque::from(self.datagrams.clone())) + } else { + None + }; + tokio::pin! { + let decoding = self.gen_inputs().frame_decoded(Config::default()); + } + while let Some(r) = decoding.next().await { + assert!(r.is_ok()); + len -= 1; + #[cfg(test)] + { + let body = match r.unwrap() { + crate::packet::connected::FrameBody::User(body) => body, + _ => unreachable!("unexpected decoded result"), + }; + log::debug!("decoded: {:?}", body); + assert_eq!(body, datagrams.as_mut().unwrap().pop_front().unwrap()); + } + } + assert_eq!(len, 0); + } + + pub fn bytes(&self) -> u64 { + self.datagrams.iter().map(|b| b.len() as u64).sum() + } + + pub fn elements(&self) -> u64 { + self.datagrams.len() as u64 + } } #[cfg(test)] #[tokio::test] async fn test_bench() { - let opts = Options { - frame_per_set: 8, - frame_set_cnt: 100, - duplicated_ratio: 0.1, - unordered: true, - parted_size: 4, - shuffle: true, + use crate::utils::tests::test_trace_log_setup; + + let _guard = test_trace_log_setup(); + let opts = BenchOpts { + datagrams: vec![ + Bytes::from_static(b"hello"), + Bytes::from_static(b"world"), + Bytes::from_static(b"!"), + ], seed: 114514, - data: BytesMut::from_iter(b"1145141919810"), + dup_ratio: 0.6, + shuffle_ratio: 0.6, + mtu: 30, }; - assert_eq!(opts.input_data_size(), 8 * 100 / 4 * "1145141919810".len()); - let bench = MicroBench::new(opts); - bench.bench_decoded_checked().await; + assert_eq!(opts.bytes(), 11); + assert_eq!(opts.elements(), 3); + opts.run_bench().await; } } diff --git a/src/lib.rs b/src/lib.rs index 69af9ba..a864aa0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ #![feature(let_chains)] #![feature(context_ext)] #![feature(local_waker)] +#![feature(option_get_or_insert_default)] /// Protocol codec mod codec;