@@ -107,8 +107,8 @@ pub enum ProcessContractEventsWithDependenciesError {
107107 #[ error( "Could not build filter: {0}" ) ]
108108 BuildFilterError ( #[ from] BuildRindexerFilterError ) ,
109109
110- #[ error( "Event config not found" ) ]
111- EventConfigNotFound ,
110+ #[ error( "Event config not found for contract: {0} and event: {1} " ) ]
111+ EventConfigNotFound ( String , String ) ,
112112
113113 #[ error( "Could not run all the logs processes {0}" ) ]
114114 JoinError ( #[ from] JoinError ) ,
@@ -118,6 +118,7 @@ pub enum ProcessContractEventsWithDependenciesError {
118118pub struct OrderedLiveIndexingDetails {
119119 pub filter : RindexerEventFilter ,
120120 pub last_seen_block_number : U64 ,
121+ pub last_no_new_block_log_time : Instant ,
121122}
122123
123124async fn process_contract_events_with_dependencies (
@@ -141,10 +142,15 @@ async fn process_contract_events_with_dependencies(
141142 let event_processing_config = event_processing_config
142143 . iter ( )
143144 . find ( |e| {
144- e. contract_name == dependency. contract_name &&
145+ // TODO - this is a hacky way to check if it's a filter event
146+ ( e. contract_name == dependency. contract_name ||
147+ e. contract_name . replace ( "Filter" , "" ) == dependency. contract_name ) &&
145148 e. event_name == dependency. event_name
146149 } )
147- . ok_or ( ProcessContractEventsWithDependenciesError :: EventConfigNotFound ) ?;
150+ . ok_or ( ProcessContractEventsWithDependenciesError :: EventConfigNotFound (
151+ dependency. contract_name ,
152+ dependency. event_name ,
153+ ) ) ?;
148154
149155 // forces live indexing off as it has to handle it a bit differently
150156 process_event_logs ( Arc :: clone ( event_processing_config) , true ) . await ?;
@@ -165,9 +171,18 @@ async fn process_contract_events_with_dependencies(
165171
166172 let results = join_all ( tasks) . await ;
167173 for result in results {
168- if let Err ( e) = result {
169- error ! ( "Error processing logs: {:?}" , e) ;
170- return Err ( ProcessContractEventsWithDependenciesError :: JoinError ( e) ) ;
174+ match result {
175+ Ok ( result) => match result {
176+ Ok ( _) => { }
177+ Err ( e) => {
178+ error ! ( "Error processing logs due to dependencies error: {:?}" , e) ;
179+ return Err ( e) ;
180+ }
181+ } ,
182+ Err ( e) => {
183+ error ! ( "Error processing logs: {:?}" , e) ;
184+ return Err ( ProcessContractEventsWithDependenciesError :: JoinError ( e) ) ;
185+ }
171186 }
172187 }
173188
@@ -210,12 +225,15 @@ async fn live_indexing_for_contract_event_dependencies<'a>(
210225
211226 ordering_live_indexing_details_map. insert (
212227 config. topic_id ,
213- Arc :: new ( Mutex :: new ( OrderedLiveIndexingDetails { filter, last_seen_block_number } ) ) ,
228+ Arc :: new ( Mutex :: new ( OrderedLiveIndexingDetails {
229+ filter,
230+ last_seen_block_number,
231+ last_no_new_block_log_time : Instant :: now ( ) ,
232+ } ) ) ,
214233 ) ;
215234 }
216235
217236 // this is used for less busy chains to make sure they know rindexer is still alive
218- let mut last_no_new_block_log_time = Instant :: now ( ) ;
219237 let log_no_new_block_interval = Duration :: from_secs ( 20 ) ;
220238
221239 loop {
@@ -243,15 +261,24 @@ async fn live_indexing_for_contract_event_dependencies<'a>(
243261 & config. info_log_name,
244262 IndexingEventProgressStatus :: Live . log( )
245263 ) ;
246- if last_no_new_block_log_time. elapsed ( ) >= log_no_new_block_interval
264+ if ordering_live_indexing_details
265+ . last_no_new_block_log_time
266+ . elapsed ( ) >=
267+ log_no_new_block_interval
247268 {
248269 info ! (
249270 "{} - {} - No new blocks published in the last 20 seconds - latest block number {}" ,
250271 & config. info_log_name,
251272 IndexingEventProgressStatus :: Live . log( ) ,
252273 latest_block_number
253274 ) ;
254- last_no_new_block_log_time = Instant :: now ( ) ;
275+ ordering_live_indexing_details. last_no_new_block_log_time =
276+ Instant :: now ( ) ;
277+ * ordering_live_indexing_details_map
278+ . get ( & config. topic_id )
279+ . expect ( "Failed to get ordering_live_indexing_details_map" )
280+ . lock ( )
281+ . await = ordering_live_indexing_details;
255282 }
256283 continue ;
257284 }
0 commit comments