diff --git a/runners/s3-benchrunner-rust/Cargo.lock b/runners/s3-benchrunner-rust/Cargo.lock index 166ad98..a8e3689 100644 --- a/runners/s3-benchrunner-rust/Cargo.lock +++ b/runners/s3-benchrunner-rust/Cargo.lock @@ -114,6 +114,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.83" @@ -234,7 +240,7 @@ dependencies = [ [[package]] name = "aws-s3-transfer-manager" version = "0.1.0" -source = "git+https://github.com/awslabs/aws-s3-transfer-manager-rs.git?rev=790ead476a104cf0b66fdd00b5b9c3636321b244#790ead476a104cf0b66fdd00b5b9c3636321b244" +source = "git+https://github.com/awslabs/aws-s3-transfer-manager-rs.git?rev=06c087a5d53676bb048f6c512b8eb1fda63f03d5#06c087a5d53676bb048f6c512b8eb1fda63f03d5" dependencies = [ "async-channel", "async-trait", @@ -245,13 +251,16 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", + "blocking", "bytes", + "bytes-utils", "futures-util", "path-clean", "pin-project-lite", "tokio", "tower 0.5.1", "tracing", + "walkdir", ] [[package]] @@ -664,6 +673,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -1095,6 +1117,22 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-lite" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cef40d21ae2c515b51041df9ed313ed21e572df340ea58a922a0aefe7e8891a1" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -1940,6 +1978,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -2254,6 +2303,15 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.27" @@ -2851,6 +2909,16 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -2959,6 +3027,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/runners/s3-benchrunner-rust/Cargo.toml b/runners/s3-benchrunner-rust/Cargo.toml index 84e16b2..f4e70b3 100644 --- a/runners/s3-benchrunner-rust/Cargo.toml +++ b/runners/s3-benchrunner-rust/Cargo.toml @@ -6,12 +6,15 @@ edition = "2021" [dependencies] # Swap which line is commented-out to use GitHub or local aws-s3-transfer-manager -aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "790ead476a104cf0b66fdd00b5b9c3636321b244" } +aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "06c087a5d53676bb048f6c512b8eb1fda63f03d5" } # aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" } tracing-opentelemetry = "0.27" opentelemetry = { version = "0.26", features = ["trace"] } -opentelemetry_sdk = { version = "0.26", default-features = false, features = ["trace", "rt-tokio"] } +opentelemetry_sdk = { version = "0.26", default-features = false, features = [ + "trace", + "rt-tokio", +] } opentelemetry-stdout = { version = "0.26", features = ["trace"] } opentelemetry-semantic-conventions = "0.26" diff --git a/runners/s3-benchrunner-rust/src/lib.rs b/runners/s3-benchrunner-rust/src/lib.rs index 37115ff..e3d4c1a 100644 --- a/runners/s3-benchrunner-rust/src/lib.rs +++ b/runners/s3-benchrunner-rust/src/lib.rs @@ -32,6 +32,7 @@ pub struct BenchmarkConfig { pub bucket: String, pub region: String, pub target_throughput_gigabits_per_sec: f64, + pub disable_directory: bool, } /// From the workload's JSON file @@ -79,6 +80,7 @@ impl BenchmarkConfig { bucket: &str, region: &str, target_throughput_gigabits_per_sec: f64, + disable_directory: bool, ) -> Result { let json_file = File::open(workload_path) .with_context(|| format!("Failed opening '{workload_path}'"))?; @@ -110,6 +112,7 @@ impl BenchmarkConfig { bucket: bucket.to_string(), region: region.to_string(), target_throughput_gigabits_per_sec, + disable_directory, }) } } @@ -135,7 +138,7 @@ pub fn prepare_run(workload: &WorkloadConfig) -> Result<()> { } else if let Some(dir) = filepath.parent() { // create directory if necessary if !dir.exists() { - std::fs::create_dir(dir) + std::fs::create_dir_all(dir) .with_context(|| format!("failed creating directory: {dir:?}"))?; } } diff --git a/runners/s3-benchrunner-rust/src/main.rs b/runners/s3-benchrunner-rust/src/main.rs index f382a44..67cd02b 100644 --- a/runners/s3-benchrunner-rust/src/main.rs +++ b/runners/s3-benchrunner-rust/src/main.rs @@ -22,6 +22,11 @@ struct Args { target_throughput: f64, #[arg(long, help = "Emit telemetry via OTLP/gRPC to http://localhost:4317")] telemetry: bool, + #[arg( + long, + help = "Instead of using 1 upload_objects()/download_objects() call for multiple files on disk, use N upload()/download() calls." + )] + disable_directory: bool, } #[derive(ValueEnum, Clone, Debug)] @@ -120,8 +125,8 @@ async fn new_runner(args: &Args) -> Result> { &args.bucket, &args.region, args.target_throughput, + args.disable_directory, )?; - match args.s3_client { S3ClientId::TransferManager => { let transfer_manager = TransferManagerRunner::new(config).await; diff --git a/runners/s3-benchrunner-rust/src/transfer_manager.rs b/runners/s3-benchrunner-rust/src/transfer_manager.rs index 95052e9..180a427 100644 --- a/runners/s3-benchrunner-rust/src/transfer_manager.rs +++ b/runners/s3-benchrunner-rust/src/transfer_manager.rs @@ -1,4 +1,4 @@ -use std::{cmp::min, sync::Arc}; +use std::{cmp::min, path::PathBuf, sync::Arc}; use anyhow::Context; use async_trait::async_trait; @@ -26,6 +26,7 @@ struct Handle { config: BenchmarkConfig, transfer_manager: aws_s3_transfer_manager::Client, random_data_for_upload: Bytes, + transfer_path: Option, } impl TransferManagerRunner { @@ -57,12 +58,13 @@ impl TransferManagerRunner { .await; let transfer_manager = aws_s3_transfer_manager::Client::new(tm_config); - + let transfer_path = find_common_parent_dir(&config); TransferManagerRunner { handle: Arc::new(Handle { config, transfer_manager, random_data_for_upload, + transfer_path, }), } } @@ -87,6 +89,37 @@ impl TransferManagerRunner { } } } + async fn download_objects(&self) -> Result<()> { + let path = self.handle.transfer_path.as_ref().unwrap(); + let dest = PathBuf::from(path); + + let download_objects_handle = self + .handle + .transfer_manager + .download_objects() + .bucket(&self.config().bucket) + .key_prefix(path) + .destination(&dest) + .send() + .await?; + download_objects_handle.join().await?; + Ok(()) + } + + async fn upload_objects(&self) -> Result<()> { + let path = self.handle.transfer_path.as_ref().unwrap(); + let upload_objects_handle = self + .handle + .transfer_manager + .upload_objects() + .bucket(&self.config().bucket) + .key_prefix(path) + .source(path) + .send() + .await?; + upload_objects_handle.join().await?; + Ok(()) + } async fn download(&self, task_config: &TaskConfig) -> Result<()> { let key = &task_config.key; @@ -97,8 +130,7 @@ impl TransferManagerRunner { .download() .bucket(&self.config().bucket) .key(key) - .send() - .await + .initiate() .with_context(|| format!("failed starting download: {key}"))?; // if files_on_disk: open file for writing @@ -120,8 +152,9 @@ impl TransferManagerRunner { .instrument(info_span!("next-chunk", seq, offset = total_size)) .await { - let mut chunk = + let output = chunk_result.with_context(|| format!("failed downloading next chunk of: {key}"))?; + let mut chunk = output.data; let chunk_size = chunk.remaining(); total_size += chunk_size as u64; @@ -175,20 +208,44 @@ impl TransferManagerRunner { #[async_trait] impl RunBenchmark for TransferManagerRunner { async fn run(&self) -> Result<()> { - // Spawn concurrent tasks for all uploads/downloads. - // We want the benchmark to fail fast if anything goes wrong, - // so we're using a JoinSet. - let mut task_set: JoinSet> = JoinSet::new(); - for i in 0..self.config().workload.tasks.len() { - let task = self.clone().run_task(i); - task_set.spawn(task.instrument(tracing::Span::current())); - } + let workload_config = &self.config().workload; - while let Some(join_result) = task_set.join_next().await { - let task_result = join_result.unwrap(); - task_result?; + if workload_config.checksum.is_some() { + return Err(SkipBenchmarkError("checksums not yet implemented".to_string()).into()); + } + match &self.handle.transfer_path { + Some(transfer_path) => { + // Use the objects API to download/upload directory directly + match workload_config.tasks[0].action { + TaskAction::Download => { + self.download_objects() + .instrument(info_span!("download-directory", directory = transfer_path)) + .await? + } + TaskAction::Upload => { + self.upload_objects() + .instrument(info_span!("upload-directory", directory = transfer_path)) + .await? + } + } + } + None => { + // Spawn concurrent tasks for all uploads/downloads. + // We want the benchmark to fail fast if anything goes wrong, + // so we're using a JoinSet. + let mut task_set: JoinSet> = JoinSet::new(); + // Iterate through all the tasks to download/upload each object. + for i in 0..workload_config.tasks.len() { + let task = self.clone().run_task(i); + task_set.spawn(task.instrument(tracing::Span::current())); + } + + while let Some(join_result) = task_set.join_next().await { + let task_result = join_result.unwrap(); + task_result?; + } + } } - Ok(()) } @@ -206,6 +263,33 @@ fn calculate_concurrency(target_throughput_gigabits_per_sec: f64) -> usize { (concurrency as usize).max(10) } +/// Find the common parent directory for all tasks. +/// Returns None if we shouldn't be doing upload/download on a whole directory. +fn find_common_parent_dir(config: &BenchmarkConfig) -> Option { + if config.workload.files_on_disk && !config.disable_directory && config.workload.tasks.len() > 1 + { + let first_task = &config.workload.tasks[0]; + + // Find the common parents directory for all the tasks. + // If there is no common parent, we can't use the same directory for downloads. + let mut common_root = std::path::Path::new(&first_task.key).parent()?; + for task in &config.workload.tasks { + let task_path = std::path::Path::new(&task.key); + common_root = common_root.ancestors().find(|ancestor| { + task_path + .ancestors() + .any(|task_ancestor| task_ancestor == *ancestor) + })?; + if task.action != first_task.action { + panic!("Can't use directory for both download and upload"); + } + } + Some(common_root.to_str()?.to_string()) + } else { + None + } +} + // Quickly generate a buffer of random data. // This is fancy because a naive approach can add MINUTES to each debug run, // and we want devs to iterate quickly.