Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE computations
ADD COLUMN IF NOT EXISTS block_hash BYTEA NOT NULL DEFAULT '\x00'::BYTEA,
ADD COLUMN IF NOT EXISTS block_number BIGINT NOT NULL DEFAULT 0;

-- For next release, we should remove the default values for block_hash and block_number.
-- ALTER TABLE computations ALTER COLUMN block_hash DROP DEFAULT;
-- ALTER TABLE computations ALTER COLUMN block_number DROP DEFAULT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- no-transaction

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_computations_transaction
ON computations (transaction_id, block_hash);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_computations_handle_transaction
ON computations (output_handle, transaction_id, block_hash);
2 changes: 1 addition & 1 deletion coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
// TODO: check if we can avoid the cast from u256 to i64
sqlx::query!(
"WITH ins AS (
INSERT INTO verify_proofs (zk_proof_id, chain_id, contract_address, user_address, input, extra_data, transaction_id)
INSERT INTO verify_proofs (zk_proof_id, host_chain_id, contract_address, user_address, input, extra_data, transaction_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT(zk_proof_id) DO NOTHING
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ mod tests {
use crate::contracts::TfheContract as C;
use crate::contracts::TfheContract::TfheContractEvents as E;
use crate::database::dependence_chains::dependence_chains;
use crate::database::tfhe_event_propagate::{Chain, ChainCache, LogTfhe};
use crate::database::tfhe_event_propagate::{
Chain, ChainCache, ChainHash, LogTfhe,
};
use crate::database::tfhe_event_propagate::{
ClearConst, Handle, TransactionHash,
};
Expand Down Expand Up @@ -502,6 +504,7 @@ mod tests {
event: tfhe_event(e),
is_allowed,
block_number: 0,
block_hash: ChainHash::ZERO,
block_timestamp: sqlx::types::time::PrimitiveDateTime::MIN,
transaction_hash: Some(tx),
dependence_chain: TransactionHash::ZERO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ pub async fn ingest_block_logs(
transaction_hash: log.transaction_hash,
block_number,
block_timestamp,
block_hash,
// updated in the next loop and dependence_chains
is_allowed: false,
dependence_chain: Default::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub struct LogTfhe {
pub transaction_hash: Option<TransactionHash>,
pub is_allowed: bool,
pub block_number: u64,
pub block_hash: BlockHash,
pub block_timestamp: PrimitiveDateTime,
pub tx_depth_size: u64,
pub dependence_chain: TransactionHash,
Expand Down Expand Up @@ -374,9 +375,11 @@ impl Database {
created_at,
schedule_order,
is_completed,
host_chain_id
host_chain_id,
block_hash,
block_number
)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), $8::timestamp, $9, $10)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), $8::timestamp, $9, $10, $11, $12)
ON CONFLICT (output_handle, transaction_id) DO NOTHING
"#,
output_handle,
Expand All @@ -391,7 +394,9 @@ impl Database {
log.tx_depth_size as i64
)),
!log.is_allowed,
self.chain_id.as_i64()
self.chain_id.as_i64(),
log.block_hash.as_slice(),
log.block_number as i64,
);
query
.execute(tx.deref_mut())
Expand Down
44 changes: 22 additions & 22 deletions coprocessor/fhevm-engine/scheduler/src/dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct ComponentNode {
pub inputs: HashMap<Handle, Option<DFGTxInput>>,
pub results: Vec<Handle>,
pub intermediate_handles: Vec<Handle>,
pub transaction_id: Handle,
pub transaction: Transaction,
pub is_uncomputable: bool,
pub component_id: usize,
}
Expand Down Expand Up @@ -156,10 +156,10 @@ pub fn finalize(graph: &mut Dag<(bool, usize), OpEdge>) -> Vec<usize> {
unneeded_nodes
}

