Skip to content

Commit 207fdf0

Browse files
antoniupopisaacdecoded
authored andcommitted
fix(coprocessor): force compress/decompress for all ciphertexts (#2036)
* fix(coprocessor): decompress all ciphertexts per operation * fix(coprocessor): sanity-check that only scalars are uncompressed * fix(coprocessor): add compressed ct type * fix(coprocessor): propagate DecompressionError
1 parent dab4d48 commit 207fdf0

File tree

4 files changed

+98
-116
lines changed

4 files changed

+98
-116
lines changed

coprocessor/fhevm-engine/scheduler/src/dfg.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,10 @@ impl DFComponentGraph {
505505
.node_weight_mut(dependent_tx_index)
506506
.ok_or(SchedulerError::DataflowGraphError)?;
507507
dependent_tx.inputs.entry(handle.to_vec()).and_modify(|v| {
508-
*v = Some(DFGTxInput::Value((result.ct.clone(), result.is_allowed)))
508+
*v = Some(DFGTxInput::Compressed((
509+
result.compressed_ct.clone(),
510+
result.is_allowed,
511+
)))
509512
});
510513
}
511514
} else {
@@ -524,14 +527,7 @@ impl DFComponentGraph {
524527
self.results.push(DFGTxResult {
525528
transaction_id: producer_tx.transaction_id.clone(),
526529
handle: handle.to_vec(),
527-
compressed_ct: result.and_then(|rok| {
528-
rok.compressed_ct
529-
.map(|cct| (cct.0, cct.1))
530-
.ok_or_else(|| {
531-
error!(target: "scheduler", {handle = ?hex::encode(handle) }, "Missing compressed ciphertext in task result");
532-
SchedulerError::SchedulerError.into()
533-
})
534-
}),
530+
compressed_ct: result.map(|rok| rok.compressed_ct),
535531
});
536532
}
537533
}
@@ -615,7 +611,7 @@ impl std::fmt::Debug for DFComponentGraph {
615611

616612
pub struct DFGResult {
617613
pub handle: Handle,
618-
pub result: Result<Option<(i16, Vec<u8>)>>,
614+
pub result: Result<Option<CompressedCiphertext>>,
619615
pub work_index: usize,
620616
}
621617
pub type OpEdge = u8;
@@ -636,14 +632,18 @@ impl std::fmt::Debug for OpNode {
636632
impl OpNode {
637633
fn check_ready_inputs(&mut self, ct_map: &mut HashMap<Handle, Option<DFGTxInput>>) -> bool {
638634
for i in self.inputs.iter_mut() {
639-
if !matches!(i, DFGTaskInput::Value(_)) {
640-
let DFGTaskInput::Dependence(d) = i else {
641-
return false;
642-
};
643-
let Some(Some(DFGTxInput::Value((val, _)))) = ct_map.get(d) else {
644-
return false;
645-
};
646-
*i = DFGTaskInput::Value(val.clone());
635+
match i {
636+
DFGTaskInput::Value(_) | DFGTaskInput::Compressed(_) => continue,
637+
DFGTaskInput::Dependence(d) => {
638+
let resolved = match ct_map.get(d) {
639+
Some(Some(DFGTxInput::Value((val, _)))) => DFGTaskInput::Value(val.clone()),
640+
Some(Some(DFGTxInput::Compressed((cct, _)))) => {
641+
DFGTaskInput::Compressed(cct.clone())
642+
}
643+
_ => return false,
644+
};
645+
*i = resolved;
646+
}
647647
}
648648
}
649649
true

coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs

Lines changed: 60 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -251,21 +251,6 @@ impl<'a> Scheduler<'a> {
251251
}
252252
}
253253

254-
fn decompress_transaction_inputs(
255-
inputs: &mut HashMap<Handle, Option<DFGTxInput>>,
256-
gpu_idx: usize,
257-
) -> Result<usize> {
258-
let mut count = 0;
259-
for txinput in inputs.values_mut() {
260-
if let Some(DFGTxInput::Compressed(((t, c), allowed))) = txinput {
261-
let decomp = SupportedFheCiphertexts::decompress(*t, c, gpu_idx)?;
262-
*txinput = Some(DFGTxInput::Value((decomp, *allowed)));
263-
count += 1;
264-
}
265-
}
266-
Ok(count)
267-
}
268-
269254
fn re_randomise_operation_inputs(
270255
cts: &mut [SupportedFheCiphertexts],
271256
opcode: i32,
@@ -327,41 +312,10 @@ fn execute_partition(
327312
}
328313
continue 'tx;
329314
};
330-
*i = Some(DFGTxInput::Value((ct.ct.clone(), ct.is_allowed)));
331-
}
332-
}
333-
334-
// Decompress ciphertexts
335-
{
336-
let _guard = tracing::info_span!(
337-
"decompress_ciphertexts",
338-
txn_id = %txn_id_short,
339-
count = tracing::field::Empty,
340-
)
341-
.entered();
342-
343-
match decompress_transaction_inputs(tx_inputs, gpu_idx) {
344-
Ok(count) => {
345-
tracing::Span::current().record("count", count as i64);
346-
}
347-
Err(e) => {
348-
error!(target: "scheduler", {transaction_id = ?hex::encode(&tid), error = ?e },
349-
"Error while decompressing inputs");
350-
telemetry::set_current_span_error(&e);
351-
for nidx in dfg.graph.node_identifiers() {
352-
let Some(node) = dfg.graph.node_weight_mut(nidx) else {
353-
error!(target: "scheduler", {index = ?nidx.index() }, "Wrong dataflow graph index");
354-
continue;
355-
};
356-
if node.is_allowed {
357-
res.insert(
358-
node.result_handle.clone(),
359-
Err(SchedulerError::DecompressionError.into()),
360-
);
361-
}
362-
}
363-
continue 'tx;
364-
}
315+
*i = Some(DFGTxInput::Compressed((
316+
ct.compressed_ct.clone(),
317+
ct.is_allowed,
318+
)));
365319
}
366320
}
367321

@@ -410,7 +364,7 @@ fn execute_partition(
410364
// Update input of consumers
411365
if let Ok(ref res) = result.1 {
412366
child_node.inputs[*edge.weight() as usize] =
413-
DFGTaskInput::Value(res.0.clone());
367+
DFGTaskInput::Compressed(res.clone());
414368
}
415369
}
416370
}
@@ -422,8 +376,7 @@ fn execute_partition(
422376
res.insert(
423377
node.result_handle.clone(),
424378
result.1.map(|v| TaskResult {
425-
ct: v.0,
426-
compressed_ct: if node.is_allowed { v.1 } else { None },
379+
compressed_ct: v,
427380
is_allowed: node.is_allowed,
428381
transaction_id: tid.clone(),
429382
}),
@@ -460,12 +413,35 @@ fn try_execute_node(
460413
}
461414
let mut cts = Vec::with_capacity(node.inputs.len());
462415
for i in std::mem::take(&mut node.inputs) {
463-
if let DFGTaskInput::Value(i) = i {
464-
cts.push(i);
465-
} else {
466-
// That should not be possible as we called the checker.
467-
error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs");
468-
return Err(SchedulerError::MissingInputs.into());
416+
match i {
417+
DFGTaskInput::Value(v) => {
418+
if !matches!(v, SupportedFheCiphertexts::Scalar(_)) {
419+
error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) },
420+
"Consensus risk: non-scalar uncompressed ciphertext");
421+
}
422+
cts.push(v);
423+
}
424+
DFGTaskInput::Compressed(cct) => {
425+
let decompressed = SupportedFheCiphertexts::decompress(
426+
cct.ct_type,
427+
&cct.ct_bytes,
428+
gpu_idx,
429+
)
430+
.map_err(|e| {
431+
error!(
432+
target: "scheduler",
433+
{ handle = ?hex::encode(&node.result_handle), ct_type = cct.ct_type, error = ?e },
434+
"Error while decompressing op input"
435+
);
436+
telemetry::set_current_span_error(&e);
437+
SchedulerError::DecompressionError
438+
})?;
439+
cts.push(decompressed);
440+
}
441+
DFGTaskInput::Dependence(_) => {
442+
error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs");
443+
return Err(SchedulerError::MissingInputs.into());
444+
}
469445
}
470446
}
471447
// Re-randomize inputs for this operation
@@ -482,24 +458,20 @@ fn try_execute_node(
482458
RERAND_LATENCY_BATCH_HISTOGRAM.observe(elapsed.as_secs_f64());
483459
}
484460
let opcode = node.opcode;
485-
let is_allowed = node.is_allowed;
486461
Ok(run_computation(
487462
opcode,
488463
cts,
489464
node_index,
490-
is_allowed,
491465
gpu_idx,
492466
transaction_id,
493467
))
494468
}
495469

