Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re: Write URL of the succeed requests to the sqlite database. #606 #612

Merged
merged 6 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ rustls-pki-types = { version = "1.7.0", optional = true }

base64 = "0.22.1"
rand = "0.8"
rand_core = "0.6.4"
hickory-resolver = "0.24.1"
rand_regex = "0.17.0"
regex-syntax = "0.8.4"
Expand Down
49 changes: 24 additions & 25 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use hyper::http;
use hyper_util::rt::{TokioExecutor, TokioIo};
use rand::prelude::*;
use std::{
borrow::Cow,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc,
Expand All @@ -14,6 +15,7 @@ use tokio::net::TcpStream;
use url::{ParseError, Url};

use crate::{
pcg64si::Pcg64Si,
url_generator::{UrlGenerator, UrlGeneratorError},
ConnectToEntry,
};
Expand All @@ -30,6 +32,7 @@ pub struct ConnectionTime {
#[derive(Debug, Clone)]
/// a result for a request
pub struct RequestResult {
pub rng: Pcg64Si,
// When the query should started
pub start_latency_correction: Option<std::time::Instant>,
/// When the query started
Expand Down Expand Up @@ -177,28 +180,28 @@ pub struct Client {
}

struct ClientStateHttp1 {
rng: StdRng,
rng: Pcg64Si,
send_request: Option<SendRequestHttp1>,
}

impl Default for ClientStateHttp1 {
fn default() -> Self {
Self {
rng: StdRng::from_entropy(),
rng: SeedableRng::from_entropy(),
send_request: None,
}
}
}

struct ClientStateHttp2 {
rng: StdRng,
rng: Pcg64Si,
send_request: SendRequestHttp2,
}

impl Clone for ClientStateHttp2 {
fn clone(&self) -> Self {
Self {
rng: StdRng::from_entropy(),
rng: SeedableRng::from_entropy(),
send_request: self.send_request.clone(),
}
}
Expand Down Expand Up @@ -315,6 +318,11 @@ impl Client {
Ok(())
}

pub fn generate_url(&self, rng: &mut Pcg64Si) -> Result<(Cow<Url>, Pcg64Si), ClientError> {
let snapshot = *rng;
Ok((self.url_generator.generate(rng)?, snapshot))
}

async fn client<R: Rng>(
&self,
url: &Url,
Expand Down Expand Up @@ -467,7 +475,7 @@ impl Client {
client_state: &mut ClientStateHttp1,
) -> Result<RequestResult, ClientError> {
let do_req = async {
let url = self.url_generator.generate(&mut client_state.rng)?;
let (url, rng) = self.generate_url(&mut client_state.rng)?;
let mut start = std::time::Instant::now();
let mut connection_time: Option<ConnectionTime> = None;

Expand Down Expand Up @@ -523,6 +531,7 @@ impl Client {
let end = std::time::Instant::now();

let result = RequestResult {
rng,
start_latency_correction: None,
start,
end,
Expand Down Expand Up @@ -573,7 +582,7 @@ impl Client {
client_state: &mut ClientStateHttp2,
) -> Result<RequestResult, ClientError> {
let do_req = async {
let url = self.url_generator.generate(&mut client_state.rng)?;
let (url, rng) = self.generate_url(&mut client_state.rng)?;
let start = std::time::Instant::now();
let connection_time: Option<ConnectionTime> = None;

Expand All @@ -591,6 +600,7 @@ impl Client {
let end = std::time::Instant::now();

let result = RequestResult {
rng,
start_latency_correction: None,
start,
end,
Expand Down Expand Up @@ -760,7 +770,7 @@ fn is_hyper_error(res: &Result<RequestResult, ClientError>) -> bool {
}

async fn setup_http2(client: &Client) -> Result<(ConnectionTime, ClientStateHttp2), ClientError> {
let mut rng = StdRng::from_entropy();
let mut rng = SeedableRng::from_entropy();
let url = client.url_generator.generate(&mut rng)?;
let (connection_time, send_request) = client.connect_http2(&url, &mut rng).await?;

Expand Down Expand Up @@ -804,7 +814,7 @@ fn set_start_latency_correction<E>(

/// Run n tasks by m workers
pub async fn work_debug(
client: Client,
client: Arc<Client>,
_report_tx: flume::Sender<Result<RequestResult, ClientError>>,
) -> Result<(), ClientError> {
let mut rng = StdRng::from_entropy();
Expand Down Expand Up @@ -836,7 +846,7 @@ pub async fn work_debug(

/// Run n tasks by m workers
pub async fn work(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
n_tasks: usize,
n_connections: usize,
Expand All @@ -845,8 +855,6 @@ pub async fn work(
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = Arc::new(AtomicUsize::new(0));

let client = Arc::new(client);

if client.is_http2() {
let futures = (0..n_connections)
.map(|_| {
Expand Down Expand Up @@ -947,7 +955,7 @@ pub async fn work(

/// n tasks by m workers limit to qps works in a second
pub async fn work_with_qps(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
n_tasks: usize,
Expand Down Expand Up @@ -992,8 +1000,6 @@ pub async fn work_with_qps(
Ok::<(), flume::SendError<_>>(())
};

let client = Arc::new(client);

if client.is_http2() {
let futures = (0..n_connections)
.map(|_| {
Expand Down Expand Up @@ -1094,7 +1100,7 @@ pub async fn work_with_qps(

/// n tasks by m workers limit to qps works in a second with latency correction
pub async fn work_with_qps_latency_correction(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
n_tasks: usize,
Expand Down Expand Up @@ -1142,8 +1148,6 @@ pub async fn work_with_qps_latency_correction(
Ok::<(), flume::SendError<_>>(())
};

let client = Arc::new(client);

if client.is_http2() {
let futures = (0..n_connections)
.map(|_| {
Expand Down Expand Up @@ -1245,14 +1249,13 @@ pub async fn work_with_qps_latency_correction(

/// Run until dead_line by n workers
pub async fn work_until(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
dead_line: std::time::Instant,
n_connections: usize,
n_http2_parallel: usize,
wait_ongoing_requests_after_deadline: bool,
) {
let client = Arc::new(client);
if client.is_http2() {
// Using semaphore to control the deadline
// Maybe there is a better concurrent primitive to do this
Expand Down Expand Up @@ -1387,7 +1390,7 @@ pub async fn work_until(
/// Run until dead_line by n workers limit to qps works in a second
#[allow(clippy::too_many_arguments)]
pub async fn work_until_with_qps(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
start: std::time::Instant,
Expand Down Expand Up @@ -1434,8 +1437,6 @@ pub async fn work_until_with_qps(
}
};

let client = Arc::new(client);

if client.is_http2() {
let s = Arc::new(tokio::sync::Semaphore::new(0));

Expand Down Expand Up @@ -1572,7 +1573,7 @@ pub async fn work_until_with_qps(
/// Run until dead_line by n workers limit to qps works in a second with latency correction
#[allow(clippy::too_many_arguments)]
pub async fn work_until_with_qps_latency_correction(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
start: std::time::Instant,
Expand Down Expand Up @@ -1618,8 +1619,6 @@ pub async fn work_until_with_qps_latency_correction(
}
};

let client = Arc::new(client);

if client.is_http2() {
let s = Arc::new(tokio::sync::Semaphore::new(0));

Expand Down
46 changes: 43 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use rusqlite::Connection;

use crate::client::RequestResult;
use crate::client::{Client, RequestResult};

fn create_db(conn: &Connection) -> Result<usize, rusqlite::Error> {
conn.execute(
"CREATE TABLE oha (
url TEXT NOT NULL,
start REAL NOT NULL,
start_latency_correction REAL,
end REAL NOT NULL,
Expand All @@ -17,6 +18,7 @@ fn create_db(conn: &Connection) -> Result<usize, rusqlite::Error> {
}

pub fn store(
client: &Client,
db_url: &str,
start: std::time::Instant,
request_records: &[RequestResult],
Expand All @@ -28,9 +30,11 @@ pub fn store(
let mut affected_rows = 0;

for request in request_records {
let url = client.generate_url(&mut request.rng.clone()).unwrap().0;
affected_rows += t.execute(
"INSERT INTO oha (start, start_latency_correction, end, duration, status, len_bytes) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
"INSERT INTO oha (url, start, start_latency_correction, end, duration, status, len_bytes) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
(
url.to_string(),
(request.start - start).as_secs_f64(),
request.start_latency_correction.map(|d| (d - start).as_secs_f64()),
(request.end - start).as_secs_f64(),
Expand All @@ -48,12 +52,18 @@ pub fn store(

#[cfg(test)]
mod test_db {
use hyper::{HeaderMap, Method, Version};
use rand::SeedableRng;

use crate::{client::Dns, url_generator::UrlGenerator};

use super::*;

#[test]
fn test_store() {
let start = std::time::Instant::now();
let test_val = RequestResult {
rng: SeedableRng::seed_from_u64(0),
status: hyper::StatusCode::OK,
len_bytes: 100,
start_latency_correction: None,
Expand All @@ -62,7 +72,37 @@ mod test_db {
end: std::time::Instant::now(),
};
let test_vec = vec![test_val.clone(), test_val.clone()];
let result = store(":memory:", start, &test_vec);
let client = Client {
http_version: Version::HTTP_11,
url_generator: UrlGenerator::new_static("http://example.com".parse().unwrap()),
method: Method::GET,
headers: HeaderMap::new(),
body: None,
dns: Dns {
resolver: hickory_resolver::AsyncResolver::tokio_from_system_conf().unwrap(),
connect_to: Vec::new(),
},
timeout: None,
redirect_limit: 0,
disable_keepalive: false,
insecure: false,
#[cfg(unix)]
unix_socket: None,
#[cfg(feature = "vsock")]
vsock_addr: None,
#[cfg(feature = "rustls")]
// Cache rustls_native_certs::load_native_certs() because it's expensive.
root_cert_store: {
let mut root_cert_store = rustls::RootCertStore::empty();
for cert in
rustls_native_certs::load_native_certs().expect("could not load platform certs")
{
root_cert_store.add(cert).unwrap();
}
std::sync::Arc::new(root_cert_store)
},
};
let result = store(&client, ":memory:", start, &test_vec);
assert_eq!(result.unwrap(), 2);
}
}
Loading