@@ -40,6 +40,8 @@ pub const DEFAULT_DEPENDENCE_CACHE_SIZE: u16 = 10_000;
4040pub const DEFAULT_DEPENDENCE_BY_CONNEXITY : bool = false ;
4141pub const DEFAULT_DEPENDENCE_CROSS_BLOCK : bool = true ;
4242
43+ const TIMEOUT_REQUEST_ON_WEBSOCKET : u64 = 15 ;
44+
4345#[ derive( Parser , Debug , Clone ) ]
4446#[ command( version, about, long_about = None ) ]
4547pub struct Args {
@@ -156,6 +158,13 @@ pub struct Args {
156158 help = "Sleep duration in seconds between catchup loop iterations"
157159 ) ]
158160 pub catchup_loop_sleep_secs : u64 ,
161+
162+ #[ arg(
163+ long,
164+ default_value_t = TIMEOUT_REQUEST_ON_WEBSOCKET ,
165+ help = "Timeout in seconds for RPC calls over websocket"
166+ ) ]
167+ pub timeout_request_websocket : u64 ,
159168}
160169
161170// TODO: to merge with Levent works
@@ -180,6 +189,7 @@ struct InfiniteLogIter {
180189 reorg_maximum_duration_in_blocks : u64 , // in blocks
181190 block_history : BlockHistory , // to detect reorgs
182191 catchup_finalization_in_blocks : u64 ,
192+ timeout_request_websocket : u64 ,
183193}
184194
185195enum BlockOrTimeoutOrNone {
@@ -243,6 +253,7 @@ impl InfiniteLogIter {
243253 args. reorg_maximum_duration_in_blocks as usize ,
244254 ) ,
245255 catchup_finalization_in_blocks : args. catchup_finalization_in_blocks ,
256+ timeout_request_websocket : args. timeout_request_websocket ,
246257 }
247258 }
248259
@@ -313,13 +324,27 @@ impl InfiniteLogIter {
313324 Ok ( provider) => provider,
314325 Err ( _) => anyhow:: bail!( "Cannot get a provider" ) ,
315326 } ;
316- provider. get_logs ( & filter) . await . map_err ( |err| {
317- if eth_rpc_err:: too_much_blocks_or_events ( & err) {
318- anyhow:: anyhow!( "Too much blocks or events: {err}" )
319- } else {
320- anyhow:: anyhow!( "Cannot get logs for {filter:?} due to {err}" )
327+ // Timeout to prevent hanging indefinitely on buggy node
328+ match tokio:: time:: timeout (
329+ Duration :: from_secs ( self . timeout_request_websocket ) ,
330+ provider. get_logs ( & filter) ,
331+ )
332+ . await
333+ {
334+ Err ( _) => {
335+ anyhow:: bail!( "Timeout getting range logs for {filter:?}" )
321336 }
322- } )
337+ Ok ( Err ( err) ) => {
338+ if eth_rpc_err:: too_much_blocks_or_events ( & err) {
339+ anyhow:: bail!( "Too much blocks or events: {err}" )
340+ } else {
341+ anyhow:: bail!(
342+ "Cannot get range logs for {filter:?} due to {err}"
343+ )
344+ }
345+ }
346+ Ok ( Ok ( logs) ) => Ok ( logs) ,
347+ }
323348 }
324349
325350 async fn deduce_block_summary (
@@ -501,17 +526,24 @@ impl InfiniteLogIter {
501526 error ! ( "No provider, inconsistent state" ) ;
502527 return Err ( anyhow:: anyhow!( "No provider, inconsistent state" ) ) ;
503528 } ;
504- let block = provider. get_block ( block_id) . await ;
505- match block {
506- Ok ( Some ( block) ) => return Ok ( block) ,
507- Ok ( None ) => error ! (
529+ let block = tokio:: time:: timeout (
530+ Duration :: from_secs ( self . timeout_request_websocket ) ,
531+ provider. get_block ( block_id) ,
532+ ) ;
533+ match block. await {
534+ Ok ( Ok ( Some ( block) ) ) => return Ok ( block) ,
535+ Ok ( Ok ( None ) ) => error ! (
508536 block_id = ?block_id,
509- "Cannot get current block {block_id}, retrying" ,
537+ "Cannot get block {block_id}, retrying" ,
510538 ) ,
511- Err ( err) => error ! (
539+ Ok ( Err ( err) ) => error ! (
512540 block_id = ?block_id,
513541 error = %err,
514- "Cannot get current block {block_id}, retrying" ,
542+ "Cannot get block {block_id}, retrying" ,
543+ ) ,
544+ Err ( _) => error ! (
545+ block_id = ?block_id,
546+ "Timeout getting block {block_id}, retrying" ,
515547 ) ,
516548 }
517549 if i != REORG_RETRY_GET_BLOCK {
@@ -530,17 +562,24 @@ impl InfiniteLogIter {
530562 error ! ( "No provider, inconsistent state" ) ;
531563 return Err ( anyhow:: anyhow!( "No provider, inconsistent state" ) ) ;
532564 } ;
533- let block = provider. get_block_by_hash ( block_hash) . await ;
534- match block {
535- Ok ( Some ( block) ) => return Ok ( block) ,
536- Ok ( None ) => error ! (
565+ let block = tokio:: time:: timeout (
566+ Duration :: from_secs ( self . timeout_request_websocket ) ,
567+ provider. get_block_by_hash ( block_hash) ,
568+ ) ;
569+ match block. await {
570+ Ok ( Ok ( Some ( block) ) ) => return Ok ( block) ,
571+ Ok ( Ok ( None ) ) => error ! (
537572 block_hash = ?block_hash,
538- "Cannot get block, retrying" ,
573+ "Cannot get block by hash , retrying" ,
539574 ) ,
540- Err ( err) => error ! (
575+ Ok ( Err ( err) ) => error ! (
541576 block_hash = ?block_hash,
542577 error = %err,
543- "Cannot get block, retrying" ,
578+ "Cannot get block by hash, retrying" ,
579+ ) ,
580+ Err ( _) => error ! (
581+ block_hash = ?block_hash,
582+ "Timeout getting block by hash, retrying" ,
544583 ) ,
545584 }
546585 if i != REORG_RETRY_GET_BLOCK {
@@ -551,7 +590,7 @@ impl InfiniteLogIter {
551590 }
552591 }
553592 Err ( anyhow:: anyhow!(
554- "Cannot get block {block_hash} after retries"
593+ "Cannot get block by hash {block_hash} after retries"
555594 ) )
556595 }
557596
@@ -568,10 +607,27 @@ impl InfiniteLogIter {
568607 error ! ( "No provider, inconsistent state" ) ;
569608 return Err ( anyhow:: anyhow!( "No provider, inconsistent state" ) ) ;
570609 } ;
571- let logs = provider. get_logs ( & filter) . await ;
572- match logs {
573- Ok ( logs) => return Ok ( logs) ,
574- Err ( err) => {
610+ match tokio:: time:: timeout (
611+ Duration :: from_secs ( self . timeout_request_websocket ) ,
612+ provider. get_logs ( & filter) ,
613+ )
614+ . await
615+ {
616+ Err ( _) => {
617+ error ! (
618+ block_hash = ?block_hash,
619+ "Timeout getting logs for block {block_hash}, retrying" ,
620+ ) ;
621+ tokio:: time:: sleep ( Duration :: from_millis (
622+ RETRY_GET_LOGS_DELAY_IN_MS ,
623+ ) )
624+ . await ;
625+ continue ;
626+ }
627+ Ok ( Ok ( logs) ) => {
628+ return Ok ( logs) ;
629+ }
630+ Ok ( Err ( err) ) => {
575631 error ! (
576632 block_hash = ?block_hash,
577633 error = %err,
@@ -853,6 +909,7 @@ impl InfiniteLogIter {
853909 let Ok ( block_logs) = self . find_last_block_and_logs ( ) . await
854910 else {
855911 error ! ( "Cannot get last block and logs" ) ;
912+ self . stream = None ; // to restart
856913 continue ;
857914 } ;
858915 warn ! (
0 commit comments