diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg.rs b/coprocessor/fhevm-engine/scheduler/src/dfg.rs index e2823c59fd..a4199531e7 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg.rs @@ -505,7 +505,10 @@ impl DFComponentGraph { .node_weight_mut(dependent_tx_index) .ok_or(SchedulerError::DataflowGraphError)?; dependent_tx.inputs.entry(handle.to_vec()).and_modify(|v| { - *v = Some(DFGTxInput::Value((result.ct.clone(), result.is_allowed))) + *v = Some(DFGTxInput::Compressed(( + result.compressed_ct.clone(), + result.is_allowed, + ))) }); } } else { @@ -524,14 +527,7 @@ impl DFComponentGraph { self.results.push(DFGTxResult { transaction_id: producer_tx.transaction_id.clone(), handle: handle.to_vec(), - compressed_ct: result.and_then(|rok| { - rok.compressed_ct - .map(|cct| (cct.0, cct.1)) - .ok_or_else(|| { - error!(target: "scheduler", {handle = ?hex::encode(handle) }, "Missing compressed ciphertext in task result"); - SchedulerError::SchedulerError.into() - }) - }), + compressed_ct: result.map(|rok| rok.compressed_ct), }); } } @@ -615,7 +611,7 @@ impl std::fmt::Debug for DFComponentGraph { pub struct DFGResult { pub handle: Handle, - pub result: Result)>>, + pub result: Result>, pub work_index: usize, } pub type OpEdge = u8; @@ -636,14 +632,18 @@ impl std::fmt::Debug for OpNode { impl OpNode { fn check_ready_inputs(&mut self, ct_map: &mut HashMap>) -> bool { for i in self.inputs.iter_mut() { - if !matches!(i, DFGTaskInput::Value(_)) { - let DFGTaskInput::Dependence(d) = i else { - return false; - }; - let Some(Some(DFGTxInput::Value((val, _)))) = ct_map.get(d) else { - return false; - }; - *i = DFGTaskInput::Value(val.clone()); + match i { + DFGTaskInput::Value(_) | DFGTaskInput::Compressed(_) => continue, + DFGTaskInput::Dependence(d) => { + let resolved = match ct_map.get(d) { + Some(Some(DFGTxInput::Value((val, _)))) => DFGTaskInput::Value(val.clone()), + Some(Some(DFGTxInput::Compressed((cct, _)))) => { + DFGTaskInput::Compressed(cct.clone()) + } + _ => return false, + }; + *i = resolved; + } } } true diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs b/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs index 25bc0a6ebb..43790e548e 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs @@ -251,21 +251,6 @@ impl<'a> Scheduler<'a> { } } -fn decompress_transaction_inputs( - inputs: &mut HashMap>, - gpu_idx: usize, -) -> Result { - let mut count = 0; - for txinput in inputs.values_mut() { - if let Some(DFGTxInput::Compressed(((t, c), allowed))) = txinput { - let decomp = SupportedFheCiphertexts::decompress(*t, c, gpu_idx)?; - *txinput = Some(DFGTxInput::Value((decomp, *allowed))); - count += 1; - } - } - Ok(count) -} - fn re_randomise_operation_inputs( cts: &mut [SupportedFheCiphertexts], opcode: i32, @@ -327,41 +312,10 @@ fn execute_partition( } continue 'tx; }; - *i = Some(DFGTxInput::Value((ct.ct.clone(), ct.is_allowed))); - } - } - - // Decompress ciphertexts - { - let _guard = tracing::info_span!( - "decompress_ciphertexts", - txn_id = %txn_id_short, - count = tracing::field::Empty, - ) - .entered(); - - match decompress_transaction_inputs(tx_inputs, gpu_idx) { - Ok(count) => { - tracing::Span::current().record("count", count as i64); - } - Err(e) => { - error!(target: "scheduler", {transaction_id = ?hex::encode(&tid), error = ?e }, - "Error while decompressing inputs"); - telemetry::set_current_span_error(&e); - for nidx in dfg.graph.node_identifiers() { - let Some(node) = dfg.graph.node_weight_mut(nidx) else { - error!(target: "scheduler", {index = ?nidx.index() }, "Wrong dataflow graph index"); - continue; - }; - if node.is_allowed { - res.insert( - node.result_handle.clone(), - Err(SchedulerError::DecompressionError.into()), - ); - } - } - continue 'tx; - } + *i = Some(DFGTxInput::Compressed(( + ct.compressed_ct.clone(), + ct.is_allowed, + ))); } } @@ -410,7 +364,7 @@ fn execute_partition( // Update input of consumers if let Ok(ref res) = result.1 { child_node.inputs[*edge.weight() as usize] = - DFGTaskInput::Value(res.0.clone()); + DFGTaskInput::Compressed(res.clone()); } } } @@ -422,8 +376,7 @@ fn execute_partition( res.insert( node.result_handle.clone(), result.1.map(|v| TaskResult { - ct: v.0, - compressed_ct: if node.is_allowed { v.1 } else { None }, + compressed_ct: v, is_allowed: node.is_allowed, transaction_id: tid.clone(), }), @@ -460,12 +413,35 @@ fn try_execute_node( } let mut cts = Vec::with_capacity(node.inputs.len()); for i in std::mem::take(&mut node.inputs) { - if let DFGTaskInput::Value(i) = i { - cts.push(i); - } else { - // That should not be possible as we called the checker. - error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs"); - return Err(SchedulerError::MissingInputs.into()); + match i { + DFGTaskInput::Value(v) => { + if !matches!(v, SupportedFheCiphertexts::Scalar(_)) { + error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, + "Consensus risk: non-scalar uncompressed ciphertext"); + } + cts.push(v); + } + DFGTaskInput::Compressed(cct) => { + let decompressed = SupportedFheCiphertexts::decompress( + cct.ct_type, + &cct.ct_bytes, + gpu_idx, + ) + .map_err(|e| { + error!( + target: "scheduler", + { handle = ?hex::encode(&node.result_handle), ct_type = cct.ct_type, error = ?e }, + "Error while decompressing op input" + ); + telemetry::set_current_span_error(&e); + SchedulerError::DecompressionError + })?; + cts.push(decompressed); + } + DFGTaskInput::Dependence(_) => { + error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs"); + return Err(SchedulerError::MissingInputs.into()); + } } } // Re-randomize inputs for this operation @@ -482,24 +458,20 @@ fn try_execute_node( RERAND_LATENCY_BATCH_HISTOGRAM.observe(elapsed.as_secs_f64()); } let opcode = node.opcode; - let is_allowed = node.is_allowed; Ok(run_computation( opcode, cts, node_index, - is_allowed, gpu_idx, transaction_id, )) } -type OpResult = Result<(SupportedFheCiphertexts, Option<(i16, Vec)>)>; - +type OpResult = Result; fn run_computation( operation: i32, inputs: Vec, graph_node_index: usize, - is_allowed: bool, gpu_idx: usize, transaction_id: &Handle, ) -> (usize, OpResult) { @@ -524,7 +496,7 @@ fn run_computation( tracing::Span::current().record("compressed_size", ct_bytes.len() as i64); ( graph_node_index, - Ok((inputs[0].clone(), Some((ct_type, ct_bytes)))), + Ok(CompressedCiphertext { ct_type, ct_bytes }), ) } Err(error) => { @@ -553,31 +525,30 @@ fn run_computation( match result { Ok(result) => { - if is_allowed { - // Compression span - let _guard = tracing::info_span!( - "compress_ciphertext", - txn_id = %txn_id_short, - ct_type = result.type_name(), - operation = op_name, - compressed_size = tracing::field::Empty, - ) - .entered(); - let ct_type = result.type_num(); - let compressed = result.compress(); - match compressed { - Ok(ct_bytes) => { - tracing::Span::current() - .record("compressed_size", ct_bytes.len() as i64); - (graph_node_index, Ok((result, Some((ct_type, ct_bytes))))) - } - Err(error) => { - telemetry::set_current_span_error(&error); - (graph_node_index, Err(error.into())) - } + // Compression span + let _guard = tracing::info_span!( + "compress_ciphertext", + txn_id = %txn_id_short, + ct_type = result.type_name(), + operation = op_name, + compressed_size = tracing::field::Empty, + ) + .entered(); + let ct_type = result.type_num(); + let compressed = result.compress(); + match compressed { + Ok(ct_bytes) => { + tracing::Span::current() + .record("compressed_size", ct_bytes.len() as i64); + ( + graph_node_index, + Ok(CompressedCiphertext { ct_type, ct_bytes }), + ) + } + Err(error) => { + telemetry::set_current_span_error(&error); + (graph_node_index, Err(error.into())) } - } else { - (graph_node_index, Ok((result, None))) } } Err(e) => { diff --git a/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs b/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs index da897021bb..968486dded 100644 --- a/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs +++ b/coprocessor/fhevm-engine/scheduler/src/dfg/types.rs @@ -1,16 +1,21 @@ use anyhow::Result; use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts}; +#[derive(Clone)] +pub struct CompressedCiphertext { + pub ct_type: i16, + pub ct_bytes: Vec, +} + pub struct TaskResult { - pub ct: SupportedFheCiphertexts, - pub compressed_ct: Option<(i16, Vec)>, + pub compressed_ct: CompressedCiphertext, pub is_allowed: bool, pub transaction_id: Handle, } pub struct DFGTxResult { pub handle: Handle, pub transaction_id: Handle, - pub compressed_ct: Result<(i16, Vec)>, + pub compressed_ct: Result, } impl std::fmt::Debug for DFGTxResult { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -30,7 +35,7 @@ impl std::fmt::Debug for DFGTxResult { #[derive(Clone)] pub enum DFGTxInput { Value((SupportedFheCiphertexts, bool)), - Compressed(((i16, Vec), bool)), + Compressed((CompressedCiphertext, bool)), } impl std::fmt::Debug for DFGTxInput { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -44,7 +49,7 @@ impl std::fmt::Debug for DFGTxInput { #[derive(Clone)] pub enum DFGTaskInput { Value(SupportedFheCiphertexts), - Compressed((i16, Vec)), + Compressed(CompressedCiphertext), Dependence(Handle), } impl std::fmt::Debug for DFGTaskInput { diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs index 6d3b987b5b..cff08dbe15 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs @@ -8,7 +8,7 @@ use fhevm_engine_common::{tfhe_ops::current_ciphertext_version, types::Supported use itertools::Itertools; use lazy_static::lazy_static; use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter}; -use scheduler::dfg::types::{DFGTxInput, SchedulerError}; +use scheduler::dfg::types::{CompressedCiphertext, DFGTxInput, SchedulerError}; use scheduler::dfg::{build_component_nodes, ComponentNode, DFComponentGraph, DFGOp}; use scheduler::dfg::{scheduler::Scheduler, types::DFGTaskInput}; use sqlx::types::Uuid; @@ -473,7 +473,13 @@ async fn build_transaction_graph_and_execute<'a>( for (handle, (ct_type, mut ct)) in ciphertext_map.into_iter() { tx_graph.add_input( &handle, - &DFGTxInput::Compressed(((ct_type, std::mem::take(&mut ct)), true)), + &DFGTxInput::Compressed(( + CompressedCiphertext { + ct_type, + ct_bytes: std::mem::take(&mut ct), + }, + true, + )), )?; } // Resolve deferred cross-transaction dependences: edges whose @@ -535,10 +541,10 @@ async fn upload_transaction_graph_results<'a>( let mut cts_to_insert = vec![]; for result in graph_results.into_iter() { match result.compressed_ct { - Ok((db_type, db_bytes)) => { + Ok(cct) => { cts_to_insert.push(( result.handle.clone(), - (db_bytes, (current_ciphertext_version(), db_type)), + (cct.ct_bytes, (current_ciphertext_version(), cct.ct_type)), )); handles_to_update.push((result.handle.clone(), result.transaction_id.clone())); WORK_ITEMS_PROCESSED_COUNTER.inc();