Skip to content

Commit 0db19f0

Browse files
authored
Merge of #2119
2 parents bfc89a5 + f49b9c5 commit 0db19f0

File tree

5 files changed

+370
-0
lines changed

5 files changed

+370
-0
lines changed

coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ impl HeartBeat {
115115
let elapsed = self.now_timestamp() - self.timestamp.load(Ordering::Relaxed);
116116
elapsed <= freshness.as_secs()
117117
}
118+
119+
pub fn with_elapsed_secs(elapsed_secs: u64) -> Self {
120+
let now = std::time::Instant::now();
121+
let timestamp_origin = now
122+
.checked_sub(Duration::from_secs(elapsed_secs))
123+
.unwrap_or(now);
124+
Self {
125+
timestamp_origin,
126+
timestamp: Arc::new(AtomicU64::new(0)),
127+
}
128+
}
118129
}
119130

120131
impl Default for HeartBeat {

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,3 +1136,141 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
11361136
cancel_token.cancel();
11371137
anyhow::Result::Ok(())
11381138
}
1139+
1140+
#[cfg(test)]
1141+
mod tests {
1142+
use std::collections::VecDeque;
1143+
use std::sync::Arc;
1144+
1145+
use alloy::node_bindings::Anvil;
1146+
use alloy::providers::ext::AnvilApi;
1147+
use alloy::providers::{Provider, ProviderBuilder, WsConnect};
1148+
use tokio::sync::RwLock;
1149+
1150+
use fhevm_engine_common::utils::HeartBeat;
1151+
1152+
use super::*;
1153+
1154+
fn new_test_iter(reorg_max: u64) -> InfiniteLogIter {
1155+
InfiniteLogIter {
1156+
url: String::new(),
1157+
block_time: 12,
1158+
contract_addresses: vec![],
1159+
catchup_blocks: None,
1160+
next_blocklogs: VecDeque::new(),
1161+
stream: None,
1162+
provider: Arc::new(RwLock::new(None)),
1163+
last_valid_block: None,
1164+
start_at_block: None,
1165+
end_at_block: None,
1166+
absolute_end_at_block: None,
1167+
catchup_margin: 5,
1168+
catchup_paging: 100,
1169+
tick_timeout: HeartBeat::new(),
1170+
tick_block: HeartBeat::new(),
1171+
reorg_maximum_duration_in_blocks: reorg_max,
1172+
block_history: BlockHistory::new(reorg_max as usize),
1173+
catchup_finalization_in_blocks: 20,
1174+
timeout_request_websocket: 15,
1175+
}
1176+
}
1177+
1178+
async fn setup_iter_with_chain(
1179+
num_blocks: u64,
1180+
reorg_max: u64,
1181+
known: std::ops::RangeInclusive<usize>,
1182+
) -> (
1183+
alloy::node_bindings::AnvilInstance,
1184+
InfiniteLogIter,
1185+
Vec<BlockSummary>,
1186+
) {
1187+
let anvil = Anvil::new().spawn();
1188+
let ws = WsConnect::new(anvil.ws_endpoint());
1189+
let provider = ProviderBuilder::new().connect_ws(ws).await.unwrap();
1190+
provider.anvil_mine(Some(num_blocks), None).await.unwrap();
1191+
let mut blocks = Vec::with_capacity((num_blocks + 1) as usize);
1192+
for i in 0..=num_blocks {
1193+
let b = provider
1194+
.get_block_by_number(i.into())
1195+
.await
1196+
.unwrap()
1197+
.unwrap();
1198+
blocks.push(BlockSummary::from(b));
1199+
}
1200+
let mut iter = new_test_iter(reorg_max);
1201+
for b in &blocks[known] {
1202+
iter.block_history.add_block(*b);
1203+
}
1204+
*iter.provider.write().await = Some(provider);
1205+
(anvil, iter, blocks)
1206+
}
1207+
1208+
// Walks back 2 blocks before finding a known ancestor in history.
1209+
// Tests the common case where only a few blocks were missed.
1210+
#[tokio::test]
1211+
async fn test_get_missing_ancestors_shallow_reorg() {
1212+
let (_anvil, iter, blocks) = setup_iter_with_chain(5, 50, 0..=2).await;
1213+
1214+
let missing = iter.get_missing_ancestors(blocks[5]).await;
1215+
1216+
assert_eq!(missing.len(), 2);
1217+
assert_eq!(missing[0].number, 3);
1218+
assert_eq!(missing[1].number, 4);
1219+
assert_eq!(missing[0].parent_hash, blocks[2].hash);
1220+
}
1221+
1222+
// Walks back 13 blocks through a long gap before hitting a known ancestor.
1223+
// Tests that the walk handles a long gap correctly.
1224+
#[tokio::test]
1225+
async fn test_get_missing_ancestors_deep_reorg() {
1226+
let (_anvil, iter, blocks) = setup_iter_with_chain(15, 50, 0..=1).await;
1227+
1228+
let missing = iter.get_missing_ancestors(blocks[15]).await;
1229+
1230+
assert_eq!(missing.len(), 13);
1231+
assert_eq!(missing[0].number, 2);
1232+
assert_eq!(missing.last().unwrap().number, 14);
1233+
}
1234+
1235+
// Stops walking at reorg_maximum_duration_in_blocks even if more unknown ancestors remain.
1236+
// Tests that the function doesn't walk forever and respects the configured max depth.
1237+
#[tokio::test]
1238+
async fn test_get_missing_ancestors_beyond_max_depth() {
1239+
let (_anvil, iter, blocks) = setup_iter_with_chain(10, 3, 0..=0).await;
1240+
1241+
let missing = iter.get_missing_ancestors(blocks[10]).await;
1242+
1243+
assert_eq!(missing.len(), 3);
1244+
assert_eq!(missing[0].number, 7);
1245+
assert_eq!(missing[1].number, 8);
1246+
assert_eq!(missing[2].number, 9);
1247+
}
1248+
1249+
// Skips reorg detection when history has fewer than 2 blocks and just adds the block.
1250+
// Tests that the guard condition prevents false reorg detection.
1251+
#[tokio::test]
1252+
async fn test_check_missing_ancestors_not_ready() {
1253+
let mut iter = new_test_iter(50);
1254+
assert!(!iter.block_history.is_ready_to_detect_reorg());
1255+
1256+
let block_a = BlockSummary {
1257+
number: 100,
1258+
hash: BlockHash::with_last_byte(0xAA),
1259+
parent_hash: BlockHash::with_last_byte(0x99),
1260+
timestamp: 1000,
1261+
};
1262+
iter.check_missing_ancestors(block_a).await;
1263+
assert!(iter.block_history.is_known(&block_a.hash));
1264+
assert!(!iter.block_history.is_ready_to_detect_reorg());
1265+
1266+
let block_b = BlockSummary {
1267+
number: 101,
1268+
hash: BlockHash::with_last_byte(0xBB),
1269+
parent_hash: BlockHash::with_last_byte(0xAA),
1270+
timestamp: 1012,
1271+
};
1272+
iter.check_missing_ancestors(block_b).await;
1273+
assert!(iter.block_history.is_known(&block_b.hash));
1274+
assert!(iter.block_history.is_ready_to_detect_reorg());
1275+
}
1276+
}

coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,4 +1048,66 @@ mod tests {
10481048
assert!(logs[2].dependence_chain == tx3);
10491049
assert_eq!(cache.read().await.len(), 3);
10501050
}
1051+
1052+
#[tokio::test]
1053+
async fn test_dependence_chains_empty_logs() {
1054+
let cache = ChainCache::new(lru::LruCache::new(
1055+
std::num::NonZeroUsize::new(100).unwrap(),
1056+
));
1057+
let mut logs: Vec<LogTfhe> = vec![];
1058+
1059+
let chains = dependence_chains(&mut logs, &cache, false, true).await;
1060+
1061+
assert!(chains.is_empty());
1062+
assert_eq!(cache.read().await.len(), 0);
1063+
}
1064+
1065+
// Known past handle with across_blocks=false should not extent a past chain.
1066+
// This verifies that cross-block dependency tracking is disabled when the flag is off.
1067+
#[tokio::test]
1068+
async fn test_dependence_chains_across_blocks_false() {
1069+
let cache = ChainCache::new(lru::LruCache::new(
1070+
std::num::NonZeroUsize::new(100).unwrap(),
1071+
));
1072+
let past_handle = new_handle();
1073+
let past_chain_hash = past_chain(0).hash;
1074+
cache.write().await.put(past_handle, past_chain_hash);
1075+
1076+
let mut logs = vec![];
1077+
let tx1 = TransactionHash::with_last_byte(1);
1078+
let _v = op1(past_handle, &mut logs, tx1);
1079+
1080+
let chains = dependence_chains(&mut logs, &cache, false, false).await;
1081+
1082+
assert_eq!(chains.len(), 1);
1083+
// Chain is local (tx1), not the past chain
1084+
assert_eq!(chains[0].hash, tx1);
1085+
assert!(logs.iter().all(|log| log.dependence_chain == tx1));
1086+
// Cache not updated when across_blocks is false
1087+
assert_eq!(cache.read().await.len(), 1);
1088+
}
1089+
1090+
// Connex mode: 2 past chains feed into 1 tx, producing a single component.
1091+
#[tokio::test]
1092+
async fn test_dependence_chains_connex_two_past_chains_merge() {
1093+
let cache = ChainCache::new(lru::LruCache::new(
1094+
std::num::NonZeroUsize::new(100).unwrap(),
1095+
));
1096+
let past_handle1 = new_handle();
1097+
let past_handle2 = new_handle();
1098+
let past_chain_hash1 = past_chain(100).hash;
1099+
let past_chain_hash2 = past_chain(101).hash;
1100+
cache.write().await.put(past_handle1, past_chain_hash1);
1101+
cache.write().await.put(past_handle2, past_chain_hash2);
1102+
1103+
let mut logs = vec![];
1104+
let tx1 = TransactionHash::with_last_byte(2);
1105+
let _v = op2(past_handle1, past_handle2, &mut logs, tx1);
1106+
1107+
let chains = dependence_chains(&mut logs, &cache, true, true).await;
1108+
1109+
assert_eq!(chains.len(), 1);
1110+
assert_eq!(chains[0].hash, tx1);
1111+
assert_eq!(cache.read().await.len(), 3);
1112+
}
10511113
}

