@@ -7,6 +7,7 @@ use sqlx::types::Uuid;
77use std:: collections:: VecDeque ;
88use std:: str:: FromStr ;
99use std:: time:: Duration ;
10+ use tracing:: { error, info, warn, Level } ;
1011
1112use alloy:: primitives:: Address ;
1213use alloy:: providers:: { Provider , ProviderBuilder , RootProvider , WsConnect } ;
@@ -75,6 +76,12 @@ pub struct Args {
7576 help = "Initial block time, refined on each block"
7677 ) ]
7778 pub initial_block_time : u64 ,
79+
80+ #[ arg(
81+ long,
82+ value_parser = clap:: value_parser!( Level ) ,
83+ default_value_t = Level :: INFO ) ]
84+ pub log_level : Level ,
7885}
7986
8087type RProvider = FillProvider <
@@ -143,6 +150,7 @@ impl InfiniteLogIter {
143150 }
144151
145152 async fn get_chain_id_or_panic ( & self ) -> ChainId {
153+ // TODO: remove expect and, instead, propagate the error
146154 let ws = WsConnect :: new ( & self . url ) ;
147155 let provider = ProviderBuilder :: new ( )
148156 . connect_ws ( ws)
@@ -192,7 +200,7 @@ impl InfiniteLogIter {
192200
193201 async fn recheck_prev_block ( & mut self ) -> bool {
194202 let Some ( provider) = & self . provider else {
195- eprintln ! ( "No provider, inconsistent state" ) ;
203+ error ! ( "No provider, inconsistent state" ) ;
196204 return false ;
197205 } ;
198206 let Some ( event) = & self . prev_event else {
@@ -220,10 +228,11 @@ impl InfiniteLogIter {
220228 if logs. len ( ) as u64 == last_block_event_count {
221229 return false ;
222230 }
223- eprintln ! (
224- "Replaying Block {block} with {} events (vs {})" ,
225- logs. len( ) ,
226- last_block_event_count
231+ info ! (
232+ block = block,
233+ events_count = logs. len( ) ,
234+ last_block_event_count = last_block_event_count,
235+ "Replaying Block"
227236 ) ;
228237 self . catchup_logs . extend ( logs) ;
229238 if let Some ( event) = self . current_event . take ( ) {
@@ -236,8 +245,7 @@ impl InfiniteLogIter {
236245 async fn new_log_stream ( & mut self , not_initialized : bool ) {
237246 let mut retry = 20 ;
238247 loop {
239- let ws = WsConnect :: new ( & self . url )
240- . with_max_retries ( 0 ) ; // disabled, alloy skips events
248+ let ws = WsConnect :: new ( & self . url ) . with_max_retries ( 0 ) ; // disabled, alloy skips events
241249
242250 match ProviderBuilder :: new ( ) . connect_ws ( ws) . await {
243251 Ok ( provider) => {
@@ -252,8 +260,8 @@ impl InfiniteLogIter {
252260 if !self . contract_addresses . is_empty ( ) {
253261 filter = filter. address ( self . contract_addresses . clone ( ) )
254262 }
255- eprintln ! ( "Listening on {}" , & self . url) ;
256- eprintln ! ( "Contracts {:?}" , & self . contract_addresses) ;
263+ info ! ( url = % self . url, "Listening on" ) ;
264+ info ! ( contracts = ? self . contract_addresses, "Contracts addresses" ) ;
257265 // note subcribing to real-time before reading catchup
258266 // events to have the minimal gap between the two
259267 // TODO: but it does not guarantee no gap for now
@@ -272,6 +280,12 @@ impl InfiniteLogIter {
272280 Err ( err) => {
273281 let delay = if not_initialized {
274282 if retry == 0 {
283+ // TODO: remove panic and, instead, propagate the error
284+ error ! (
285+ url = %self . url,
286+ error = %err,
287+ "Cannot connect" ,
288+ ) ;
275289 panic ! (
276290 "Cannot connect to {} due to {err}." ,
277291 & self . url
@@ -282,14 +296,19 @@ impl InfiniteLogIter {
282296 1
283297 } ;
284298 if not_initialized {
285- eprintln ! (
286- "Cannot connect to {} due to {err}. Will retry in {delay} secs, {retry} times." ,
287- & self . url
299+ warn ! (
300+ url = %self . url,
301+ error = %err,
302+ delay_secs = delay,
303+ retry = retry,
304+ "Cannot connect. Will retry" ,
288305 ) ;
289306 } else {
290- eprintln ! (
291- "Cannot connect to {} due to {err}. Will retry in {delay} secs, indefinitively." ,
292- & self . url
307+ warn ! (
308+ url = %self . url,
309+ error = %err,
310+ delay_secs = delay,
311+ "Cannot connect. Will retry infinitely" ,
293312 ) ;
294313 }
295314 retry -= 1 ;
@@ -301,7 +320,7 @@ impl InfiniteLogIter {
301320
302321 async fn next_event_or_block_end ( & mut self ) -> LogOrBlockTimeout {
303322 let Some ( stream) = & mut self . stream else {
304- eprintln ! ( "No stream, inconsistent state" ) ;
323+ error ! ( "No stream, inconsistent state" ) ;
305324 return LogOrBlockTimeout :: Log ( None ) ; // simulate a stream end to
306325 // force reinit
307326 } ;
@@ -331,7 +350,7 @@ impl InfiniteLogIter {
331350 } ;
332351 if let Some ( log) = self . catchup_logs . pop_front ( ) {
333352 if self . catchup_logs . is_empty ( ) {
334- eprintln ! ( "Going back to real-time events" ) ;
353+ info ! ( "Going back to real-time events" ) ;
335354 } ;
336355 self . current_event = Some ( log) ;
337356 break ;
@@ -348,7 +367,7 @@ impl InfiniteLogIter {
348367 return None ;
349368 }
350369 }
351- eprintln ! ( "Nothing to read, retrying" ) ;
370+ info ! ( "Nothing to read, retrying" ) ;
352371 tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
353372 continue ;
354373 }
@@ -411,22 +430,32 @@ impl InfiniteLogIter {
411430}
412431
413432pub async fn main ( args : Args ) {
433+ tracing_subscriber:: fmt ( )
434+ . json ( )
435+ . with_level ( true )
436+ . with_max_level ( args. log_level )
437+ . init ( ) ;
438+
414439 let _ = rustls:: crypto:: aws_lc_rs:: default_provider ( ) . install_default ( ) ;
415440
416441 if let Some ( acl_contract_address) = & args. acl_contract_address {
417442 if let Err ( err) = Address :: from_str ( acl_contract_address) {
443+ // TODO: remove panic and, instead, propagate the error
444+ error ! ( error = %err, "Invalid ACL contract address" ) ;
418445 panic ! ( "Invalid acl contract address: {err}" ) ;
419446 } ;
420447 } ;
421448 if let Some ( tfhe_contract_address) = & args. tfhe_contract_address {
422449 if let Err ( err) = Address :: from_str ( tfhe_contract_address) {
423- panic ! ( "Invalid tfhe contract address: {err}" ) ;
450+ // TODO: remove panic and, instead, propagate the error
451+ error ! ( error = %err, "Invalid TFHE contract address" ) ;
452+ panic ! ( "Invalid TFHE contract address: {err}" ) ;
424453 } ;
425454 }
426455
427456 let mut log_iter = InfiniteLogIter :: new ( & args) ;
428457 let chain_id = log_iter. get_chain_id_or_panic ( ) . await ;
429- eprintln ! ( "Chain ID: {chain_id} " ) ;
458+ info ! ( chain_id = chain_id , "Chain ID" ) ;
430459
431460 let mut db = if !args. database_url . is_empty ( ) {
432461 if let Some ( coprocessor_api_key) = args. coprocessor_api_key {
@@ -444,6 +473,8 @@ pub async fn main(args: Args) {
444473 }
445474 Some ( db)
446475 } else {
476+ // TODO: remove panic and, instead, propagate the error
477+ error ! ( "A Coprocessor API key is required to access the database" ) ;
447478 panic ! ( "A Coprocessor API key is required to access the database" ) ;
448479 }
449480 } else {
@@ -452,46 +483,47 @@ pub async fn main(args: Args) {
452483
453484 log_iter. new_log_stream ( true ) . await ;
454485
455- let mut block_error_event_fthe = 0 ;
486+ let mut block_tfhe_errors = 0 ;
456487 while let Some ( log) = log_iter. next ( ) . await {
457488 if log_iter. is_first_of_block ( ) {
458489 log_iter. reestimated_block_time ( ) ;
459490 if let Some ( block_number) = log. block_number {
460- if block_error_event_fthe == 0 {
491+ if block_tfhe_errors == 0 {
461492 if let Some ( ref mut db) = db {
462- let last_valid_block = db. mark_prev_block_as_valid (
463- & log_iter. current_event ,
464- & log_iter. prev_event ,
465- )
466- . await ;
493+ let last_valid_block = db
494+ . mark_prev_block_as_valid (
495+ & log_iter. current_event ,
496+ & log_iter. prev_event ,
497+ )
498+ . await ;
467499 if last_valid_block. is_some ( ) {
468500 log_iter. last_valid_block = last_valid_block;
469501 }
470502 }
471503 } else {
472- eprintln ! (
473- "Errors in tfhe events: {block_error_event_fthe}"
504+ error ! (
505+ block_tfhe_errors = block_tfhe_errors,
506+ "Errors in tfhe events"
474507 ) ;
475- block_error_event_fthe = 0 ;
508+ block_tfhe_errors = 0 ;
476509 }
477- eprintln ! ( "\n --------------------" ) ;
478- eprintln ! ( "Block {block_number}" ) ;
510+ info ! ( block = block_number, "Block" ) ;
479511 }
480512 } ;
481- if block_error_event_fthe > 0 {
482- eprintln ! ( "Errors in block {block_error_event_fthe} " ) ;
513+ if block_tfhe_errors > 0 {
514+ error ! ( block_tfhe_errors = block_tfhe_errors , "Errors in block" ) ;
483515 }
484516 if !args. ignore_tfhe_events {
485517 if let Ok ( event) =
486518 TfheContract :: TfheContractEvents :: decode_log ( & log. inner )
487519 {
488520 // TODO: filter on contract address if known
489- println ! ( "TFHE { event:#?} " ) ;
521+ info ! ( tfhe_event = ?event , "TFHE event" ) ;
490522 if let Some ( ref mut db) = db {
491523 let res = db. insert_tfhe_event ( & event) . await ;
492524 if let Err ( err) = res {
493- block_error_event_fthe += 1 ;
494- eprintln ! ( "Error inserting tfhe event: {err} " ) ;
525+ block_tfhe_errors += 1 ;
526+ error ! ( error = %err , "Error inserting tfhe event" ) ;
495527 }
496528 }
497529 continue ;
@@ -501,7 +533,7 @@ pub async fn main(args: Args) {
501533 if let Ok ( event) =
502534 AclContract :: AclContractEvents :: decode_log ( & log. inner )
503535 {
504- println ! ( "ACL { event:#?} " ) ;
536+ info ! ( acl_event = ?event , "ACL event" ) ;
505537 if let Some ( ref mut db) = db {
506538 let _ = db. handle_acl_event ( & event) . await ;
507539 }
0 commit comments