@@ -42,13 +42,27 @@ pub enum ProcessEventError {
4242 ProviderCallError ( #[ from] ProviderError ) ,
4343}
4444
45- pub async fn process_event (
45+ /// Processes an event that doesn't have dependencies.
46+ /// First processes historical logs, then starts live indexing if the event is configured for live indexing.
47+ /// This function returns immediately without waiting for the indexing to complete.
48+ pub async fn process_non_blocking_event (
4649 config : EventProcessingConfig ,
47- block_until_indexed : bool ,
4850) -> Result < ( ) , ProcessEventError > {
49- debug ! ( "{} - Processing events" , config. info_log_name( ) ) ;
51+ debug ! ( "{} - Processing non blocking event" , config. info_log_name( ) ) ;
52+
53+ process_event_logs ( Arc :: new ( config) , false , false ) . await ?;
54+
55+ Ok ( ( ) )
56+ }
57+
58+ /// Processes historical logs for a blocking event that has dependencies.
59+ /// This function waits for the indexing to complete before returning.
60+ pub async fn process_blocking_event_historical_data (
61+ config : Arc < EventProcessingConfig > ,
62+ ) -> Result < ( ) , Box < ProviderError > > {
63+ debug ! ( "{} - Processing blocking event historical data" , config. info_log_name( ) ) ;
5064
51- process_event_logs ( Arc :: new ( config) , false , block_until_indexed ) . await ?;
65+ process_event_logs ( config, true , true ) . await ?;
5266
5367 Ok ( ( ) )
5468}
@@ -191,9 +205,10 @@ async fn process_contract_events_with_dependencies(
191205 let task = tokio:: spawn ( {
192206 let live_indexing_events = Arc :: clone ( & live_indexing_events) ;
193207 async move {
194- // forces live indexing off as it has to handle it a bit differently
195- process_event_logs ( Arc :: clone ( & event_processing_config) , true , true )
196- . await ?;
208+ process_blocking_event_historical_data ( Arc :: clone (
209+ & event_processing_config,
210+ ) )
211+ . await ?;
197212
198213 if event_processing_config. live_indexing ( ) {
199214 let network_contract = event_processing_config. network_contract ( ) ;
@@ -275,7 +290,7 @@ async fn live_indexing_for_contract_event_dependencies(
275290 EventDependenciesIndexingConfig { cached_provider, events, network } : EventDependenciesIndexingConfig ,
276291) {
277292 debug ! (
278- "Live indexing events on {} - {}" ,
293+ "Live indexing events on {} in order: {}" ,
279294 network,
280295 events
281296 . iter( )
@@ -359,12 +374,11 @@ async fn live_indexing_for_contract_event_dependencies(
359374 >= log_no_new_block_interval
360375 {
361376 info ! (
362- "{}::{} - {} - No new blocks published in the last 5 minutes - latest block number {}" ,
363- & config. info_log_name( ) ,
364- & config. network_contract( ) . network,
365- IndexingEventProgressStatus :: Live . log( ) ,
366- latest_block_number
367- ) ;
377+ "{} - {} - No new blocks published in the last 5 minutes - latest block number {}" ,
378+ & config. info_log_name( ) ,
379+ IndexingEventProgressStatus :: Live . log( ) ,
380+ latest_block_number
381+ ) ;
368382 ordering_live_indexing_details. last_no_new_block_log_time = Instant :: now ( ) ;
369383 * ordering_live_indexing_details_map
370384 . get ( & config. id ( ) )
@@ -396,18 +410,16 @@ async fn live_indexing_for_contract_event_dependencies(
396410 // therefore, we log an error as means RCP state is not in sync with the blockchain
397411 if is_outside_reorg_range {
398412 error ! (
399- "{}::{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}" ,
413+ "{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}" ,
400414 & config. info_log_name( ) ,
401- & config. network_contract( ) . network,
402415 IndexingEventProgressStatus :: Live . log( ) ,
403416 latest_block_number,
404417 from_block
405418 ) ;
406419 } else {
407420 info ! (
408- "{}::{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}" ,
421+ "{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}" ,
409422 & config. info_log_name( ) ,
410- & config. network_contract( ) . network,
411423 IndexingEventProgressStatus :: Live . log( ) ,
412424 latest_block_number,
413425 from_block
@@ -417,9 +429,8 @@ async fn live_indexing_for_contract_event_dependencies(
417429 continue ;
418430 } else {
419431 info ! (
420- "{}::{} - {} - not in safe reorg block range yet block: {} > range: {}" ,
432+ "{} - {} - not in safe reorg block range yet block: {} > range: {}" ,
421433 & config. info_log_name( ) ,
422- & config. network_contract( ) . network,
423434 IndexingEventProgressStatus :: Live . log( ) ,
424435 from_block,
425436 safe_block_number
@@ -438,16 +449,14 @@ async fn live_indexing_for_contract_event_dependencies(
438449 )
439450 {
440451 debug ! (
441- "{}::{} - {} - Skipping block {} as it's not relevant" ,
452+ "{} - {} - Skipping block {} as it's not relevant" ,
442453 & config. info_log_name( ) ,
443- & config. network_contract( ) . network,
444454 IndexingEventProgressStatus :: Live . log( ) ,
445455 from_block
446456 ) ;
447457 debug ! (
448- "{}::{} - {} - Did not need to hit RPC as no events in {} block - LogsBloom for block checked" ,
458+ "{} - {} - Did not need to hit RPC as no events in {} block - LogsBloom for block checked" ,
449459 & config. info_log_name( ) ,
450- & config. network_contract( ) . network,
451460 IndexingEventProgressStatus :: Live . log( ) ,
452461 from_block
453462 ) ;
@@ -477,9 +486,8 @@ async fn live_indexing_for_contract_event_dependencies(
477486 match cached_provider. get_logs ( & ordering_live_indexing_details. filter ) . await {
478487 Ok ( logs) => {
479488 debug ! (
480- "{}::{} - {} - Live id {} topic_id {}, Logs: {} from {} to {}" ,
489+ "{} - {} - Live id {} topic_id {}, Logs: {} from {} to {}" ,
481490 & config. info_log_name( ) ,
482- & config. network_contract( ) . network,
483491 IndexingEventProgressStatus :: Live . log( ) ,
484492 & config. id( ) ,
485493 & config. topic_id( ) ,
@@ -489,9 +497,8 @@ async fn live_indexing_for_contract_event_dependencies(
489497 ) ;
490498
491499 debug ! (
492- "{}::{} - {} - Fetched {} event logs - blocks: {} - {}" ,
500+ "{} - {} - Fetched {} event logs - blocks: {} - {}" ,
493501 & config. info_log_name( ) ,
494- & config. network_contract( ) . network,
495502 IndexingEventProgressStatus :: Live . log( ) ,
496503 logs. len( ) ,
497504 from_block,
@@ -516,12 +523,11 @@ async fn live_indexing_for_contract_event_dependencies(
516523 let complete = task. await ;
517524 if let Err ( e) = complete {
518525 error ! (
519- "{}::{} - {} - Error indexing task: {} - will try again in 200ms" ,
520- & config. info_log_name( ) ,
521- & config. network_contract( ) . network,
522- IndexingEventProgressStatus :: Live . log( ) ,
523- e
524- ) ;
526+ "{} - {} - Error indexing task: {} - will try again in 200ms" ,
527+ & config. info_log_name( ) ,
528+ IndexingEventProgressStatus :: Live . log( ) ,
529+ e
530+ ) ;
525531 break ;
526532 }
527533 ordering_live_indexing_details. last_seen_block_number = to_block;
@@ -531,9 +537,8 @@ async fn live_indexing_for_contract_event_dependencies(
531537 . filter
532538 . set_from_block ( to_block + U64 :: from ( 1 ) ) ;
533539 debug ! (
534- "{}::{} - {} - No events found between blocks {} - {}" ,
540+ "{} - {} - No events found between blocks {} - {}" ,
535541 & config. info_log_name( ) ,
536- & config. network_contract( ) . network,
537542 IndexingEventProgressStatus :: Live . log( ) ,
538543 from_block,
539544 to_block
@@ -557,9 +562,8 @@ async fn live_indexing_for_contract_event_dependencies(
557562 }
558563 Err ( err) => {
559564 error ! (
560- "{}::{} - {} - Error fetching logs: {} - will try again in 200ms" ,
565+ "{} - {} - Error fetching logs: {} - will try again in 200ms" ,
561566 & config. info_log_name( ) ,
562- & config. network_contract( ) . network,
563567 IndexingEventProgressStatus :: Live . log( ) ,
564568 err
565569 ) ;
@@ -569,9 +573,8 @@ async fn live_indexing_for_contract_event_dependencies(
569573 }
570574 Err ( err) => {
571575 error ! (
572- "{}::{} - {} - Error fetching logs: {} - will try again in 200ms" ,
576+ "{} - {} - Error fetching logs: {} - will try again in 200ms" ,
573577 & config. info_log_name( ) ,
574- & config. network_contract( ) . network,
575578 IndexingEventProgressStatus :: Live . log( ) ,
576579 err
577580 ) ;
@@ -621,7 +624,7 @@ async fn handle_logs_result(
621624) -> Result < JoinHandle < ( ) > , Box < dyn std:: error:: Error + Send > > {
622625 match result {
623626 Ok ( result) => {
624- debug ! ( "Processing logs {} - length {}" , config. event_name ( ) , result. logs. len( ) ) ;
627+ debug ! ( "{} - Processing {} logs " , config. info_log_name ( ) , result. logs. len( ) ) ;
625628
626629 let fn_data = result
627630 . logs
0 commit comments