coprocessor/fhevm-engine/host-listener/src/database/ingest.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,4 +465,90 @@ mod tests {
465465
assert!(slow_dep_chain_ids.contains(&chains[2].hash));
466466
assert!(!slow_dep_chain_ids.contains(&chains[3].hash));
467467
}
468+
469+
// 4 independent chains each with exactly max_per_chain ops.
470+
// Since they are disconnected, each represents its own component.
471+
#[test]
472+
fn classify_slow_disconnected_components_at_threshold_are_fast() {
473+
let chains = vec![
474+
fixture_chain(1, &[]),
475+
fixture_chain(2, &[]),
476+
fixture_chain(3, &[]),
477+
fixture_chain(4, &[]),
478+
];
479+
let max = 64_u64;
480+
let dependent_ops_by_chain = HashMap::from([
481+
(chains[0].hash, max),
482+
(chains[1].hash, max),
483+
(chains[2].hash, max),
484+
(chains[3].hash, max),
485+
]);
486+
487+
let slow = classify_slow_by_split_dependency_closure(
488+
&chains,
489+
&dependent_ops_by_chain,
490+
max,
491+
);
492+
493+
assert!(
494+
slow.is_empty(),
495+
"no chain should be slow at exactly the threshold"
496+
);
497+
}
498+
499+
// Single chain with exactly max_per_chain ops is not slow.
500+
// One more dep makes it fast.
501+
#[test]
502+
fn classify_slow_single_chain_at_boundary() {
503+
let chains = vec![fixture_chain(1, &[])];
504+
let max = 64_u64;
505+
506+
let at_boundary = classify_slow_by_split_dependency_closure(
507+
&chains,
508+
&HashMap::from([(chains[0].hash, max)]),
509+
max,
510+
);
511+
assert!(
512+
at_boundary.is_empty(),
513+
"exactly at threshold should be fast"
514+
);
515+
516+
let over_boundary = classify_slow_by_split_dependency_closure(
517+
&chains,
518+
&HashMap::from([(chains[0].hash, max + 1)]),
519+
max,
520+
);
521+
assert!(
522+
over_boundary.contains(&chains[0].hash),
523+
"one over threshold should be slow"
524+
);
525+
}
526+
527+
// Non linear: A -> B, A -> C, B -> D, C -> D
528+
// Mark A slow, verify B, C, D all become slow via propagate_slow_lane_to_dependents.
529+
#[test]
530+
fn propagate_slow_lane_non_linear_dependency() {
531+
let chain_a = fixture_chain(1, &[]);
532+
let chain_b = fixture_chain(2, &[1]);
533+
let chain_c = fixture_chain(3, &[1]);
534+
let chain_d = fixture_chain(4, &[2, 3]);
535+
let chains = vec![chain_a, chain_b, chain_c, chain_d];
536+
537+
let mut slow = HashSet::from([chains[0].hash]);
538+
propagate_slow_lane_to_dependents(&chains, &mut slow);
539+
540+
assert!(slow.contains(&chains[0].hash), "A should be slow");
541+
assert!(
542+
slow.contains(&chains[1].hash),
543+
"B should be slow (depends on A)"
544+
);
545+
assert!(
546+
slow.contains(&chains[2].hash),
547+
"C should be slow (depends on A)"
548+
);
549+
assert!(
550+
slow.contains(&chains[3].hash),
551+
"D should be slow (depends on B and C)"
552+
);
553+
}
468554
}

