Skip to content

Commit 3469a0c

Browse files
antoniupopobatirourudy-6-4
authored
fix(coprocessor): fix handling of cyclic dependence errors (port from #1421) (#1439)
* fix(coprocessor): fix handling of cyclic dependence errors (#1421) * fix(coprocessor): fix handling of dependence cycles * fix(coprocessor): log dependence cycle * fix(coprocessor): fix uncomputable flag and schedule order on missing input * chore(host-listener): update max message size websocket (#1410) --------- Co-authored-by: Oba <obatirou@gmail.com> * fix(coprocessor): host-listener, message size leftover --------- Co-authored-by: Oba <obatirou@gmail.com> Co-authored-by: rudy <rudy.sicard@zama.ai>
1 parent 360fc18 commit 3469a0c

File tree

3 files changed

+104
-14
lines changed

3 files changed

+104
-14
lines changed

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use alloy::primitives::Address;
33
use alloy::providers::{Provider, ProviderBuilder, WsConnect};
44
use alloy::pubsub::SubscriptionStream;
55
use alloy::rpc::types::{Block, BlockNumberOrTag, Filter, Header, Log};
6+
use alloy::transports::ws::WebSocketConfig;
67
use anyhow::{anyhow, Result};
78
use clap::Parser;
89
use futures_util::stream::StreamExt;
@@ -166,6 +167,10 @@ mod eth_rpc_err {
166167
}
167168
}
168169

170+
fn websocket_config() -> WebSocketConfig {
171+
WebSocketConfig::default().max_message_size(Some(256 * 1024 * 1024)) // 256MB
172+
}
173+
169174
impl InfiniteLogIter {
170175
fn new(args: &Args) -> Self {
171176
let mut contract_addresses = vec![];
@@ -202,7 +207,8 @@ impl InfiniteLogIter {
202207
}
203208

204209
async fn get_chain_id(&self) -> anyhow::Result<ChainId> {
205-
let ws = WsConnect::new(&self.url);
210+
let config = websocket_config();
211+
let ws = WsConnect::new(&self.url).with_config(config);
206212
let provider = ProviderBuilder::new().connect_ws(ws).await?;
207213
Ok(provider.get_chain_id().await?)
208214
}
@@ -245,7 +251,10 @@ impl InfiniteLogIter {
245251
filter = filter.address(self.contract_addresses.clone())
246252
}
247253
// we use a specific provider to not disturb the real-time one (no buffer shared)
248-
let ws = WsConnect::new(&self.url).with_max_retries(0); // disabled, alloy skips events
254+
let config = websocket_config();
255+
let ws = WsConnect::new(&self.url)
256+
.with_config(config)
257+
.with_max_retries(0); // disabled, alloy skips events
249258
let provider = match ProviderBuilder::new().connect_ws(ws).await {
250259
Ok(provider) => provider,
251260
Err(_) => anyhow::bail!("Cannot get a provider"),
@@ -622,9 +631,11 @@ impl InfiniteLogIter {
622631

623632
async fn new_log_stream(&mut self, not_initialized: bool) {
624633
let mut retry = 20;
634+
let config = websocket_config();
625635
loop {
626-
let ws = WsConnect::new(&self.url).with_max_retries(0); // disabled, alloy skips events
627-
636+
let ws = WsConnect::new(&self.url)
637+
.with_config(config)
638+
.with_max_retries(0); // disabled, alloy skips events
628639
match ProviderBuilder::new().connect_ws(ws).await {
629640
Ok(provider) => {
630641
let catch_up_from =

coprocessor/fhevm-engine/scheduler/src/dfg.rs

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ pub mod scheduler;
22
pub mod types;
33

44
use std::{collections::HashMap, sync::atomic::AtomicUsize};
5-
use tracing::error;
5+
use tracing::{error, warn};
66

77
use crate::dfg::types::*;
88
use anyhow::Result;
@@ -305,7 +305,12 @@ impl DFComponentGraph {
305305
for (consumer, tx) in self.graph.node_references() {
306306
for i in tx.inputs.keys() {
307307
if let Some(producer) = self.allowed_map.get(i) {
308-
dependence_pairs.push((producer, consumer));
308+
if *producer == consumer {
309+
warn!(target: "scheduler", { },
310+
"Self-dependence on node");
311+
} else {
312+
dependence_pairs.push((*producer, consumer));
313+
}
309314
} else {
310315
self.needed_map
311316
.entry(i.clone())
@@ -314,14 +319,72 @@ impl DFComponentGraph {
314319
}
315320
}
316321
}
322+
323+
// We build a replica of the graph and map it to the
324+
// underlying DiGraph so we can identify cycles.
325+
let mut digraph = self.graph.map(|idx, _| idx, |_, _| ()).graph().clone();
317326
// Add transaction dependence edges
318-
for (producer, consumer) in dependence_pairs {
319-
// Error only occurs in case of cyclic dependence which
320-
// shoud not be possible between transactions. In that
321-
// case, the whole cycle should be put in an error state.
322-
self.graph
323-
.add_edge(*producer, consumer, ())
324-
.map_err(|_| SchedulerError::CyclicDependence)?;
327+
for (producer, consumer) in dependence_pairs.iter() {
328+
digraph.add_edge(*producer, *consumer, ());
329+
}
330+
let mut tarjan = daggy::petgraph::algo::TarjanScc::new();
331+
let mut sccs = Vec::new();
332+
tarjan.run(&digraph, |scc| {
333+
if scc.len() > 1 {
334+
// All non-singleton SCCs in a directed graph are
335+
// dependence cycles
336+
sccs.push(scc.to_vec());
337+
}
338+
});
339+
if !sccs.is_empty() {
340+
for scc in sccs {
341+
error!(target: "scheduler", { cycle_size = ?scc.len() },
342+
"Dependence cycle detected");
343+
for idx in scc {
344+
let idx = digraph
345+
.node_weight(idx)
346+
.ok_or(SchedulerError::DataflowGraphError)?;
347+
let tx = self
348+
.graph
349+
.node_weight_mut(*idx)
350+
.ok_or(SchedulerError::DataflowGraphError)?;
351+
// Mark the node as uncomputable so we don't go
352+
// and mark as completed operations that are in
353+
// error.
354+
tx.is_uncomputable = true;
355+
error!(target: "scheduler", { transaction_id = ?hex::encode(tx.transaction_id.clone()) },
356+
"Transaction is part of a dependence cycle");
357+
for (_, op) in tx.graph.graph.node_references() {
358+
self.results.push(DFGTxResult {
359+
transaction_id: tx.transaction_id.clone(),
360+
handle: op.result_handle.to_vec(),
361+
compressed_ct: Err(SchedulerError::CyclicDependence.into()),
362+
});
363+
}
364+
}
365+
}
366+
return Err(SchedulerError::CyclicDependence.into());
367+
} else {
368+
// If no dependence cycles were found, then we can
369+
// complete the graph and proceed to execution
370+
for (producer, consumer) in dependence_pairs.iter() {
371+
// The error case here should not happen as we've
372+
// already covered it by testing for SCCs in the graph
373+
// first
374+
if self.graph.add_edge(*producer, *consumer, ()).is_err() {
375+
let prod = self
376+
.graph
377+
.node_weight(*producer)
378+
.ok_or(SchedulerError::DataflowGraphError)?;
379+
let cons = self
380+
.graph
381+
.node_weight(*consumer)
382+
.ok_or(SchedulerError::DataflowGraphError)?;
383+
error!(target: "scheduler", { producer_id = ?hex::encode(prod.transaction_id.clone()), consumer_id = ?hex::encode(cons.transaction_id.clone()) },
384+
"Dependence cycle when adding dependence - initial cycle detection failed");
385+
return Err(SchedulerError::CyclicDependence.into());
386+
}
387+
}
325388
}
326389
Ok(())
327390
}
@@ -402,7 +465,17 @@ impl DFComponentGraph {
402465
.graph
403466
.node_weight_mut(tx_node_index)
404467
.ok_or(SchedulerError::DataflowGraphError)?;
468+
if tx_node.is_uncomputable {
469+
return Ok(());
470+
}
405471
tx_node.is_uncomputable = true;
472+
for (_idx, op) in tx_node.graph.graph.node_references() {
473+
self.results.push(DFGTxResult {
474+
transaction_id: tx_node.transaction_id.clone(),
475+
handle: op.result_handle.to_vec(),
476+
compressed_ct: Err(SchedulerError::MissingInputs.into()),
477+
});
478+
}
406479
for edge in edges.edges_directed(tx_node_index, Direction::Outgoing) {
407480
let dependent_tx_index = edge.target();
408481
self.set_uncomputable(dependent_tx_index, edges)?;

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,13 @@ async fn build_transaction_graph_and_execute<'a>(
448448
loop_ctx: &opentelemetry::Context,
449449
) -> Result<DFComponentGraph, Box<dyn std::error::Error + Send + Sync>> {
450450
let mut tx_graph = DFComponentGraph::default();
451-
tx_graph.build(tenant_txs)?;
451+
if let Err(e) = tx_graph.build(tenant_txs) {
452+
// If we had an error while building the graph, we don't
453+
// execute anything and return to allow any set results
454+
// (essentially errors) to be set in DB.
455+
warn!(target: "tfhe_worker", { error = %e }, "error while building transaction graph");
456+
return Ok(tx_graph);
457+
}
452458
let cts_to_query = tx_graph.needed_map.keys().cloned().collect::<Vec<_>>();
453459
let ciphertext_map =
454460
query_ciphertexts(&cts_to_query, *tenant_id, trx, tracer, loop_ctx).await?;

0 commit comments

Comments
 (0)