Skip to content
11 changes: 11 additions & 0 deletions coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ impl HeartBeat {
let elapsed = self.now_timestamp() - self.timestamp.load(Ordering::Relaxed);
elapsed <= freshness.as_secs()
}

pub fn with_elapsed_secs(elapsed_secs: u64) -> Self {
let now = std::time::Instant::now();
let timestamp_origin = now
.checked_sub(Duration::from_secs(elapsed_secs))
.unwrap_or(now);
Self {
timestamp_origin,
timestamp: Arc::new(AtomicU64::new(0)),
}
}
}

impl Default for HeartBeat {
Expand Down
138 changes: 138 additions & 0 deletions coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,3 +1136,141 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
cancel_token.cancel();
anyhow::Result::Ok(())
}

#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::sync::Arc;

use alloy::node_bindings::Anvil;
use alloy::providers::ext::AnvilApi;
use alloy::providers::{Provider, ProviderBuilder, WsConnect};
use tokio::sync::RwLock;

use fhevm_engine_common::utils::HeartBeat;

use super::*;

fn new_test_iter(reorg_max: u64) -> InfiniteLogIter {
InfiniteLogIter {
url: String::new(),
block_time: 12,
contract_addresses: vec![],
catchup_blocks: None,
next_blocklogs: VecDeque::new(),
stream: None,
provider: Arc::new(RwLock::new(None)),
last_valid_block: None,
start_at_block: None,
end_at_block: None,
absolute_end_at_block: None,
catchup_margin: 5,
catchup_paging: 100,
tick_timeout: HeartBeat::new(),
tick_block: HeartBeat::new(),
reorg_maximum_duration_in_blocks: reorg_max,
block_history: BlockHistory::new(reorg_max as usize),
catchup_finalization_in_blocks: 20,
timeout_request_websocket: 15,
}
}

async fn setup_iter_with_chain(
num_blocks: u64,
reorg_max: u64,
known: std::ops::RangeInclusive<usize>,
) -> (
alloy::node_bindings::AnvilInstance,
InfiniteLogIter,
Vec<BlockSummary>,
) {
let anvil = Anvil::new().spawn();
let ws = WsConnect::new(anvil.ws_endpoint());
let provider = ProviderBuilder::new().connect_ws(ws).await.unwrap();
provider.anvil_mine(Some(num_blocks), None).await.unwrap();
let mut blocks = Vec::with_capacity((num_blocks + 1) as usize);
for i in 0..=num_blocks {
let b = provider
.get_block_by_number(i.into())
.await
.unwrap()
.unwrap();
blocks.push(BlockSummary::from(b));
}
let mut iter = new_test_iter(reorg_max);
for b in &blocks[known] {
iter.block_history.add_block(*b);
}
*iter.provider.write().await = Some(provider);
(anvil, iter, blocks)
}

// Walks back 2 blocks before finding a known ancestor in history.
// Tests the common case where only a few blocks were missed.
#[tokio::test]
async fn test_get_missing_ancestors_shallow_reorg() {
let (_anvil, iter, blocks) = setup_iter_with_chain(5, 50, 0..=2).await;

let missing = iter.get_missing_ancestors(blocks[5]).await;

assert_eq!(missing.len(), 2);
assert_eq!(missing[0].number, 3);
assert_eq!(missing[1].number, 4);
assert_eq!(missing[0].parent_hash, blocks[2].hash);
}

// Walks back 13 blocks through a long gap before hitting a known ancestor.
// Tests that the walk handles a long gap correctly.
#[tokio::test]
async fn test_get_missing_ancestors_deep_reorg() {
let (_anvil, iter, blocks) = setup_iter_with_chain(15, 50, 0..=1).await;

let missing = iter.get_missing_ancestors(blocks[15]).await;

assert_eq!(missing.len(), 13);
assert_eq!(missing[0].number, 2);
assert_eq!(missing.last().unwrap().number, 14);
}

// Stops walking at reorg_maximum_duration_in_blocks even if more unknown ancestors remain.
// Tests that the function doesn't walk forever and respects the configured max depth.
#[tokio::test]
async fn test_get_missing_ancestors_beyond_max_depth() {
let (_anvil, iter, blocks) = setup_iter_with_chain(10, 3, 0..=0).await;

let missing = iter.get_missing_ancestors(blocks[10]).await;

assert_eq!(missing.len(), 3);
assert_eq!(missing[0].number, 7);
assert_eq!(missing[1].number, 8);
assert_eq!(missing[2].number, 9);
}

