Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
ALTER TABLE computations
ADD COLUMN IF NOT EXISTS is_allowed BOOL NOT NULL DEFAULT FALSE;

UPDATE computations SET is_allowed = true WHERE (output_handle, tenant_id) IN (
SELECT handle, tenant_id FROM allowed_handles WHERE is_computed=false
);

CREATE INDEX IF NOT EXISTS idx_computations_is_allowed
ON computations (is_allowed)
WHERE is_computed=false;
51 changes: 30 additions & 21 deletions coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anyhow::{anyhow, Result};
use futures_util::stream::StreamExt;
use sqlx::types::Uuid;

use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -22,11 +22,11 @@ use rustls;
use tokio_util::sync::CancellationToken;

use fhevm_engine_common::healthz_server::HttpServer as HealthHttpServer;
use fhevm_engine_common::types::BlockchainProvider;
use fhevm_engine_common::types::{BlockchainProvider, Handle};
use fhevm_engine_common::utils::HeartBeat;

use crate::contracts::{AclContract, TfheContract};
use crate::database::tfhe_event_propagate::{ChainId, Database, LogTfhe};
use crate::database::tfhe_event_propagate::{tfhe_result_handle, ChainId, Database, LogTfhe};
use crate::health_check::HealthCheck;

pub mod block_history;
Expand Down Expand Up @@ -819,35 +819,34 @@ async fn db_insert_block_no_retry(
tfhe_contract_address: &Option<Address>,
) -> std::result::Result<(), sqlx::Error> {
let mut tx = db.new_transaction().await?;
let mut is_allowed = HashSet::<Handle>::new();
let mut acl_events = vec![];
let mut tfhe_event_log = vec![];
for log in &block_logs.logs {
info!(
block = ?log.block_number,
tx = ?log.transaction_hash,
log_index = ?log.log_index,
"Log",
);
let current_address = Some(log.inner.address);
let is_acl_address = &current_address == acl_contract_address;
if acl_contract_address.is_none() || is_acl_address {
if let Ok(event) =
AclContract::AclContractEvents::decode_log(&log.inner)
{
info!(acl_event = ?event, "ACL event");
let handle = panic!();
is_allowed.insert(handle);
db.handle_acl_event(&mut tx, &event).await?;
continue;
}
}
let is_tfhe_address = &current_address == tfhe_contract_address;
if tfhe_contract_address.is_none() || is_tfhe_address {
if let Ok(event) =
TfheContract::TfheContractEvents::decode_log(&log.inner)
{
info!(tfhe_event = ?event, "TFHE event");
let log = LogTfhe {
event,
transaction_hash: log.transaction_hash,
is_allowed: false, // updated in the next loop
};
db.insert_tfhe_event(&mut tx, &log).await?;
continue;
}
}
let is_acl_address = &current_address == acl_contract_address;
if acl_contract_address.is_none() || is_acl_address {
if let Ok(event) =
AclContract::AclContractEvents::decode_log(&log.inner)
{
info!(acl_event = ?event, "ACL event");
db.handle_acl_event(&mut tx, &event).await?;
tfhe_event_log.push(log);
continue;
}
}
Expand All @@ -856,10 +855,20 @@ async fn db_insert_block_no_retry(
event_address = ?log.inner.address,
acl_contract_address = ?acl_contract_address,
tfhe_contract_address = ?tfhe_contract_address,
log = ?log,
"Cannot decode event",
);
}
}
for tfhe_log in &tfhe_event_log {
info!(tfhe_log = ?tfhe_log, "TFHE event");
let result_handle = tfhe_result_handle(&tfhe_log.event);
let tfhe_log = LogTfhe {
is_allowed: is_allowed.contains(&result_handle),
..tfhe_log
};
db.insert_tfhe_event(&mut tx, &tfhe_log).await?;
}
db.mark_block_as_valid(&mut tx, &block_logs.summary).await?;
tx.commit().await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ pub struct Database {
pub tick: HeartBeat,
}

#[derive(Debug)]
pub struct LogTfhe {
pub event: Log<TfheContractEvents>,
pub transaction_hash: Option<TransactionHash>,
pub is_allowed: bool,
}

pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>;
Expand Down Expand Up @@ -277,7 +279,8 @@ impl Database {
fhe_operation,
is_scalar,
dependence_chain_id,
transaction_id
transaction_id,
is_allowed,
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING
Expand All @@ -288,7 +291,8 @@ impl Database {
fhe_operation as i16,
is_scalar,
bucket.to_vec(),
log.transaction_hash.map(|txh| txh.to_vec())
log.transaction_hash.map(|txh| txh.to_vec()),
log.is_allowed,
);
query.execute(tx.deref_mut()).await.map(|_| ())
}
Expand Down Expand Up @@ -674,3 +678,46 @@ pub fn event_name(op: &TfheContractEvents) -> &'static str {
E::VerifyCiphertext(_) => "VerifyCiphertext",
}
}


pub fn tfhe_result_handle(op: &TfheContractEvents) -> Option<Handle> {
use TfheContract as C;
use TfheContractEvents as E;
match op {
E::Cast(C::Cast {result, ..})
| E::FheAdd(C::FheAdd {result, ..})
| E::FheBitAnd(C::FheBitAnd {result, ..})
| E::FheBitOr(C::FheBitOr {result, ..})
| E::FheBitXor(C::FheBitXor {result, ..} )
| E::FheDiv(C::FheDiv {result, ..})
| E::FheMax(C::FheMax {result, ..})
| E::FheMin(C::FheMin {result, ..})
| E::FheMul(C::FheMul {result, ..})
| E::FheRem(C::FheRem {result, ..})
| E::FheRotl(C::FheRotl {result, ..})
| E::FheRotr(C::FheRotr {result, ..})
| E::FheShl(C::FheShl {result, ..})
| E::FheShr(C::FheShr {result, ..})
| E::FheSub(C::FheSub {result, ..})
| E::FheIfThenElse(C::FheIfThenElse {result, ..})
| E::FheEq(C::FheEq {result, ..})
| E::FheGe(C::FheGe {result, ..})
| E::FheGt(C::FheGt {result, ..})
| E::FheLe(C::FheLe {result, ..})
| E::FheLt(C::FheLt {result, ..})
| E::FheNe(C::FheNe {result, ..})
| E::FheNeg(C::FheNeg {result, ..})
| E::FheNot(C::FheNot {result, ..})
| E::FheRand(C::FheRand {result, ..})
| E::FheRandBounded(C::FheRandBounded {result, ..})
| E::TrivialEncrypt(C::TrivialEncrypt {result, ..})
=> Some(result.clone()),

| E::Initialized(_)
| E::OwnershipTransferStarted(_)
| E::OwnershipTransferred(_)
| E::Upgraded(_)
| E::VerifyCiphertext(_)
=> None,
}
}
Loading