Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/coprocessor-gpu-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ jobs:
runs-on: ${{ needs.setup-instance.outputs.runner-name }}
permissions:
contents: read
packages: read
strategy:
fail-fast: false
# explicit include-based build matrix, of known valid options
Expand Down Expand Up @@ -140,6 +141,13 @@ jobs:
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
restore-keys: ${{ runner.os }}-cargo-

- name: Login to GitHub Container Registry
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Init database
run: make init_db
working-directory: coprocessor/fhevm-engine/coprocessor
Expand Down
2 changes: 2 additions & 0 deletions coprocessor/fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions coprocessor/fhevm-engine/coprocessor/src/daemon_cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::Parser;
use tracing::Level;

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
Expand Down Expand Up @@ -75,6 +76,13 @@ pub struct Args {
/// Coprocessor service name in OTLP traces
#[arg(long, default_value = "coprocessor")]
pub service_name: String,

/// Log level for the application
#[arg(
long,
value_parser = clap::value_parser!(Level),
default_value_t = Level::INFO)]
pub log_level: Level,
}

pub fn parse_args() -> Args {
Expand Down
6 changes: 5 additions & 1 deletion coprocessor/fhevm-engine/coprocessor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ pub async fn async_main(
args: daemon_cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
TRACING_INIT.call_once(|| {
tracing_subscriber::fmt().json().with_level(true).init();
tracing_subscriber::fmt()
.json()
.with_level(true)
.with_max_level(args.log_level)
Comment thread
dartdart26 marked this conversation as resolved.
.init();
});

info!(target: "async_main", "Starting runtime with args: {:?}", args);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use alloy::primitives::{Bytes, FixedBytes, Log};
use alloy::primitives::{FixedBytes, Log};
use bigdecimal::num_bigint::BigInt;

use fhevm_listener::contracts::TfheContract;
use fhevm_listener::contracts::TfheContract::TfheContractEvents;
use fhevm_listener::database::tfhe_event_propagate::{ClearConst, Database as ListenerDatabase, Handle, ToType};

use fhevm_listener::database::tfhe_event_propagate::{
ClearConst, Database as ListenerDatabase, Handle, ToType,
};

use crate::tests::operators::{generate_binary_test_cases, generate_unary_test_cases};
use crate::tests::utils::{decrypt_ciphertexts, wait_until_all_ciphertexts_computed};
Expand Down Expand Up @@ -46,11 +47,6 @@ fn as_scalar_uint(big_int: &BigInt) -> ClearConst {
ClearConst::from_be_slice(&bytes)
}

fn to_bytes(big_int: &BigInt) -> Bytes {
let (_, bytes) = big_int.to_bytes_be();
Bytes::copy_from_slice(&bytes)
}

fn to_ty(ty: i32) -> ToType {
ToType::from(ty as u8)
}
Expand Down
2 changes: 2 additions & 0 deletions coprocessor/fhevm-engine/coprocessor/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU16, Ordering};
use testcontainers::{core::WaitFor, runners::AsyncRunner, GenericImage, ImageExt};
use tokio::sync::watch::Receiver;
use tracing::Level;

pub struct TestInstance {
// just to destroy container
Expand Down Expand Up @@ -102,6 +103,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
maximimum_compact_inputs_upload: 10,
coprocessor_private_key: "./coprocessor.key".to_string(),
service_name: "coprocessor".to_string(),
log_level: Level::INFO,
};

std::thread::spawn(move || {
Expand Down
2 changes: 2 additions & 0 deletions coprocessor/fhevm-engine/fhevm-listener/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ rustls = { workspace = true }
serde = { workspace = true }
sqlx = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

# local dependencies
fhevm-engine-common = { path = "../fhevm-engine-common" }
Expand Down
7 changes: 7 additions & 0 deletions coprocessor/fhevm-engine/fhevm-listener/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,12 @@ use clap::Parser;
#[tokio::main]
async fn main() {
let args = fhevm_listener::cmd::Args::parse();

tracing_subscriber::fmt()
.json()
.with_level(true)
.with_max_level(args.log_level)
.init();

fhevm_listener::cmd::main(args).await;
}
102 changes: 64 additions & 38 deletions coprocessor/fhevm-engine/fhevm-listener/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use sqlx::types::Uuid;
use std::collections::VecDeque;
use std::str::FromStr;
use std::time::Duration;
use tracing::{error, info, warn, Level};

use alloy::primitives::Address;
use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect};
Expand Down Expand Up @@ -75,6 +76,12 @@ pub struct Args {
help = "Initial block time, refined on each block"
)]
pub initial_block_time: u64,

#[arg(
long,
value_parser = clap::value_parser!(Level),
default_value_t = Level::INFO)]
pub log_level: Level,
}

