Skip to content

Commit cbbf4a2

Browse files
committed
feat(corprocessor): update is_allowed
1 parent aa8ec89 commit cbbf4a2

File tree

3 files changed

+89
-23
lines changed

3 files changed

+89
-23
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
ALTER TABLE computations
2+
ADD COLUMN IF NOT EXISTS is_allowed BOOL NOT NULL DEFAULT FALSE;
3+
4+
UPDATE computations SET is_allowed = true WHERE (output_handle, tenant_id) IN (
5+
SELECT handle, tenant_id FROM allowed_handles WHERE is_computed=false
6+
);
7+
8+
CREATE INDEX IF NOT EXISTS idx_computations_is_allowed
9+
ON computations (is_allowed)
10+
WHERE is_computed=false;

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use anyhow::{anyhow, Result};
88
use futures_util::stream::StreamExt;
99
use sqlx::types::Uuid;
1010

11-
use std::collections::VecDeque;
11+
use std::collections::{HashSet, VecDeque};
1212
use std::str::FromStr;
1313
use std::sync::Arc;
1414
use std::time::Duration;
@@ -22,11 +22,11 @@ use rustls;
2222
use tokio_util::sync::CancellationToken;
2323

2424
use fhevm_engine_common::healthz_server::HttpServer as HealthHttpServer;
25-
use fhevm_engine_common::types::BlockchainProvider;
25+
use fhevm_engine_common::types::{BlockchainProvider, Handle};
2626
use fhevm_engine_common::utils::HeartBeat;
2727

2828
use crate::contracts::{AclContract, TfheContract};
29-
use crate::database::tfhe_event_propagate::{ChainId, Database, LogTfhe};
29+
use crate::database::tfhe_event_propagate::{tfhe_result_handle, ChainId, Database, LogTfhe};
3030
use crate::health_check::HealthCheck;
3131

3232
pub mod block_history;
@@ -819,35 +819,34 @@ async fn db_insert_block_no_retry(
819819
tfhe_contract_address: &Option<Address>,
820820
) -> std::result::Result<(), sqlx::Error> {
821821
let mut tx = db.new_transaction().await?;
822+
let mut is_allowed = HashSet::<Handle>::new();
823+
let mut acl_events = vec![];
824+
let mut tfhe_event_log = vec![];
822825
for log in &block_logs.logs {
823-
info!(
824-
block = ?log.block_number,
825-
tx = ?log.transaction_hash,
826-
log_index = ?log.log_index,
827-
"Log",
828-
);
829826
let current_address = Some(log.inner.address);
827+
let is_acl_address = &current_address == acl_contract_address;
828+
if acl_contract_address.is_none() || is_acl_address {
829+
if let Ok(event) =
830+
AclContract::AclContractEvents::decode_log(&log.inner)
831+
{
832+
info!(acl_event = ?event, "ACL event");
833+
let handle = panic!();
834+
is_allowed.insert(handle);
835+
db.handle_acl_event(&mut tx, &event).await?;
836+
continue;
837+
}
838+
}
830839
let is_tfhe_address = &current_address == tfhe_contract_address;
831840
if tfhe_contract_address.is_none() || is_tfhe_address {
832841
if let Ok(event) =
833842
TfheContract::TfheContractEvents::decode_log(&log.inner)
834843
{
835-
info!(tfhe_event = ?event, "TFHE event");
836844
let log = LogTfhe {
837845
event,
838846
transaction_hash: log.transaction_hash,
847+
is_allowed: false, // updated in the next loop
839848
};
840-
db.insert_tfhe_event(&mut tx, &log).await?;
841-
continue;
842-
}
843-
}
844-
let is_acl_address = &current_address == acl_contract_address;
845-
if acl_contract_address.is_none() || is_acl_address {
846-
if let Ok(event) =
847-
AclContract::AclContractEvents::decode_log(&log.inner)
848-
{
849-
info!(acl_event = ?event, "ACL event");
850-
db.handle_acl_event(&mut tx, &event).await?;
849+
tfhe_event_log.push(log);
851850
continue;
852851
}
853852
}
@@ -856,10 +855,20 @@ async fn db_insert_block_no_retry(
856855
event_address = ?log.inner.address,
857856
acl_contract_address = ?acl_contract_address,
858857
tfhe_contract_address = ?tfhe_contract_address,
858+
log = ?log,
859859
"Cannot decode event",
860860
);
861861
}
862862
}
863+
for tfhe_log in &tfhe_event_log {
864+
info!(tfhe_log = ?tfhe_log, "TFHE event");
865+
let result_handle = tfhe_result_handle(&tfhe_log.event);
866+
let tfhe_log = LogTfhe {
867+
is_allowed: is_allowed.contains(&result_handle),
868+
..tfhe_log
869+
};
870+
db.insert_tfhe_event(&mut tx, &tfhe_log).await?;
871+
}
863872
db.mark_block_as_valid(&mut tx, &block_logs.summary).await?;
864873
tx.commit().await
865874
}

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,11 @@ pub struct Database {
7777
pub tick: HeartBeat,
7878
}
7979

