From e49fbb12384142a167f8ee6ef3f234ca9d2066f0 Mon Sep 17 00:00:00 2001 From: Antoniu Pop Date: Mon, 2 Mar 2026 10:16:40 +0000 Subject: [PATCH 1/4] fix(coprocessor): decompress all ciphertexts per operation --- coprocessor/fhevm-engine/scheduler/src/dfg.rs | 34 ++--- .../scheduler/src/dfg/scheduler.rs | 130 +++++------------- .../fhevm-engine/scheduler/src/dfg/types.rs | 4 +- 3 files changed, 57 insertions(+), 111 deletions(-) diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg.rs b/coprocessor/fhevm-engine/scheduler/src/dfg.rs index e2823c59fd..ff6e6d87af 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg.rs @@ -505,7 +505,10 @@ impl DFComponentGraph { .node_weight_mut(dependent_tx_index) .ok_or(SchedulerError::DataflowGraphError)?; dependent_tx.inputs.entry(handle.to_vec()).and_modify(|v| { - *v = Some(DFGTxInput::Value((result.ct.clone(), result.is_allowed))) + *v = Some(DFGTxInput::Compressed(( + (result.ct_type, result.compressed_ct.clone()), + result.is_allowed, + ))) }); } } else { @@ -524,14 +527,7 @@ impl DFComponentGraph { self.results.push(DFGTxResult { transaction_id: producer_tx.transaction_id.clone(), handle: handle.to_vec(), - compressed_ct: result.and_then(|rok| { - rok.compressed_ct - .map(|cct| (cct.0, cct.1)) - .ok_or_else(|| { - error!(target: "scheduler", {handle = ?hex::encode(handle) }, "Missing compressed ciphertext in task result"); - SchedulerError::SchedulerError.into() - }) - }), + compressed_ct: result.map(|rok| (rok.ct_type, rok.compressed_ct)), }); } } @@ -636,14 +632,18 @@ impl std::fmt::Debug for OpNode { impl OpNode { fn check_ready_inputs(&mut self, ct_map: &mut HashMap>) -> bool { for i in self.inputs.iter_mut() { - if !matches!(i, DFGTaskInput::Value(_)) { - let DFGTaskInput::Dependence(d) = i else { - return false; - }; - let Some(Some(DFGTxInput::Value((val, _)))) = ct_map.get(d) else { - return false; - }; - *i = DFGTaskInput::Value(val.clone()); + match i { + DFGTaskInput::Value(_) | DFGTaskInput::Compressed(_) => continue, + DFGTaskInput::Dependence(d) => { + let resolved = match ct_map.get(d) { + Some(Some(DFGTxInput::Value((val, _)))) => DFGTaskInput::Value(val.clone()), + Some(Some(DFGTxInput::Compressed(((t, c), _)))) => { + DFGTaskInput::Compressed((*t, c.clone())) + } + _ => return false, + }; + *i = resolved; + } } } true diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs b/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs index 25bc0a6ebb..4a19af3ff5 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs @@ -251,21 +251,6 @@ impl<'a> Scheduler<'a> { } } -fn decompress_transaction_inputs( - inputs: &mut HashMap>, - gpu_idx: usize, -) -> Result { - let mut count = 0; - for txinput in inputs.values_mut() { - if let Some(DFGTxInput::Compressed(((t, c), allowed))) = txinput { - let decomp = SupportedFheCiphertexts::decompress(*t, c, gpu_idx)?; - *txinput = Some(DFGTxInput::Value((decomp, *allowed))); - count += 1; - } - } - Ok(count) -} - fn re_randomise_operation_inputs( cts: &mut [SupportedFheCiphertexts], opcode: i32, @@ -327,41 +312,10 @@ fn execute_partition( } continue 'tx; }; - *i = Some(DFGTxInput::Value((ct.ct.clone(), ct.is_allowed))); - } - } - - // Decompress ciphertexts - { - let _guard = tracing::info_span!( - "decompress_ciphertexts", - txn_id = %txn_id_short, - count = tracing::field::Empty, - ) - .entered(); - - match decompress_transaction_inputs(tx_inputs, gpu_idx) { - Ok(count) => { - tracing::Span::current().record("count", count as i64); - } - Err(e) => { - error!(target: "scheduler", {transaction_id = ?hex::encode(&tid), error = ?e }, - "Error while decompressing inputs"); - telemetry::set_current_span_error(&e); - for nidx in dfg.graph.node_identifiers() { - let Some(node) = dfg.graph.node_weight_mut(nidx) else { - error!(target: "scheduler", {index = ?nidx.index() }, "Wrong dataflow graph index"); - continue; - }; - if node.is_allowed { - res.insert( - node.result_handle.clone(), - Err(SchedulerError::DecompressionError.into()), - ); - } - } - continue 'tx; - } + *i = Some(DFGTxInput::Compressed(( + (ct.ct_type, ct.compressed_ct.clone()), + ct.is_allowed, + ))); } } @@ -410,7 +364,7 @@ fn execute_partition( // Update input of consumers if let Ok(ref res) = result.1 { child_node.inputs[*edge.weight() as usize] = - DFGTaskInput::Value(res.0.clone()); + DFGTaskInput::Compressed((res.0, res.1.clone())); } } } @@ -422,8 +376,8 @@ fn execute_partition( res.insert( node.result_handle.clone(), result.1.map(|v| TaskResult { - ct: v.0, - compressed_ct: if node.is_allowed { v.1 } else { None }, + ct_type: v.0, + compressed_ct: v.1, is_allowed: node.is_allowed, transaction_id: tid.clone(), }), @@ -460,12 +414,15 @@ fn try_execute_node( } let mut cts = Vec::with_capacity(node.inputs.len()); for i in std::mem::take(&mut node.inputs) { - if let DFGTaskInput::Value(i) = i { - cts.push(i); - } else { - // That should not be possible as we called the checker. - error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs"); - return Err(SchedulerError::MissingInputs.into()); + match i { + DFGTaskInput::Value(v) => cts.push(v), + DFGTaskInput::Compressed((t, c)) => { + cts.push(SupportedFheCiphertexts::decompress(t, &c, gpu_idx)?); + } + DFGTaskInput::Dependence(_) => { + error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs"); + return Err(SchedulerError::MissingInputs.into()); + } } } // Re-randomize inputs for this operation @@ -482,24 +439,20 @@ fn try_execute_node( RERAND_LATENCY_BATCH_HISTOGRAM.observe(elapsed.as_secs_f64()); } let opcode = node.opcode; - let is_allowed = node.is_allowed; Ok(run_computation( opcode, cts, node_index, - is_allowed, gpu_idx, transaction_id, )) } -type OpResult = Result<(SupportedFheCiphertexts, Option<(i16, Vec)>)>; - +type OpResult = Result<(i16, Vec)>; fn run_computation( operation: i32, inputs: Vec, graph_node_index: usize, - is_allowed: bool, gpu_idx: usize, transaction_id: &Handle, ) -> (usize, OpResult) { @@ -522,10 +475,7 @@ fn run_computation( match compressed { Ok(ct_bytes) => { tracing::Span::current().record("compressed_size", ct_bytes.len() as i64); - ( - graph_node_index, - Ok((inputs[0].clone(), Some((ct_type, ct_bytes)))), - ) + (graph_node_index, Ok((ct_type, ct_bytes))) } Err(error) => { telemetry::set_current_span_error(&error); @@ -553,31 +503,27 @@ fn run_computation( match result { Ok(result) => { - if is_allowed { - // Compression span - let _guard = tracing::info_span!( - "compress_ciphertext", - txn_id = %txn_id_short, - ct_type = result.type_name(), - operation = op_name, - compressed_size = tracing::field::Empty, - ) - .entered(); - let ct_type = result.type_num(); - let compressed = result.compress(); - match compressed { - Ok(ct_bytes) => { - tracing::Span::current() - .record("compressed_size", ct_bytes.len() as i64); - (graph_node_index, Ok((result, Some((ct_type, ct_bytes))))) - } - Err(error) => { - telemetry::set_current_span_error(&error); - (graph_node_index, Err(error.into())) - } + // Compression span + let _guard = tracing::info_span!( + "compress_ciphertext", + txn_id = %txn_id_short, + ct_type = result.type_name(), + operation = op_name, + compressed_size = tracing::field::Empty, + ) + .entered(); + let ct_type = result.type_num(); + let compressed = result.compress(); + match compressed { + Ok(ct_bytes) => { + tracing::Span::current() + .record("compressed_size", ct_bytes.len() as i64); + (graph_node_index, Ok((ct_type, ct_bytes))) + } + Err(error) => { + telemetry::set_current_span_error(&error); + (graph_node_index, Err(error.into())) } - } else { - (graph_node_index, Ok((result, None))) } } Err(e) => { diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs b/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs index da897021bb..8cc58a5b1c 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs @@ -2,8 +2,8 @@ use anyhow::Result; use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts}; pub struct TaskResult { - pub ct: SupportedFheCiphertexts, - pub compressed_ct: Option<(i16, Vec)>, + pub ct_type: i16, + pub compressed_ct: Vec, pub is_allowed: bool, pub transaction_id: Handle, } From a1cb0bee39c3b4ad1a737a59a3e1ccb1f0a6ab8f Mon Sep 17 00:00:00 2001 From: Antoniu Pop Date: Mon, 2 Mar 2026 12:04:52 +0000 Subject: [PATCH 2/4] fix(coprocessor): sanity-check that only scalars are uncompressed --- coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs b/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs index 4a19af3ff5..d41d722fed 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs @@ -415,7 +415,13 @@ fn try_execute_node( let mut cts = Vec::with_capacity(node.inputs.len()); for i in std::mem::take(&mut node.inputs) { match i { - DFGTaskInput::Value(v) => cts.push(v), + DFGTaskInput::Value(v) => { + if !matches!(v, SupportedFheCiphertexts::Scalar(_)) { + error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, + "Consensus risk: non-scalar uncompressed ciphertext"); + } + cts.push(v); + } DFGTaskInput::Compressed((t, c)) => { cts.push(SupportedFheCiphertexts::decompress(t, &c, gpu_idx)?); } From 77081d06579ced408a7bdf68cfa4c71c41da5d15 Mon Sep 17 00:00:00 2001 From: Antoniu Pop Date: Mon, 2 Mar 2026 12:19:10 +0000 Subject: [PATCH 3/4] fix(coprocessor): add compressed ct type --- coprocessor/fhevm-engine/scheduler/src/dfg.rs | 10 +++---- .../scheduler/src/dfg/scheduler.rs | 27 ++++++++++++------- .../fhevm-engine/scheduler/src/dfg/types.rs | 15 +++++++---- .../tfhe-worker/src/tfhe_worker.rs | 14 +++++++--- 4 files changed, 43 insertions(+), 23 deletions(-) diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg.rs b/coprocessor/fhevm-engine/scheduler/src/dfg.rs index ff6e6d87af..a4199531e7 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg.rs @@ -506,7 +506,7 @@ impl DFComponentGraph { .ok_or(SchedulerError::DataflowGraphError)?; dependent_tx.inputs.entry(handle.to_vec()).and_modify(|v| { *v = Some(DFGTxInput::Compressed(( - (result.ct_type, result.compressed_ct.clone()), + result.compressed_ct.clone(), result.is_allowed, ))) }); @@ -527,7 +527,7 @@ impl DFComponentGraph { self.results.push(DFGTxResult { transaction_id: producer_tx.transaction_id.clone(), handle: handle.to_vec(), - compressed_ct: result.map(|rok| (rok.ct_type, rok.compressed_ct)), + compressed_ct: result.map(|rok| rok.compressed_ct), }); } } @@ -611,7 +611,7 @@ impl std::fmt::Debug for DFComponentGraph { pub struct DFGResult { pub handle: Handle, - pub result: Result)>>, + pub result: Result>, pub work_index: usize, } pub type OpEdge = u8; @@ -637,8 +637,8 @@ impl OpNode { DFGTaskInput::Dependence(d) => { let resolved = match ct_map.get(d) { Some(Some(DFGTxInput::Value((val, _)))) => DFGTaskInput::Value(val.clone()), - Some(Some(DFGTxInput::Compressed(((t, c), _)))) => { - DFGTaskInput::Compressed((*t, c.clone())) + Some(Some(DFGTxInput::Compressed((cct, _)))) => { + DFGTaskInput::Compressed(cct.clone()) } _ => return false, }; diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs b/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs index d41d722fed..7f472ab0dd 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs @@ -313,7 +313,7 @@ fn execute_partition( continue 'tx; }; *i = Some(DFGTxInput::Compressed(( - (ct.ct_type, ct.compressed_ct.clone()), + ct.compressed_ct.clone(), ct.is_allowed, ))); } @@ -364,7 +364,7 @@ fn execute_partition( // Update input of consumers if let Ok(ref res) = result.1 { child_node.inputs[*edge.weight() as usize] = - DFGTaskInput::Compressed((res.0, res.1.clone())); + DFGTaskInput::Compressed(res.clone()); } } } @@ -376,8 +376,7 @@ fn execute_partition( res.insert( node.result_handle.clone(), result.1.map(|v| TaskResult { - ct_type: v.0, - compressed_ct: v.1, + compressed_ct: v, is_allowed: node.is_allowed, transaction_id: tid.clone(), }), @@ -422,8 +421,12 @@ fn try_execute_node( } cts.push(v); } - DFGTaskInput::Compressed((t, c)) => { - cts.push(SupportedFheCiphertexts::decompress(t, &c, gpu_idx)?); + DFGTaskInput::Compressed(cct) => { + cts.push(SupportedFheCiphertexts::decompress( + cct.ct_type, + &cct.ct_bytes, + gpu_idx, + )?); } DFGTaskInput::Dependence(_) => { error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs"); @@ -454,7 +457,7 @@ fn try_execute_node( )) } -type OpResult = Result<(i16, Vec)>; +type OpResult = Result; fn run_computation( operation: i32, inputs: Vec, @@ -481,7 +484,10 @@ fn run_computation( match compressed { Ok(ct_bytes) => { tracing::Span::current().record("compressed_size", ct_bytes.len() as i64); - (graph_node_index, Ok((ct_type, ct_bytes))) + ( + graph_node_index, + Ok(CompressedCiphertext { ct_type, ct_bytes }), + ) } Err(error) => { telemetry::set_current_span_error(&error); @@ -524,7 +530,10 @@ fn run_computation( Ok(ct_bytes) => { tracing::Span::current() .record("compressed_size", ct_bytes.len() as i64); - (graph_node_index, Ok((ct_type, ct_bytes))) + ( + graph_node_index, + Ok(CompressedCiphertext { ct_type, ct_bytes }), + ) } Err(error) => { telemetry::set_current_span_error(&error); diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs b/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs index 8cc58a5b1c..968486dded 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs @@ -1,16 +1,21 @@ use anyhow::Result; use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts}; -pub struct TaskResult { +#[derive(Clone)] +pub struct CompressedCiphertext { pub ct_type: i16, - pub compressed_ct: Vec, + pub ct_bytes: Vec, +} + +pub struct TaskResult { + pub compressed_ct: CompressedCiphertext, pub is_allowed: bool, pub transaction_id: Handle, } pub struct DFGTxResult { pub handle: Handle, pub transaction_id: Handle, - pub compressed_ct: Result<(i16, Vec)>, + pub compressed_ct: Result, } impl std::fmt::Debug for DFGTxResult { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -30,7 +35,7 @@ impl std::fmt::Debug for DFGTxResult { #[derive(Clone)] pub enum DFGTxInput { Value((SupportedFheCiphertexts, bool)), - Compressed(((i16, Vec), bool)), + Compressed((CompressedCiphertext, bool)), } impl std::fmt::Debug for DFGTxInput { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -44,7 +49,7 @@ impl std::fmt::Debug for DFGTxInput { #[derive(Clone)] pub enum DFGTaskInput { Value(SupportedFheCiphertexts), - Compressed((i16, Vec)), + Compressed(CompressedCiphertext), Dependence(Handle), } impl std::fmt::Debug for DFGTaskInput { diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs index 6d3b987b5b..cff08dbe15 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs @@ -8,7 +8,7 @@ use fhevm_engine_common::{tfhe_ops::current_ciphertext_version, types::Supported use itertools::Itertools; use lazy_static::lazy_static; use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter}; -use scheduler::dfg::types::{DFGTxInput, SchedulerError}; +use scheduler::dfg::types::{CompressedCiphertext, DFGTxInput, SchedulerError}; use scheduler::dfg::{build_component_nodes, ComponentNode, DFComponentGraph, DFGOp}; use scheduler::dfg::{scheduler::Scheduler, types::DFGTaskInput}; use sqlx::types::Uuid; @@ -473,7 +473,13 @@ async fn build_transaction_graph_and_execute<'a>( for (handle, (ct_type, mut ct)) in ciphertext_map.into_iter() { tx_graph.add_input( &handle, - &DFGTxInput::Compressed(((ct_type, std::mem::take(&mut ct)), true)), + &DFGTxInput::Compressed(( + CompressedCiphertext { + ct_type, + ct_bytes: std::mem::take(&mut ct), + }, + true, + )), )?; } // Resolve deferred cross-transaction dependences: edges whose @@ -535,10 +541,10 @@ async fn upload_transaction_graph_results<'a>( let mut cts_to_insert = vec![]; for result in graph_results.into_iter() { match result.compressed_ct { - Ok((db_type, db_bytes)) => { + Ok(cct) => { cts_to_insert.push(( result.handle.clone(), - (db_bytes, (current_ciphertext_version(), db_type)), + (cct.ct_bytes, (current_ciphertext_version(), cct.ct_type)), )); handles_to_update.push((result.handle.clone(), result.transaction_id.clone())); WORK_ITEMS_PROCESSED_COUNTER.inc(); From 42a2e6fbd5323359f3b6f90e0e45a5d1289879a9 Mon Sep 17 00:00:00 2001 From: Antoniu Pop Date: Wed, 4 Mar 2026 19:50:25 +0000 Subject: [PATCH 4/4] fix(coprocessor): propagate DecompressionError --- .../scheduler/src/dfg/scheduler.rs | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs b/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs index 7f472ab0dd..43790e548e 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs @@ -422,11 +422,21 @@ fn try_execute_node( cts.push(v); } DFGTaskInput::Compressed(cct) => { - cts.push(SupportedFheCiphertexts::decompress( - cct.ct_type, - &cct.ct_bytes, - gpu_idx, - )?); + let decompressed = SupportedFheCiphertexts::decompress( + cct.ct_type, + &cct.ct_bytes, + gpu_idx, + ) + .map_err(|e| { + error!( + target: "scheduler", + { handle = ?hex::encode(&node.result_handle), ct_type = cct.ct_type, error = ?e }, + "Error while decompressing op input" + ); + telemetry::set_current_span_error(&e); + SchedulerError::DecompressionError + })?; + cts.push(decompressed); } DFGTaskInput::Dependence(_) => { error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs");