Skip to content

Commit 50f58fa

Browse files
committed
feat(coprocessor): add log level configurations
Add the `log-level` cmd line option to configure the log level. Also, make sure gw-listener and transaction-sender retry connecting to the node infinitely at startup as the `WsConnect` options don't apply to connect during provider creation. Finally, add structured logging to fhevm-listener instead of println!s. Also, added some TODOs to remove panics and, instead, propagate errors. Will be done in a separate PR. As a side note, using structured logging in all other services where it is applicable will be done in a separate PR as well (as we usually only utilize the `message` field in the log macros).
1 parent f64b19b commit 50f58fa

File tree

19 files changed

+302
-134
lines changed

19 files changed

+302
-134
lines changed

.github/workflows/coprocessor-gpu-tests.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ jobs:
8282
runs-on: ${{ needs.setup-instance.outputs.runner-name }}
8383
permissions:
8484
contents: read
85+
packages: read
8586
strategy:
8687
fail-fast: false
8788
# explicit include-based build matrix, of known valid options
@@ -140,6 +141,13 @@ jobs:
140141
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
141142
restore-keys: ${{ runner.os }}-cargo-
142143

144+
- name: Login to GitHub Container Registry
145+
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
146+
with:
147+
registry: ghcr.io
148+
username: ${{ github.actor }}
149+
password: ${{ secrets.GITHUB_TOKEN }}
150+
143151
- name: Init database
144152
run: make init_db
145153
working-directory: coprocessor/fhevm-engine/coprocessor

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/coprocessor/src/daemon_cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use clap::Parser;
2+
use tracing::Level;
23

34
#[derive(Parser, Debug, Clone)]
45
#[command(version, about, long_about = None)]
@@ -75,6 +76,13 @@ pub struct Args {
7576
/// Coprocessor service name in OTLP traces
7677
#[arg(long, default_value = "coprocessor")]
7778
pub service_name: String,
79+
80+
/// Log level for the application
81+
#[arg(
82+
long,
83+
value_parser = clap::value_parser!(Level),
84+
default_value_t = Level::INFO)]
85+
pub log_level: Level,
7886
}
7987

