Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions coprocessor/fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions coprocessor/fhevm-engine/zkproof-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ bincode = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
humantime = { workspace = true }

# local dependencies
fhevm-engine-common = { path = "../fhevm-engine-common" }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use clap::{command, Parser};
use fhevm_engine_common::healthz_server::HttpServer;
use fhevm_engine_common::telemetry;
use std::sync::Arc;
use humantime::parse_duration;
use std::{sync::Arc, time::Duration};
use tokio::{join, task};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, Level};
Expand All @@ -26,6 +27,15 @@ pub struct Args {
#[arg(long, default_value_t = 5)]
pub pg_pool_connections: u32,

/// Postgres acquire timeout
/// A longer timeout could affect the healthz/liveness updates
#[arg(long, default_value = "15s", value_parser = parse_duration)]
pub pg_timeout: Duration,

/// Postgres diagnostics: enable auto_explain extension
#[arg(long, value_parser = parse_duration)]
pub pg_auto_explain_with_min_duration: Option<Duration>,

/// Postgres database url. If unspecified DATABASE_URL environment variable
/// is used
#[arg(long)]
Expand Down Expand Up @@ -60,6 +70,8 @@ async fn main() {
let args = parse_args();
tracing_subscriber::fmt()
.json()
.with_current_span(true)
.with_span_list(false)
.with_level(true)
.with_max_level(args.log_level)
.init();
Expand All @@ -76,6 +88,8 @@ async fn main() {
pg_pool_connections: args.pg_pool_connections,
pg_polling_interval: args.pg_polling_interval,
worker_thread_count: args.worker_thread_count,
pg_timeout: args.pg_timeout,
pg_auto_explain_with_min_duration: args.pg_auto_explain_with_min_duration,
};

if let Err(err) = telemetry::setup_otlp(&args.service_name) {
Expand All @@ -84,7 +98,11 @@ async fn main() {
}

let cancel_token = CancellationToken::new();
let service = ZkProofService::create(conf, cancel_token.child_token()).await;
let Some(service) = ZkProofService::create(conf, cancel_token.child_token()).await else {
error!("Failed to create zkproof service");
std::process::exit(1);
};

let service = Arc::new(service);

let http_server = HttpServer::new(
Expand Down
17 changes: 15 additions & 2 deletions coprocessor/fhevm-engine/zkproof-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ pub mod auxiliary;
mod tests;

pub mod verifier;
use std::io;
use std::{io, time::Duration};

use fhevm_engine_common::types::FhevmError;
use fhevm_engine_common::{pg_pool::ServiceError, types::FhevmError};
use thiserror::Error;

/// The highest index of an input is 254,
Expand Down Expand Up @@ -55,13 +55,26 @@ pub enum ExecutionError {
TooManyInputs(usize),
}

impl From<ExecutionError> for ServiceError {
fn from(err: ExecutionError) -> Self {
match err {
ExecutionError::DbError(e) => ServiceError::Database(e),

// collapse everything else into InternalError
other => ServiceError::InternalError(other.to_string()),
}
}
}

#[derive(Default, Debug, Clone)]
pub struct Config {
pub database_url: String,
pub listen_database_channel: String,
pub notify_database_channel: String,
pub pg_pool_connections: u32,
pub pg_polling_interval: u32,
pub pg_timeout: Duration,
pub pg_auto_explain_with_min_duration: Option<Duration>,

pub worker_thread_count: u32,
}
21 changes: 12 additions & 9 deletions coprocessor/fhevm-engine/zkproof-worker/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ mod utils;
#[tokio::test]
#[serial(db)]
async fn test_verify_proof() {
let (pool, _instance) = utils::setup().await.expect("valid setup");
let (pool_mngr, _instance) = utils::setup().await.expect("valid setup");
let pool = pool_mngr.pool();

// Generate Valid ZkPok
let aux: (crate::auxiliary::ZkData, [u8; 92]) =
Expand Down Expand Up @@ -42,7 +43,8 @@ async fn test_verify_proof() {
#[tokio::test]
#[serial(db)]
async fn test_verify_empty_input_list() {
let (pool, _instance) = utils::setup().await.expect("valid setup");
let (pool_mngr, _instance) = utils::setup().await.expect("valid setup");
let pool = pool_mngr.pool();

let aux: (crate::auxiliary::ZkData, [u8; 92]) =
utils::aux_fixture(ACL_CONTRACT_ADDR.to_owned());
Expand All @@ -61,7 +63,8 @@ async fn test_verify_empty_input_list() {
#[tokio::test]
#[serial(db)]
async fn test_max_input_index() {
let (db, _instance) = utils::setup().await.expect("valid setup");
let (pool_mngr, _instance) = utils::setup().await.expect("valid setup");
let pool = pool_mngr.pool();

let aux: (crate::auxiliary::ZkData, [u8; 92]) =
utils::aux_fixture(ACL_CONTRACT_ADDR.to_owned());
Expand All @@ -70,11 +73,11 @@ async fn test_max_input_index() {
let inputs = vec![utils::ZkInput::U8(1); MAX_INPUT_INDEX as usize + 2];

assert!(!utils::is_valid(
&db,
&pool,
utils::insert_proof(
&db,
&pool,
101,
&utils::generate_zk_pok_with_inputs(&db, &aux.1, &inputs).await,
&utils::generate_zk_pok_with_inputs(&pool, &aux.1, &inputs).await,
&aux.0
)
.await
Expand All @@ -87,11 +90,11 @@ async fn test_max_input_index() {
// Test with highest number of inputs - 255
let inputs = vec![utils::ZkInput::U64(2); MAX_INPUT_INDEX as usize + 1];
assert!(utils::is_valid(
&db,
&pool,
utils::insert_proof(
&db,
&pool,
102,
&utils::generate_zk_pok_with_inputs(&db, &aux.1, &inputs).await,
&utils::generate_zk_pok_with_inputs(&pool, &aux.1, &inputs).await,
&aux.0
)
.await
Expand Down
29 changes: 20 additions & 9 deletions coprocessor/fhevm-engine/zkproof-worker/src/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use fhevm_engine_common::pg_pool::PostgresPoolManager;
use fhevm_engine_common::{tenant_keys, utils::safe_serialize};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
Expand All @@ -7,7 +8,7 @@ use tokio::time::sleep;

use crate::auxiliary::ZkData;

pub async fn setup() -> anyhow::Result<(sqlx::PgPool, DBInstance)> {
pub async fn setup() -> anyhow::Result<(PostgresPoolManager, DBInstance)> {
let _ = tracing_subscriber::fmt().json().with_level(true).try_init();
let test_instance = test_harness::instance::setup_test_db(ImportMode::WithKeysNoSns)
.await
Expand All @@ -20,29 +21,39 @@ pub async fn setup() -> anyhow::Result<(sqlx::PgPool, DBInstance)> {
pg_pool_connections: 10,
pg_polling_interval: 60,
worker_thread_count: 1,
pg_timeout: Duration::from_secs(15),
pg_auto_explain_with_min_duration: None,
};

let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(10)
.connect(&conf.database_url)
.await?;
let pool_mngr = PostgresPoolManager::connect_pool(
test_instance.parent_token.child_token(),
conf.database_url.as_str(),
conf.pg_timeout,
conf.pg_pool_connections,
Duration::from_secs(2),
conf.pg_auto_explain_with_min_duration,
)
.await
.unwrap();

let pmngr = pool_mngr.clone();

sqlx::query("TRUNCATE TABLE verify_proofs")
.execute(&pool)
.execute(&pmngr.pool())
.await
.unwrap();

let last_active_at = Arc::new(RwLock::new(SystemTime::now()));
let db_pool = pool.clone();

tokio::spawn(async move {
crate::verifier::execute_verify_proofs_loop(db_pool, conf.clone(), last_active_at.clone())
crate::verifier::execute_verify_proofs_loop(pmngr, conf.clone(), last_active_at.clone())
.await
.unwrap();
});

sleep(Duration::from_secs(2)).await;

Ok((pool, test_instance))
Ok((pool_mngr, test_instance))
}

/// Checks if the proof is valid by querying the database continuously.
Expand Down
Loading
Loading