Skip to content

Commit c5178f3

Browse files
authored
chore(coprocessor): improve host-listener tests performance (#2113)
* chore(coprocessor): improve performance of test_listener_restart_and_chain_reorg test from 430 to 126 seconds * chore(coprocessor): improve performance of host-listener integration tests from 565 to 480 seconds by replacing hardcoded sleeps with polling * chore(coprocessor): improve performance of host listener integration tests from 480 to 450 secs by replacing fixed sleeps with block polling in slow lane tests
1 parent 7c5dc38 commit c5178f3

File tree

1 file changed

+142
-88
lines changed

1 file changed

+142
-88
lines changed

coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs

Lines changed: 142 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ sol!(
5252
use crate::ACLTest::ACLTestInstance;
5353
use crate::FHEVMExecutorTest::FHEVMExecutorTestInstance;
5454

55-
const NB_EVENTS_PER_WALLET: i64 = 200;
55+
const NB_EVENTS_PER_WALLET: i64 = 50;
5656

5757
async fn emit_events<P, N>(
5858
wallets: &[EthereumWallet],
@@ -455,6 +455,66 @@ async fn dep_chain_id_for_output_handle(
455455
Ok(dep_chain_id)
456456
}
457457

458+
// Polls Anvil until the block number advances past `after_block`.
459+
// If `after_block` is `None`, queries the current block first.
460+
async fn wait_for_next_block(
461+
url: &str,
462+
after_block: Option<u64>,
463+
timeout: tokio::time::Duration,
464+
) -> Result<u64, anyhow::Error> {
465+
let provider = ProviderBuilder::new()
466+
.connect_ws(WsConnect::new(url))
467+
.await?;
468+
let current = match after_block {
469+
Some(b) => b,
470+
None => provider.get_block_number().await?,
471+
};
472+
let deadline = tokio::time::Instant::now() + timeout;
473+
loop {
474+
let block = provider.get_block_number().await?;
475+
if block > current {
476+
return Ok(block);
477+
}
478+
assert!(
479+
tokio::time::Instant::now() < deadline,
480+
"timeout waiting for block > {current}, still at {block}"
481+
);
482+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
483+
}
484+
}
485+
486+
// Polls the database until both `computations` and `allowed_handles` counts
487+
// satisfy `predicate`, returning the final `(tfhe_count, acl_count)`.
488+
// Panics with `context` if `timeout` elapses before the condition is met.
489+
async fn wait_for_event_counts(
490+
db_pool: &sqlx::PgPool,
491+
timeout: tokio::time::Duration,
492+
context: &str,
493+
predicate: impl Fn(i64, i64) -> bool,
494+
) -> Result<(i64, i64), anyhow::Error> {
495+
let deadline = tokio::time::Instant::now() + timeout;
496+
loop {
497+
let tfhe = sqlx::query!("SELECT COUNT(*) FROM computations")
498+
.fetch_one(db_pool)
499+
.await?
500+
.count
501+
.unwrap_or(0);
502+
let acl = sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
503+
.fetch_one(db_pool)
504+
.await?
505+
.count
506+
.unwrap_or(0);
507+
if predicate(tfhe, acl) {
508+
return Ok((tfhe, acl));
509+
}
510+
assert!(
511+
tokio::time::Instant::now() < deadline,
512+
"timeout {context}: tfhe={tfhe}, acl={acl}"
513+
);
514+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
515+
}
516+
}
517+
458518
#[tokio::test]
459519
#[serial(db)]
460520
async fn test_slow_lane_threshold_matrix_locally() -> Result<(), anyhow::Error>
@@ -584,7 +644,7 @@ async fn test_schedule_priority_migration_contract() -> Result<(), anyhow::Error
584644
#[serial(db)]
585645
async fn test_slow_lane_cross_block_sustained_below_cap_stays_fast_locally(
586646
) -> Result<(), anyhow::Error> {
587-
let setup = setup_with_block_time(None, 3.0).await?;
647+
let setup = setup_with_block_time(None, 1.0).await?;
588648
let mut db = Database::new(
589649
&setup.args.database_url,
590650
setup.chain_id,
@@ -628,7 +688,16 @@ async fn test_slow_lane_cross_block_sustained_below_cap_stays_fast_locally(
628688
.await?;
629689

630690
current_handle = Some(last_output_handle);
631-
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
691+
let last_block = receipts
692+
.last()
693+
.and_then(|r| r.block_number)
694+
.expect("receipt has block number");
695+
wait_for_next_block(
696+
&setup.args.url,
697+
Some(last_block),
698+
tokio::time::Duration::from_secs(10),
699+
)
700+
.await?;
632701
}
633702

634703
assert!(
@@ -711,7 +780,7 @@ async fn test_slow_lane_cross_block_parent_lookup_finds_known_slow_parent_locall
711780
#[serial(db)]
712781
async fn test_slow_lane_priority_is_monotonic_across_blocks_locally(
713782
) -> Result<(), anyhow::Error> {
714-
let setup = setup_with_block_time(None, 3.0).await?;
783+
let setup = setup_with_block_time(None, 1.0).await?;
715784
let mut db = Database::new(
716785
&setup.args.database_url,
717786
setup.chain_id,
@@ -732,7 +801,12 @@ async fn test_slow_lane_priority_is_monotonic_across_blocks_locally(
732801
.await?;
733802
assert_eq!(initial_priority, 1, "first pass should mark chain slow");
734803

735-
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
804+
wait_for_next_block(
805+
&setup.args.url,
806+
None,
807+
tokio::time::Duration::from_secs(10),
808+
)
809+
.await?;
736810

737811
let second_output = ingest_dependent_burst_seeded(
738812
&mut db,
@@ -1079,10 +1153,10 @@ async fn test_listener_no_event_loss(
10791153
nb_wallets * NB_EVENTS_PER_WALLET
10801154
};
10811155
let expected_acl_events = nb_wallets * NB_EVENTS_PER_WALLET;
1082-
for _ in 1..120 {
1083-
// 10 mins max to avoid stalled CI
1156+
for _ in 1..40 {
1157+
// 4 mins max to avoid stalled CI
10841158
let listener_handle = tokio::spawn(main(args.clone()));
1085-
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
1159+
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
10861160
check_finalization_status(&setup).await;
10871161
let tfhe_new_count = sqlx::query!("SELECT COUNT(*) FROM computations")
10881162
.fetch_one(&setup.db_pool)
@@ -1108,7 +1182,7 @@ async fn test_listener_no_event_loss(
11081182
if kill {
11091183
listener_handle.abort();
11101184
while !listener_handle.is_finished() {
1111-
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
1185+
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
11121186
}
11131187
nb_kill += 1;
11141188
}
@@ -1119,7 +1193,7 @@ async fn test_listener_no_event_loss(
11191193
acl_events_count,
11201194
nb_wallets * NB_EVENTS_PER_WALLET,
11211195
);
1122-
tokio::time::sleep(tokio::time::Duration::from_secs_f64(1.5)).await;
1196+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
11231197
}
11241198
assert_eq!(tfhe_events_count, expected_tfhe_events);
11251199
assert_eq!(acl_events_count, expected_acl_events);
@@ -1190,21 +1264,17 @@ async fn test_catchup_and_listen() -> Result<(), anyhow::Error> {
11901264
args.catchup_paging = 3;
11911265
let listener_handle = tokio::spawn(main(args.clone()));
11921266
assert!(health_check::wait_healthy(&setup.health_check_url, 60, 1).await);
1193-
tokio::time::sleep(tokio::time::Duration::from_secs(20)).await; // time to catchup
1194-
1195-
let tfhe_events_count = sqlx::query!("SELECT COUNT(*) FROM computations")
1196-
.fetch_one(&setup.db_pool)
1197-
.await?
1198-
.count
1199-
.unwrap_or(0);
1200-
let acl_events_count = sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
1201-
.fetch_one(&setup.db_pool)
1202-
.await?
1203-
.count
1204-
.unwrap_or(0);
12051267
let nb_wallets = setup.wallets.len() as i64;
1206-
assert_eq!(tfhe_events_count, nb_wallets * nb_event_per_wallet);
1207-
assert_eq!(acl_events_count, nb_wallets * nb_event_per_wallet);
1268+
let expected = nb_wallets * nb_event_per_wallet;
1269+
let (tfhe_events_count, acl_events_count) = wait_for_event_counts(
1270+
&setup.db_pool,
1271+
tokio::time::Duration::from_secs(30),
1272+
&format!("waiting for first catchup (expected {expected})"),
1273+
|tfhe, acl| tfhe >= expected && acl >= expected,
1274+
)
1275+
.await?;
1276+
assert_eq!(tfhe_events_count, expected);
1277+
assert_eq!(acl_events_count, expected);
12081278
assert!(!listener_handle.is_finished(), "Listener should continue");
12091279
let wallets_clone = setup.wallets.clone();
12101280
let url_clone = setup.args.url.clone();
@@ -1219,20 +1289,17 @@ async fn test_catchup_and_listen() -> Result<(), anyhow::Error> {
12191289
nb_event_per_wallet,
12201290
)
12211291
.await;
1222-
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
12231292

1224-
let tfhe_events_count = sqlx::query!("SELECT COUNT(*) FROM computations")
1225-
.fetch_one(&setup.db_pool)
1226-
.await?
1227-
.count
1228-
.unwrap_or(0);
1229-
let acl_events_count = sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
1230-
.fetch_one(&setup.db_pool)
1231-
.await?
1232-
.count
1233-
.unwrap_or(0);
1234-
assert_eq!(tfhe_events_count, 2 * nb_wallets * nb_event_per_wallet);
1235-
assert_eq!(acl_events_count, 2 * nb_wallets * nb_event_per_wallet);
1293+
let expected2 = 2 * nb_wallets * nb_event_per_wallet;
1294+
let (tfhe_events_count, acl_events_count) = wait_for_event_counts(
1295+
&setup.db_pool,
1296+
tokio::time::Duration::from_secs(30),
1297+
&format!("waiting for second batch (expected {expected2})"),
1298+
|tfhe, acl| tfhe >= expected2 && acl >= expected2,
1299+
)
1300+
.await?;
1301+
assert_eq!(tfhe_events_count, expected2);
1302+
assert_eq!(acl_events_count, expected2);
12361303
listener_handle.abort();
12371304
Ok(())
12381305
}
@@ -1265,23 +1332,28 @@ async fn test_catchup_only() -> Result<(), anyhow::Error> {
12651332
args.catchup_paging = 2;
12661333
let listener_handle = tokio::spawn(main(args.clone()));
12671334
assert!(health_check::wait_healthy(&setup.health_check_url, 60, 1).await);
1268-
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; // time to catchup
1269-
1270-
let tfhe_events_count = sqlx::query!("SELECT COUNT(*) FROM computations")
1271-
.fetch_one(&setup.db_pool)
1272-
.await?
1273-
.count
1274-
.unwrap_or(0);
1275-
let acl_events_count = sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
1276-
.fetch_one(&setup.db_pool)
1277-
.await?
1278-
.count
1279-
.unwrap_or(0);
12801335
let nb_wallets = setup.wallets.len() as i64;
1336+
let expected = nb_wallets * nb_event_per_wallet;
1337+
let (tfhe_events_count, acl_events_count) = wait_for_event_counts(
1338+
&setup.db_pool,
1339+
tokio::time::Duration::from_secs(30),
1340+
&format!("waiting for catchup (expected {expected})"),
1341+
|tfhe, acl| tfhe >= expected && acl >= expected,
1342+
)
1343+
.await?;
12811344
eprintln!("End block {:?}", args.end_at_block);
1282-
assert_eq!(tfhe_events_count, nb_wallets * nb_event_per_wallet);
1283-
assert_eq!(acl_events_count, nb_wallets * nb_event_per_wallet);
1284-
assert!(listener_handle.is_finished(), "Listener should stop");
1345+
assert_eq!(tfhe_events_count, expected);
1346+
assert_eq!(acl_events_count, expected);
1347+
// Allow the listener to finish after ingesting all events
1348+
let finish_deadline =
1349+
tokio::time::Instant::now() + tokio::time::Duration::from_secs(10);
1350+
while !listener_handle.is_finished() {
1351+
assert!(
1352+
tokio::time::Instant::now() < finish_deadline,
1353+
"Listener should stop"
1354+
);
1355+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1356+
}
12851357
Ok(())
12861358
}
12871359

@@ -1324,19 +1396,15 @@ where
13241396

13251397
let listener_handle = tokio::spawn(main(args.clone()));
13261398
assert!(health_check::wait_healthy(&setup.health_check_url, 60, 1).await);
1327-
tokio::time::sleep(tokio::time::Duration::from_secs(sleep_secs)).await;
1328-
1329-
let tfhe_events_count = sqlx::query!("SELECT COUNT(*) FROM computations")
1330-
.fetch_one(&setup.db_pool)
1331-
.await?
1332-
.count
1333-
.unwrap_or(0);
1334-
let acl_events_count = sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
1335-
.fetch_one(&setup.db_pool)
1336-
.await?
1337-
.count
1338-
.unwrap_or(0);
13391399
let nb_wallets = setup.wallets.len() as i64;
1400+
let expected = nb_wallets * nb_event_per_wallet;
1401+
let (tfhe_events_count, acl_events_count) = wait_for_event_counts(
1402+
&setup.db_pool,
1403+
tokio::time::Duration::from_secs(sleep_secs.max(30)),
1404+
&format!("waiting for catchup in scenario (expected {expected})"),
1405+
|tfhe, acl| tfhe >= expected && acl >= expected,
1406+
)
1407+
.await?;
13401408

13411409
Ok(CatchupOutcome {
13421410
_setup: setup,
@@ -1427,30 +1495,16 @@ async fn test_catchup_only_relative_end() -> Result<(), anyhow::Error> {
14271495
)
14281496
.await;
14291497

1430-
// Wait enough time for another catchup iteration to complete
1431-
tokio::time::sleep(tokio::time::Duration::from_secs(20)).await;
1432-
1433-
let tfhe_events_count_after =
1434-
sqlx::query!("SELECT COUNT(*) FROM computations")
1435-
.fetch_one(&setup.db_pool)
1436-
.await?
1437-
.count
1438-
.unwrap_or(0);
1439-
let acl_events_count_after =
1440-
sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
1441-
.fetch_one(&setup.db_pool)
1442-
.await?
1443-
.count
1444-
.unwrap_or(0);
1445-
1446-
assert!(
1447-
tfhe_events_count_after > first_tfhe_events_count,
1448-
"Second catchup iteration should ingest additional TFHE events"
1449-
);
1450-
assert!(
1451-
acl_events_count_after > first_acl_events_count,
1452-
"Second catchup iteration should ingest additional ACL events"
1453-
);
1498+
// Poll until second catchup iteration ingests additional events
1499+
wait_for_event_counts(
1500+
&setup.db_pool,
1501+
tokio::time::Duration::from_secs(30),
1502+
"waiting for second catchup iteration",
1503+
|tfhe, acl| {
1504+
tfhe > first_tfhe_events_count && acl > first_acl_events_count
1505+
},
1506+
)
1507+
.await?;
14541508

14551509
// Listener should still be running
14561510
assert!(

0 commit comments

Comments
 (0)