@@ -282,6 +282,7 @@ fn blocking_executor(
282282fn bench_on_statements ( c : & mut Criterion ) {
283283 let statement_counts = [ 100 , 500 , 1000 , 2000 ] ;
284284 let thread_counts = [ 1 , 2 , 4 , 8 ] ;
285+ let peer_counts = [ 1 , 2 , 4 , 8 , 16 ] ;
285286 let max_runtime_instances = 8 ;
286287 let executor_types = [ ( "blocking" , true ) , ( "non_blocking" , false ) ] ;
287288
@@ -292,39 +293,45 @@ fn bench_on_statements(c: &mut Criterion) {
292293 for & num_statements in & statement_counts {
293294 for & num_threads in & thread_counts {
294295 for & ( executor_name, is_blocking) in & executor_types {
295- let statements: Vec < Statement > =
296- ( 0 ..num_statements) . map ( |i| create_signed_statement ( i, & keypair) ) . collect ( ) ;
297- let executor = if is_blocking {
298- blocking_executor ( & handle)
299- } else {
300- non_blocking_executor ( & handle)
301- } ;
302-
303- let benchmark_name = format ! (
304- "on_statements/statements_{}/threads_{}/{}" ,
305- num_statements, num_threads, executor_name
306- ) ;
307-
308- c. bench_function ( & benchmark_name, |b| {
309- b. iter_batched (
310- || build_handler ( executor. clone ( ) , num_threads, max_runtime_instances) ,
311- |( mut handler, peer_id, _temp_dir) | {
312- handler. on_statements ( peer_id, statements. clone ( ) ) ;
313-
314- runtime. block_on ( async {
315- while handler. pending_statements_mut ( ) . next ( ) . await . is_some ( ) { }
316- } ) ;
317-
318- let pending = handler. pending_statements_mut ( ) ;
319- assert ! (
320- pending. is_empty( ) ,
321- "Pending statements not empty: {}" ,
322- pending. len( )
323- ) ;
324- } ,
325- criterion:: BatchSize :: LargeInput ,
326- )
327- } ) ;
296+ for num_peers in & peer_counts {
297+ let statements: Vec < Statement > =
298+ ( 0 ..num_statements) . map ( |i| create_signed_statement ( i, & keypair) ) . collect ( ) ;
299+ let executor = if is_blocking {
300+ blocking_executor ( & handle)
301+ } else {
302+ non_blocking_executor ( & handle)
303+ } ;
304+
305+ let benchmark_name = format ! (
306+ "on_statements/statements_{}/peers_{}/threads_{}/{}" ,
307+ num_statements, num_peers, num_threads, executor_name
308+ ) ;
309+
310+ c. bench_function ( & benchmark_name, |b| {
311+ b. iter_batched (
312+ || build_handler ( executor. clone ( ) , num_threads, max_runtime_instances) ,
313+ |( mut handler, peer_id, _temp_dir) | {
314+ // The number of peers determines how many times we might receive a
315+ // statement.
316+ for _ in 0 ..* num_peers {
317+ handler. on_statements ( peer_id, statements. clone ( ) ) ;
318+ }
319+
320+ runtime. block_on ( async {
321+ while handler. pending_statements_mut ( ) . next ( ) . await . is_some ( ) { }
322+ } ) ;
323+
324+ let pending = handler. pending_statements_mut ( ) ;
325+ assert ! (
326+ pending. is_empty( ) ,
327+ "Pending statements not empty: {}" ,
328+ pending. len( )
329+ ) ;
330+ } ,
331+ criterion:: BatchSize :: LargeInput ,
332+ )
333+ } ) ;
334+ }
328335 }
329336 }
330337 }
0 commit comments