@@ -3,7 +3,7 @@ use std::pin::pin;
33use std:: sync:: Arc ;
44use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
55use std:: thread;
6- use std:: time:: Instant ;
6+ use std:: time:: { Duration , Instant } ;
77
88use clap:: { Parser , Subcommand } ;
99use futures:: StreamExt ;
@@ -34,25 +34,29 @@ fn init_tracing_subscriber() {
3434fn run_benchmark (
3535 client : impl ObjectClient + Clone + Send ,
3636 num_iterations : usize ,
37- num_downloads : usize ,
3837 bucket : & str ,
39- key : & str ,
38+ keys : & [ & str ] ,
4039 enable_backpressure : bool ,
4140 output_path : Option < & Path > ,
41+ max_duration : Option < Duration > ,
4242) {
4343 let mut total_bytes = 0 ;
4444 let total_start = Instant :: now ( ) ;
4545 let mut iter_results = Vec :: new ( ) ;
46+ let mut iteration = 0 ;
47+ let timeout: Instant = total_start
48+ . checked_add ( max_duration. unwrap_or ( Duration :: from_secs ( 86400 ) ) )
49+ . expect ( "Duration overflow error" ) ;
4650
47- for i in 0 .. num_iterations {
48- let start = Instant :: now ( ) ;
51+ while iteration < num_iterations && Instant :: now ( ) < timeout {
52+ let iter_start = Instant :: now ( ) ;
4953 let received_size = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
5054
5155 thread:: scope ( |scope| {
52- for _ in 0 ..num_downloads {
56+ for key in keys {
5357 let client = client. clone ( ) ;
5458 let received_size_clone = Arc :: clone ( & received_size) ;
55- scope. spawn ( || {
59+ scope. spawn ( move || {
5660 futures:: executor:: block_on ( async move {
5761 let mut received_obj_len = 0u64 ;
5862 let mut request = client
@@ -68,7 +72,7 @@ fn run_benchmark(
6872 }
6973
7074 let mut request = pin ! ( request) ;
71- loop {
75+ while Instant :: now ( ) < timeout {
7276 match request. next ( ) . await {
7377 Some ( Ok ( part) ) => {
7478 let part_len = part. data . len ( ) ;
@@ -107,32 +111,43 @@ fn run_benchmark(
107111 }
108112 } ) ;
109113
110- let elapsed = start . elapsed ( ) ;
114+ let elapsed = iter_start . elapsed ( ) ;
111115 let received_size = received_size. load ( Ordering :: SeqCst ) ;
112116 total_bytes += received_size;
113117 println ! (
114118 "{}: received {} bytes in {:.2}s: {:.2} Gib/s" ,
115- i ,
119+ iteration ,
116120 received_size,
117121 elapsed. as_secs_f64( ) ,
118122 ( received_size as f64 ) / elapsed. as_secs_f64( ) / ( 1024 * 1024 * 1024 / 8 ) as f64
119123 ) ;
120124
121125 iter_results. push ( json ! ( {
122- "iteration" : i ,
126+ "iteration" : iteration ,
123127 "bytes" : received_size,
124- "duration_seconds " : elapsed. as_secs_f64( ) ,
128+ "elapsed_seconds " : elapsed. as_secs_f64( ) ,
125129 } ) ) ;
130+
131+ iteration += 1 ;
126132 }
127133
128134 let total_elapsed = total_start. elapsed ( ) ;
135+ println ! (
136+ "Total: received {} bytes in {:.2}s across {} iterations: {:.2} Gib/s" ,
137+ total_bytes,
138+ total_elapsed. as_secs_f64( ) ,
139+ iter_results. len( ) ,
140+ ( total_bytes as f64 ) / total_elapsed. as_secs_f64( ) / ( 1024 * 1024 * 1024 / 8 ) as f64
141+ ) ;
142+
129143 if let Some ( output_path) = output_path {
130144 let ouput_file = std:: fs:: File :: create ( output_path) . expect ( "Failed to create output_file: {output_path}" ) ;
131145 let results = json ! ( {
132146 "summary" : {
133147 "total_bytes" : total_bytes,
134- "duration_seconds" : total_elapsed. as_secs_f64( ) ,
135- "iterations" : num_iterations,
148+ "total_elapsed_seconds" : total_elapsed. as_secs_f64( ) ,
149+ "max_duration_seconds" : max_duration,
150+ "iterations" : iter_results. len( ) ,
136151 } ,
137152 "iterations" : iter_results
138153 } ) ;
@@ -142,17 +157,22 @@ fn run_benchmark(
142157
143158#[ derive( Subcommand ) ]
144159enum Client {
145- #[ command( about = "Download a key from S3" ) ]
160+ #[ command( about = "Download keys from S3" ) ]
146161 Real {
147162 #[ arg( help = "Bucket name" ) ]
148163 bucket : String ,
149- #[ arg( help = "Key name" ) ]
150- key : String ,
164+ #[ arg(
165+ help = "Comma-separated list of key names" ,
166+ value_delimiter = ',' ,
167+ value_name = "KEYS"
168+ ) ]
169+ keys : Vec < String > ,
151170 #[ arg( long, help = "AWS region" , default_value = "us-east-1" ) ]
152171 region : String ,
153- #[ clap (
172+ #[ arg (
154173 long,
155174 help = "One or more network interfaces to use when accessing S3. Requires Linux 5.7+ or running as root." ,
175+ value_delimiter = ',' ,
156176 value_name = "NETWORK_INTERFACE"
157177 ) ]
158178 bind : Option < Vec < String > > ,
@@ -164,6 +184,12 @@ enum Client {
164184 } ,
165185}
166186
187+ fn parse_duration ( arg : & str ) -> Result < Duration , String > {
188+ arg. parse :: < u64 > ( )
189+ . map ( Duration :: from_secs)
190+ . map_err ( |e| format ! ( "Invalid duration: {e}" ) )
191+ }
192+
167193#[ derive( Parser ) ]
168194struct CliArgs {
169195 #[ command( subcommand) ]
@@ -186,8 +212,6 @@ struct CliArgs {
186212 part_size : usize ,
187213 #[ arg( long, help = "Number of benchmark iterations" , default_value = "1" ) ]
188214 iterations : usize ,
189- #[ arg( long, help = "Number of concurrent downloads" , default_value = "1" ) ]
190- downloads : usize ,
191215 #[ arg( long, help = "Enable CRT backpressure mode" ) ]
192216 enable_backpressure : bool ,
193217 #[ arg(
@@ -196,8 +220,35 @@ struct CliArgs {
196220 default_value = "0"
197221 ) ]
198222 initial_window_size : Option < usize > ,
199- #[ clap( long, help = "Output file to write the results to" , value_name = "OUTPUT_FILE" ) ]
200- pub output_file : Option < PathBuf > ,
223+ #[ arg( long, help = "Output file to write the results to" , value_name = "OUTPUT_FILE" ) ]
224+ output_file : Option < PathBuf > ,
225+ #[ arg(
226+ long,
227+ help = "Maximum duration (in seconds) to run the benchmark" ,
228+ value_name = "SECONDS" ,
229+ value_parser = parse_duration,
230+ ) ]
231+ max_duration : Option < Duration > ,
232+ }
233+
234+ fn create_s3_client_config ( region : & str , args : & CliArgs , nics : Vec < String > ) -> S3ClientConfig {
235+ let mut config = S3ClientConfig :: new ( ) . endpoint_config ( EndpointConfig :: new ( region) ) ;
236+
237+ config = config. throughput_target_gbps ( args. throughput_target_gbps ) ;
238+ config = config. memory_limit_in_bytes ( args. crt_memory_limit_gb * 1024 * 1024 * 1024 ) ;
239+ config = config. network_interface_names ( nics) ;
240+
241+ config = config. part_size ( args. part_size ) ;
242+
243+ if args. enable_backpressure {
244+ config = config. read_backpressure ( true ) ;
245+ config = config. initial_read_window (
246+ args. initial_window_size
247+ . expect ( "read window size is required when backpressure is enabled" ) ,
248+ ) ;
249+ }
250+
251+ config
201252}
202253
203254fn main ( ) {
@@ -207,40 +258,30 @@ fn main() {
207258
208259 match args. client {
209260 Client :: Real {
210- bucket,
211- key ,
212- region,
213- bind,
261+ ref bucket,
262+ ref keys ,
263+ ref region,
264+ ref bind,
214265 } => {
215- let mut config = S3ClientConfig :: new ( ) . endpoint_config ( EndpointConfig :: new ( & region) ) ;
216- config = config. throughput_target_gbps ( args. throughput_target_gbps ) ;
217- config = config. memory_limit_in_bytes ( args. crt_memory_limit_gb * 1024 * 1024 * 1024 ) ;
218- if let Some ( interfaces) = & bind {
219- config = config. network_interface_names ( interfaces. clone ( ) ) ;
220- }
221- config = config. part_size ( args. part_size ) ;
222- if args. enable_backpressure {
223- config = config. read_backpressure ( true ) ;
224- config = config. initial_read_window (
225- args. initial_window_size
226- . expect ( "read window size is required when backpressure is enabled" ) ,
227- ) ;
228- }
266+ let network_interfaces = bind. clone ( ) . unwrap_or_default ( ) ;
267+ let config = create_s3_client_config ( region, & args, network_interfaces) ;
229268 let client = S3CrtClient :: new ( config) . expect ( "couldn't create client" ) ;
269+ let key_refs: Vec < & str > = keys. iter ( ) . map ( |s| s. as_str ( ) ) . collect ( ) ;
230270
231271 run_benchmark (
232272 client,
233273 args. iterations ,
234- args. downloads ,
235- & bucket,
236- & key,
274+ bucket,
275+ & key_refs,
237276 args. enable_backpressure ,
238277 args. output_file . as_deref ( ) ,
278+ args. max_duration ,
239279 ) ;
240280 }
241281 Client :: Mock { object_size } => {
242282 const BUCKET : & str = "bucket" ;
243283 const KEY : & str = "key" ;
284+ let keys = & [ KEY ] ;
244285
245286 let config = MockClient :: config ( )
246287 . bucket ( BUCKET )
@@ -254,11 +295,11 @@ fn main() {
254295 run_benchmark (
255296 client,
256297 args. iterations ,
257- args. downloads ,
258298 BUCKET ,
259- KEY ,
299+ keys ,
260300 args. enable_backpressure ,
261301 args. output_file . as_deref ( ) ,
302+ args. max_duration ,
262303 ) ;
263304 }
264305 }
0 commit comments