Skip to content

Commit 5c3e33e

Browse files
committed
fix(coprocessor): useblock hash to identify transaction
fixes a bug where 2 transactions are swapped during a reorg butkeeping the sametx id and being dependent on each other the 4 tx are merged and create a cycle
1 parent 803f104 commit 5c3e33e

File tree

10 files changed

+128
-64
lines changed

10 files changed

+128
-64
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
ALTER TABLE computations
2+
ADD COLUMN IF NOT EXISTS block_hash BYTEA NOT NULL DEFAULT '\x00'::BYTEA,
3+
ADD COLUMN IF NOT EXISTS block_number BIGINT NOT NULL DEFAULT 0;
4+
5+
-- For next release, we should remove the default values for block_hash and block_number.
6+
-- ALTER TABLE computations ALTER COLUMN block_hash DROP DEFAULT;
7+
-- ALTER TABLE computations ALTER COLUMN block_number DROP DEFAULT;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- no-transaction
2+
3+
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_computations_transaction
4+
ON computations (transaction_id, block_hash);
5+
6+
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_computations_handle_transaction
7+
ON computations (output_handle, transaction_id, block_hash);

coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ mod tests {
474474
use crate::contracts::TfheContract as C;
475475
use crate::contracts::TfheContract::TfheContractEvents as E;
476476
use crate::database::dependence_chains::dependence_chains;
477-
use crate::database::tfhe_event_propagate::{Chain, ChainCache, LogTfhe};
477+
use crate::database::tfhe_event_propagate::{Chain, ChainCache, ChainHash, LogTfhe};
478478
use crate::database::tfhe_event_propagate::{
479479
ClearConst, Handle, TransactionHash,
480480
};
@@ -502,6 +502,7 @@ mod tests {
502502
event: tfhe_event(e),
503503
is_allowed,
504504
block_number: 0,
505+
block_hash: ChainHash::ZERO,
505506
block_timestamp: sqlx::types::time::PrimitiveDateTime::MIN,
506507
transaction_hash: Some(tx),
507508
dependence_chain: TransactionHash::ZERO,

coprocessor/fhevm-engine/host-listener/src/database/ingest.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ pub async fn ingest_block_logs(
209209
transaction_hash: log.transaction_hash,
210210
block_number,
211211
block_timestamp,
212+
block_hash,
212213
// updated in the next loop and dependence_chains
213214
is_allowed: false,
214215
dependence_chain: Default::default(),

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ pub struct LogTfhe {
122122
pub transaction_hash: Option<TransactionHash>,
123123
pub is_allowed: bool,
124124
pub block_number: u64,
125+
pub block_hash: BlockHash,
125126
pub block_timestamp: PrimitiveDateTime,
126127
pub tx_depth_size: u64,
127128
pub dependence_chain: TransactionHash,
@@ -374,9 +375,11 @@ impl Database {
374375
created_at,
375376
schedule_order,
376377
is_completed,
377-
host_chain_id
378+
host_chain_id,
379+
block_hash,
380+
block_number
378381
)
379-
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), $8::timestamp, $9, $10)
382+
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), $8::timestamp, $9, $10, $11, $12)
380383
ON CONFLICT (output_handle, transaction_id) DO NOTHING
381384
"#,
382385
output_handle,
@@ -391,7 +394,9 @@ impl Database {
391394
log.tx_depth_size as i64
392395
)),
393396
!log.is_allowed,
394-
self.chain_id.as_i64()
397+
self.chain_id.as_i64(),
398+
log.block_hash.as_slice(),
399+
log.block_number as i64,
395400
);
396401
query
397402
.execute(tx.deref_mut())

coprocessor/fhevm-engine/scheduler/src/dfg.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub struct ComponentNode {
6969
pub inputs: HashMap<Handle, Option<DFGTxInput>>,
7070
pub results: Vec<Handle>,
7171
pub intermediate_handles: Vec<Handle>,
72-
pub transaction_id: Handle,
72+
pub transaction: Transaction,
7373
pub is_uncomputable: bool,
7474
pub component_id: usize,
7575
}
@@ -156,10 +156,10 @@ pub fn finalize(graph: &mut Dag<(bool, usize), OpEdge>) -> Vec<usize> {
156156
unneeded_nodes
157157
}
158158

159-
type ComponentNodes = Result<(Vec<ComponentNode>, Vec<(Handle, Handle)>)>;
159+
type ComponentNodes = Result<(Vec<ComponentNode>, Vec<(Handle, Transaction)>)>;
160160
pub fn build_component_nodes(
161161
mut operations: Vec<DFGOp>,
162-
transaction_id: &Handle,
162+
transaction: &Transaction,
163163
) -> ComponentNodes {
164164
operations.sort_by_key(|o| o.output_handle.clone());
165165
let mut graph: Dag<(bool, usize), OpEdge> = Dag::default();
@@ -195,9 +195,9 @@ pub fn build_component_nodes(
195195
.map_err(|_| SchedulerError::CyclicDependence)?;
196196
}
197197
// Prune unneeded branches from the graph
198-
let unneeded: Vec<(Handle, Handle)> = finalize(&mut graph)
198+
let unneeded: Vec<(Handle, Transaction)> = finalize(&mut graph)
199199
.into_iter()
200-
.map(|i| (operations[i].output_handle.clone(), transaction_id.clone()))
200+
.map(|i| (operations[i].output_handle.clone(), transaction.clone()))
201201
.collect();
202202
// Partition the graph and extract sequential components
203203
let mut execution_graph: Dag<ExecNode, ()> = Dag::default();
@@ -215,7 +215,7 @@ pub fn build_component_nodes(
215215
.ok_or(SchedulerError::DataflowGraphError)?;
216216
component_ops.push(std::mem::take(&mut operations[op_node.1]));
217217
}
218-
component.build(component_ops, transaction_id, idx)?;
218+
component.build(component_ops, transaction, idx)?;
219219
components.push(component);
220220
}
221221
Ok((components, unneeded))
@@ -225,10 +225,10 @@ impl ComponentNode {
225225
pub fn build(
226226
&mut self,
227227
mut operations: Vec<DFGOp>,
228-
transaction_id: &Handle,
228+
transaction: &Transaction,
229229
component_id: usize,
230230
) -> Result<()> {
231-
self.transaction_id = transaction_id.clone();
231+
self.transaction = transaction.clone();
232232
self.component_id = component_id;
233233
self.is_uncomputable = false;
234234
// Gather all handles produced within the transaction
@@ -286,7 +286,7 @@ impl ComponentNode {
286286
}
287287
impl std::fmt::Debug for ComponentNode {
288288
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289-
let _ = writeln!(f, "Transaction: [{:?}]", self.transaction_id);
289+
let _ = writeln!(f, "Transaction: [{:?}]", self.transaction);
290290
let _ = writeln!(
291291
f,
292292
"{:?}",
@@ -308,7 +308,7 @@ impl std::fmt::Debug for ComponentNode {
308308
pub struct DFComponentGraph {
309309
pub graph: Dag<ComponentNode, ComponentEdge>,
310310
pub needed_map: HashMap<Handle, Vec<NodeIndex>>,
311-
pub produced: HashMap<Handle, Vec<(NodeIndex, Handle)>>,
311+
pub produced: HashMap<Handle, Vec<(NodeIndex, Transaction)>>,
312312
pub results: Vec<DFGTxResult>,
313313
deferred_dependences: Vec<(NodeIndex, NodeIndex, Handle)>,
314314
}
@@ -322,8 +322,8 @@ impl DFComponentGraph {
322322
for r in tx.results.iter() {
323323
self.produced
324324
.entry(r.clone())
325-
.and_modify(|p| p.push((producer, tx.transaction_id.clone())))
326-
.or_insert(vec![(producer, tx.transaction_id.clone())]);
325+
.and_modify(|p| p.push((producer, tx.transaction.clone())))
326+
.or_insert(vec![(producer, tx.transaction.clone())]);
327327
}
328328
}
329329
// Identify all dependence pairs (producer, consumer)
@@ -333,7 +333,7 @@ impl DFComponentGraph {
333333
if let Some(producer) = self.produced.get(i) {
334334
// If this handle is produced within this same transaction
335335
if let Some((prod_idx, _)) =
336-
producer.iter().find(|(_, tid)| *tid == tx.transaction_id)
336+
producer.iter().find(|(_, tid)| *tid == tx.transaction)
337337
{
338338
if *prod_idx == consumer {
339339
warn!(target: "scheduler", { },
@@ -383,7 +383,7 @@ impl DFComponentGraph {
383383
.graph
384384
.node_weight(*consumer)
385385
.ok_or(SchedulerError::DataflowGraphError)?;
386-
error!(target: "scheduler", { producer_id = ?hex::encode(prod.transaction_id.clone()), consumer_id = ?hex::encode(cons.transaction_id.clone()) },
386+
error!(target: "scheduler", { producer_id = ?prod.transaction.clone(), consumer_id = ?cons.transaction.clone()},
387387
"Unexpected cycle in same-transaction dependence");
388388
return Err(SchedulerError::CyclicDependence.into());
389389
}
@@ -431,11 +431,11 @@ impl DFComponentGraph {
431431
.node_weight_mut(*idx)
432432
.ok_or(SchedulerError::DataflowGraphError)?;
433433
tx.is_uncomputable = true;
434-
error!(target: "scheduler", { transaction_id = ?hex::encode(tx.transaction_id.clone()) },
434+
error!(target: "scheduler", { transaction = ?tx.transaction.clone() },
435435
"Transaction is part of a dependence cycle");
436436
for (_, op) in tx.graph.graph.node_references() {
437437
self.results.push(DFGTxResult {
438-
transaction_id: tx.transaction_id.clone(),
438+
transaction: tx.transaction.clone(),
439439
handle: op.result_handle.to_vec(),
440440
compressed_ct: Err(SchedulerError::CyclicDependence.into()),
441441
});
@@ -454,7 +454,7 @@ impl DFComponentGraph {
454454
.graph
455455
.node_weight(*consumer)
456456
.ok_or(SchedulerError::DataflowGraphError)?;
457-
error!(target: "scheduler", { producer_id = ?hex::encode(prod.transaction_id.clone()), consumer_id = ?hex::encode(cons.transaction_id.clone()) },
457+
error!(target: "scheduler", { producer = ?prod.transaction.clone(), consumer_id = ?cons.transaction.clone() },
458458
"Dependence cycle when adding dependence - initial cycle detection failed");
459459
return Err(SchedulerError::CyclicDependence.into());
460460
}
@@ -489,7 +489,7 @@ impl DFComponentGraph {
489489
if let Ok(ref result) = result {
490490
if let Some((pid, _)) = producer
491491
.iter()
492-
.find(|(_, tid)| *tid == result.transaction_id)
492+
.find(|(_, tid)| tid == result.transaction)
493493
{
494494
prod_idx = *pid;
495495
}
@@ -525,7 +525,7 @@ impl DFComponentGraph {
525525
.node_weight_mut(prod_idx)
526526
.ok_or(SchedulerError::DataflowGraphError)?;
527527
self.results.push(DFGTxResult {
528-
transaction_id: producer_tx.transaction_id.clone(),
528+
transaction: producer_tx.transaction.clone(),
529529
handle: handle.to_vec(),
530530
compressed_ct: result.map(|rok| rok.compressed_ct),
531531
});
@@ -558,7 +558,7 @@ impl DFComponentGraph {
558558
// Add error results for all operations in this transaction
559559
for (_idx, op) in tx_node.graph.graph.node_references() {
560560
self.results.push(DFGTxResult {
561-
transaction_id: tx_node.transaction_id.clone(),
561+
transaction: tx_node.transaction.clone(),
562562
handle: op.result_handle.to_vec(),
563563
compressed_ct: Err(SchedulerError::MissingInputs.into()),
564564
});
@@ -574,14 +574,14 @@ impl DFComponentGraph {
574574
pub fn get_results(&mut self) -> Vec<DFGTxResult> {
575575
std::mem::take(&mut self.results)
576576
}
577-
pub fn get_intermediate_handles(&mut self) -> Vec<(Handle, Handle)> {
577+
pub fn get_intermediate_handles(&mut self) -> Vec<(Handle, Transaction)> {
578578
let mut res = vec![];
579579
for tx in self.graph.node_weights_mut() {
580580
if !tx.is_uncomputable {
581581
res.append(
582582
&mut (std::mem::take(&mut tx.intermediate_handles))
583583
.into_iter()
584-
.map(|h| (h, tx.transaction_id.clone()))
584+
.map(|h| (h, tx.transaction.clone()))
585585
.collect::<Vec<_>>(),
586586
);
587587
}

coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ impl<'a> Scheduler<'a> {
178178
args.push((
179179
std::mem::take(&mut tx.graph),
180180
std::mem::take(&mut tx.inputs),
181-
tx.transaction_id.clone(),
181+
tx.transaction.clone(),
182182
tx.component_id,
183183
));
184184
}
@@ -232,7 +232,7 @@ impl<'a> Scheduler<'a> {
232232
args.push((
233233
std::mem::take(&mut tx.graph),
234234
std::mem::take(&mut tx.inputs),
235-
tx.transaction_id.clone(),
235+
tx.transaction.clone(),
236236
tx.component_id,
237237
));
238238
}
@@ -273,7 +273,12 @@ fn re_randomise_operation_inputs(
273273
Ok(())
274274
}
275275

276-
type ComponentSet = Vec<(DFGraph, HashMap<Handle, Option<DFGTxInput>>, Handle, usize)>;
276+
type ComponentSet = Vec<(
277+
DFGraph,
278+
HashMap<Handle, Option<DFGTxInput>>,
279+
Transaction,
280+
usize,
281+
)>;
277282
fn execute_partition(
278283
transactions: ComponentSet,
279284
task_id: NodeIndex,
@@ -287,7 +292,7 @@ fn execute_partition(
287292
// Traverse transactions within the partition. The transactions
288293
// are topologically sorted so the order is executable
289294
'tx: for (ref mut dfg, ref mut tx_inputs, tid, _cid) in transactions {
290-
let txn_id_short = telemetry::short_hex_id(&tid);
295+
let txn_id_short = telemetry::short_hex_id(&tid.transaction_id);
291296

292297
// Update the transaction inputs based on allowed handles so
293298
// far. If any input is still missing, and we cannot fill it
@@ -296,7 +301,7 @@ fn execute_partition(
296301
for (h, i) in tx_inputs.iter_mut() {
297302
if i.is_none() {
298303
let Some(Ok(ct)) = res.get(h) else {
299-
warn!(target: "scheduler", {transaction_id = ?hex::encode(tid) },
304+
warn!(target: "scheduler", {transaction = ?tid},
300305
"Missing input to compute transaction - skipping");
301306
for nidx in dfg.graph.node_identifiers() {
302307
let Some(node) = dfg.graph.node_weight_mut(nidx) else {
@@ -328,7 +333,7 @@ fn execute_partition(
328333
let started_at = std::time::Instant::now();
329334

330335
let Ok(ts) = daggy::petgraph::algo::toposort(&dfg.graph, None) else {
331-
error!(target: "scheduler", {transaction_id = ?tid },
336+
error!(target: "scheduler", {transaction = ?tid },
332337
"Cyclical dependence error in transaction");
333338
for nidx in dfg.graph.node_identifiers() {
334339
let Some(node) = dfg.graph.node_weight_mut(nidx) else {
@@ -378,7 +383,7 @@ fn execute_partition(
378383
result.1.map(|v| TaskResult {
379384
compressed_ct: v,
380385
is_allowed: node.is_allowed,
381-
transaction_id: tid.clone(),
386+
transaction: tid.clone(),
382387
}),
383388
);
384389
}
@@ -405,7 +410,7 @@ fn try_execute_node(
405410
node_index: usize,
406411
tx_inputs: &mut HashMap<Handle, Option<DFGTxInput>>,
407412
gpu_idx: usize,
408-
transaction_id: &Handle,
413+
transaction: &Transaction,
409414
cpk: &tfhe::CompactPublicKey,
410415
) -> Result<(usize, OpResult)> {
411416
if !node.check_ready_inputs(tx_inputs) {
@@ -463,7 +468,7 @@ fn try_execute_node(
463468
cts,
464469
node_index,
465470
gpu_idx,
466-
transaction_id,
471+
transaction,
467472
))
468473
}
469474

@@ -473,9 +478,9 @@ fn run_computation(
473478
inputs: Vec<SupportedFheCiphertexts>,
474479
graph_node_index: usize,
475480
gpu_idx: usize,
476-
transaction_id: &Handle,
481+
transaction: &Transaction,
477482
) -> (usize, OpResult) {
478-
let txn_id_short = telemetry::short_hex_id(transaction_id);
483+
let txn_id_short = telemetry::short_hex_id(&transaction.transaction_id);
479484
let op = FheOperation::try_from(operation);
480485
match op {
481486
Ok(FheOperation::FheGetCiphertext) => {

coprocessor/fhevm-engine/scheduler/src/dfg/types.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,19 @@ pub struct CompressedCiphertext {
1010
pub struct TaskResult {
1111
pub compressed_ct: CompressedCiphertext,
1212
pub is_allowed: bool,
13-
pub transaction_id: Handle,
13+
pub transaction: Transaction,
1414
}
1515
pub struct DFGTxResult {
1616
pub handle: Handle,
17-
pub transaction_id: Handle,
17+
pub transaction: Transaction,
1818
pub compressed_ct: Result<CompressedCiphertext>,
1919
}
2020
impl std::fmt::Debug for DFGTxResult {
2121
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2222
let _ = writeln!(
2323
f,
2424
"Result: [{:?}] - tid [{:?}]",
25-
self.handle, self.transaction_id
25+
self.handle, self.transaction
2626
);
2727
if self.compressed_ct.is_err() {
2828
let _ = write!(f, "\t ERROR");
@@ -98,3 +98,29 @@ impl std::fmt::Display for SchedulerError {
9898
}
9999
}
100100
}
101+
102+
pub type TransactionId = Vec<u8>;
103+
pub type BlockHash = Vec<u8>;
104+
105+
#[derive(Clone, Default, Hash, PartialEq, Eq)]
106+
pub struct Transaction {
107+
pub transaction_id: TransactionId,
108+
pub block_hash: BlockHash,
109+
}
110+
111+
impl PartialEq<Transaction> for &Transaction {
112+
fn eq(&self, other: &Transaction) -> bool {
113+
self.transaction_id == other.transaction_id && self.block_hash == other.block_hash
114+
}
115+
}
116+
117+
impl std::fmt::Debug for Transaction {
118+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119+
write!(
120+
f,
121+
"Transaction: Tx[{:?}]:Block[{:?}]",
122+
hex::encode(&self.transaction_id),
123+
hex::encode(&self.block_hash)
124+
)
125+
}
126+
}

0 commit comments

Comments
 (0)