type RProvider = FillProvider<
Expand Down Expand Up @@ -143,6 +150,7 @@ impl InfiniteLogIter {
}

async fn get_chain_id_or_panic(&self) -> ChainId {
// TODO: remove expect and, instead, propagate the error
Comment thread
dartdart26 marked this conversation as resolved.
let ws = WsConnect::new(&self.url);
let provider = ProviderBuilder::new()
.connect_ws(ws)
Expand Down Expand Up @@ -192,7 +200,7 @@ impl InfiniteLogIter {

async fn recheck_prev_block(&mut self) -> bool {
let Some(provider) = &self.provider else {
eprintln!("No provider, inconsistent state");
error!("No provider, inconsistent state");
return false;
};
let Some(event) = &self.prev_event else {
Expand Down Expand Up @@ -220,10 +228,11 @@ impl InfiniteLogIter {
if logs.len() as u64 == last_block_event_count {
return false;
}
eprintln!(
"Replaying Block {block} with {} events (vs {})",
logs.len(),
last_block_event_count
info!(
block = block,
events_count = logs.len(),
last_block_event_count = last_block_event_count,
"Replaying Block"
);
self.catchup_logs.extend(logs);
if let Some(event) = self.current_event.take() {
Expand All @@ -236,8 +245,7 @@ impl InfiniteLogIter {
async fn new_log_stream(&mut self, not_initialized: bool) {
let mut retry = 20;
loop {
let ws = WsConnect::new(&self.url)
.with_max_retries(0); // disabled, alloy skips events
let ws = WsConnect::new(&self.url).with_max_retries(0); // disabled, alloy skips events

match ProviderBuilder::new().connect_ws(ws).await {
Ok(provider) => {
Expand All @@ -252,8 +260,8 @@ impl InfiniteLogIter {
if !self.contract_addresses.is_empty() {
filter = filter.address(self.contract_addresses.clone())
}
eprintln!("Listening on {}", &self.url);
eprintln!("Contracts {:?}", &self.contract_addresses);
info!(url = %self.url, "Listening on");
info!(contracts = ?self.contract_addresses, "Contracts addresses");
// note subcribing to real-time before reading catchup
// events to have the minimal gap between the two
// TODO: but it does not guarantee no gap for now
Expand All @@ -272,6 +280,12 @@ impl InfiniteLogIter {
Err(err) => {
let delay = if not_initialized {
if retry == 0 {
// TODO: remove panic and, instead, propagate the error
Comment thread
dartdart26 marked this conversation as resolved.
error!(
url = %self.url,
error = %err,
"Cannot connect",
);
panic!(
"Cannot connect to {} due to {err}.",
&self.url
Expand All @@ -282,14 +296,19 @@ impl InfiniteLogIter {
1
};
if not_initialized {
eprintln!(
"Cannot connect to {} due to {err}. Will retry in {delay} secs, {retry} times.",
&self.url
warn!(
url = %self.url,
error = %err,
delay_secs = delay,
retry = retry,
"Cannot connect. Will retry",
);
} else {
eprintln!(
"Cannot connect to {} due to {err}. Will retry in {delay} secs, indefinitively.",
&self.url
warn!(
url = %self.url,
error = %err,
delay_secs = delay,
"Cannot connect. Will retry infinitely",
);
}
retry -= 1;
Expand All @@ -301,7 +320,7 @@ impl InfiniteLogIter {

async fn next_event_or_block_end(&mut self) -> LogOrBlockTimeout {
let Some(stream) = &mut self.stream else {
eprintln!("No stream, inconsistent state");
error!("No stream, inconsistent state");
return LogOrBlockTimeout::Log(None); // simulate a stream end to
// force reinit
};
Expand Down Expand Up @@ -331,7 +350,7 @@ impl InfiniteLogIter {
};
if let Some(log) = self.catchup_logs.pop_front() {
if self.catchup_logs.is_empty() {
eprintln!("Going back to real-time events");
info!("Going back to real-time events");
};
self.current_event = Some(log);
break;
Expand All @@ -348,7 +367,7 @@ impl InfiniteLogIter {
return None;
}
}
eprintln!("Nothing to read, retrying");
info!("Nothing to read, retrying");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
Expand Down Expand Up @@ -415,18 +434,22 @@ pub async fn main(args: Args) {

if let Some(acl_contract_address) = &args.acl_contract_address {
if let Err(err) = Address::from_str(acl_contract_address) {
// TODO: remove panic and, instead, propagate the error
error!(error = %err, "Invalid ACL contract address");
panic!("Invalid acl contract address: {err}");
};
};
if let Some(tfhe_contract_address) = &args.tfhe_contract_address {
if let Err(err) = Address::from_str(tfhe_contract_address) {
panic!("Invalid tfhe contract address: {err}");
// TODO: remove panic and, instead, propagate the error
error!(error = %err, "Invalid TFHE contract address");
panic!("Invalid TFHE contract address: {err}");
};
}

let mut log_iter = InfiniteLogIter::new(&args);
let chain_id = log_iter.get_chain_id_or_panic().await;
eprintln!("Chain ID: {chain_id}");
info!(chain_id = chain_id, "Chain ID");

let mut db = if !args.database_url.is_empty() {
if let Some(coprocessor_api_key) = args.coprocessor_api_key {
Expand All @@ -444,6 +467,8 @@ pub async fn main(args: Args) {
}
Some(db)
} else {
// TODO: remove panic and, instead, propagate the error
error!("A Coprocessor API key is required to access the database");
panic!("A Coprocessor API key is required to access the database");
}
} else {
Expand All @@ -452,46 +477,47 @@ pub async fn main(args: Args) {

log_iter.new_log_stream(true).await;

let mut block_error_event_fthe = 0;
let mut block_tfhe_errors = 0;
while let Some(log) = log_iter.next().await {
if log_iter.is_first_of_block() {
log_iter.reestimated_block_time();
if let Some(block_number) = log.block_number {
if block_error_event_fthe == 0 {
if block_tfhe_errors == 0 {
if let Some(ref mut db) = db {
let last_valid_block = db.mark_prev_block_as_valid(
&log_iter.current_event,
&log_iter.prev_event,
)
.await;
let last_valid_block = db
.mark_prev_block_as_valid(
&log_iter.current_event,
&log_iter.prev_event,
)
.await;
if last_valid_block.is_some() {
log_iter.last_valid_block = last_valid_block;
}
}
} else {
eprintln!(
"Errors in tfhe events: {block_error_event_fthe}"
error!(
block_tfhe_errors = block_tfhe_errors,
"Errors in tfhe events"
);
block_error_event_fthe = 0;
block_tfhe_errors = 0;
}
eprintln!("\n--------------------");
eprintln!("Block {block_number}");
info!(block = block_number, "Block");
}
};
if block_error_event_fthe > 0 {
eprintln!("Errors in block {block_error_event_fthe}");
if block_tfhe_errors > 0 {
error!(block_tfhe_errors = block_tfhe_errors, "Errors in block");
}
if !args.ignore_tfhe_events {
if let Ok(event) =
TfheContract::TfheContractEvents::decode_log(&log.inner)
{
// TODO: filter on contract address if known
println!("TFHE {event:#?}");
info!(tfhe_event = ?event, "TFHE event");
if let Some(ref mut db) = db {
let res = db.insert_tfhe_event(&event).await;
if let Err(err) = res {
block_error_event_fthe += 1;
eprintln!("Error inserting tfhe event: {err}");
block_tfhe_errors += 1;
error!(error = %err, "Error inserting tfhe event");
}
}
continue;
Expand All @@ -501,7 +527,7 @@ pub async fn main(args: Args) {
if let Ok(event) =
AclContract::AclContractEvents::decode_log(&log.inner)
{
println!("ACL {event:#?}");
info!(acl_event = ?event, "ACL event");
if let Some(ref mut db) = db {
let _ = db.handle_acl_event(&event).await;
}
Expand Down
Loading
Loading