Skip to content

Commit 67e1a47

Browse files
committed
feat(coprocessor): add log level configurations
Add the `log-level` cmd line option to configure the log level. Also, make sure gw-listener and transaction-sender retry connecting to the node infinitely at startup as the `WsConnect` options don't apply to connect during provider creation. Finally, add structured logging to fhevm-listener instead of println!s. Also, added some TODOs to remove panics and, instead, propagate errors. Will be done in a separate PR. As a side note, using structured logging in all other services where it is applicable will be done in a separate PR as well (as we usually only utilize the `message` field in the log macros).
1 parent fa0170f commit 67e1a47

File tree

13 files changed

+212
-86
lines changed

13 files changed

+212
-86
lines changed

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/coprocessor/src/daemon_cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use clap::Parser;
2+
use tracing::Level;
23

34
#[derive(Parser, Debug, Clone)]
45
#[command(version, about, long_about = None)]
@@ -75,6 +76,13 @@ pub struct Args {
7576
/// Coprocessor service name in OTLP traces
7677
#[arg(long, default_value = "coprocessor")]
7778
pub service_name: String,
79+
80+
/// Log level for the application
81+
#[arg(
82+
long,
83+
value_parser = clap::value_parser!(Level),
84+
default_value_t = Level::INFO)]
85+
pub log_level: Level,
7886
}
7987

8088
pub fn parse_args() -> Args {

coprocessor/fhevm-engine/coprocessor/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ pub async fn async_main(
5151
args: daemon_cli::Args,
5252
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
5353
TRACING_INIT.call_once(|| {
54-
tracing_subscriber::fmt().json().with_level(true).init();
54+
tracing_subscriber::fmt()
55+
.json()
56+
.with_level(true)
57+
.with_max_level(args.log_level)
58+
.init();
5559
});
5660

5761
info!(target: "async_main", "Starting runtime with args: {:?}", args);

coprocessor/fhevm-engine/fhevm-listener/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ rustls = { workspace = true }
2626
serde = { workspace = true }
2727
sqlx = { workspace = true }
2828
tokio = { workspace = true }
29+
tracing = { workspace = true }
30+
tracing-subscriber = { workspace = true }
2931

3032
# local dependencies
3133
fhevm-engine-common = { path = "../fhevm-engine-common" }

coprocessor/fhevm-engine/fhevm-listener/src/cmd/mod.rs

Lines changed: 70 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use sqlx::types::Uuid;
77
use std::collections::VecDeque;
88
use std::str::FromStr;
99
use std::time::Duration;
10+
use tracing::{error, info, warn, Level};
1011

1112
use alloy::primitives::Address;
1213
use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect};
@@ -75,6 +76,12 @@ pub struct Args {
7576
help = "Initial block time, refined on each block"
7677
)]
7778
pub initial_block_time: u64,
79+
80+
#[arg(
81+
long,
82+
value_parser = clap::value_parser!(Level),
83+
default_value_t = Level::INFO)]
84+
pub log_level: Level,
7885
}
7986

8087
type RProvider = FillProvider<
@@ -143,6 +150,7 @@ impl InfiniteLogIter {
143150
}
144151

