5252use crate :: ACLTest :: ACLTestInstance ;
5353use crate :: FHEVMExecutorTest :: FHEVMExecutorTestInstance ;
5454
55- const NB_EVENTS_PER_WALLET : i64 = 200 ;
55+ const NB_EVENTS_PER_WALLET : i64 = 50 ;
5656
5757async fn emit_events < P , N > (
5858 wallets : & [ EthereumWallet ] ,
@@ -455,6 +455,66 @@ async fn dep_chain_id_for_output_handle(
455455 Ok ( dep_chain_id)
456456}
457457
458+ // Polls Anvil until the block number advances past `after_block`.
459+ // If `after_block` is `None`, queries the current block first.
460+ async fn wait_for_next_block (
461+ url : & str ,
462+ after_block : Option < u64 > ,
463+ timeout : tokio:: time:: Duration ,
464+ ) -> Result < u64 , anyhow:: Error > {
465+ let provider = ProviderBuilder :: new ( )
466+ . connect_ws ( WsConnect :: new ( url) )
467+ . await ?;
468+ let current = match after_block {
469+ Some ( b) => b,
470+ None => provider. get_block_number ( ) . await ?,
471+ } ;
472+ let deadline = tokio:: time:: Instant :: now ( ) + timeout;
473+ loop {
474+ let block = provider. get_block_number ( ) . await ?;
475+ if block > current {
476+ return Ok ( block) ;
477+ }
478+ assert ! (
479+ tokio:: time:: Instant :: now( ) < deadline,
480+ "timeout waiting for block > {current}, still at {block}"
481+ ) ;
482+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
483+ }
484+ }
485+
486+ // Polls the database until both `computations` and `allowed_handles` counts
487+ // satisfy `predicate`, returning the final `(tfhe_count, acl_count)`.
488+ // Panics with `context` if `timeout` elapses before the condition is met.
489+ async fn wait_for_event_counts (
490+ db_pool : & sqlx:: PgPool ,
491+ timeout : tokio:: time:: Duration ,
492+ context : & str ,
493+ predicate : impl Fn ( i64 , i64 ) -> bool ,
494+ ) -> Result < ( i64 , i64 ) , anyhow:: Error > {
495+ let deadline = tokio:: time:: Instant :: now ( ) + timeout;
496+ loop {
497+ let tfhe = sqlx:: query!( "SELECT COUNT(*) FROM computations" )
498+ . fetch_one ( db_pool)
499+ . await ?
500+ . count
501+ . unwrap_or ( 0 ) ;
502+ let acl = sqlx:: query!( "SELECT COUNT(*) FROM allowed_handles" )
503+ . fetch_one ( db_pool)
504+ . await ?
505+ . count
506+ . unwrap_or ( 0 ) ;
507+ if predicate ( tfhe, acl) {
508+ return Ok ( ( tfhe, acl) ) ;
509+ }
510+ assert ! (
511+ tokio:: time:: Instant :: now( ) < deadline,
512+ "timeout {context}: tfhe={tfhe}, acl={acl}"
513+ ) ;
514+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 500 ) ) . await ;
515+ }
516+ }
517+
458518#[ tokio:: test]
459519#[ serial( db) ]
460520async fn test_slow_lane_threshold_matrix_locally ( ) -> Result < ( ) , anyhow:: Error >
@@ -584,7 +644,7 @@ async fn test_schedule_priority_migration_contract() -> Result<(), anyhow::Error
584644#[ serial( db) ]
585645async fn test_slow_lane_cross_block_sustained_below_cap_stays_fast_locally (
586646) -> Result < ( ) , anyhow:: Error > {
587- let setup = setup_with_block_time ( None , 3 .0) . await ?;
647+ let setup = setup_with_block_time ( None , 1 .0) . await ?;
588648 let mut db = Database :: new (
589649 & setup. args . database_url ,
590650 setup. chain_id ,
@@ -628,7 +688,16 @@ async fn test_slow_lane_cross_block_sustained_below_cap_stays_fast_locally(
628688 . await ?;
629689
630690 current_handle = Some ( last_output_handle) ;
631- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 4 ) ) . await ;
691+ let last_block = receipts
692+ . last ( )
693+ . and_then ( |r| r. block_number )
694+ . expect ( "receipt has block number" ) ;
695+ wait_for_next_block (
696+ & setup. args . url ,
697+ Some ( last_block) ,
698+ tokio:: time:: Duration :: from_secs ( 10 ) ,
699+ )
700+ . await ?;
632701 }
633702
634703 assert ! (
@@ -711,7 +780,7 @@ async fn test_slow_lane_cross_block_parent_lookup_finds_known_slow_parent_locall
711780#[ serial( db) ]
712781async fn test_slow_lane_priority_is_monotonic_across_blocks_locally (
713782) -> Result < ( ) , anyhow:: Error > {
714- let setup = setup_with_block_time ( None , 3 .0) . await ?;
783+ let setup = setup_with_block_time ( None , 1 .0) . await ?;
715784 let mut db = Database :: new (
716785 & setup. args . database_url ,
717786 setup. chain_id ,
@@ -732,7 +801,12 @@ async fn test_slow_lane_priority_is_monotonic_across_blocks_locally(
732801 . await ?;
733802 assert_eq ! ( initial_priority, 1 , "first pass should mark chain slow" ) ;
734803
735- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 4 ) ) . await ;
804+ wait_for_next_block (
805+ & setup. args . url ,
806+ None ,
807+ tokio:: time:: Duration :: from_secs ( 10 ) ,
808+ )
809+ . await ?;
736810
737811 let second_output = ingest_dependent_burst_seeded (
738812 & mut db,
@@ -1079,10 +1153,10 @@ async fn test_listener_no_event_loss(
10791153 nb_wallets * NB_EVENTS_PER_WALLET
10801154 } ;
10811155 let expected_acl_events = nb_wallets * NB_EVENTS_PER_WALLET ;
1082- for _ in 1 ..120 {
1083- // 10 mins max to avoid stalled CI
1156+ for _ in 1 ..40 {
1157+ // 4 mins max to avoid stalled CI
10841158 let listener_handle = tokio:: spawn ( main ( args. clone ( ) ) ) ;
1085- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 10 ) ) . await ;
1159+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 5 ) ) . await ;
10861160 check_finalization_status ( & setup) . await ;
10871161 let tfhe_new_count = sqlx:: query!( "SELECT COUNT(*) FROM computations" )
10881162 . fetch_one ( & setup. db_pool )
@@ -1108,7 +1182,7 @@ async fn test_listener_no_event_loss(
11081182 if kill {
11091183 listener_handle. abort ( ) ;
11101184 while !listener_handle. is_finished ( ) {
1111- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 10 ) ) . await ;
1185+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 2 ) ) . await ;
11121186 }
11131187 nb_kill += 1 ;
11141188 }
@@ -1119,7 +1193,7 @@ async fn test_listener_no_event_loss(
11191193 acl_events_count,
11201194 nb_wallets * NB_EVENTS_PER_WALLET ,
11211195 ) ;
1122- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs_f64 ( 1.5 ) ) . await ;
1196+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 1 ) ) . await ;
11231197 }
11241198 assert_eq ! ( tfhe_events_count, expected_tfhe_events) ;
11251199 assert_eq ! ( acl_events_count, expected_acl_events) ;
@@ -1190,21 +1264,17 @@ async fn test_catchup_and_listen() -> Result<(), anyhow::Error> {
11901264 args. catchup_paging = 3 ;
11911265 let listener_handle = tokio:: spawn ( main ( args. clone ( ) ) ) ;
11921266 assert ! ( health_check:: wait_healthy( & setup. health_check_url, 60 , 1 ) . await ) ;
1193- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 20 ) ) . await ; // time to catchup
1194-
1195- let tfhe_events_count = sqlx:: query!( "SELECT COUNT(*) FROM computations" )
1196- . fetch_one ( & setup. db_pool )
1197- . await ?
1198- . count
1199- . unwrap_or ( 0 ) ;
1200- let acl_events_count = sqlx:: query!( "SELECT COUNT(*) FROM allowed_handles" )
1201- . fetch_one ( & setup. db_pool )
1202- . await ?
1203- . count
1204- . unwrap_or ( 0 ) ;
12051267 let nb_wallets = setup. wallets . len ( ) as i64 ;
1206- assert_eq ! ( tfhe_events_count, nb_wallets * nb_event_per_wallet) ;
1207- assert_eq ! ( acl_events_count, nb_wallets * nb_event_per_wallet) ;
1268+ let expected = nb_wallets * nb_event_per_wallet;
1269+ let ( tfhe_events_count, acl_events_count) = wait_for_event_counts (
1270+ & setup. db_pool ,
1271+ tokio:: time:: Duration :: from_secs ( 30 ) ,
1272+ & format ! ( "waiting for first catchup (expected {expected})" ) ,
1273+ |tfhe, acl| tfhe >= expected && acl >= expected,
1274+ )
1275+ . await ?;
1276+ assert_eq ! ( tfhe_events_count, expected) ;
1277+ assert_eq ! ( acl_events_count, expected) ;
12081278 assert ! ( !listener_handle. is_finished( ) , "Listener should continue" ) ;
12091279 let wallets_clone = setup. wallets . clone ( ) ;
12101280 let url_clone = setup. args . url . clone ( ) ;
@@ -1219,20 +1289,17 @@ async fn test_catchup_and_listen() -> Result<(), anyhow::Error> {
12191289 nb_event_per_wallet,
12201290 )
12211291 . await ;
1222- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 5 ) ) . await ;
12231292
1224- let tfhe_events_count = sqlx:: query!( "SELECT COUNT(*) FROM computations" )
1225- . fetch_one ( & setup. db_pool )
1226- . await ?
1227- . count
1228- . unwrap_or ( 0 ) ;
1229- let acl_events_count = sqlx:: query!( "SELECT COUNT(*) FROM allowed_handles" )
1230- . fetch_one ( & setup. db_pool )
1231- . await ?
1232- . count
1233- . unwrap_or ( 0 ) ;
1234- assert_eq ! ( tfhe_events_count, 2 * nb_wallets * nb_event_per_wallet) ;
1235- assert_eq ! ( acl_events_count, 2 * nb_wallets * nb_event_per_wallet) ;
1293+ let expected2 = 2 * nb_wallets * nb_event_per_wallet;
1294+ let ( tfhe_events_count, acl_events_count) = wait_for_event_counts (
1295+ & setup. db_pool ,
1296+ tokio:: time:: Duration :: from_secs ( 30 ) ,
1297+ & format ! ( "waiting for second batch (expected {expected2})" ) ,
1298+ |tfhe, acl| tfhe >= expected2 && acl >= expected2,
1299+ )
1300+ . await ?;
1301+ assert_eq ! ( tfhe_events_count, expected2) ;
1302+ assert_eq ! ( acl_events_count, expected2) ;
12361303 listener_handle. abort ( ) ;
12371304 Ok ( ( ) )
12381305}
@@ -1265,23 +1332,28 @@ async fn test_catchup_only() -> Result<(), anyhow::Error> {
12651332 args. catchup_paging = 2 ;
12661333 let listener_handle = tokio:: spawn ( main ( args. clone ( ) ) ) ;
12671334 assert ! ( health_check:: wait_healthy( & setup. health_check_url, 60 , 1 ) . await ) ;
1268- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 30 ) ) . await ; // time to catchup
1269-
1270- let tfhe_events_count = sqlx:: query!( "SELECT COUNT(*) FROM computations" )
1271- . fetch_one ( & setup. db_pool )
1272- . await ?
1273- . count
1274- . unwrap_or ( 0 ) ;
1275- let acl_events_count = sqlx:: query!( "SELECT COUNT(*) FROM allowed_handles" )
1276- . fetch_one ( & setup. db_pool )
1277- . await ?
1278- . count
1279- . unwrap_or ( 0 ) ;
12801335 let nb_wallets = setup. wallets . len ( ) as i64 ;
1336+ let expected = nb_wallets * nb_event_per_wallet;
1337+ let ( tfhe_events_count, acl_events_count) = wait_for_event_counts (
1338+ & setup. db_pool ,
1339+ tokio:: time:: Duration :: from_secs ( 30 ) ,
1340+ & format ! ( "waiting for catchup (expected {expected})" ) ,
1341+ |tfhe, acl| tfhe >= expected && acl >= expected,
1342+ )
1343+ . await ?;
12811344 eprintln ! ( "End block {:?}" , args. end_at_block) ;
1282- assert_eq ! ( tfhe_events_count, nb_wallets * nb_event_per_wallet) ;
1283- assert_eq ! ( acl_events_count, nb_wallets * nb_event_per_wallet) ;
1284- assert ! ( listener_handle. is_finished( ) , "Listener should stop" ) ;
1345+ assert_eq ! ( tfhe_events_count, expected) ;
1346+ assert_eq ! ( acl_events_count, expected) ;
1347+ // Allow the listener to finish after ingesting all events
1348+ let finish_deadline =
1349+ tokio:: time:: Instant :: now ( ) + tokio:: time:: Duration :: from_secs ( 10 ) ;
1350+ while !listener_handle. is_finished ( ) {
1351+ assert ! (
1352+ tokio:: time:: Instant :: now( ) < finish_deadline,
1353+ "Listener should stop"
1354+ ) ;
1355+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 500 ) ) . await ;
1356+ }
12851357 Ok ( ( ) )
12861358}
12871359
@@ -1324,19 +1396,15 @@ where
13241396
13251397 let listener_handle = tokio:: spawn ( main ( args. clone ( ) ) ) ;
13261398 assert ! ( health_check:: wait_healthy( & setup. health_check_url, 60 , 1 ) . await ) ;
1327- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( sleep_secs) ) . await ;
1328-
1329- let tfhe_events_count = sqlx:: query!( "SELECT COUNT(*) FROM computations" )
1330- . fetch_one ( & setup. db_pool )
1331- . await ?
1332- . count
1333- . unwrap_or ( 0 ) ;
1334- let acl_events_count = sqlx:: query!( "SELECT COUNT(*) FROM allowed_handles" )
1335- . fetch_one ( & setup. db_pool )
1336- . await ?
1337- . count
1338- . unwrap_or ( 0 ) ;
13391399 let nb_wallets = setup. wallets . len ( ) as i64 ;
1400+ let expected = nb_wallets * nb_event_per_wallet;
1401+ let ( tfhe_events_count, acl_events_count) = wait_for_event_counts (
1402+ & setup. db_pool ,
1403+ tokio:: time:: Duration :: from_secs ( sleep_secs. max ( 30 ) ) ,
1404+ & format ! ( "waiting for catchup in scenario (expected {expected})" ) ,
1405+ |tfhe, acl| tfhe >= expected && acl >= expected,
1406+ )
1407+ . await ?;
13401408
13411409 Ok ( CatchupOutcome {
13421410 _setup : setup,
@@ -1427,30 +1495,16 @@ async fn test_catchup_only_relative_end() -> Result<(), anyhow::Error> {
14271495 )
14281496 . await ;
14291497
1430- // Wait enough time for another catchup iteration to complete
1431- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 20 ) ) . await ;
1432-
1433- let tfhe_events_count_after =
1434- sqlx:: query!( "SELECT COUNT(*) FROM computations" )
1435- . fetch_one ( & setup. db_pool )
1436- . await ?
1437- . count
1438- . unwrap_or ( 0 ) ;
1439- let acl_events_count_after =
1440- sqlx:: query!( "SELECT COUNT(*) FROM allowed_handles" )
1441- . fetch_one ( & setup. db_pool )
1442- . await ?
1443- . count
1444- . unwrap_or ( 0 ) ;
1445-
1446- assert ! (
1447- tfhe_events_count_after > first_tfhe_events_count,
1448- "Second catchup iteration should ingest additional TFHE events"
1449- ) ;
1450- assert ! (
1451- acl_events_count_after > first_acl_events_count,
1452- "Second catchup iteration should ingest additional ACL events"
1453- ) ;
1498+ // Poll until second catchup iteration ingests additional events
1499+ wait_for_event_counts (
1500+ & setup. db_pool ,
1501+ tokio:: time:: Duration :: from_secs ( 30 ) ,
1502+ "waiting for second catchup iteration" ,
1503+ |tfhe, acl| {
1504+ tfhe > first_tfhe_events_count && acl > first_acl_events_count
1505+ } ,
1506+ )
1507+ . await ?;
14541508
14551509 // Listener should still be running
14561510 assert ! (
0 commit comments