coprocessor/fhevm-engine/host-listener/src/health_check.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,76 @@ impl HealthCheckService for HealthCheck {
6262
default_get_version()
6363
}
6464
}
65+
66+
#[cfg(test)]
67+
mod tests {
68+
use super::*;
69+
70+
// Helper to build a HealthCheck without real DB/provider connections.
71+
fn build_test_health_check(
72+
blockchain_timeout_tick: HeartBeat,
73+
blockchain_tick: HeartBeat,
74+
database_tick: HeartBeat,
75+
) -> HealthCheck {
76+
let db_url = "postgres://test:test@localhost:5432/test";
77+
let pool = sqlx::postgres::PgPoolOptions::new()
78+
.connect_lazy(db_url)
79+
.unwrap();
80+
HealthCheck {
81+
blockchain_timeout_tick,
82+
blockchain_tick,
83+
blockchain_provider: Arc::new(RwLock::new(None)),
84+
database_pool: Arc::new(RwLock::new(pool)),
85+
database_tick,
86+
}
87+
}
88+
89+
fn stale_tick() -> HeartBeat {
90+
HeartBeat::with_elapsed_secs(30)
91+
}
92+
93+
#[tokio::test]
94+
async fn not_alive_when_all_ticks_stale() {
95+
let health_check =
96+
build_test_health_check(stale_tick(), stale_tick(), stale_tick());
97+
assert!(!health_check.is_alive().await);
98+
}
99+
100+
#[tokio::test]
101+
async fn is_alive_after_blockchain_tick_update() {
102+
let health_check =
103+
build_test_health_check(stale_tick(), stale_tick(), stale_tick());
104+
assert!(!health_check.is_alive().await);
105+
health_check.blockchain_tick.update();
106+
assert!(health_check.is_alive().await);
107+
}
108+
109+
#[tokio::test]
110+
async fn is_alive_after_timeout_tick_update() {
111+
let health_check =
112+
build_test_health_check(stale_tick(), stale_tick(), stale_tick());
113+
assert!(!health_check.is_alive().await);
114+
health_check.blockchain_timeout_tick.update();
115+
assert!(health_check.is_alive().await);
116+
}
117+
118+
#[tokio::test]
119+
async fn not_alive_after_only_database_tick_update() {
120+
let health_check =
121+
build_test_health_check(stale_tick(), stale_tick(), stale_tick());
122+
assert!(!health_check.is_alive().await);
123+
health_check.database_tick.update();
124+
assert!(!health_check.is_alive().await);
125+
}
126+
127+
#[tokio::test]
128+
async fn is_alive_after_all_ticks_update() {
129+
let health_check =
130+
build_test_health_check(stale_tick(), stale_tick(), stale_tick());
131+
assert!(!health_check.is_alive().await);
132+
health_check.blockchain_tick.update();
133+
health_check.blockchain_timeout_tick.update();
134+
health_check.database_tick.update();
135+
assert!(health_check.is_alive().await);
136+
}
137+
}

0 commit comments

Comments
 (0)