145152
async fn get_chain_id_or_panic(&self) -> ChainId {
153+
// TODO: remove expect and, instead, propagate the error
146154
let ws = WsConnect::new(&self.url);
147155
let provider = ProviderBuilder::new()
148156
.connect_ws(ws)
@@ -192,7 +200,7 @@ impl InfiniteLogIter {
192200

193201
async fn recheck_prev_block(&mut self) -> bool {
194202
let Some(provider) = &self.provider else {
195-
eprintln!("No provider, inconsistent state");
203+
error!("No provider, inconsistent state");
196204
return false;
197205
};
198206
let Some(event) = &self.prev_event else {
@@ -220,10 +228,11 @@ impl InfiniteLogIter {
220228
if logs.len() as u64 == last_block_event_count {
221229
return false;
222230
}
223-
eprintln!(
224-
"Replaying Block {block} with {} events (vs {})",
225-
logs.len(),
226-
last_block_event_count
231+
info!(
232+
block = block,
233+
events_count = logs.len(),
234+
last_block_event_count = last_block_event_count,
235+
"Replaying Block"
227236
);
228237
self.catchup_logs.extend(logs);
229238
if let Some(event) = self.current_event.take() {
@@ -236,8 +245,7 @@ impl InfiniteLogIter {
236245
async fn new_log_stream(&mut self, not_initialized: bool) {
237246
let mut retry = 20;
238247
loop {
239-
let ws = WsConnect::new(&self.url)
240-
.with_max_retries(0); // disabled, alloy skips events
248+
let ws = WsConnect::new(&self.url).with_max_retries(0); // disabled, alloy skips events
241249

242250
match ProviderBuilder::new().connect_ws(ws).await {
243251
Ok(provider) => {
@@ -252,8 +260,8 @@ impl InfiniteLogIter {
252260
if !self.contract_addresses.is_empty() {
253261
filter = filter.address(self.contract_addresses.clone())
254262
}
255-
eprintln!("Listening on {}", &self.url);
256-
eprintln!("Contracts {:?}", &self.contract_addresses);
263+
info!(url = %self.url, "Listening on");
264+
info!(contracts = ?self.contract_addresses, "Contracts addresses");
257265
// note subcribing to real-time before reading catchup
258266
// events to have the minimal gap between the two
259267
// TODO: but it does not guarantee no gap for now
@@ -272,6 +280,12 @@ impl InfiniteLogIter {
272280
Err(err) => {
273281
let delay = if not_initialized {
274282
if retry == 0 {
283+
// TODO: remove panic and, instead, propagate the error
284+
error!(
285+
url = %self.url,
286+
error = %err,
287+
"Cannot connect",
288+
);
275289
panic!(
276290
"Cannot connect to {} due to {err}.",
277291
&self.url
@@ -282,14 +296,19 @@ impl InfiniteLogIter {
282296
1
283297
};
284298
if not_initialized {
285-
eprintln!(
286-
"Cannot connect to {} due to {err}. Will retry in {delay} secs, {retry} times.",
287-
&self.url
299+
warn!(
300+
url = %self.url,
301+
error = %err,
302+
delay_secs = delay,
303+
retry = retry,
304+
"Cannot connect. Will retry",
288305
);
289306
} else {
290-
eprintln!(
291-
"Cannot connect to {} due to {err}. Will retry in {delay} secs, indefinitively.",
292-
&self.url
307+
warn!(
308+
url = %self.url,
309+
error = %err,
310+
delay_secs = delay,
311+
"Cannot connect. Will retry infinitely",
293312
);
294313
}
295314
retry -= 1;
@@ -301,7 +320,7 @@ impl InfiniteLogIter {
301320

302321
async fn next_event_or_block_end(&mut self) -> LogOrBlockTimeout {
303322
let Some(stream) = &mut self.stream else {
304-
eprintln!("No stream, inconsistent state");
323+
error!("No stream, inconsistent state");
305324
return LogOrBlockTimeout::Log(None); // simulate a stream end to
306325
// force reinit
307326
};
@@ -331,7 +350,7 @@ impl InfiniteLogIter {
331350
};
332351
if let Some(log) = self.catchup_logs.pop_front() {
333352
if self.catchup_logs.is_empty() {
334-
eprintln!("Going back to real-time events");
353+
info!("Going back to real-time events");
335354
};
336355
self.current_event = Some(log);
337356
break;
@@ -348,7 +367,7 @@ impl InfiniteLogIter {
348367
return None;
349368
}
350369
}
351-
eprintln!("Nothing to read, retrying");
370+
info!("Nothing to read, retrying");
352371
tokio::time::sleep(Duration::from_secs(1)).await;
353372
continue;
354373
}
@@ -411,22 +430,32 @@ impl InfiniteLogIter {
411430
}
412431

413432
pub async fn main(args: Args) {
433+
tracing_subscriber::fmt()
434+
.json()
435+
.with_level(true)
436+
.with_max_level(args.log_level)
437+
.init();
438+
414439
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
415440

416441
if let Some(acl_contract_address) = &args.acl_contract_address {
417442
if let Err(err) = Address::from_str(acl_contract_address) {
443+
// TODO: remove panic and, instead, propagate the error
444+
error!(error = %err, "Invalid ACL contract address");
418445
panic!("Invalid acl contract address: {err}");
419446
};
420447
};
421448
if let Some(tfhe_contract_address) = &args.tfhe_contract_address {
422449
if let Err(err) = Address::from_str(tfhe_contract_address) {
423-
panic!("Invalid tfhe contract address: {err}");
450+
// TODO: remove panic and, instead, propagate the error
451+
error!(error = %err, "Invalid TFHE contract address");
452+
panic!("Invalid TFHE contract address: {err}");
424453
};
425454
}
426455

427456
let mut log_iter = InfiniteLogIter::new(&args);
428457
let chain_id = log_iter.get_chain_id_or_panic().await;
429-
eprintln!("Chain ID: {chain_id}");
458+
info!(chain_id = chain_id, "Chain ID");
430459

431460
let mut db = if !args.database_url.is_empty() {
432461
if let Some(coprocessor_api_key) = args.coprocessor_api_key {
@@ -444,6 +473,8 @@ pub async fn main(args: Args) {
444473
}
445474
Some(db)
446475
} else {
476+
// TODO: remove panic and, instead, propagate the error
477+
error!("A Coprocessor API key is required to access the database");
447478
panic!("A Coprocessor API key is required to access the database");
448479
}
449480
} else {
@@ -452,46 +483,47 @@ pub async fn main(args: Args) {
452483

453484
log_iter.new_log_stream(true).await;
454485

455-
let mut block_error_event_fthe = 0;
486+
let mut block_tfhe_errors = 0;
456487
while let Some(log) = log_iter.next().await {
457488
if log_iter.is_first_of_block() {
458489
log_iter.reestimated_block_time();
459490
if let Some(block_number) = log.block_number {
460-
if block_error_event_fthe == 0 {
491+
if block_tfhe_errors == 0 {
461492
if let Some(ref mut db) = db {
462-
let last_valid_block = db.mark_prev_block_as_valid(
463-
&log_iter.current_event,
464-
&log_iter.prev_event,
465-
)
466-
.await;
493+
let last_valid_block = db
494+
.mark_prev_block_as_valid(
495+
&log_iter.current_event,
496+
&log_iter.prev_event,
497+
)
498+
.await;
467499
if last_valid_block.is_some() {
468500
log_iter.last_valid_block = last_valid_block;
469501
}
470502
}
471503
} else {
472-
eprintln!(
473-
"Errors in tfhe events: {block_error_event_fthe}"
504+
error!(
505+
block_tfhe_errors = block_tfhe_errors,
506+
"Errors in tfhe events"
474507
);
475-
block_error_event_fthe = 0;
508+
block_tfhe_errors = 0;
476509
}
477-
eprintln!("\n--------------------");
478-
eprintln!("Block {block_number}");
510+
info!(block = block_number, "Block");
479511
}
480512
};
481-
if block_error_event_fthe > 0 {
482-
eprintln!("Errors in block {block_error_event_fthe}");
513+
if block_tfhe_errors > 0 {
514+
error!(block_tfhe_errors = block_tfhe_errors, "Errors in block");
483515
}
484516
if !args.ignore_tfhe_events {
485517
if let Ok(event) =
486518
TfheContract::TfheContractEvents::decode_log(&log.inner)
487519
{
488520
// TODO: filter on contract address if known
489-
println!("TFHE {event:#?}");
521+
info!(tfhe_event = ?event, "TFHE event");
490522
if let Some(ref mut db) = db {
491523
let res = db.insert_tfhe_event(&event).await;
492524
if let Err(err) = res {
493-
block_error_event_fthe += 1;
494-
eprintln!("Error inserting tfhe event: {err}");
525+
block_tfhe_errors += 1;
526+
error!(error = %err, "Error inserting tfhe event");
495527
}
496528
}
497529
continue;
@@ -501,7 +533,7 @@ pub async fn main(args: Args) {
501533
if let Ok(event) =
502534
AclContract::AclContractEvents::decode_log(&log.inner)
503535
{
504-
println!("ACL {event:#?}");
536+
info!(acl_event = ?event, "ACL event");
505537
if let Some(ref mut db) = db {
506538
let _ = db.handle_acl_event(&event).await;
507539
}

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use gw_listener::ConfigSettings;
99
use humantime::parse_duration;
1010
use tokio::signal::unix::{signal, SignalKind};
1111
use tokio_util::sync::CancellationToken;
12-
use tracing::{error, info};
12+
use tracing::{error, info, Level};
1313

1414
#[derive(Parser, Debug, Clone)]
1515
#[command(version, about, long_about = None)]
@@ -47,6 +47,12 @@ struct Conf {
4747

4848
#[arg(long, default_value = "4s", value_parser = parse_duration)]
4949
provider_retry_interval: Duration,
50+
51+
#[arg(
52+
long,
53+
value_parser = clap::value_parser!(Level),
54+
default_value_t = Level::INFO)]
55+
log_level: Level,
5056
}
5157

5258
fn install_signal_handlers(cancel_token: CancellationToken) -> anyhow::Result<()> {
@@ -64,24 +70,43 @@ fn install_signal_handlers(cancel_token: CancellationToken) -> anyhow::Result<()
6470

6571
#[tokio::main]
6672
async fn main() -> anyhow::Result<()> {
67-
tracing_subscriber::fmt().json().with_level(true).init();
68-
6973
let conf = Conf::parse();
7074

75+
tracing_subscriber::fmt()
76+
.json()
77+
.with_level(true)
78+
.with_max_level(conf.log_level)
79+
.init();
80+
7181
info!("Starting gw_listener with configuration: {:?}", conf);
7282

7383
let database_url = conf
7484
.database_url
7585
.clone()
7686
.unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined"));
7787

78-
let provider = ProviderBuilder::new()
79-
.connect_ws(
80-
WsConnect::new(conf.gw_url.clone())
81-
.with_max_retries(conf.provider_max_retries)
82-
.with_retry_interval(conf.provider_retry_interval),
83-
)
84-
.await?;
88+
let provider = loop {
89+
match ProviderBuilder::new()
90+
.connect_ws(
91+
WsConnect::new(conf.gw_url.clone())
92+
.with_max_retries(conf.provider_max_retries)
93+
.with_retry_interval(conf.provider_retry_interval),
94+
)
95+
.await
96+
{
97+
Ok(provider) => {
98+
info!("Connected to Gateway at {}", conf.gw_url);
99+
break provider;
100+
}
101+
Err(e) => {
102+
error!(
103+
"Failed to connect to Gateway at {} on startup: {}, retrying in {:?}",
104+
conf.gw_url, e, conf.provider_retry_interval
105+
);
106+
tokio::time::sleep(conf.provider_retry_interval).await;
107+
}
108+
}
109+
};
85110

86111
let cancel_token = CancellationToken::new();
87112

@@ -116,7 +141,10 @@ async fn main() -> anyhow::Result<()> {
116141
// Install signal handlers
117142
install_signal_handlers(cancel_token.clone())?;
118143

119-
info!("Starting HTTP server on port {}", conf.health_check_port);
144+
info!(
145+
"Starting HTTP health check server on port {}",
146+
conf.health_check_port
147+
);
120148

121149
// Run both services concurrently - note we now have to deref the Arc for run()
122150
let (listener_result, http_result) = tokio::join!(gw_listener.run(), http_server.start());

0 commit comments

Comments
 (0)