80+
#[derive(Debug)]
8081
pub struct LogTfhe {
8182
pub event: Log<TfheContractEvents>,
8283
pub transaction_hash: Option<TransactionHash>,
84+
pub is_allowed: bool,
8385
}
8486

8587
pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>;
@@ -277,7 +279,8 @@ impl Database {
277279
fhe_operation,
278280
is_scalar,
279281
dependence_chain_id,
280-
transaction_id
282+
transaction_id,
283+
is_allowed,
281284
)
282285
VALUES ($1, $2, $3, $4, $5, $6, $7)
283286
ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING
@@ -288,7 +291,8 @@ impl Database {
288291
fhe_operation as i16,
289292
is_scalar,
290293
bucket.to_vec(),
291-
log.transaction_hash.map(|txh| txh.to_vec())
294+
log.transaction_hash.map(|txh| txh.to_vec()),
295+
log.is_allowed,
292296
);
293297
query.execute(tx.deref_mut()).await.map(|_| ())
294298
}
@@ -674,3 +678,46 @@ pub fn event_name(op: &TfheContractEvents) -> &'static str {
674678
E::VerifyCiphertext(_) => "VerifyCiphertext",
675679
}
676680
}
681+
682+
683+
pub fn tfhe_result_handle(op: &TfheContractEvents) -> Option<Handle> {
684+
use TfheContract as C;
685+
use TfheContractEvents as E;
686+
match op {
687+
E::Cast(C::Cast {result, ..})
688+
| E::FheAdd(C::FheAdd {result, ..})
689+
| E::FheBitAnd(C::FheBitAnd {result, ..})
690+
| E::FheBitOr(C::FheBitOr {result, ..})
691+
| E::FheBitXor(C::FheBitXor {result, ..} )
692+
| E::FheDiv(C::FheDiv {result, ..})
693+
| E::FheMax(C::FheMax {result, ..})
694+
| E::FheMin(C::FheMin {result, ..})
695+
| E::FheMul(C::FheMul {result, ..})
696+
| E::FheRem(C::FheRem {result, ..})
697+
| E::FheRotl(C::FheRotl {result, ..})
698+
| E::FheRotr(C::FheRotr {result, ..})
699+
| E::FheShl(C::FheShl {result, ..})
700+
| E::FheShr(C::FheShr {result, ..})
701+
| E::FheSub(C::FheSub {result, ..})
702+
| E::FheIfThenElse(C::FheIfThenElse {result, ..})
703+
| E::FheEq(C::FheEq {result, ..})
704+
| E::FheGe(C::FheGe {result, ..})
705+
| E::FheGt(C::FheGt {result, ..})
706+
| E::FheLe(C::FheLe {result, ..})
707+
| E::FheLt(C::FheLt {result, ..})
708+
| E::FheNe(C::FheNe {result, ..})
709+
| E::FheNeg(C::FheNeg {result, ..})
710+
| E::FheNot(C::FheNot {result, ..})
711+
| E::FheRand(C::FheRand {result, ..})
712+
| E::FheRandBounded(C::FheRandBounded {result, ..})
713+
| E::TrivialEncrypt(C::TrivialEncrypt {result, ..})
714+
=> Some(result.clone()),
715+
716+
| E::Initialized(_)
717+
| E::OwnershipTransferStarted(_)
718+
| E::OwnershipTransferred(_)
719+
| E::Upgraded(_)
720+
| E::VerifyCiphertext(_)
721+
=> None,
722+
}
723+
}

0 commit comments

Comments
 (0)