From b3633a4c2537f0a53ab2541b71d589e9d2903438 Mon Sep 17 00:00:00 2001 From: Rafael RL Date: Tue, 24 Sep 2024 18:33:25 +0200 Subject: [PATCH] More meaningfull global output --- .vscode/launch.json | 45 +++ README.md | 4 +- .../data_group_1.json | 0 .../data_group_10.json | 0 .../data_group_2.json | 0 .../data_group_3.json | 0 src/main.rs | 16 +- src/measure.rs | 56 ++-- src/utils.rs | 288 ++++++++++++------ 9 files changed, 269 insertions(+), 140 deletions(-) create mode 100644 .vscode/launch.json rename configs/config_group_1.json => data/data_group_1.json (100%) rename configs/config_group_10.json => data/data_group_10.json (100%) rename configs/config_group_2.json => data/data_group_2.json (100%) rename configs/config_group_3.json => data/data_group_3.json (100%) diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..cb4b2a1 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,45 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "Debug executable 'databroker-perf'", + "cargo": { + "args": [ + "build", + "--bin=databroker-perf", + "--package=databroker-perf" + ], + "filter": { + "name": "databroker-perf", + "kind": "bin" + } + }, + "args": ["--api", "kuksa.val.v2", "--run-seconds", "5", "--skip-seconds", "2", "--config", "configs/config_group_10.json"], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in executable 'databroker-perf'", + "cargo": { + "args": [ + "test", + "--no-run", + "--bin=databroker-perf", + "--package=databroker-perf" + ], + "filter": { + "name": "databroker-perf", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + } + ] +} \ No newline at end of file diff --git a/README.md b/README.md index 0e34002..310dc7d 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,7 @@ Options: --host Host address of databroker [default: http://127.0.0.1] --port Port of databroker [default: 55555] --skip-seconds Seconds to run (skip) before measuring the latency [default: 4] - --detail-output Print more details in the summary result + --detailed-output Print more details in the summary result --config Path to configuration file --run-forever Run the measurements forever (until receiving a shutdown signal) -v, --verbosity Verbosity level. Can be one of ERROR, WARN, INFO, DEBUG, TRACE [default: WARN] @@ -134,7 +134,7 @@ Group: Frame C | Cycle time(ms): 30 For a detailed output of the results, please enable the corresponding flag like: ``` -./target/release/databroker-perf --detail-output +./target/release/databroker-perf --detailed-output ``` ## Group config file diff --git a/configs/config_group_1.json b/data/data_group_1.json similarity index 100% rename from configs/config_group_1.json rename to data/data_group_1.json diff --git a/configs/config_group_10.json b/data/data_group_10.json similarity index 100% rename from configs/config_group_10.json rename to data/data_group_10.json diff --git a/configs/config_group_2.json b/data/data_group_2.json similarity index 100% rename from configs/config_group_2.json rename to data/data_group_2.json diff --git a/configs/config_group_3.json b/data/data_group_3.json similarity index 100% rename from configs/config_group_3.json rename to data/data_group_3.json diff --git a/src/main.rs b/src/main.rs index 0125b08..d3f224f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -55,7 +55,7 @@ struct Args { #[clap( long, display_order = 5, - value_name = "ITERATIONS", + value_name = "RUN_SECONDS", default_value_t = 4 )] skip_seconds: u64, @@ -67,18 +67,18 @@ struct Args { value_name = "Detailed ouput result", default_value_t = false )] - detail_output: bool, + detailed_output: bool, - /// Path to configuration file - #[clap(long = "config", display_order = 7, value_name = "FILE")] - config_file: Option, + /// Path to test data file + #[clap(long = "test-data-file", display_order = 7, value_name = "FILE")] + test_data_file: Option, /// Run the measurements forever (until receiving a shutdown signal). #[clap( long, action = clap::ArgAction::SetTrue, display_order = 8, - conflicts_with = "seconds", + conflicts_with = "run_seconds", default_value_t = false )] run_forever: bool, @@ -117,7 +117,7 @@ async fn main() -> Result<()> { api = Api::KuksaValV2; } - let config_groups = read_config(args.config_file.as_ref())?; + let config_groups = read_config(args.test_data_file.as_ref())?; // Skip at most _iterations_ number of iterations let skip_seconds = max(0, min(args.run_seconds, args.skip_seconds)); @@ -130,7 +130,7 @@ async fn main() -> Result<()> { skip_seconds, api, run_forever: args.run_forever, - detail_output: args.detail_output, + detailed_output: args.detailed_output, }; perform_measurement(measurement_config, config_groups, shutdown_handler).await?; diff --git a/src/measure.rs b/src/measure.rs index 4642c0c..3e529f0 100644 --- a/src/measure.rs +++ b/src/measure.rs @@ -20,7 +20,7 @@ use crate::config::{Group, Signal}; use crate::shutdown::ShutdownHandler; use crate::subscriber::{self, Subscriber}; -use crate::utils::write_output; +use crate::utils::{write_global_output, write_output}; use anyhow::{Context, Result}; use hdrhistogram::Histogram; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; @@ -55,7 +55,7 @@ pub struct MeasurementConfig { pub skip_seconds: u64, pub api: Api, pub run_forever: bool, - pub detail_output: bool, + pub detailed_output: bool, } pub struct MeasurementContext { @@ -73,7 +73,7 @@ pub struct MeasurementContext { pub struct MeasurementResult { pub measurement_context: MeasurementContext, pub iterations_executed: u64, - pub iterations_skipped: u64, + pub signals_skipped: u64, pub start_time: SystemTime, } @@ -111,12 +111,14 @@ fn create_databroker_endpoint(host: String, port: u64) -> Result { let endpoint = tonic::transport::Channel::from_shared(databroker_address.clone()) .with_context(|| "Failed to parse server url")?; - let endpoint = endpoint - .initial_stream_window_size(1000 * 3 * 128 * 1024) // 20 MB stream window size - .initial_connection_window_size(1000 * 3 * 128 * 1024) // 20 MB connection window size - .keep_alive_timeout(Duration::from_secs(1)) // 60 seconds keepalive time - .keep_alive_timeout(Duration::from_secs(1)) // 20 seconds keepalive timeout - .timeout(Duration::from_secs(1)); + + // Leave out for now until we decide what and how to configure it + // let endpoint = endpoint + // .initial_stream_window_size(1000 * 3 * 128 * 1024) // 20 MB stream window size + // .initial_connection_window_size(1000 * 3 * 128 * 1024) // 20 MB connection window size + // .keep_alive_timeout(Duration::from_secs(1)) // 60 seconds keepalive time + // .keep_alive_timeout(Duration::from_secs(1)) // 20 seconds keepalive timeout + // .timeout(Duration::from_secs(1)); Ok(endpoint) } @@ -226,7 +228,7 @@ pub async fn perform_measurement( }; tasks.spawn(async move { - let (iterations_executed, iterations_skipped) = + let (iterations_executed, signals_skipped) = measurement_loop(&mut measurement_context).await.unwrap(); measurement_context.progress.finish(); @@ -234,7 +236,7 @@ pub async fn perform_measurement( Ok(MeasurementResult { measurement_context, iterations_executed, - iterations_skipped, + signals_skipped, start_time, }) }); @@ -267,27 +269,17 @@ pub async fn perform_measurement( } } - print!("\n\nSummary:"); - print!("\n API: {}", measurement_config.api); - if measurement_config.run_forever { - print!("\n Run forever: Activated"); - } else { - print!( - "\n Run seconds: {}", - measurement_config.run_seconds - ); - } - print!( - "\n Skipped run seconds: {}", - measurement_config.skip_seconds - ); - - for group in config_groups { - let measurement_result = measurements_results - .iter() - .find(|result| result.measurement_context.group_name == group.group_name) - .unwrap(); - write_output(measurement_result).unwrap(); + let _ = write_global_output(&measurement_config, &measurements_results); + + if measurement_config.detailed_output { + for group in config_groups { + let measurement_result = measurements_results + .iter() + .find(|result| result.measurement_context.group_name == group.group_name) + .unwrap(); + + write_output(measurement_result).unwrap(); + } } Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index 9faa2c3..11a9839 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -13,13 +13,19 @@ use anyhow::{anyhow, Context, Ok, Result}; use console::Term; +use hdrhistogram::Histogram; use log::debug; use serde_json::from_reader; -use std::{cmp::max, fs::OpenOptions, io::Write, time::SystemTime}; +use std::{ + cmp::max, + fs::OpenOptions, + io::Write, + time::{Duration, SystemTime}, +}; use crate::{ config::{Config, Group, Signal}, - measure::MeasurementResult, + measure::{MeasurementConfig, MeasurementResult}, }; const MAX_NUMBER_OF_GROUPS: usize = 10; @@ -72,124 +78,210 @@ pub fn read_config(config_file: Option<&String>) -> Result> { } } -pub fn write_output(measurement_result: &MeasurementResult) -> Result<()> { - let mut stdout = Term::stdout(); - let end_time = SystemTime::now(); - let total_duration = end_time.duration_since(measurement_result.start_time)?; - let measurement_context = &measurement_result.measurement_context; - let measurement_config = &measurement_context.measurement_config; +fn print_latency_histogram( + stdout: &mut Term, + histogram: &Histogram, + signal_len: u64, +) -> Result<()> { + let step_size = max(1, (histogram.max() - histogram.min()) / 11); - writeln!( - stdout, - "\n\nGroup: {} | Cycle time(ms): {}", - measurement_context.group_name, measurement_config.interval - )?; + let buckets = histogram.iter_linear(step_size); + + // skip initial empty buckets + let buckets = buckets.skip_while(|v| v.count_since_last_iteration() == 0); + + let mut histogram = Vec::with_capacity(11); - if !measurement_config.detail_output { + for v in buckets { + let mean = v.value_iterated_to() + 1 - step_size / 2; // +1 to make range inclusive + let count = v.count_since_last_iteration(); + histogram.push((mean, count)); + } + + let (_, cols) = stdout.size(); + debug!("Number of columns: {cols}"); + + let histogram_len = histogram.len() as u64; + for (mean, count) in histogram { + let bars = count as f64 / (histogram_len * signal_len) as f64 * (cols - 22) as f64; + let bar = "∎".repeat(bars as usize); writeln!( stdout, - " Average: {:>7.3} ms", - measurement_context.hist.mean() / 1000.0 + " {:>7.3} ms [{:<5}] |{}", + mean as f64 / 1000.0, + count, + bar )?; + } + Ok(()) +} + +fn print_latency_distribution(stdout: &mut Term, histogram: &Histogram) -> Result<()> { + for q in &[10, 25, 50, 75, 90, 95, 99] { writeln!( stdout, - " 95% in under {:.3} ms", - measurement_context.hist.value_at_quantile(95_f64 / 100.0) as f64 / 1000.0 + " {q}% in under {:.3} ms", + histogram.value_at_quantile(*q as f64 / 100.0) as f64 / 1000.0 )?; + } + Ok(()) +} + +pub fn write_global_output( + measurement_config: &MeasurementConfig, + measurement_results: &Vec, +) -> Result<()> { + let mut stdout = Term::stdout(); + let end_time = SystemTime::now(); + + let mut global_end_time = Duration::default(); + + let mut global_hist = Histogram::::new_with_bounds(1, 60 * 60 * 1000 * 1000, 3)?; + + let mut global_signals_len = 0; + + let mut global_signals_sent = 0; + let mut global_signals_skipped = 0; + + for result in measurement_results { + global_hist += result.measurement_context.hist.clone(); + global_signals_len += result.measurement_context.signals.len(); + global_signals_sent += + result.iterations_executed * result.measurement_context.signals.len() as u64; + global_signals_skipped += result.signals_skipped; + global_end_time += end_time.duration_since(result.start_time)?; + } + + global_end_time /= measurement_results.len() as u32; + + writeln!(stdout, "\n\nGlobal Summary:")?; + writeln!(stdout, " API: {}", measurement_config.api)?; + + if measurement_config.run_forever { + writeln!(stdout, " Run forever: Activated")?; + writeln!(stdout, " Run seconds: {}", global_end_time.as_secs())?; } else { + writeln!(stdout, " Run seconds: {}", measurement_config.run_seconds)?; + } + + writeln!( + stdout, + " Skipped run seconds: {}", + measurement_config.skip_seconds + )?; + writeln!(stdout, " Total signals: {} signals", global_signals_len,)?; + writeln!(stdout, " Sent: {} signal updates", global_signals_sent,)?; + writeln!( + stdout, + " Skipped: {} signal updates", + global_signals_skipped + )?; + writeln!(stdout, " Received: {} signal updates", global_hist.len())?; + + if measurement_config.run_forever { writeln!( stdout, - " Elapsed time: {:.2} s", - total_duration.as_millis() as f64 / 1000.0 - )?; - let rate_limit = match measurement_config.interval { - 0 => "None".into(), - ms => format!("{} ms between iterations", ms), - }; - writeln!(stdout, " Rate limit: {}", rate_limit)?; - writeln!( - stdout, - " Sent: {} iterations * {} signals = {} updates", - measurement_result.iterations_executed, - measurement_context.signals.len(), - measurement_result.iterations_executed * measurement_context.signals.len() as u64 - )?; - writeln!( - stdout, - " Skipped: {} updates", - measurement_result.iterations_skipped - )?; - writeln!( - stdout, - " Received: {} updates", - measurement_context.hist.len() - )?; - writeln!( - stdout, - " Fastest: {:>7.3} ms", - measurement_context.hist.min() as f64 / 1000.0 - )?; - writeln!( - stdout, - " Slowest: {:>7.3} ms", - measurement_context.hist.max() as f64 / 1000.0 + " Signal/Second: {} signal/s", + global_hist.len() / (global_end_time.as_secs() - measurement_config.skip_seconds) )?; + } else { writeln!( stdout, - " Average: {:>7.3} ms", - measurement_context.hist.mean() / 1000.0 + " Signal/Second: {} signal/s", + global_hist.len() / (measurement_config.run_seconds - measurement_config.skip_seconds) )?; + } - writeln!(stdout, "\nLatency histogram:")?; + writeln!( + stdout, + " Fastest: {:>7.3} ms", + global_hist.min() as f64 / 1000.0 + )?; + writeln!( + stdout, + " Slowest: {:>7.3} ms", + global_hist.max() as f64 / 1000.0 + )?; + writeln!(stdout, " Average: {:>7.3} ms", global_hist.mean() / 1000.0)?; - let step_size = max( - 1, - (measurement_context.hist.max() - measurement_context.hist.min()) / 11, - ); + writeln!(stdout, "\nLatency histogram:")?; + print_latency_histogram(stdout.by_ref(), &global_hist, global_hist.len()).unwrap(); - let buckets = measurement_context.hist.iter_linear(step_size); + writeln!(stdout, "\nLatency distribution:")?; + print_latency_distribution(stdout.by_ref(), &global_hist).unwrap(); + Ok(()) +} - // skip initial empty buckets - let buckets = buckets.skip_while(|v| v.count_since_last_iteration() == 0); +pub fn write_output(measurement_result: &MeasurementResult) -> Result<()> { + let mut stdout = Term::stdout(); + let end_time = SystemTime::now(); + let total_duration = end_time.duration_since(measurement_result.start_time)?; + let measurement_context = &measurement_result.measurement_context; + let measurement_config = &measurement_context.measurement_config; - let mut histogram = Vec::with_capacity(11); + writeln!( + stdout, + "\n\nGroup: {} | Cycle time(ms): {}", + measurement_context.group_name, measurement_config.interval + )?; + writeln!( + stdout, + " API: {}", + measurement_context.measurement_config.api + )?; + writeln!( + stdout, + " Elapsed time: {:.2} s", + total_duration.as_millis() as f64 / 1000.0 + )?; + let rate_limit = match measurement_config.interval { + 0 => "None".into(), + ms => format!("{} ms between iterations", ms), + }; + writeln!(stdout, " Rate limit: {}", rate_limit)?; + writeln!( + stdout, + " Sent: {} iterations * {} signals = {} updates", + measurement_result.iterations_executed, + measurement_context.signals.len(), + measurement_result.iterations_executed * measurement_context.signals.len() as u64 + )?; + writeln!( + stdout, + " Skipped: {} updates", + measurement_result.signals_skipped + )?; + writeln!( + stdout, + " Received: {} updates", + measurement_context.hist.len() + )?; + writeln!( + stdout, + " Fastest: {:>7.3} ms", + measurement_context.hist.min() as f64 / 1000.0 + )?; + writeln!( + stdout, + " Slowest: {:>7.3} ms", + measurement_context.hist.max() as f64 / 1000.0 + )?; + writeln!( + stdout, + " Average: {:>7.3} ms", + measurement_context.hist.mean() / 1000.0 + )?; - for v in buckets { - let mean = v.value_iterated_to() + 1 - step_size / 2; // +1 to make range inclusive - let count = v.count_since_last_iteration(); - histogram.push((mean, count)); - } + writeln!(stdout, "\nLatency histogram:")?; - let (_, cols) = stdout.size(); - debug!("Number of columns: {cols}"); - - for (mean, count) in histogram { - let bars = count as f64 - / (measurement_context.hist.len() * measurement_context.signals.len() as u64) - as f64 - * (cols - 22) as f64; - let bar = "∎".repeat(bars as usize); - writeln!( - stdout, - " {:>7.3} ms [{:<5}] |{}", - mean as f64 / 1000.0, - count, - bar - )?; - } + print_latency_histogram( + stdout.by_ref(), + &measurement_context.hist, + measurement_context.hist.len(), + ) + .unwrap(); - writeln!(stdout, "\nLatency distribution:")?; - - for q in &[10, 25, 50, 75, 90, 95, 99] { - writeln!( - stdout, - " {q}% in under {:.3} ms", - measurement_context - .hist - .value_at_quantile(*q as f64 / 100.0) as f64 - / 1000.0 - )?; - } - } + writeln!(stdout, "\nLatency distribution:")?; + print_latency_distribution(stdout.by_ref(), &measurement_context.hist).unwrap(); Ok(()) }