Skip to content

Commit 9053b19

Browse files
authored
fix(coprocessor): maximise dependence fetch from DB (#1950) (#1961)
* fix(coprocessor): maximise dependence fetch from DB (#1950) fix(coprocessor): maximise fetched dependence ciphertexts from DB * chore(coprocessor): update time and bytes package versions
1 parent 40a9870 commit 9053b19

File tree

3 files changed

+85
-47
lines changed

3 files changed

+85
-47
lines changed

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 12 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/scheduler/src/dfg.rs

Lines changed: 63 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ pub struct DFComponentGraph {
312312
pub needed_map: HashMap<Handle, Vec<NodeIndex>>,
313313
pub produced: HashMap<Handle, Vec<(NodeIndex, Handle)>>,
314314
pub results: Vec<DFGTxResult>,
315+
deferred_dependences: Vec<(NodeIndex, NodeIndex, Handle)>,
315316
}
316317
impl DFComponentGraph {
317318
pub fn build(&mut self, nodes: &mut Vec<ComponentNode>) -> Result<()> {
@@ -350,7 +351,16 @@ impl DFComponentGraph {
350351
error!(target: "scheduler", { output_handle = ?hex::encode(i.clone()) },
351352
"Missing producer for handle");
352353
} else {
353-
dependence_pairs.push((producer[0].0, consumer));
354+
// Cross-transaction dependence: defer until
355+
// after DB fetch. If the handle is found in
356+
// DB, we use the fetched value and skip the
357+
// dependence edge.
358+
self.deferred_dependences
359+
.push((producer[0].0, consumer, i.clone()));
360+
self.needed_map
361+
.entry(i.clone())
362+
.and_modify(|uses| uses.push(consumer))
363+
.or_insert(vec![consumer]);
354364
}
355365
} else {
356366
self.needed_map
@@ -361,19 +371,52 @@ impl DFComponentGraph {
361371
}
362372
}
363373

364-
// We build a replica of the graph and map it to the
365-
// underlying DiGraph so we can identify cycles.
366-
let mut digraph = self.graph.map(|idx, _| idx, |_, _| ()).graph().clone();
367-
// Add transaction dependence edges
374+
// Same-transaction dependences are always acyclic (they
375+
// derive from the transaction's internal DAG). Add them
376+
// directly; cycle detection runs once in
377+
// resolve_dependences() over the full edge set.
368378
for (producer, consumer) in dependence_pairs.iter() {
379+
if self.graph.add_edge(*producer, *consumer, ()).is_err() {
380+
let prod = self
381+
.graph
382+
.node_weight(*producer)
383+
.ok_or(SchedulerError::DataflowGraphError)?;
384+
let cons = self
385+
.graph
386+
.node_weight(*consumer)
387+
.ok_or(SchedulerError::DataflowGraphError)?;
388+
error!(target: "scheduler", { producer_id = ?hex::encode(prod.transaction_id.clone()), consumer_id = ?hex::encode(cons.transaction_id.clone()) },
389+
"Unexpected cycle in same-transaction dependence");
390+
return Err(SchedulerError::CyclicDependence.into());
391+
}
392+
}
393+
Ok(())
394+
}
395+
396+
// Resolve deferred cross-transaction dependences after DB fetch.
397+
// Dependences whose handle was successfully fetched are dropped
398+
// (the consumer already has the data). Remaining dependences are
399+
// added as graph edges after cycle detection.
400+
pub fn resolve_dependences(&mut self, fetched_handles: &HashSet<Handle>) -> Result<()> {
401+
let remaining: Vec<(NodeIndex, NodeIndex)> = self
402+
.deferred_dependences
403+
.drain(..)
404+
.filter(|(_, _, handle)| !fetched_handles.contains(handle))
405+
.map(|(prod, cons, _)| (prod, cons))
406+
.collect();
407+
if remaining.is_empty() {
408+
return Ok(());
409+
}
410+
// Build a digraph replica including existing edges +
411+
// remaining deferred edges and check for cycles
412+
let mut digraph = self.graph.map(|idx, _| idx, |_, _| ()).graph().clone();
413+
for (producer, consumer) in remaining.iter() {
369414
digraph.add_edge(*producer, *consumer, ());
370415
}
371416
let mut tarjan = daggy::petgraph::algo::TarjanScc::new();
372417
let mut sccs = Vec::new();
373418
tarjan.run(&digraph, |scc| {
374419
if scc.len() > 1 {
375-
// All non-singleton SCCs in a directed graph are
376-
// dependence cycles
377420
sccs.push(scc.to_vec());
378421
}
379422
});
@@ -389,9 +432,6 @@ impl DFComponentGraph {
389432
.graph
390433
.node_weight_mut(*idx)
391434
.ok_or(SchedulerError::DataflowGraphError)?;
392-
// Mark the node as uncomputable so we don't go
393-
// and mark as completed operations that are in
394-
// error.
395435
tx.is_uncomputable = true;
396436
error!(target: "scheduler", { transaction_id = ?hex::encode(tx.transaction_id.clone()) },
397437
"Transaction is part of a dependence cycle");
@@ -405,26 +445,20 @@ impl DFComponentGraph {
405445
}
406446
}
407447
return Err(SchedulerError::CyclicDependence.into());
408-
} else {
409-
// If no dependence cycles were found, then we can
410-
// complete the graph and proceed to execution
411-
for (producer, consumer) in dependence_pairs.iter() {
412-
// The error case here should not happen as we've
413-
// already covered it by testing for SCCs in the graph
414-
// first
415-
if self.graph.add_edge(*producer, *consumer, ()).is_err() {
416-
let prod = self
417-
.graph
418-
.node_weight(*producer)
419-
.ok_or(SchedulerError::DataflowGraphError)?;
420-
let cons = self
421-
.graph
422-
.node_weight(*consumer)
423-
.ok_or(SchedulerError::DataflowGraphError)?;
424-
error!(target: "scheduler", { producer_id = ?hex::encode(prod.transaction_id.clone()), consumer_id = ?hex::encode(cons.transaction_id.clone()) },
448+
}
449+
for (producer, consumer) in remaining.iter() {
450+
if self.graph.add_edge(*producer, *consumer, ()).is_err() {
451+
let prod = self
452+
.graph
453+
.node_weight(*producer)
454+
.ok_or(SchedulerError::DataflowGraphError)?;
455+
let cons = self
456+
.graph
457+
.node_weight(*consumer)
458+
.ok_or(SchedulerError::DataflowGraphError)?;
459+
error!(target: "scheduler", { producer_id = ?hex::encode(prod.transaction_id.clone()), consumer_id = ?hex::encode(cons.transaction_id.clone()) },
425460
"Dependence cycle when adding dependence - initial cycle detection failed");
426-
return Err(SchedulerError::CyclicDependence.into());
427-
}
461+
return Err(SchedulerError::CyclicDependence.into());
428462
}
429463
}
430464
Ok(())

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -531,23 +531,26 @@ async fn build_transaction_graph_and_execute<'a>(
531531
let cts_to_query = tx_graph.needed_map.keys().cloned().collect::<Vec<_>>();
532532
let ciphertext_map =
533533
query_ciphertexts(&cts_to_query, *tenant_id, trx, tracer, loop_ctx).await?;
534-
// Check if we retrieved all needed CTs - if not, we may not want to proceed to execution
535-
if cts_to_query.len() != ciphertext_map.len() {
534+
let fetched_handles: std::collections::HashSet<_> = ciphertext_map.keys().cloned().collect();
535+
if cts_to_query.len() != fetched_handles.len() {
536536
if let Some(dcid_lock) = dcid_mngr.get_current_lock() {
537-
warn!(target: "tfhe_worker", { missing_inputs = ?(cts_to_query.len() - ciphertext_map.len()), dcid = %hex::encode(dcid_lock.dependence_chain_id) },
537+
warn!(target: "tfhe_worker", { missing_inputs = ?(cts_to_query.len() - fetched_handles.len()), dcid = %hex::encode(dcid_lock.dependence_chain_id) },
538538
"some inputs are missing to execute the dependence chain");
539539
}
540-
// Do not stop execution, we will allow the scheduler to run
541-
// and complete as many operations as can be computed given
542-
// the inputs fetched
543540
}
544-
545541
for (handle, (ct_type, mut ct)) in ciphertext_map.into_iter() {
546542
tx_graph.add_input(
547543
&handle,
548544
&DFGTxInput::Compressed(((ct_type, std::mem::take(&mut ct)), true)),
549545
)?;
550546
}
547+
// Resolve deferred cross-transaction dependences: edges whose
548+
// handle was fetched from DB are dropped (data already available),
549+
// remaining edges are added after cycle detection.
550+
if let Err(e) = tx_graph.resolve_dependences(&fetched_handles) {
551+
warn!(target: "tfhe_worker", { error = %e }, "error resolving cross-transaction dependences");
552+
return Ok(tx_graph);
553+
}
551554
// Execute the DFG with the current tenant's keys
552555
let mut s_compute = tracer.start_with_context("compute_fhe_ops", loop_ctx);
553556
{

0 commit comments

Comments
 (0)