Skip to content

Commit 3c912c2

Browse files
committed
feat(host-listener): account slow-lane over split dependency closures
Keep no_fork parallelism, but aggregate inserted-op pressure over split dependency closures for slow-lane classification.
1 parent 4e27262 commit 3c912c2

File tree

4 files changed

+105
-21
lines changed

4 files changed

+105
-21
lines changed

coprocessor/fhevm-engine/host-listener/README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@ If you want to disable TFHE operation events propagation, you can provide an emp
3838
`--dependent-ops-max-per-chain` enables slow-lane assignment (`0` disables).
3939

4040
Current behavior:
41-
- Counter is **per dependence chain, per ingest pass** (block-scoped in normal flow).
42-
- Count is **unweighted**: `+1` for each newly inserted TFHE op in the chain.
41+
- Count is **per ingest pass** (block-scoped in normal flow).
42+
- Count unit is **unweighted**: `+1` for each newly inserted TFHE op.
43+
- Slow-lane threshold is evaluated on split-dependency closures (connected dcids),
44+
then applied to all chains in the over-cap closure.
4345
- `is_allowed` is **not** part of the counter (a non-allowed op can still be required producer work).
4446
- It is **not** dependency depth and **not** cumulative across past blocks.
4547

46-
If a chain exceeds the cap in that ingest pass, host-listener marks it slow by
48+
If a closure exceeds the cap in that ingest pass, host-listener marks its chains slow by
4749
setting `dependence_chain.schedule_priority = 1` (monotonic via `GREATEST` on
4850
upsert). tfhe-worker picks fast first (`0`) and processes slow when fast is empty.
4951

coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ async fn grouping_to_chains_connex(
415415
size: tx.size,
416416
before_size: 0,
417417
dependencies: vec![],
418-
inheritance_parents: vec![],
418+
split_dependencies: vec![],
419419
dependents: vec![],
420420
allowed_handle: tx.allowed_handle.clone(),
421421
new_chain,
@@ -523,7 +523,7 @@ fn grouping_to_chains_no_fork(
523523
size: 0,
524524
before_size: 0,
525525
dependencies: vec![],
526-
inheritance_parents: vec![],
526+
split_dependencies: vec![],
527527
dependents: vec![],
528528
allowed_handle: tx.allowed_handle.clone(), // needed to publish in cache
529529
new_chain: false,
@@ -544,15 +544,15 @@ fn grouping_to_chains_no_fork(
544544
}
545545
debug!("Creating new chain for tx {:?} with block dependencies {:?}, outer dependencies {:?}, before_size {}",
546546
tx, dependencies_block, dependencies_outer, before_size);
547-
let inheritance_parents =
547+
let split_dependencies =
548548
[dependencies_block.clone(), dependencies_outer.clone()]
549549
.concat();
550550
let new_chain = Chain {
551551
hash: tx.tx_hash,
552552
size: tx.size,
553553
before_size,
554554
dependencies: dependencies_block,
555-
inheritance_parents,
555+
split_dependencies,
556556
dependents: vec![],
557557
allowed_handle: tx.allowed_handle.clone(),
558558
new_chain: true,
@@ -1022,7 +1022,7 @@ mod tests {
10221022
Chain {
10231023
hash: TransactionHash::with_last_byte(last_byte),
10241024
dependencies: vec![],
1025-
inheritance_parents: vec![],
1025+
split_dependencies: vec![],
10261026
dependents: vec![],
10271027
size: 1,
10281028
before_size: 0,

coprocessor/fhevm-engine/host-listener/src/database/ingest.rs

Lines changed: 93 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ fn propagate_slow_lane_to_dependents(
5151
let mut dependents_by_dependency: HashMap<ChainHash, Vec<ChainHash>> =
5252
HashMap::new();
5353
for chain in chains {
54-
for dependency in &chain.inheritance_parents {
54+
for dependency in &chain.split_dependencies {
5555
dependents_by_dependency
5656
.entry(*dependency)
5757
.or_default()
@@ -74,6 +74,64 @@ fn propagate_slow_lane_to_dependents(
7474
}
7575
}
7676

77+
fn classify_slow_by_split_dependency_closure(
78+
chains: &[Chain],
79+
dependent_ops_by_chain: &HashMap<ChainHash, u64>,
80+
max_per_chain: u64,
81+
) -> HashSet<ChainHash> {
82+
let chain_ids = chains
83+
.iter()
84+
.map(|chain| chain.hash)
85+
.collect::<HashSet<_>>();
86+
let mut neighbors: HashMap<ChainHash, HashSet<ChainHash>> =
87+
HashMap::with_capacity(chains.len());
88+
for chain in chains {
89+
neighbors.entry(chain.hash).or_default();
90+
for dependency in &chain.split_dependencies {
91+
if !chain_ids.contains(dependency) {
92+
continue;
93+
}
94+
neighbors.entry(chain.hash).or_default().insert(*dependency);
95+
neighbors.entry(*dependency).or_default().insert(chain.hash);
96+
}
97+
}
98+
99+
let mut visited = HashSet::with_capacity(chains.len());
100+
let mut slow_dep_chain_ids = HashSet::new();
101+
for chain in chains {
102+
if visited.contains(&chain.hash) {
103+
continue;
104+
}
105+
let mut component = Vec::new();
106+
let mut stack = vec![chain.hash];
107+
visited.insert(chain.hash);
108+
while let Some(current) = stack.pop() {
109+
component.push(current);
110+
if let Some(next_neighbors) = neighbors.get(&current) {
111+
for next in next_neighbors {
112+
if visited.insert(*next) {
113+
stack.push(*next);
114+
}
115+
}
116+
}
117+
}
118+
119+
let component_ops =
120+
component.iter().fold(0_u64, |sum, dep_chain_id| {
121+
sum.saturating_add(
122+
dependent_ops_by_chain
123+
.get(dep_chain_id)
124+
.copied()
125+
.unwrap_or(0),
126+
)
127+
});
128+
if component_ops > max_per_chain {
129+
slow_dep_chain_ids.extend(component);
130+
}
131+
}
132+
slow_dep_chain_ids
133+
}
134+
77135
pub async fn ingest_block_logs(
78136
chain_id: ChainId,
79137
db: &mut Database,
@@ -205,20 +263,17 @@ pub async fn ingest_block_logs(
205263
let mut slow_dep_chain_ids: HashSet<ChainHash> = HashSet::new();
206264
if slow_lane_enabled {
207265
let max_per_chain = u64::from(options.dependent_ops_max_per_chain);
208-
for chain in &chains {
209-
if let Some(chain_dep_ops) = dependent_ops_by_chain.get(&chain.hash)
210-
{
211-
if *chain_dep_ops > max_per_chain {
212-
slow_dep_chain_ids.insert(chain.hash);
213-
}
214-
}
215-
}
266+
slow_dep_chain_ids = classify_slow_by_split_dependency_closure(
267+
&chains,
268+
&dependent_ops_by_chain,
269+
max_per_chain,
270+
);
216271

217272
let parent_dep_chain_ids = chains
218273
.iter()
219274
.flat_map(|chain| {
220275
chain
221-
.inheritance_parents
276+
.split_dependencies
222277
.iter()
223278
.map(|dependency| dependency.to_vec())
224279
})
@@ -276,7 +331,7 @@ mod tests {
276331
.iter()
277332
.map(|dep| FixedBytes::<32>::from([*dep; 32]))
278333
.collect(),
279-
inheritance_parents: dependencies
334+
split_dependencies: dependencies
280335
.iter()
281336
.map(|dep| FixedBytes::<32>::from([*dep; 32]))
282337
.collect(),
@@ -305,4 +360,31 @@ mod tests {
305360
assert!(slow_dep_chain_ids.contains(&chains[2].hash));
306361
assert!(!slow_dep_chain_ids.contains(&chains[3].hash));
307362
}
363+
364+
#[test]
365+
fn classifies_slow_by_split_dependency_closure_sum() {
366+
let chains = vec![
367+
fixture_chain(1, &[]),
368+
fixture_chain(2, &[1]),
369+
fixture_chain(3, &[2]),
370+
fixture_chain(4, &[]),
371+
];
372+
let dependent_ops_by_chain = HashMap::from([
373+
(chains[0].hash, 30_u64),
374+
(chains[1].hash, 20_u64),
375+
(chains[2].hash, 20_u64),
376+
(chains[3].hash, 10_u64),
377+
]);
378+
379+
let slow_dep_chain_ids = classify_slow_by_split_dependency_closure(
380+
&chains,
381+
&dependent_ops_by_chain,
382+
64,
383+
);
384+
385+
assert!(slow_dep_chain_ids.contains(&chains[0].hash));
386+
assert!(slow_dep_chain_ids.contains(&chains[1].hash));
387+
assert!(slow_dep_chain_ids.contains(&chains[2].hash));
388+
assert!(!slow_dep_chain_ids.contains(&chains[3].hash));
389+
}
308390
}

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ static SLOW_LANE_MARKED_CHAINS_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(
5555
pub struct Chain {
5656
pub hash: ChainHash,
5757
pub dependencies: Vec<ChainHash>,
58-
// Ingest-only metadata for slow-lane inheritance propagation.
58+
// Ingest-only metadata for dependency links split by no_fork grouping.
5959
// Not used by scheduler execution ordering.
60-
pub inheritance_parents: Vec<ChainHash>,
60+
pub split_dependencies: Vec<ChainHash>,
6161
pub dependents: Vec<ChainHash>,
6262
pub allowed_handle: Vec<Handle>,
6363
pub size: u64,

0 commit comments

Comments
 (0)