Skip to content

Commit 8c674cf

Browse files
fix: resolve task blocking dependencies (#115)
* fix: resolve race condition of dependency blocking * changelog.mdx * use try_join_all
1 parent 4f98d00 commit 8c674cf

File tree

4 files changed

+49
-13
lines changed

4 files changed

+49
-13
lines changed

core/src/abi.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,9 @@ pub enum ReadAbiError {
179179
impl ABIItem {
180180
pub fn format_event_signature(&self) -> Result<String, ParamTypeError> {
181181
let name = &self.name;
182-
let params = self.inputs.iter()
182+
let params = self
183+
.inputs
184+
.iter()
183185
.map(Self::format_param_type)
184186
.collect::<Result<Vec<_>, _>>()?
185187
.join(",");
@@ -193,14 +195,16 @@ impl ABIItem {
193195

194196
let type_str = match base_type {
195197
"tuple" => {
196-
let inner = input.components.as_ref()
198+
let inner = input
199+
.components
200+
.as_ref()
197201
.ok_or(ParamTypeError::MissingComponents)?
198202
.iter()
199203
.map(Self::format_param_type)
200204
.collect::<Result<Vec<_>, _>>()?
201205
.join(",");
202206
format!("({})", inner)
203-
},
207+
}
204208
_ => base_type.to_string(),
205209
};
206210

core/src/indexer/process.rs

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,39 @@ pub enum ProcessEventError {
3636
BuildFilterError(#[from] BuildRindexerFilterError),
3737
}
3838

39-
pub async fn process_event(config: EventProcessingConfig) -> Result<(), ProcessEventError> {
39+
pub async fn process_event(
40+
config: EventProcessingConfig,
41+
block_until_indexed: bool,
42+
) -> Result<(), ProcessEventError> {
4043
debug!("{} - Processing events", config.info_log_name);
4144

42-
process_event_logs(Arc::new(config), false).await?;
45+
process_event_logs(Arc::new(config), false, block_until_indexed).await?;
4346

4447
Ok(())
4548
}
4649

50+
/// note block_until_indexed:
51+
/// Whether to wait for all indexing tasks to complete for an event before returning
52+
// (needed for dependency indexing)
4753
async fn process_event_logs(
4854
config: Arc<EventProcessingConfig>,
4955
force_no_live_indexing: bool,
56+
block_until_indexed: bool,
5057
) -> Result<(), Box<ProviderError>> {
5158
let mut logs_stream = fetch_logs_stream(Arc::clone(&config), force_no_live_indexing);
59+
let mut tasks = Vec::new();
5260

5361
while let Some(result) = logs_stream.next().await {
54-
handle_logs_result(Arc::clone(&config), result)
62+
let task = handle_logs_result(Arc::clone(&config), result)
63+
.await
64+
.map_err(|e| Box::new(ProviderError::CustomError(e.to_string())))?;
65+
66+
tasks.push(task);
67+
}
68+
69+
if block_until_indexed {
70+
// Wait for all tasks in parallel
71+
futures::future::try_join_all(tasks)
5572
.await
5673
.map_err(|e| Box::new(ProviderError::CustomError(e.to_string())))?;
5774
}
@@ -153,7 +170,7 @@ async fn process_contract_events_with_dependencies(
153170
))?;
154171

155172
// forces live indexing off as it has to handle it a bit differently
156-
process_event_logs(Arc::clone(event_processing_config), true).await?;
173+
process_event_logs(Arc::clone(event_processing_config), true, true).await?;
157174

158175
if event_processing_config.live_indexing {
159176
let rindexer_event_filter = event_processing_config.to_event_filter()?;
@@ -392,7 +409,18 @@ async fn live_indexing_for_contract_event_dependencies<'a>(
392409
.await;
393410

394411
match result {
395-
Ok(_) => {
412+
Ok(task) => {
413+
let complete = task.await;
414+
if let Err(e) = complete {
415+
error!(
416+
"{} - {} - Error indexing task: {} - will try again in 200ms",
417+
&config.info_log_name,
418+
IndexingEventProgressStatus::Live.log(),
419+
e
420+
);
421+
drop(permit);
422+
break;
423+
}
396424
ordering_live_indexing_details
397425
.last_seen_block_number = to_block;
398426
if logs_empty {
@@ -476,7 +504,7 @@ async fn live_indexing_for_contract_event_dependencies<'a>(
476504
async fn handle_logs_result(
477505
config: Arc<EventProcessingConfig>,
478506
result: Result<FetchLogsResult, Box<dyn std::error::Error + Send>>,
479-
) -> Result<(), Box<dyn std::error::Error + Send>> {
507+
) -> Result<JoinHandle<()>, Box<dyn std::error::Error + Send>> {
480508
match result {
481509
Ok(result) => {
482510
debug!("Processing logs {} - length {}", config.event_name, result.logs.len());
@@ -495,18 +523,21 @@ async fn handle_logs_result(
495523
.collect::<Vec<_>>();
496524

497525
if !fn_data.is_empty() {
498-
if config.index_event_in_order {
526+
return if config.index_event_in_order {
499527
config.trigger_event(fn_data).await;
500528
update_progress_and_last_synced(config, result.to_block);
529+
Ok(tokio::spawn(async {})) // Return a completed task
501530
} else {
502-
tokio::spawn(async move {
531+
let task = tokio::spawn(async move {
503532
config.trigger_event(fn_data).await;
504533
update_progress_and_last_synced(config, result.to_block);
505534
});
535+
536+
Ok(task)
506537
}
507538
}
508539

509-
Ok(())
540+
Ok(tokio::spawn(async {})) // Return a completed task
510541
}
511542
Err(e) => {
512543
error!("Error fetching logs: {:?}", e);

core/src/indexer/start.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ pub async fn start_indexing(
239239
dependencies,
240240
);
241241
} else {
242-
let process_event = tokio::spawn(process_event(event_processing_config));
242+
let process_event = tokio::spawn(process_event(event_processing_config, false));
243243
non_blocking_process_events.push(process_event);
244244
}
245245
}

documentation/docs/pages/docs/changelog.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
### Bug fixes
1010
-------------------------------------------------
11+
fix: resolve race condition of dependency blocking indexing
1112

1213
### Breaking changes
1314
-------------------------------------------------

0 commit comments

Comments
 (0)