Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions coprocessor/fhevm-engine/scheduler/src/dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,10 @@ impl DFComponentGraph {
.node_weight_mut(dependent_tx_index)
.ok_or(SchedulerError::DataflowGraphError)?;
dependent_tx.inputs.entry(handle.to_vec()).and_modify(|v| {
*v = Some(DFGTxInput::Value((result.ct.clone(), result.is_allowed)))
*v = Some(DFGTxInput::Compressed((
result.compressed_ct.clone(),
result.is_allowed,
)))
});
}
} else {
Expand All @@ -524,14 +527,7 @@ impl DFComponentGraph {
self.results.push(DFGTxResult {
transaction_id: producer_tx.transaction_id.clone(),
handle: handle.to_vec(),
compressed_ct: result.and_then(|rok| {
rok.compressed_ct
.map(|cct| (cct.0, cct.1))
.ok_or_else(|| {
error!(target: "scheduler", {handle = ?hex::encode(handle) }, "Missing compressed ciphertext in task result");
SchedulerError::SchedulerError.into()
})
}),
compressed_ct: result.map(|rok| rok.compressed_ct),
});
}
}
Expand Down Expand Up @@ -615,7 +611,7 @@ impl std::fmt::Debug for DFComponentGraph {

pub struct DFGResult {
pub handle: Handle,
pub result: Result<Option<(i16, Vec<u8>)>>,
pub result: Result<Option<CompressedCiphertext>>,
pub work_index: usize,
}
pub type OpEdge = u8;
Expand All @@ -636,14 +632,18 @@ impl std::fmt::Debug for OpNode {
impl OpNode {
fn check_ready_inputs(&mut self, ct_map: &mut HashMap<Handle, Option<DFGTxInput>>) -> bool {
for i in self.inputs.iter_mut() {
if !matches!(i, DFGTaskInput::Value(_)) {
let DFGTaskInput::Dependence(d) = i else {
return false;
};
let Some(Some(DFGTxInput::Value((val, _)))) = ct_map.get(d) else {
return false;
};
*i = DFGTaskInput::Value(val.clone());
match i {
DFGTaskInput::Value(_) | DFGTaskInput::Compressed(_) => continue,
DFGTaskInput::Dependence(d) => {
let resolved = match ct_map.get(d) {
Some(Some(DFGTxInput::Value((val, _)))) => DFGTaskInput::Value(val.clone()),
Some(Some(DFGTxInput::Compressed((cct, _)))) => {
DFGTaskInput::Compressed(cct.clone())
}
_ => return false,
};
*i = resolved;
}
}
}
true
Expand Down
149 changes: 60 additions & 89 deletions coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,6 @@ impl<'a> Scheduler<'a> {
}
}

fn decompress_transaction_inputs(
inputs: &mut HashMap<Handle, Option<DFGTxInput>>,
gpu_idx: usize,
) -> Result<usize> {
let mut count = 0;
for txinput in inputs.values_mut() {
if let Some(DFGTxInput::Compressed(((t, c), allowed))) = txinput {
let decomp = SupportedFheCiphertexts::decompress(*t, c, gpu_idx)?;
*txinput = Some(DFGTxInput::Value((decomp, *allowed)));
count += 1;
}
}
Ok(count)
}

fn re_randomise_operation_inputs(
cts: &mut [SupportedFheCiphertexts],
opcode: i32,
Expand Down Expand Up @@ -327,41 +312,10 @@ fn execute_partition(
}
continue 'tx;
};
*i = Some(DFGTxInput::Value((ct.ct.clone(), ct.is_allowed)));
}
}

// Decompress ciphertexts
{
let _guard = tracing::info_span!(
"decompress_ciphertexts",
txn_id = %txn_id_short,
count = tracing::field::Empty,
)
.entered();

match decompress_transaction_inputs(tx_inputs, gpu_idx) {
Ok(count) => {
tracing::Span::current().record("count", count as i64);
}
Err(e) => {
error!(target: "scheduler", {transaction_id = ?hex::encode(&tid), error = ?e },
"Error while decompressing inputs");
telemetry::set_current_span_error(&e);
for nidx in dfg.graph.node_identifiers() {
let Some(node) = dfg.graph.node_weight_mut(nidx) else {
error!(target: "scheduler", {index = ?nidx.index() }, "Wrong dataflow graph index");
continue;
};
if node.is_allowed {
res.insert(
node.result_handle.clone(),
Err(SchedulerError::DecompressionError.into()),
);
}
}
continue 'tx;
}
*i = Some(DFGTxInput::Compressed((
ct.compressed_ct.clone(),
ct.is_allowed,
)));
}
}

Expand Down Expand Up @@ -410,7 +364,7 @@ fn execute_partition(
// Update input of consumers
if let Ok(ref res) = result.1 {
child_node.inputs[*edge.weight() as usize] =
DFGTaskInput::Value(res.0.clone());
DFGTaskInput::Compressed(res.clone());
}
}
}
Expand All @@ -422,8 +376,7 @@ fn execute_partition(
res.insert(
node.result_handle.clone(),
result.1.map(|v| TaskResult {
ct: v.0,
compressed_ct: if node.is_allowed { v.1 } else { None },
compressed_ct: v,
is_allowed: node.is_allowed,
transaction_id: tid.clone(),
}),
Expand Down Expand Up @@ -460,12 +413,35 @@ fn try_execute_node(
}
let mut cts = Vec::with_capacity(node.inputs.len());
for i in std::mem::take(&mut node.inputs) {
if let DFGTaskInput::Value(i) = i {
cts.push(i);
} else {
// That should not be possible as we called the checker.
error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs");
return Err(SchedulerError::MissingInputs.into());
match i {
DFGTaskInput::Value(v) => {
if !matches!(v, SupportedFheCiphertexts::Scalar(_)) {
error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) },
"Consensus risk: non-scalar uncompressed ciphertext");
}
cts.push(v);
}
DFGTaskInput::Compressed(cct) => {
let decompressed = SupportedFheCiphertexts::decompress(
cct.ct_type,
&cct.ct_bytes,
gpu_idx,
)
.map_err(|e| {
error!(
target: "scheduler",
{ handle = ?hex::encode(&node.result_handle), ct_type = cct.ct_type, error = ?e },
"Error while decompressing op input"
);
telemetry::set_current_span_error(&e);
SchedulerError::DecompressionError
})?;
cts.push(decompressed);
}
DFGTaskInput::Dependence(_) => {
error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs");
return Err(SchedulerError::MissingInputs.into());
}
}
}
// Re-randomize inputs for this operation
Expand All @@ -482,24 +458,20 @@ fn try_execute_node(
RERAND_LATENCY_BATCH_HISTOGRAM.observe(elapsed.as_secs_f64());
}
let opcode = node.opcode;
let is_allowed = node.is_allowed;
Ok(run_computation(
opcode,
cts,
node_index,
is_allowed,
gpu_idx,
transaction_id,
))
}