// Skips reorg detection when history has fewer than 2 blocks and just adds the block.
// Tests that the guard condition prevents false reorg detection.
#[tokio::test]
async fn test_check_missing_ancestors_not_ready() {
let mut iter = new_test_iter(50);
assert!(!iter.block_history.is_ready_to_detect_reorg());

let block_a = BlockSummary {
number: 100,
hash: BlockHash::with_last_byte(0xAA),
parent_hash: BlockHash::with_last_byte(0x99),
timestamp: 1000,
};
iter.check_missing_ancestors(block_a).await;
assert!(iter.block_history.is_known(&block_a.hash));
assert!(!iter.block_history.is_ready_to_detect_reorg());

let block_b = BlockSummary {
number: 101,
hash: BlockHash::with_last_byte(0xBB),
parent_hash: BlockHash::with_last_byte(0xAA),
timestamp: 1012,
};
iter.check_missing_ancestors(block_b).await;
assert!(iter.block_history.is_known(&block_b.hash));
assert!(iter.block_history.is_ready_to_detect_reorg());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1048,4 +1048,66 @@ mod tests {
assert!(logs[2].dependence_chain == tx3);
assert_eq!(cache.read().await.len(), 3);
}

#[tokio::test]
async fn test_dependence_chains_empty_logs() {
let cache = ChainCache::new(lru::LruCache::new(
std::num::NonZeroUsize::new(100).unwrap(),
));
let mut logs: Vec<LogTfhe> = vec![];

let chains = dependence_chains(&mut logs, &cache, false, true).await;

assert!(chains.is_empty());
assert_eq!(cache.read().await.len(), 0);
}

// Known past handle with across_blocks=false should not extent a past chain.
// This verifies that cross-block dependency tracking is disabled when the flag is off.
#[tokio::test]
async fn test_dependence_chains_across_blocks_false() {
let cache = ChainCache::new(lru::LruCache::new(
std::num::NonZeroUsize::new(100).unwrap(),
));
let past_handle = new_handle();
let past_chain_hash = past_chain(0).hash;
cache.write().await.put(past_handle, past_chain_hash);

let mut logs = vec![];
let tx1 = TransactionHash::with_last_byte(1);
let _v = op1(past_handle, &mut logs, tx1);

let chains = dependence_chains(&mut logs, &cache, false, false).await;

assert_eq!(chains.len(), 1);
// Chain is local (tx1), not the past chain
assert_eq!(chains[0].hash, tx1);
assert!(logs.iter().all(|log| log.dependence_chain == tx1));
// Cache not updated when across_blocks is false
assert_eq!(cache.read().await.len(), 1);
}

// Connex mode: 2 past chains feed into 1 tx, producing a single component.
#[tokio::test]
async fn test_dependence_chains_connex_two_past_chains_merge() {
let cache = ChainCache::new(lru::LruCache::new(
std::num::NonZeroUsize::new(100).unwrap(),
));
let past_handle1 = new_handle();
let past_handle2 = new_handle();
let past_chain_hash1 = past_chain(100).hash;
let past_chain_hash2 = past_chain(101).hash;
cache.write().await.put(past_handle1, past_chain_hash1);
cache.write().await.put(past_handle2, past_chain_hash2);

let mut logs = vec![];
let tx1 = TransactionHash::with_last_byte(2);
let _v = op2(past_handle1, past_handle2, &mut logs, tx1);

let chains = dependence_chains(&mut logs, &cache, true, true).await;

assert_eq!(chains.len(), 1);
assert_eq!(chains[0].hash, tx1);
assert_eq!(cache.read().await.len(), 3);
}
}
86 changes: 86 additions & 0 deletions coprocessor/fhevm-engine/host-listener/src/database/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,4 +465,90 @@ mod tests {
assert!(slow_dep_chain_ids.contains(&chains[2].hash));
assert!(!slow_dep_chain_ids.contains(&chains[3].hash));
}

// 4 independent chains each with exactly max_per_chain ops.
// Since they are disconnected, each represents its own component.
#[test]
fn classify_slow_disconnected_components_at_threshold_are_fast() {
let chains = vec![
fixture_chain(1, &[]),
fixture_chain(2, &[]),
fixture_chain(3, &[]),
fixture_chain(4, &[]),
];
let max = 64_u64;
let dependent_ops_by_chain = HashMap::from([
(chains[0].hash, max),
(chains[1].hash, max),
(chains[2].hash, max),
(chains[3].hash, max),
]);

let slow = classify_slow_by_split_dependency_closure(
&chains,
&dependent_ops_by_chain,
max,
);

assert!(
slow.is_empty(),
"no chain should be slow at exactly the threshold"
);
}