8088
pub fn parse_args() -> Args {

coprocessor/fhevm-engine/coprocessor/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ pub async fn async_main(
5151
args: daemon_cli::Args,
5252
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
5353
TRACING_INIT.call_once(|| {
54-
tracing_subscriber::fmt().json().with_level(true).init();
54+
tracing_subscriber::fmt()
55+
.json()
56+
.with_level(true)
57+
.with_max_level(args.log_level)
58+
.init();
5559
});
5660

5761
info!(target: "async_main", "Starting runtime with args: {:?}", args);

coprocessor/fhevm-engine/coprocessor/src/tests/operators_from_events.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
use alloy::primitives::{Bytes, FixedBytes, Log};
1+
use alloy::primitives::{FixedBytes, Log};
22
use bigdecimal::num_bigint::BigInt;
33

44
use fhevm_listener::contracts::TfheContract;
55
use fhevm_listener::contracts::TfheContract::TfheContractEvents;
6-
use fhevm_listener::database::tfhe_event_propagate::{ClearConst, Database as ListenerDatabase, Handle, ToType};
7-
6+
use fhevm_listener::database::tfhe_event_propagate::{
7+
ClearConst, Database as ListenerDatabase, Handle, ToType,
8+
};
89

910
use crate::tests::operators::{generate_binary_test_cases, generate_unary_test_cases};
1011
use crate::tests::utils::{decrypt_ciphertexts, wait_until_all_ciphertexts_computed};
@@ -46,11 +47,6 @@ fn as_scalar_uint(big_int: &BigInt) -> ClearConst {
4647
ClearConst::from_be_slice(&bytes)
4748
}
4849

49-
fn to_bytes(big_int: &BigInt) -> Bytes {
50-
let (_, bytes) = big_int.to_bytes_be();
51-
Bytes::copy_from_slice(&bytes)
52-
}
53-
5450
fn to_ty(ty: i32) -> ToType {
5551
ToType::from(ty as u8)
5652
}

coprocessor/fhevm-engine/coprocessor/src/tests/utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::collections::BTreeMap;
77
use std::sync::atomic::{AtomicU16, Ordering};
88
use testcontainers::{core::WaitFor, runners::AsyncRunner, GenericImage, ImageExt};
99
use tokio::sync::watch::Receiver;
10+
use tracing::Level;
1011

1112
pub struct TestInstance {
1213
// just to destroy container
@@ -102,6 +103,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
102103
maximimum_compact_inputs_upload: 10,
103104
coprocessor_private_key: "./coprocessor.key".to_string(),
104105
service_name: "coprocessor".to_string(),
106+
log_level: Level::INFO,
105107
};
106108

107109
std::thread::spawn(move || {

coprocessor/fhevm-engine/fhevm-listener/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ rustls = { workspace = true }
2626
serde = { workspace = true }
2727
sqlx = { workspace = true }
2828
tokio = { workspace = true }
29+
tracing = { workspace = true }
30+
tracing-subscriber = { workspace = true }
2931

3032
# local dependencies
3133
fhevm-engine-common = { path = "../fhevm-engine-common" }

coprocessor/fhevm-engine/fhevm-listener/src/bin/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,12 @@ use clap::Parser;
33
#[tokio::main]
44
async fn main() {
55
let args = fhevm_listener::cmd::Args::parse();
6+
7+
tracing_subscriber::fmt()
8+
.json()
9+
.with_level(true)
10+
.with_max_level(args.log_level)
11+
.init();
12+
613
fhevm_listener::cmd::main(args).await;
714
}

coprocessor/fhevm-engine/fhevm-listener/src/cmd/mod.rs

Lines changed: 64 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use sqlx::types::Uuid;
77
use std::collections::VecDeque;
88
use std::str::FromStr;
99
use std::time::Duration;
10+
use tracing::{error, info, warn, Level};
1011

1112
use alloy::primitives::Address;
1213
use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect};
@@ -75,6 +76,12 @@ pub struct Args {
7576
help = "Initial block time, refined on each block"
7677
)]
7778
pub initial_block_time: u64,
79+
80+
#[arg(
81+
long,
82+
value_parser = clap::value_parser!(Level),
83+
default_value_t = Level::INFO)]
84+
pub log_level: Level,
7885
}
7986

8087
type RProvider = FillProvider<
@@ -143,6 +150,7 @@ impl InfiniteLogIter {
143150
}
144151

