@@ -343,17 +343,6 @@ async fn ingest_blocks_for_receipts(
343343 Ok ( ( ) )
344344}
345345
346- async fn emit_dependent_burst (
347- setup : & Setup ,
348- input_handle : Option < FixedBytes < 32 > > ,
349- depth : usize ,
350- ) -> Result <
351- ( Vec < alloy:: rpc:: types:: TransactionReceipt > , FixedBytes < 32 > ) ,
352- anyhow:: Error ,
353- > {
354- emit_dependent_burst_seeded ( setup, input_handle, depth, 1_u64 ) . await
355- }
356-
357346async fn ingest_dependent_burst_seeded (
358347 db : & mut Database ,
359348 setup : & Setup ,
@@ -480,7 +469,7 @@ async fn test_bad_chain_id() {
480469
481470#[ tokio:: test]
482471#[ serial( db) ]
483- async fn test_slow_lane_marks_heavy_chain_locally ( ) -> Result < ( ) , anyhow:: Error >
472+ async fn test_slow_lane_threshold_matrix_locally ( ) -> Result < ( ) , anyhow:: Error >
484473{
485474 let setup = setup_with_block_time ( None , 3.0 ) . await ?;
486475 let mut db = Database :: new (
@@ -490,27 +479,111 @@ async fn test_slow_lane_marks_heavy_chain_locally() -> Result<(), anyhow::Error>
490479 )
491480 . await ?;
492481
493- let ( receipts, _) = emit_dependent_burst ( & setup, None , 4 ) . await ?;
494- ingest_blocks_for_receipts (
495- & mut db,
496- & setup,
497- & receipts,
498- IngestOptions {
499- dependence_by_connexity : false ,
500- dependence_cross_block : true ,
501- dependent_ops_max_per_chain : 1 ,
502- } ,
482+ let cases = [
483+ ( "below_cap" , 63_usize , 64_u32 , 0_i16 , 11_u64 ) ,
484+ ( "at_cap" , 64_usize , 64_u32 , 0_i16 , 12_u64 ) ,
485+ ( "above_cap" , 65_usize , 64_u32 , 1_i16 , 13_u64 ) ,
486+ ] ;
487+
488+ let mut seen_chains = HashSet :: new ( ) ;
489+ for ( name, depth, cap, expected_priority, seed) in cases {
490+ let last_handle = ingest_dependent_burst_seeded (
491+ & mut db, & setup, None , depth, seed, cap,
492+ )
493+ . await ?;
494+ let dep_chain_id =
495+ dep_chain_id_for_output_handle ( & setup, last_handle) . await ?;
496+ assert ! (
497+ seen_chains. insert( dep_chain_id. clone( ) ) ,
498+ "matrix case {name} reused an existing dependence chain"
499+ ) ;
500+
501+ let schedule_priority = sqlx:: query_scalar :: < _ , i16 > (
502+ "SELECT schedule_priority FROM dependence_chain WHERE dependence_chain_id = $1" ,
503+ )
504+ . bind ( & dep_chain_id)
505+ . fetch_one ( & setup. db_pool )
506+ . await ?;
507+ assert_eq ! (
508+ schedule_priority, expected_priority,
509+ "case={name} depth={depth} cap={cap}"
510+ ) ;
511+ }
512+
513+ Ok ( ( ) )
514+ }
515+
516+ #[ tokio:: test]
517+ #[ serial( db) ]
518+ async fn test_slow_lane_cross_block_sustained_below_cap_stays_fast_locally (
519+ ) -> Result < ( ) , anyhow:: Error > {
520+ let setup = setup_with_block_time ( None , 3.0 ) . await ?;
521+ let mut db = Database :: new (
522+ & setup. args . database_url ,
523+ & setup. args . coprocessor_api_key . unwrap ( ) ,
524+ setup. args . dependence_cache_size ,
503525 )
504526 . await ?;
505527
506- let slow_chain_count = sqlx:: query_scalar :: < _ , i64 > (
507- "SELECT COUNT(*) FROM dependence_chain WHERE schedule_priority = 1" ,
528+ let cap = 64_u32 ;
529+ let burst_depth = 8_usize ;
530+ let rounds = 4_u64 ;
531+
532+ let mut current_handle: Option < FixedBytes < 32 > > = None ;
533+ let mut seen_block_numbers = HashSet :: new ( ) ;
534+
535+ for round in 0 ..rounds {
536+ let seed = 101_u64 + round;
537+ let ( receipts, last_output_handle) = emit_dependent_burst_seeded (
538+ & setup,
539+ current_handle,
540+ burst_depth,
541+ seed,
542+ )
543+ . await ?;
544+
545+ for receipt in & receipts {
546+ let block_number =
547+ receipt. block_number . expect ( "receipt has block number" ) ;
548+ seen_block_numbers. insert ( block_number) ;
549+ }
550+
551+ ingest_blocks_for_receipts (
552+ & mut db,
553+ & setup,
554+ & receipts,
555+ IngestOptions {
556+ dependence_by_connexity : false ,
557+ dependence_cross_block : true ,
558+ dependent_ops_max_per_chain : cap,
559+ } ,
560+ )
561+ . await ?;
562+
563+ current_handle = Some ( last_output_handle) ;
564+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 4 ) ) . await ;
565+ }
566+
567+ assert ! (
568+ seen_block_numbers. len( ) > 1 ,
569+ "test must span multiple blocks"
570+ ) ;
571+
572+ let dep_chain_id = dep_chain_id_for_output_handle (
573+ & setup,
574+ current_handle. expect ( "final output handle exists" ) ,
575+ )
576+ . await ?;
577+ let schedule_priority = sqlx:: query_scalar :: < _ , i16 > (
578+ "SELECT schedule_priority FROM dependence_chain WHERE dependence_chain_id = $1" ,
508579 )
580+ . bind ( & dep_chain_id)
509581 . fetch_one ( & setup. db_pool )
510582 . await ?;
511- assert ! (
512- slow_chain_count > 0 ,
513- "heavy dependent chain should be assigned to slow lane"
583+
584+ assert_eq ! (
585+ schedule_priority, 0 ,
586+ "current behavior: below-cap batches do not accumulate into slow lane across blocks"
514587 ) ;
515588 Ok ( ( ) )
516589}
0 commit comments