Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ sol!(
use crate::ACLTest::ACLTestInstance;
use crate::FHEVMExecutorTest::FHEVMExecutorTestInstance;

const NB_EVENTS_PER_WALLET: i64 = 200;
const NB_EVENTS_PER_WALLET: i64 = 50;

async fn emit_events<P, N>(
wallets: &[EthereumWallet],
Expand Down Expand Up @@ -455,6 +455,66 @@ async fn dep_chain_id_for_output_handle(
Ok(dep_chain_id)
}

// Polls Anvil until the block number advances past `after_block`.
// If `after_block` is `None`, queries the current block first.
async fn wait_for_next_block(
url: &str,
after_block: Option<u64>,
timeout: tokio::time::Duration,
) -> Result<u64, anyhow::Error> {
let provider = ProviderBuilder::new()
.connect_ws(WsConnect::new(url))
.await?;
let current = match after_block {
Some(b) => b,
None => provider.get_block_number().await?,
};
let deadline = tokio::time::Instant::now() + timeout;
loop {
let block = provider.get_block_number().await?;
if block > current {
return Ok(block);
}
assert!(
tokio::time::Instant::now() < deadline,
"timeout waiting for block > {current}, still at {block}"
);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}

// Polls the database until both `computations` and `allowed_handles` counts
// satisfy `predicate`, returning the final `(tfhe_count, acl_count)`.
// Panics with `context` if `timeout` elapses before the condition is met.
async fn wait_for_event_counts(
db_pool: &sqlx::PgPool,
timeout: tokio::time::Duration,
context: &str,
predicate: impl Fn(i64, i64) -> bool,
) -> Result<(i64, i64), anyhow::Error> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let tfhe = sqlx::query!("SELECT COUNT(*) FROM computations")
.fetch_one(db_pool)
.await?
.count
.unwrap_or(0);
let acl = sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
.fetch_one(db_pool)
.await?
.count
.unwrap_or(0);
if predicate(tfhe, acl) {
return Ok((tfhe, acl));
}
assert!(
tokio::time::Instant::now() < deadline,
"timeout {context}: tfhe={tfhe}, acl={acl}"
);
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
}

#[tokio::test]
#[serial(db)]
async fn test_slow_lane_threshold_matrix_locally() -> Result<(), anyhow::Error>
Expand Down Expand Up @@ -584,7 +644,7 @@ async fn test_schedule_priority_migration_contract() -> Result<(), anyhow::Error
#[serial(db)]
async fn test_slow_lane_cross_block_sustained_below_cap_stays_fast_locally(
) -> Result<(), anyhow::Error> {
let setup = setup_with_block_time(None, 3.0).await?;
let setup = setup_with_block_time(None, 1.0).await?;
let mut db = Database::new(
&setup.args.database_url,
setup.chain_id,
Expand Down Expand Up @@ -628,7 +688,16 @@ async fn test_slow_lane_cross_block_sustained_below_cap_stays_fast_locally(
.await?;

current_handle = Some(last_output_handle);
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
let last_block = receipts
.last()
.and_then(|r| r.block_number)
.expect("receipt has block number");
wait_for_next_block(
&setup.args.url,
Some(last_block),
tokio::time::Duration::from_secs(10),
)
.await?;
}

assert!(
Expand Down Expand Up @@ -711,7 +780,7 @@ async fn test_slow_lane_cross_block_parent_lookup_finds_known_slow_parent_locall
#[serial(db)]
async fn test_slow_lane_priority_is_monotonic_across_blocks_locally(
) -> Result<(), anyhow::Error> {
let setup = setup_with_block_time(None, 3.0).await?;
let setup = setup_with_block_time(None, 1.0).await?;
let mut db = Database::new(
&setup.args.database_url,
setup.chain_id,
Expand All @@ -732,7 +801,12 @@ async fn test_slow_lane_priority_is_monotonic_across_blocks_locally(
.await?;
assert_eq!(initial_priority, 1, "first pass should mark chain slow");

tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
wait_for_next_block(
&setup.args.url,
None,
tokio::time::Duration::from_secs(10),
)
.await?;

let second_output = ingest_dependent_burst_seeded(
&mut db,
Expand Down Expand Up @@ -1079,10 +1153,10 @@ async fn test_listener_no_event_loss(
nb_wallets * NB_EVENTS_PER_WALLET
};
let expected_acl_events = nb_wallets * NB_EVENTS_PER_WALLET;
for _ in 1..120 {
// 10 mins max to avoid stalled CI
for _ in 1..40 {
// 4 mins max to avoid stalled CI
let listener_handle = tokio::spawn(main(args.clone()));
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
check_finalization_status(&setup).await;
let tfhe_new_count = sqlx::query!("SELECT COUNT(*) FROM computations")
.fetch_one(&setup.db_pool)
Expand All @@ -1108,7 +1182,7 @@ async fn test_listener_no_event_loss(
if kill {
listener_handle.abort();
while !listener_handle.is_finished() {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
nb_kill += 1;
}
Expand All @@ -1119,7 +1193,7 @@ async fn test_listener_no_event_loss(
acl_events_count,
nb_wallets * NB_EVENTS_PER_WALLET,
);
tokio::time::sleep(tokio::time::Duration::from_secs_f64(1.5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
assert_eq!(tfhe_events_count, expected_tfhe_events);
assert_eq!(acl_events_count, expected_acl_events);
Expand Down Expand Up @@ -1190,21 +1264,17 @@ async fn test_catchup_and_listen() -> Result<(), anyhow::Error> {
args.catchup_paging = 3;
let listener_handle = tokio::spawn(main(args.clone()));
assert!(health_check::wait_healthy(&setup.health_check_url, 60, 1).await);
tokio::time::sleep(tokio::time::Duration::from_secs(20)).await; // time to catchup

let tfhe_events_count = sqlx::query!("SELECT COUNT(*) FROM computations")
.fetch_one(&setup.db_pool)
.await?
.count
.unwrap_or(0);
let acl_events_count = sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
.fetch_one(&setup.db_pool)
.await?
.count
.unwrap_or(0);
let nb_wallets = setup.wallets.len() as i64;
assert_eq!(tfhe_events_count, nb_wallets * nb_event_per_wallet);
assert_eq!(acl_events_count, nb_wallets * nb_event_per_wallet);
let expected = nb_wallets * nb_event_per_wallet;
let (tfhe_events_count, acl_events_count) = wait_for_event_counts(
&setup.db_pool,
tokio::time::Duration::from_secs(30),
&format!("waiting for first catchup (expected {expected})"),
|tfhe, acl| tfhe >= expected && acl >= expected,
)
.await?;
assert_eq!(tfhe_events_count, expected);
assert_eq!(acl_events_count, expected);
assert!(!listener_handle.is_finished(), "Listener should continue");
let wallets_clone = setup.wallets.clone();
let url_clone = setup.args.url.clone();
Expand All @@ -1219,20 +1289,17 @@ async fn test_catchup_and_listen() -> Result<(), anyhow::Error> {
nb_event_per_wallet,
)
.await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;

let tfhe_events_count = sqlx::query!("SELECT COUNT(*) FROM computations")
.fetch_one(&setup.db_pool)
.await?
.count
.unwrap_or(0);
let acl_events_count = sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
.fetch_one(&setup.db_pool)
.await?
.count
.unwrap_or(0);
assert_eq!(tfhe_events_count, 2 * nb_wallets * nb_event_per_wallet);
assert_eq!(acl_events_count, 2 * nb_wallets * nb_event_per_wallet);
let expected2 = 2 * nb_wallets * nb_event_per_wallet;
let (tfhe_events_count, acl_events_count) = wait_for_event_counts(
&setup.db_pool,
tokio::time::Duration::from_secs(30),
&format!("waiting for second batch (expected {expected2})"),
|tfhe, acl| tfhe >= expected2 && acl >= expected2,
)
.await?;
assert_eq!(tfhe_events_count, expected2);
assert_eq!(acl_events_count, expected2);
listener_handle.abort();
Ok(())
}
Expand Down Expand Up @@ -1265,23 +1332,28 @@ async fn test_catchup_only() -> Result<(), anyhow::Error> {
args.catchup_paging = 2;
let listener_handle = tokio::spawn(main(args.clone()));
assert!(health_check::wait_healthy(&setup.health_check_url, 60, 1).await);
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; // time to catchup

let tfhe_events_count = sqlx::query!("SELECT COUNT(*) FROM computations")
.fetch_one(&setup.db_pool)
.await?
.count
.unwrap_or(0);
let acl_events_count = sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
.fetch_one(&setup.db_pool)
.await?
.count
.unwrap_or(0);
let nb_wallets = setup.wallets.len() as i64;
let expected = nb_wallets * nb_event_per_wallet;
let (tfhe_events_count, acl_events_count) = wait_for_event_counts(
&setup.db_pool,
tokio::time::Duration::from_secs(30),
&format!("waiting for catchup (expected {expected})"),
|tfhe, acl| tfhe >= expected && acl >= expected,
)
.await?;
eprintln!("End block {:?}", args.end_at_block);
assert_eq!(tfhe_events_count, nb_wallets * nb_event_per_wallet);
assert_eq!(acl_events_count, nb_wallets * nb_event_per_wallet);
assert!(listener_handle.is_finished(), "Listener should stop");
assert_eq!(tfhe_events_count, expected);
assert_eq!(acl_events_count, expected);
// Allow the listener to finish after ingesting all events
let finish_deadline =
tokio::time::Instant::now() + tokio::time::Duration::from_secs(10);
while !listener_handle.is_finished() {
assert!(
tokio::time::Instant::now() < finish_deadline,
"Listener should stop"
);
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
Ok(())
}

Expand Down Expand Up @@ -1324,19 +1396,15 @@ where

let listener_handle = tokio::spawn(main(args.clone()));
assert!(health_check::wait_healthy(&setup.health_check_url, 60, 1).await);
tokio::time::sleep(tokio::time::Duration::from_secs(sleep_secs)).await;

let tfhe_events_count = sqlx::query!("SELECT COUNT(*) FROM computations")
.fetch_one(&setup.db_pool)
.await?
.count
.unwrap_or(0);
let acl_events_count = sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
.fetch_one(&setup.db_pool)
.await?
.count
.unwrap_or(0);
let nb_wallets = setup.wallets.len() as i64;
let expected = nb_wallets * nb_event_per_wallet;
let (tfhe_events_count, acl_events_count) = wait_for_event_counts(
&setup.db_pool,
tokio::time::Duration::from_secs(sleep_secs.max(30)),
&format!("waiting for catchup in scenario (expected {expected})"),
|tfhe, acl| tfhe >= expected && acl >= expected,
)
.await?;

Ok(CatchupOutcome {
_setup: setup,
Expand Down Expand Up @@ -1427,30 +1495,16 @@ async fn test_catchup_only_relative_end() -> Result<(), anyhow::Error> {
)
.await;

// Wait enough time for another catchup iteration to complete
tokio::time::sleep(tokio::time::Duration::from_secs(20)).await;

let tfhe_events_count_after =
sqlx::query!("SELECT COUNT(*) FROM computations")
.fetch_one(&setup.db_pool)
.await?
.count
.unwrap_or(0);
let acl_events_count_after =
sqlx::query!("SELECT COUNT(*) FROM allowed_handles")
.fetch_one(&setup.db_pool)
.await?
.count
.unwrap_or(0);

assert!(
tfhe_events_count_after > first_tfhe_events_count,
"Second catchup iteration should ingest additional TFHE events"
);
assert!(
acl_events_count_after > first_acl_events_count,
"Second catchup iteration should ingest additional ACL events"
);
// Poll until second catchup iteration ingests additional events
wait_for_event_counts(
&setup.db_pool,
tokio::time::Duration::from_secs(30),
"waiting for second catchup iteration",
|tfhe, acl| {
tfhe > first_tfhe_events_count && acl > first_acl_events_count
},
)
.await?;

// Listener should still be running
assert!(
Expand Down
Loading