// Single chain with exactly max_per_chain ops is not slow.
// One more dep makes it fast.
#[test]
fn classify_slow_single_chain_at_boundary() {
let chains = vec![fixture_chain(1, &[])];
let max = 64_u64;

let at_boundary = classify_slow_by_split_dependency_closure(
&chains,
&HashMap::from([(chains[0].hash, max)]),
max,
);
assert!(
at_boundary.is_empty(),
"exactly at threshold should be fast"
);

let over_boundary = classify_slow_by_split_dependency_closure(
&chains,
&HashMap::from([(chains[0].hash, max + 1)]),
max,
);
assert!(
over_boundary.contains(&chains[0].hash),
"one over threshold should be slow"
);
}

// Non linear: A -> B, A -> C, B -> D, C -> D
// Mark A slow, verify B, C, D all become slow via propagate_slow_lane_to_dependents.
#[test]
fn propagate_slow_lane_non_linear_dependency() {
let chain_a = fixture_chain(1, &[]);
let chain_b = fixture_chain(2, &[1]);
let chain_c = fixture_chain(3, &[1]);
let chain_d = fixture_chain(4, &[2, 3]);
let chains = vec![chain_a, chain_b, chain_c, chain_d];

let mut slow = HashSet::from([chains[0].hash]);
propagate_slow_lane_to_dependents(&chains, &mut slow);

assert!(slow.contains(&chains[0].hash), "A should be slow");
assert!(
slow.contains(&chains[1].hash),
"B should be slow (depends on A)"
);
assert!(
slow.contains(&chains[2].hash),
"C should be slow (depends on A)"
);
assert!(
slow.contains(&chains[3].hash),
"D should be slow (depends on B and C)"
);
}
}
73 changes: 73 additions & 0 deletions coprocessor/fhevm-engine/host-listener/src/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,76 @@ impl HealthCheckService for HealthCheck {
default_get_version()
}
}

#[cfg(test)]
mod tests {
use super::*;

// Helper to build a HealthCheck without real DB/provider connections.
fn build_test_health_check(
blockchain_timeout_tick: HeartBeat,
blockchain_tick: HeartBeat,
database_tick: HeartBeat,
) -> HealthCheck {
let db_url = "postgres://test:test@localhost:5432/test";
let pool = sqlx::postgres::PgPoolOptions::new()
.connect_lazy(db_url)
.unwrap();
HealthCheck {
blockchain_timeout_tick,
blockchain_tick,
blockchain_provider: Arc::new(RwLock::new(None)),
database_pool: Arc::new(RwLock::new(pool)),
database_tick,
}
}

fn stale_tick() -> HeartBeat {
HeartBeat::with_elapsed_secs(30)
}

#[tokio::test]
async fn not_alive_when_all_ticks_stale() {
let health_check =
build_test_health_check(stale_tick(), stale_tick(), stale_tick());
assert!(!health_check.is_alive().await);
}

#[tokio::test]
async fn is_alive_after_blockchain_tick_update() {
let health_check =
build_test_health_check(stale_tick(), stale_tick(), stale_tick());
assert!(!health_check.is_alive().await);
health_check.blockchain_tick.update();
assert!(health_check.is_alive().await);
}

#[tokio::test]
async fn is_alive_after_timeout_tick_update() {
let health_check =
build_test_health_check(stale_tick(), stale_tick(), stale_tick());
assert!(!health_check.is_alive().await);
health_check.blockchain_timeout_tick.update();
assert!(health_check.is_alive().await);
}

#[tokio::test]
async fn not_alive_after_only_database_tick_update() {
let health_check =
build_test_health_check(stale_tick(), stale_tick(), stale_tick());
assert!(!health_check.is_alive().await);
health_check.database_tick.update();
assert!(!health_check.is_alive().await);
}

#[tokio::test]
async fn is_alive_after_all_ticks_update() {
let health_check =
build_test_health_check(stale_tick(), stale_tick(), stale_tick());
assert!(!health_check.is_alive().await);
health_check.blockchain_tick.update();
health_check.blockchain_timeout_tick.update();
health_check.database_tick.update();
assert!(health_check.is_alive().await);
}
}
Loading