type OpResult = Result<(SupportedFheCiphertexts, Option<(i16, Vec<u8>)>)>;

type OpResult = Result<CompressedCiphertext>;
fn run_computation(
operation: i32,
inputs: Vec<SupportedFheCiphertexts>,
graph_node_index: usize,
is_allowed: bool,
gpu_idx: usize,
transaction_id: &Handle,
) -> (usize, OpResult) {
Expand All @@ -524,7 +496,7 @@ fn run_computation(
tracing::Span::current().record("compressed_size", ct_bytes.len() as i64);
(
graph_node_index,
Ok((inputs[0].clone(), Some((ct_type, ct_bytes)))),
Ok(CompressedCiphertext { ct_type, ct_bytes }),
)
}
Err(error) => {
Expand Down Expand Up @@ -553,31 +525,30 @@ fn run_computation(

match result {
Ok(result) => {
if is_allowed {
// Compression span
let _guard = tracing::info_span!(
"compress_ciphertext",
txn_id = %txn_id_short,
ct_type = result.type_name(),
operation = op_name,
compressed_size = tracing::field::Empty,
)
.entered();
let ct_type = result.type_num();
let compressed = result.compress();
match compressed {
Ok(ct_bytes) => {
tracing::Span::current()
.record("compressed_size", ct_bytes.len() as i64);
(graph_node_index, Ok((result, Some((ct_type, ct_bytes)))))
}
Err(error) => {
telemetry::set_current_span_error(&error);
(graph_node_index, Err(error.into()))
}
// Compression span
let _guard = tracing::info_span!(
"compress_ciphertext",
txn_id = %txn_id_short,
ct_type = result.type_name(),
operation = op_name,
compressed_size = tracing::field::Empty,
)
.entered();
let ct_type = result.type_num();
let compressed = result.compress();
match compressed {
Ok(ct_bytes) => {
tracing::Span::current()
.record("compressed_size", ct_bytes.len() as i64);
(
graph_node_index,
Ok(CompressedCiphertext { ct_type, ct_bytes }),
)
}
Err(error) => {
telemetry::set_current_span_error(&error);
(graph_node_index, Err(error.into()))
}
} else {
(graph_node_index, Ok((result, None)))
}
}
Err(e) => {
Expand Down
15 changes: 10 additions & 5 deletions coprocessor/fhevm-engine/scheduler/src/dfg/types.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use anyhow::Result;
use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts};

#[derive(Clone)]
pub struct CompressedCiphertext {
pub ct_type: i16,
pub ct_bytes: Vec<u8>,
}

pub struct TaskResult {
pub ct: SupportedFheCiphertexts,
pub compressed_ct: Option<(i16, Vec<u8>)>,
pub compressed_ct: CompressedCiphertext,
pub is_allowed: bool,
pub transaction_id: Handle,
}
pub struct DFGTxResult {
pub handle: Handle,
pub transaction_id: Handle,
pub compressed_ct: Result<(i16, Vec<u8>)>,
pub compressed_ct: Result<CompressedCiphertext>,
}
impl std::fmt::Debug for DFGTxResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -30,7 +35,7 @@ impl std::fmt::Debug for DFGTxResult {
#[derive(Clone)]
pub enum DFGTxInput {
Value((SupportedFheCiphertexts, bool)),
Compressed(((i16, Vec<u8>), bool)),
Compressed((CompressedCiphertext, bool)),
}
impl std::fmt::Debug for DFGTxInput {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -44,7 +49,7 @@ impl std::fmt::Debug for DFGTxInput {
#[derive(Clone)]
pub enum DFGTaskInput {
Value(SupportedFheCiphertexts),
Compressed((i16, Vec<u8>)),
Compressed(CompressedCiphertext),
Dependence(Handle),
}
impl std::fmt::Debug for DFGTaskInput {
Expand Down
14 changes: 10 additions & 4 deletions coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use fhevm_engine_common::{tfhe_ops::current_ciphertext_version, types::Supported
use itertools::Itertools;
use lazy_static::lazy_static;
use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter};
use scheduler::dfg::types::{DFGTxInput, SchedulerError};
use scheduler::dfg::types::{CompressedCiphertext, DFGTxInput, SchedulerError};
use scheduler::dfg::{build_component_nodes, ComponentNode, DFComponentGraph, DFGOp};
use scheduler::dfg::{scheduler::Scheduler, types::DFGTaskInput};
use sqlx::types::Uuid;
Expand Down Expand Up @@ -473,7 +473,13 @@ async fn build_transaction_graph_and_execute<'a>(
for (handle, (ct_type, mut ct)) in ciphertext_map.into_iter() {
tx_graph.add_input(
&handle,
&DFGTxInput::Compressed(((ct_type, std::mem::take(&mut ct)), true)),
&DFGTxInput::Compressed((
CompressedCiphertext {
ct_type,
ct_bytes: std::mem::take(&mut ct),
},
true,
)),
)?;
}
// Resolve deferred cross-transaction dependences: edges whose
Expand Down Expand Up @@ -535,10 +541,10 @@ async fn upload_transaction_graph_results<'a>(
let mut cts_to_insert = vec![];
for result in graph_results.into_iter() {
match result.compressed_ct {
Ok((db_type, db_bytes)) => {
Ok(cct) => {
cts_to_insert.push((
result.handle.clone(),
(db_bytes, (current_ciphertext_version(), db_type)),
(cct.ct_bytes, (current_ciphertext_version(), cct.ct_type)),
));
handles_to_update.push((result.handle.clone(), result.transaction_id.clone()));
WORK_ITEMS_PROCESSED_COUNTER.inc();
Expand Down
Loading