type ComponentNodes = Result<(Vec<ComponentNode>, Vec<(Handle, Handle)>)>;
type ComponentNodes = Result<(Vec<ComponentNode>, Vec<(Handle, Transaction)>)>;
pub fn build_component_nodes(
mut operations: Vec<DFGOp>,
transaction_id: &Handle,
transaction: &Transaction,
) -> ComponentNodes {
operations.sort_by_key(|o| o.output_handle.clone());
let mut graph: Dag<(bool, usize), OpEdge> = Dag::default();
Expand Down Expand Up @@ -195,9 +195,9 @@ pub fn build_component_nodes(
.map_err(|_| SchedulerError::CyclicDependence)?;
}
// Prune unneeded branches from the graph
let unneeded: Vec<(Handle, Handle)> = finalize(&mut graph)
let unneeded: Vec<(Handle, Transaction)> = finalize(&mut graph)
.into_iter()
.map(|i| (operations[i].output_handle.clone(), transaction_id.clone()))
.map(|i| (operations[i].output_handle.clone(), transaction.clone()))
.collect();
// Partition the graph and extract sequential components
let mut execution_graph: Dag<ExecNode, ()> = Dag::default();
Expand All @@ -215,7 +215,7 @@ pub fn build_component_nodes(
.ok_or(SchedulerError::DataflowGraphError)?;
component_ops.push(std::mem::take(&mut operations[op_node.1]));
}
component.build(component_ops, transaction_id, idx)?;
component.build(component_ops, transaction, idx)?;
components.push(component);
}
Ok((components, unneeded))
Expand All @@ -225,10 +225,10 @@ impl ComponentNode {
pub fn build(
&mut self,
mut operations: Vec<DFGOp>,
transaction_id: &Handle,
transaction: &Transaction,
component_id: usize,
) -> Result<()> {
self.transaction_id = transaction_id.clone();
self.transaction = transaction.clone();
self.component_id = component_id;
self.is_uncomputable = false;
// Gather all handles produced within the transaction
Expand Down Expand Up @@ -286,7 +286,7 @@ impl ComponentNode {
}
impl std::fmt::Debug for ComponentNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let _ = writeln!(f, "Transaction: [{:?}]", self.transaction_id);
let _ = writeln!(f, "Transaction: [{:?}]", self.transaction);
let _ = writeln!(
f,
"{:?}",
Expand All @@ -308,7 +308,7 @@ impl std::fmt::Debug for ComponentNode {
pub struct DFComponentGraph {
pub graph: Dag<ComponentNode, ComponentEdge>,
pub needed_map: HashMap<Handle, Vec<NodeIndex>>,
pub produced: HashMap<Handle, Vec<(NodeIndex, Handle)>>,
pub produced: HashMap<Handle, Vec<(NodeIndex, Transaction)>>,
pub results: Vec<DFGTxResult>,
deferred_dependences: Vec<(NodeIndex, NodeIndex, Handle)>,
}
Expand All @@ -322,8 +322,8 @@ impl DFComponentGraph {
for r in tx.results.iter() {
self.produced
.entry(r.clone())
.and_modify(|p| p.push((producer, tx.transaction_id.clone())))
.or_insert(vec![(producer, tx.transaction_id.clone())]);
.and_modify(|p| p.push((producer, tx.transaction.clone())))
.or_insert(vec![(producer, tx.transaction.clone())]);
}
}
// Identify all dependence pairs (producer, consumer)
Expand All @@ -333,7 +333,7 @@ impl DFComponentGraph {
if let Some(producer) = self.produced.get(i) {
// If this handle is produced within this same transaction
if let Some((prod_idx, _)) =
producer.iter().find(|(_, tid)| *tid == tx.transaction_id)
producer.iter().find(|(_, tid)| *tid == tx.transaction)
{
if *prod_idx == consumer {
warn!(target: "scheduler", { },
Expand Down Expand Up @@ -383,7 +383,7 @@ impl DFComponentGraph {
.graph
.node_weight(*consumer)
.ok_or(SchedulerError::DataflowGraphError)?;
error!(target: "scheduler", { producer_id = ?hex::encode(prod.transaction_id.clone()), consumer_id = ?hex::encode(cons.transaction_id.clone()) },
error!(target: "scheduler", { producer_id = ?prod.transaction.clone(), consumer_id = ?cons.transaction.clone()},
"Unexpected cycle in same-transaction dependence");
return Err(SchedulerError::CyclicDependence.into());
}
Expand Down Expand Up @@ -431,11 +431,11 @@ impl DFComponentGraph {
.node_weight_mut(*idx)
.ok_or(SchedulerError::DataflowGraphError)?;
tx.is_uncomputable = true;
error!(target: "scheduler", { transaction_id = ?hex::encode(tx.transaction_id.clone()) },
error!(target: "scheduler", { transaction = ?tx.transaction.clone() },
"Transaction is part of a dependence cycle");
for (_, op) in tx.graph.graph.node_references() {
self.results.push(DFGTxResult {
transaction_id: tx.transaction_id.clone(),
transaction: tx.transaction.clone(),
handle: op.result_handle.to_vec(),
compressed_ct: Err(SchedulerError::CyclicDependence.into()),
});
Expand All @@ -454,7 +454,7 @@ impl DFComponentGraph {
.graph
.node_weight(*consumer)
.ok_or(SchedulerError::DataflowGraphError)?;
error!(target: "scheduler", { producer_id = ?hex::encode(prod.transaction_id.clone()), consumer_id = ?hex::encode(cons.transaction_id.clone()) },
error!(target: "scheduler", { producer = ?prod.transaction.clone(), consumer_id = ?cons.transaction.clone() },
"Dependence cycle when adding dependence - initial cycle detection failed");
return Err(SchedulerError::CyclicDependence.into());
}
Expand Down Expand Up @@ -489,7 +489,7 @@ impl DFComponentGraph {
if let Ok(ref result) = result {
if let Some((pid, _)) = producer
.iter()
.find(|(_, tid)| *tid == result.transaction_id)
.find(|(_, tid)| tid == result.transaction)
{
prod_idx = *pid;
}
Expand Down Expand Up @@ -525,7 +525,7 @@ impl DFComponentGraph {
.node_weight_mut(prod_idx)
.ok_or(SchedulerError::DataflowGraphError)?;
self.results.push(DFGTxResult {
transaction_id: producer_tx.transaction_id.clone(),
transaction: producer_tx.transaction.clone(),
handle: handle.to_vec(),
compressed_ct: result.map(|rok| rok.compressed_ct),
});
Expand Down Expand Up @@ -558,7 +558,7 @@ impl DFComponentGraph {
// Add error results for all operations in this transaction
for (_idx, op) in tx_node.graph.graph.node_references() {
self.results.push(DFGTxResult {
transaction_id: tx_node.transaction_id.clone(),
transaction: tx_node.transaction.clone(),
handle: op.result_handle.to_vec(),
compressed_ct: Err(SchedulerError::MissingInputs.into()),
});
Expand All @@ -574,14 +574,14 @@ impl DFComponentGraph {
pub fn get_results(&mut self) -> Vec<DFGTxResult> {
std::mem::take(&mut self.results)
}
pub fn get_intermediate_handles(&mut self) -> Vec<(Handle, Handle)> {
pub fn get_intermediate_handles(&mut self) -> Vec<(Handle, Transaction)> {
let mut res = vec![];
for tx in self.graph.node_weights_mut() {
if !tx.is_uncomputable {
res.append(
&mut (std::mem::take(&mut tx.intermediate_handles))
.into_iter()
.map(|h| (h, tx.transaction_id.clone()))
.map(|h| (h, tx.transaction.clone()))
.collect::<Vec<_>>(),
);
}
Expand Down
27 changes: 16 additions & 11 deletions coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl<'a> Scheduler<'a> {
args.push((
std::mem::take(&mut tx.graph),
std::mem::take(&mut tx.inputs),
tx.transaction_id.clone(),
tx.transaction.clone(),
tx.component_id,
));
}
Expand Down Expand Up @@ -232,7 +232,7 @@ impl<'a> Scheduler<'a> {
args.push((
std::mem::take(&mut tx.graph),
std::mem::take(&mut tx.inputs),
tx.transaction_id.clone(),
tx.transaction.clone(),
tx.component_id,
));
}
Expand Down Expand Up @@ -273,7 +273,12 @@ fn re_randomise_operation_inputs(
Ok(())
}

type ComponentSet = Vec<(DFGraph, HashMap<Handle, Option<DFGTxInput>>, Handle, usize)>;
type ComponentSet = Vec<(
DFGraph,
HashMap<Handle, Option<DFGTxInput>>,
Transaction,
usize,
)>;
fn execute_partition(
transactions: ComponentSet,
task_id: NodeIndex,
Expand All @@ -287,7 +292,7 @@ fn execute_partition(
// Traverse transactions within the partition. The transactions
// are topologically sorted so the order is executable
'tx: for (ref mut dfg, ref mut tx_inputs, tid, _cid) in transactions {
let txn_id_short = telemetry::short_hex_id(&tid);
let txn_id_short = telemetry::short_hex_id(&tid.transaction_id);

// Update the transaction inputs based on allowed handles so
// far. If any input is still missing, and we cannot fill it
Expand All @@ -296,7 +301,7 @@ fn execute_partition(
for (h, i) in tx_inputs.iter_mut() {
if i.is_none() {
let Some(Ok(ct)) = res.get(h) else {
warn!(target: "scheduler", {transaction_id = ?hex::encode(tid) },
warn!(target: "scheduler", {transaction = ?tid},
"Missing input to compute transaction - skipping");
for nidx in dfg.graph.node_identifiers() {
let Some(node) = dfg.graph.node_weight_mut(nidx) else {
Expand Down Expand Up @@ -328,7 +333,7 @@ fn execute_partition(
let started_at = std::time::Instant::now();

let Ok(ts) = daggy::petgraph::algo::toposort(&dfg.graph, None) else {
error!(target: "scheduler", {transaction_id = ?tid },
error!(target: "scheduler", {transaction = ?tid },
"Cyclical dependence error in transaction");
for nidx in dfg.graph.node_identifiers() {
let Some(node) = dfg.graph.node_weight_mut(nidx) else {
Expand Down Expand Up @@ -378,7 +383,7 @@ fn execute_partition(
result.1.map(|v| TaskResult {
compressed_ct: v,
is_allowed: node.is_allowed,
transaction_id: tid.clone(),
transaction: tid.clone(),
}),
);
}
Expand All @@ -405,7 +410,7 @@ fn try_execute_node(
node_index: usize,
tx_inputs: &mut HashMap<Handle, Option<DFGTxInput>>,
gpu_idx: usize,
transaction_id: &Handle,
transaction: &Transaction,
cpk: &tfhe::CompactPublicKey,
) -> Result<(usize, OpResult)> {
if !node.check_ready_inputs(tx_inputs) {
Expand Down Expand Up @@ -463,7 +468,7 @@ fn try_execute_node(
cts,
node_index,
gpu_idx,
transaction_id,
transaction,
))
}

Expand All @@ -473,9 +478,9 @@ fn run_computation(
inputs: Vec<SupportedFheCiphertexts>,
graph_node_index: usize,
gpu_idx: usize,
transaction_id: &Handle,
transaction: &Transaction,
) -> (usize, OpResult) {
let txn_id_short = telemetry::short_hex_id(transaction_id);
let txn_id_short = telemetry::short_hex_id(&transaction.transaction_id);
let op = FheOperation::try_from(operation);
match op {
Ok(FheOperation::FheGetCiphertext) => {
Expand Down
Loading
Loading