145152
async fn get_chain_id_or_panic(&self) -> ChainId {
153+
// TODO: remove expect and, instead, propagate the error
146154
let ws = WsConnect::new(&self.url);
147155
let provider = ProviderBuilder::new()
148156
.connect_ws(ws)
@@ -192,7 +200,7 @@ impl InfiniteLogIter {
192200

193201
async fn recheck_prev_block(&mut self) -> bool {
194202
let Some(provider) = &self.provider else {
195-
eprintln!("No provider, inconsistent state");
203+
error!("No provider, inconsistent state");
196204
return false;
197205
};
198206
let Some(event) = &self.prev_event else {
@@ -220,10 +228,11 @@ impl InfiniteLogIter {
220228
if logs.len() as u64 == last_block_event_count {
221229
return false;
222230
}
223-
eprintln!(
224-
"Replaying Block {block} with {} events (vs {})",
225-
logs.len(),
226-
last_block_event_count
231+
info!(
232+
block = block,
233+
events_count = logs.len(),
234+
last_block_event_count = last_block_event_count,
235+
"Replaying Block"
227236
);
228237
self.catchup_logs.extend(logs);
229238
if let Some(event) = self.current_event.take() {
@@ -236,8 +245,7 @@ impl InfiniteLogIter {
236245
async fn new_log_stream(&mut self, not_initialized: bool) {
237246
let mut retry = 20;
238247
loop {
239-
let ws = WsConnect::new(&self.url)
240-
.with_max_retries(0); // disabled, alloy skips events
248+
let ws = WsConnect::new(&self.url).with_max_retries(0); // disabled, alloy skips events
241249

242250
match ProviderBuilder::new().connect_ws(ws).await {
243251
Ok(provider) => {
@@ -252,8 +260,8 @@ impl InfiniteLogIter {
252260
if !self.contract_addresses.is_empty() {
253261
filter = filter.address(self.contract_addresses.clone())
254262
}
255-
eprintln!("Listening on {}", &self.url);
256-
eprintln!("Contracts {:?}", &self.contract_addresses);
263+
info!(url = %self.url, "Listening on");
264+
info!(contracts = ?self.contract_addresses, "Contracts addresses");
257265
// note subcribing to real-time before reading catchup
258266
// events to have the minimal gap between the two
259267
// TODO: but it does not guarantee no gap for now
@@ -272,6 +280,12 @@ impl InfiniteLogIter {
272280
Err(err) => {
273281
let delay = if not_initialized {
274282
if retry == 0 {
283+
// TODO: remove panic and, instead, propagate the error
284+
error!(
285+
url = %self.url,
286+
error = %err,
287+
"Cannot connect",
288+
);
275289
panic!(
276290
"Cannot connect to {} due to {err}.",
277291
&self.url
@@ -282,14 +296,19 @@ impl InfiniteLogIter {
282296
1
283297
};
284298
if not_initialized {
285-
eprintln!(
286-
"Cannot connect to {} due to {err}. Will retry in {delay} secs, {retry} times.",
287-
&self.url
299+
warn!(
300+
url = %self.url,
301+
error = %err,
302+
delay_secs = delay,
303+
retry = retry,
304+
"Cannot connect. Will retry",
288305
);
289306
} else {
290-
eprintln!(
291-
"Cannot connect to {} due to {err}. Will retry in {delay} secs, indefinitively.",
292-
&self.url
307+
warn!(
308+
url = %self.url,
309+
error = %err,
310+
delay_secs = delay,
311+
"Cannot connect. Will retry infinitely",
293312
);
294313
}
295314
retry -= 1;
@@ -301,7 +320,7 @@ impl InfiniteLogIter {
301320

302321
async fn next_event_or_block_end(&mut self) -> LogOrBlockTimeout {
303322
let Some(stream) = &mut self.stream else {
304-
eprintln!("No stream, inconsistent state");
323+
error!("No stream, inconsistent state");
305324
return LogOrBlockTimeout::Log(None); // simulate a stream end to
306325
// force reinit
307326
};
@@ -331,7 +350,7 @@ impl InfiniteLogIter {
331350
};
332351
if let Some(log) = self.catchup_logs.pop_front() {
333352
if self.catchup_logs.is_empty() {
334-
eprintln!("Going back to real-time events");
353+
info!("Going back to real-time events");
335354
};
336355
self.current_event = Some(log);
337356
break;
@@ -348,7 +367,7 @@ impl InfiniteLogIter {
348367
return None;
349368
}
350369
}
351-
eprintln!("Nothing to read, retrying");
370+
info!("Nothing to read, retrying");
352371
tokio::time::sleep(Duration::from_secs(1)).await;
353372
continue;
354373
}
@@ -415,18 +434,22 @@ pub async fn main(args: Args) {
415434

416435
if let Some(acl_contract_address) = &args.acl_contract_address {
417436
if let Err(err) = Address::from_str(acl_contract_address) {
437+
// TODO: remove panic and, instead, propagate the error
438+
error!(error = %err, "Invalid ACL contract address");
418439
panic!("Invalid acl contract address: {err}");
419440
};
420441
};
421442
if let Some(tfhe_contract_address) = &args.tfhe_contract_address {
422443
if let Err(err) = Address::from_str(tfhe_contract_address) {
423-
panic!("Invalid tfhe contract address: {err}");
444+
// TODO: remove panic and, instead, propagate the error
445+
error!(error = %err, "Invalid TFHE contract address");
446+
panic!("Invalid TFHE contract address: {err}");
424447
};
425448
}
426449

427450
let mut log_iter = InfiniteLogIter::new(&args);
428451
let chain_id = log_iter.get_chain_id_or_panic().await;
429-
eprintln!("Chain ID: {chain_id}");
452+
info!(chain_id = chain_id, "Chain ID");
430453

431454
let mut db = if !args.database_url.is_empty() {
432455
if let Some(coprocessor_api_key) = args.coprocessor_api_key {
@@ -444,6 +467,8 @@ pub async fn main(args: Args) {
444467
}
445468
Some(db)
446469
} else {
470+
// TODO: remove panic and, instead, propagate the error
471+
error!("A Coprocessor API key is required to access the database");
447472
panic!("A Coprocessor API key is required to access the database");
448473
}
449474
} else {
@@ -452,46 +477,47 @@ pub async fn main(args: Args) {
452477

453478
log_iter.new_log_stream(true).await;
454479

455-
let mut block_error_event_fthe = 0;
480+
let mut block_tfhe_errors = 0;
456481
while let Some(log) = log_iter.next().await {
457482
if log_iter.is_first_of_block() {
458483
log_iter.reestimated_block_time();
459484
if let Some(block_number) = log.block_number {
460-
if block_error_event_fthe == 0 {
485+
if block_tfhe_errors == 0 {
461486
if let Some(ref mut db) = db {
462-
let last_valid_block = db.mark_prev_block_as_valid(
463-
&log_iter.current_event,
464-
&log_iter.prev_event,
465-
)
466-
.await;
487+
let last_valid_block = db
488+
.mark_prev_block_as_valid(
489+
&log_iter.current_event,
490+
&log_iter.prev_event,
491+
)
492+
.await;
467493
if last_valid_block.is_some() {
468494
log_iter.last_valid_block = last_valid_block;
469495
}
470496
}
471497
} else {
472-
eprintln!(
473-
"Errors in tfhe events: {block_error_event_fthe}"
498+
error!(
499+
block_tfhe_errors = block_tfhe_errors,
500+
"Errors in tfhe events"
474501
);
475-
block_error_event_fthe = 0;
502+
block_tfhe_errors = 0;
476503
}
477-
eprintln!("\n--------------------");
478-
eprintln!("Block {block_number}");
504+
info!(block = block_number, "Block");
479505
}
480506
};
481-
if block_error_event_fthe > 0 {
482-
eprintln!("Errors in block {block_error_event_fthe}");
507+
if block_tfhe_errors > 0 {
508+
error!(block_tfhe_errors = block_tfhe_errors, "Errors in block");
483509
}
484510
if !args.ignore_tfhe_events {
485511
if let Ok(event) =
486512
TfheContract::TfheContractEvents::decode_log(&log.inner)
487513
{
488514
// TODO: filter on contract address if known
489-
println!("TFHE {event:#?}");
515+
info!(tfhe_event = ?event, "TFHE event");
490516
if let Some(ref mut db) = db {
491517
let res = db.insert_tfhe_event(&event).await;
492518
if let Err(err) = res {
493-
block_error_event_fthe += 1;
494-
eprintln!("Error inserting tfhe event: {err}");
519+
block_tfhe_errors += 1;
520+
error!(error = %err, "Error inserting tfhe event");
495521
}
496522
}
497523
continue;
@@ -501,7 +527,7 @@ pub async fn main(args: Args) {
501527
if let Ok(event) =
502528
AclContract::AclContractEvents::decode_log(&log.inner)
503529
{
504-
println!("ACL {event:#?}");
530+
info!(acl_event = ?event, "ACL event");
505531
if let Some(ref mut db) = db {
506532
let _ = db.handle_acl_event(&event).await;
507533
}

0 commit comments

Comments
 (0)