496-
type OpResult = Result<(SupportedFheCiphertexts, Option<(i16, Vec<u8>)>)>;
497-
470+
type OpResult = Result<CompressedCiphertext>;
498471
fn run_computation(
499472
operation: i32,
500473
inputs: Vec<SupportedFheCiphertexts>,
501474
graph_node_index: usize,
502-
is_allowed: bool,
503475
gpu_idx: usize,
504476
transaction_id: &Handle,
505477
) -> (usize, OpResult) {
@@ -524,7 +496,7 @@ fn run_computation(
524496
tracing::Span::current().record("compressed_size", ct_bytes.len() as i64);
525497
(
526498
graph_node_index,
527-
Ok((inputs[0].clone(), Some((ct_type, ct_bytes)))),
499+
Ok(CompressedCiphertext { ct_type, ct_bytes }),
528500
)
529501
}
530502
Err(error) => {
@@ -553,31 +525,30 @@ fn run_computation(
553525

554526
match result {
555527
Ok(result) => {
556-
if is_allowed {
557-
// Compression span
558-
let _guard = tracing::info_span!(
559-
"compress_ciphertext",
560-
txn_id = %txn_id_short,
561-
ct_type = result.type_name(),
562-
operation = op_name,
563-
compressed_size = tracing::field::Empty,
564-
)
565-
.entered();
566-
let ct_type = result.type_num();
567-
let compressed = result.compress();
568-
match compressed {
569-
Ok(ct_bytes) => {
570-
tracing::Span::current()
571-
.record("compressed_size", ct_bytes.len() as i64);
572-
(graph_node_index, Ok((result, Some((ct_type, ct_bytes)))))
573-
}
574-
Err(error) => {
575-
telemetry::set_current_span_error(&error);
576-
(graph_node_index, Err(error.into()))
577-
}
528+
// Compression span
529+
let _guard = tracing::info_span!(
530+
"compress_ciphertext",
531+
txn_id = %txn_id_short,
532+
ct_type = result.type_name(),
533+
operation = op_name,
534+
compressed_size = tracing::field::Empty,
535+
)
536+
.entered();
537+
let ct_type = result.type_num();
538+
let compressed = result.compress();
539+
match compressed {
540+
Ok(ct_bytes) => {
541+
tracing::Span::current()
542+
.record("compressed_size", ct_bytes.len() as i64);
543+
(
544+
graph_node_index,
545+
Ok(CompressedCiphertext { ct_type, ct_bytes }),
546+
)
547+
}
548+
Err(error) => {
549+
telemetry::set_current_span_error(&error);
550+
(graph_node_index, Err(error.into()))
578551
}
579-
} else {
580-
(graph_node_index, Ok((result, None)))
581552
}
582553
}
583554
Err(e) => {

coprocessor/fhevm-engine/scheduler/src/dfg/types.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
use anyhow::Result;
22
use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts};
33

4+
#[derive(Clone)]
5+
pub struct CompressedCiphertext {
6+
pub ct_type: i16,
7+
pub ct_bytes: Vec<u8>,
8+
}
9+
410
pub struct TaskResult {
5-
pub ct: SupportedFheCiphertexts,
6-
pub compressed_ct: Option<(i16, Vec<u8>)>,
11+
pub compressed_ct: CompressedCiphertext,
712
pub is_allowed: bool,
813
pub transaction_id: Handle,
914
}
1015
pub struct DFGTxResult {
1116
pub handle: Handle,
1217
pub transaction_id: Handle,
13-
pub compressed_ct: Result<(i16, Vec<u8>)>,
18+
pub compressed_ct: Result<CompressedCiphertext>,
1419
}
1520
impl std::fmt::Debug for DFGTxResult {
1621
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -30,7 +35,7 @@ impl std::fmt::Debug for DFGTxResult {
3035
#[derive(Clone)]
3136
pub enum DFGTxInput {
3237
Value((SupportedFheCiphertexts, bool)),
33-
Compressed(((i16, Vec<u8>), bool)),
38+
Compressed((CompressedCiphertext, bool)),
3439
}
3540
impl std::fmt::Debug for DFGTxInput {
3641
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -44,7 +49,7 @@ impl std::fmt::Debug for DFGTxInput {
4449
#[derive(Clone)]
4550
pub enum DFGTaskInput {
4651
Value(SupportedFheCiphertexts),
47-
Compressed((i16, Vec<u8>)),
52+
Compressed(CompressedCiphertext),
4853
Dependence(Handle),
4954
}
5055
impl std::fmt::Debug for DFGTaskInput {

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use fhevm_engine_common::{tfhe_ops::current_ciphertext_version, types::Supported
88
use itertools::Itertools;
99
use lazy_static::lazy_static;
1010
use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter};
11-
use scheduler::dfg::types::{DFGTxInput, SchedulerError};
11+
use scheduler::dfg::types::{CompressedCiphertext, DFGTxInput, SchedulerError};
1212
use scheduler::dfg::{build_component_nodes, ComponentNode, DFComponentGraph, DFGOp};
1313
use scheduler::dfg::{scheduler::Scheduler, types::DFGTaskInput};
1414
use sqlx::types::Uuid;
@@ -473,7 +473,13 @@ async fn build_transaction_graph_and_execute<'a>(
473473
for (handle, (ct_type, mut ct)) in ciphertext_map.into_iter() {
474474
tx_graph.add_input(
475475
&handle,
476-
&DFGTxInput::Compressed(((ct_type, std::mem::take(&mut ct)), true)),
476+
&DFGTxInput::Compressed((
477+
CompressedCiphertext {
478+
ct_type,
479+
ct_bytes: std::mem::take(&mut ct),
480+
},
481+
true,
482+
)),
477483
)?;
478484
}
479485
// Resolve deferred cross-transaction dependences: edges whose
@@ -535,10 +541,10 @@ async fn upload_transaction_graph_results<'a>(
535541
let mut cts_to_insert = vec![];
536542
for result in graph_results.into_iter() {
537543
match result.compressed_ct {
538-
Ok((db_type, db_bytes)) => {
544+
Ok(cct) => {
539545
cts_to_insert.push((
540546
result.handle.clone(),
541-
(db_bytes, (current_ciphertext_version(), db_type)),
547+
(cct.ct_bytes, (current_ciphertext_version(), cct.ct_type)),
542548
));
543549
handles_to_update.push((result.handle.clone(), result.transaction_id.clone()));
544550
WORK_ITEMS_PROCESSED_COUNTER.inc();

0 commit comments

Comments
 (0)