diff --git a/coprocessor/fhevm-engine/db-migration/migrations/20250917084403_add_is_allowed_in_computations.sql b/coprocessor/fhevm-engine/db-migration/migrations/20250917084403_add_is_allowed_in_computations.sql new file mode 100644 index 0000000000..03d6e5d6a8 --- /dev/null +++ b/coprocessor/fhevm-engine/db-migration/migrations/20250917084403_add_is_allowed_in_computations.sql @@ -0,0 +1,10 @@ +ALTER TABLE computations + ADD COLUMN IF NOT EXISTS is_allowed BOOL NOT NULL DEFAULT FALSE; + +UPDATE computations SET is_allowed = true WHERE (output_handle, tenant_id) IN ( + SELECT handle, tenant_id FROM allowed_handles WHERE is_computed=false +); + +CREATE INDEX IF NOT EXISTS idx_computations_is_allowed + ON computations (is_allowed) + WHERE is_computed=false; diff --git a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs index ea86603d6e..3d293b14ea 100644 --- a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs @@ -8,7 +8,7 @@ use anyhow::{anyhow, Result}; use futures_util::stream::StreamExt; use sqlx::types::Uuid; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -22,11 +22,11 @@ use rustls; use tokio_util::sync::CancellationToken; use fhevm_engine_common::healthz_server::HttpServer as HealthHttpServer; -use fhevm_engine_common::types::BlockchainProvider; +use fhevm_engine_common::types::{BlockchainProvider, Handle}; use fhevm_engine_common::utils::HeartBeat; use crate::contracts::{AclContract, TfheContract}; -use crate::database::tfhe_event_propagate::{ChainId, Database, LogTfhe}; +use crate::database::tfhe_event_propagate::{tfhe_result_handle, ChainId, Database, LogTfhe}; use crate::health_check::HealthCheck; pub mod block_history; @@ -819,35 +819,34 @@ async fn db_insert_block_no_retry( tfhe_contract_address: &Option
, ) -> std::result::Result<(), sqlx::Error> { let mut tx = db.new_transaction().await?; + let mut is_allowed = HashSet::::new(); + let mut acl_events = vec![]; + let mut tfhe_event_log = vec![]; for log in &block_logs.logs { - info!( - block = ?log.block_number, - tx = ?log.transaction_hash, - log_index = ?log.log_index, - "Log", - ); let current_address = Some(log.inner.address); + let is_acl_address = ¤t_address == acl_contract_address; + if acl_contract_address.is_none() || is_acl_address { + if let Ok(event) = + AclContract::AclContractEvents::decode_log(&log.inner) + { + info!(acl_event = ?event, "ACL event"); + let handle = panic!(); + is_allowed.insert(handle); + db.handle_acl_event(&mut tx, &event).await?; + continue; + } + } let is_tfhe_address = ¤t_address == tfhe_contract_address; if tfhe_contract_address.is_none() || is_tfhe_address { if let Ok(event) = TfheContract::TfheContractEvents::decode_log(&log.inner) { - info!(tfhe_event = ?event, "TFHE event"); let log = LogTfhe { event, transaction_hash: log.transaction_hash, + is_allowed: false, // updated in the next loop }; - db.insert_tfhe_event(&mut tx, &log).await?; - continue; - } - } - let is_acl_address = ¤t_address == acl_contract_address; - if acl_contract_address.is_none() || is_acl_address { - if let Ok(event) = - AclContract::AclContractEvents::decode_log(&log.inner) - { - info!(acl_event = ?event, "ACL event"); - db.handle_acl_event(&mut tx, &event).await?; + tfhe_event_log.push(log); continue; } } @@ -856,10 +855,20 @@ async fn db_insert_block_no_retry( event_address = ?log.inner.address, acl_contract_address = ?acl_contract_address, tfhe_contract_address = ?tfhe_contract_address, + log = ?log, "Cannot decode event", ); } } + for tfhe_log in &tfhe_event_log { + info!(tfhe_log = ?tfhe_log, "TFHE event"); + let result_handle = tfhe_result_handle(&tfhe_log.event); + let tfhe_log = LogTfhe { + is_allowed: is_allowed.contains(&result_handle), + ..tfhe_log + }; + db.insert_tfhe_event(&mut tx, &tfhe_log).await?; + } db.mark_block_as_valid(&mut tx, &block_logs.summary).await?; tx.commit().await } diff --git a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs index f5725ade24..39a514c094 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs @@ -77,9 +77,11 @@ pub struct Database { pub tick: HeartBeat, } +#[derive(Debug)] pub struct LogTfhe { pub event: Log, pub transaction_hash: Option, + pub is_allowed: bool, } pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>; @@ -277,7 +279,8 @@ impl Database { fhe_operation, is_scalar, dependence_chain_id, - transaction_id + transaction_id, + is_allowed, ) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING @@ -288,7 +291,8 @@ impl Database { fhe_operation as i16, is_scalar, bucket.to_vec(), - log.transaction_hash.map(|txh| txh.to_vec()) + log.transaction_hash.map(|txh| txh.to_vec()), + log.is_allowed, ); query.execute(tx.deref_mut()).await.map(|_| ()) } @@ -674,3 +678,46 @@ pub fn event_name(op: &TfheContractEvents) -> &'static str { E::VerifyCiphertext(_) => "VerifyCiphertext", } } + + +pub fn tfhe_result_handle(op: &TfheContractEvents) -> Option { + use TfheContract as C; + use TfheContractEvents as E; + match op { + E::Cast(C::Cast {result, ..}) + | E::FheAdd(C::FheAdd {result, ..}) + | E::FheBitAnd(C::FheBitAnd {result, ..}) + | E::FheBitOr(C::FheBitOr {result, ..}) + | E::FheBitXor(C::FheBitXor {result, ..} ) + | E::FheDiv(C::FheDiv {result, ..}) + | E::FheMax(C::FheMax {result, ..}) + | E::FheMin(C::FheMin {result, ..}) + | E::FheMul(C::FheMul {result, ..}) + | E::FheRem(C::FheRem {result, ..}) + | E::FheRotl(C::FheRotl {result, ..}) + | E::FheRotr(C::FheRotr {result, ..}) + | E::FheShl(C::FheShl {result, ..}) + | E::FheShr(C::FheShr {result, ..}) + | E::FheSub(C::FheSub {result, ..}) + | E::FheIfThenElse(C::FheIfThenElse {result, ..}) + | E::FheEq(C::FheEq {result, ..}) + | E::FheGe(C::FheGe {result, ..}) + | E::FheGt(C::FheGt {result, ..}) + | E::FheLe(C::FheLe {result, ..}) + | E::FheLt(C::FheLt {result, ..}) + | E::FheNe(C::FheNe {result, ..}) + | E::FheNeg(C::FheNeg {result, ..}) + | E::FheNot(C::FheNot {result, ..}) + | E::FheRand(C::FheRand {result, ..}) + | E::FheRandBounded(C::FheRandBounded {result, ..}) + | E::TrivialEncrypt(C::TrivialEncrypt {result, ..}) + => Some(result.clone()), + + | E::Initialized(_) + | E::OwnershipTransferStarted(_) + | E::OwnershipTransferred(_) + | E::Upgraded(_) + | E::VerifyCiphertext(_) + => None, + } +}