Skip to content

Commit 341d8a0

Browse files
authored
Do not log error if we are backwards, but in safe reorg range (#321)
1 parent de24c80 commit 341d8a0

File tree

10 files changed

+165
-158
lines changed

10 files changed

+165
-158
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/blockclock/fetcher.rs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -209,20 +209,14 @@ impl BlockFetcher {
209209
#[cfg(test)]
210210
mod tests {
211211
use super::*;
212-
use crate::provider::create_client;
213-
use crate::HeaderMap;
214-
215-
async fn client() -> Arc<JsonRpcCachedProvider> {
216-
let rpc_url = "http://localhost:8545";
217-
let client = create_client(rpc_url, 1, Some(660), None, None, HeaderMap::new(), None, None)
218-
.await
219-
.unwrap();
220-
client
212+
213+
fn mock_provider() -> Arc<JsonRpcCachedProvider> {
214+
JsonRpcCachedProvider::mock(1)
221215
}
222216

223-
#[tokio::test]
224-
async fn test_simple_interpolation() {
225-
let clock = BlockFetcher::new(None, client().await);
217+
#[test]
218+
fn test_simple_interpolation() {
219+
let clock = BlockFetcher::new(None, mock_provider());
226220

227221
// Lets assume perfect times, 1 block = 1 second.
228222
let mut anchors = vec![(1, 1), (10, 10), (20, 20), (30, 30), (10_000, 10_000)];
@@ -239,38 +233,38 @@ mod tests {
239233
}
240234
}
241235

242-
#[tokio::test]
243-
async fn test_ratio_sampling_min() {
244-
let clock = BlockFetcher::new(Some(1.0), client().await);
236+
#[test]
237+
fn test_ratio_sampling_min() {
238+
let clock = BlockFetcher::new(Some(1.0), mock_provider());
245239
let samples = clock.block_range_samples(1000, 1234);
246240
let actual_sampling_ratio = samples.len() as f32 / (1234 - 1000) as f32;
247241

248242
assert!(actual_sampling_ratio >= 1.0);
249243

250-
let clock = BlockFetcher::new(Some(0.1), client().await);
244+
let clock = BlockFetcher::new(Some(0.1), mock_provider());
251245
let samples = clock.block_range_samples(1000, 1019);
252246
let actual_sampling_ratio = samples.len() as f32 / (1019 - 1000) as f32;
253247

254248
assert!(actual_sampling_ratio >= 0.1);
255249

256-
let clock = BlockFetcher::new(Some(0.1), client().await);
250+
let clock = BlockFetcher::new(Some(0.1), mock_provider());
257251
let samples = clock.block_range_samples(1000, 1600);
258252
let actual_sampling_ratio = samples.len() as f32 / (1600 - 1000) as f32;
259253

260254
assert!(actual_sampling_ratio >= 0.1);
261255
assert!(actual_sampling_ratio <= 0.15); // Ensure we don't oversample here either
262256

263-
let clock = BlockFetcher::new(Some(0.5), client().await);
257+
let clock = BlockFetcher::new(Some(0.5), mock_provider());
264258
let samples = clock.block_range_samples(1000, 1300);
265259
let actual_sampling_ratio = samples.len() as f32 / (1300 - 1000) as f32;
266260

267261
assert!(actual_sampling_ratio >= 0.5);
268262
assert!(actual_sampling_ratio <= 0.6); // Ensure we don't oversample here either
269263
}
270264

271-
#[tokio::test]
272-
async fn test_sampling() {
273-
let clock = BlockFetcher::new(Some(0.1), client().await);
265+
#[test]
266+
fn test_sampling() {
267+
let clock = BlockFetcher::new(Some(0.1), mock_provider());
274268

275269
assert_eq!(clock.block_range_samples(10, 10), vec![10]);
276270
assert_eq!(clock.block_range_samples(10, 11), vec![10, 11]);
@@ -280,9 +274,9 @@ mod tests {
280274
assert!(clock.block_range_samples(11, 10).is_empty());
281275
}
282276

283-
#[tokio::test]
284-
async fn test_sampling_unique() {
285-
let clock = BlockFetcher::new(Some(0.1), client().await);
277+
#[test]
278+
fn test_sampling_unique() {
279+
let clock = BlockFetcher::new(Some(0.1), mock_provider());
286280
let samples = clock.block_range_samples(1000, 1600);
287281
let unique_samples = samples.iter().cloned().collect::<std::collections::HashSet<_>>();
288282

@@ -295,9 +289,9 @@ mod tests {
295289
assert!(wide_samples.contains(&20_000));
296290
}
297291

298-
#[tokio::test]
299-
async fn test_sampling_window_sequence() {
300-
let clock = BlockFetcher::new(Some(0.1), client().await);
292+
#[test]
293+
fn test_sampling_window_sequence() {
294+
let clock = BlockFetcher::new(Some(0.1), mock_provider());
301295
let samples = clock.block_range_samples(1000, 1600);
302296

303297
assert!(samples.windows(2).all(|w| w[0] < w[1]));

core/src/indexer/fetch_logs.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::blockclock::BlockClock;
22
use crate::helpers::{halved_block_number, is_relevant_block};
3+
use crate::indexer::reorg::reorg_safe_distance_for_chain;
34
use crate::{
45
event::{config::EventProcessingConfig, RindexerEventFilter},
56
indexer::{reorg::handle_chain_notification, IndexingEventProgressStatus},
@@ -488,14 +489,31 @@ async fn live_indexing_stream(
488489
let from_block = current_filter.from_block();
489490
if from_block > safe_block_number {
490491
if reorg_safe_distance.is_zero() {
491-
error!(
492-
"{}::{} - {} - LIVE INDEXING STEAM - RPC has gone back on latest block: rpc returned {}, last seen: {}",
493-
info_log_name,
494-
network,
495-
IndexingEventProgressStatus::Live.log(),
496-
latest_block_number,
497-
from_block
498-
);
492+
let block_distance = latest_block_number - from_block;
493+
let is_outside_reorg_range = block_distance
494+
> reorg_safe_distance_for_chain(cached_provider.chain.id());
495+
496+
// it should never get under normal conditions outside the reorg range,
497+
// therefore, we log an error as means RCP state is not in sync with the blockchain
498+
if is_outside_reorg_range {
499+
error!(
500+
"{}::{} - {} - LIVE INDEXING STEAM - RPC has gone back on latest block: rpc returned {}, last seen: {}",
501+
info_log_name,
502+
network,
503+
IndexingEventProgressStatus::Live.log(),
504+
latest_block_number,
505+
from_block
506+
);
507+
} else {
508+
info!(
509+
"{}::{} - {} - LIVE INDEXING STEAM - RPC has gone back on latest block: rpc returned {}, last seen: {}",
510+
info_log_name,
511+
network,
512+
IndexingEventProgressStatus::Live.log(),
513+
latest_block_number,
514+
from_block
515+
);
516+
}
499517
} else {
500518
info!(
501519
"{}::{} - {} - LIVE INDEXING STEAM - not in safe reorg block range yet block: {} > range: {}",

core/src/indexer/process.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tokio::{
1212
use tracing::{debug, error, info};
1313

1414
use crate::helpers::is_relevant_block;
15+
use crate::indexer::reorg::reorg_safe_distance_for_chain;
1516
use crate::provider::JsonRpcCachedProvider;
1617
use crate::{
1718
event::{
@@ -387,14 +388,32 @@ async fn live_indexing_for_contract_event_dependencies(
387388
// check reorg distance and skip if not safe
388389
if from_block > safe_block_number {
389390
if reorg_safe_distance.is_zero() {
390-
error!(
391-
"{}::{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}",
392-
&config.info_log_name(),
393-
&config.network_contract().network,
394-
IndexingEventProgressStatus::Live.log(),
395-
latest_block_number,
396-
from_block
397-
);
391+
let block_distance = latest_block_number - from_block;
392+
let is_outside_reorg_range =
393+
block_distance > reorg_safe_distance_for_chain(cached_provider.chain.id());
394+
395+
// it should never get under normal conditions outside the reorg range,
396+
// therefore, we log an error as means RCP state is not in sync with the blockchain
397+
if is_outside_reorg_range {
398+
error!(
399+
"{}::{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}",
400+
&config.info_log_name(),
401+
&config.network_contract().network,
402+
IndexingEventProgressStatus::Live.log(),
403+
latest_block_number,
404+
from_block
405+
);
406+
} else {
407+
info!(
408+
"{}::{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}",
409+
&config.info_log_name(),
410+
&config.network_contract().network,
411+
IndexingEventProgressStatus::Live.log(),
412+
latest_block_number,
413+
from_block
414+
);
415+
}
416+
398417
continue;
399418
} else {
400419
info!(

core/src/indexer/reorg.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use alloy::primitives::{U256, U64};
1+
use alloy::primitives::U64;
22
use tracing::{debug, warn};
33

44
use crate::notifications::ChainStateNotification;
@@ -43,8 +43,8 @@ pub fn handle_chain_notification(
4343
}
4444
}
4545

46-
pub fn reorg_safe_distance_for_chain(chain_id: &U256) -> U64 {
47-
if chain_id == &U256::from(1) {
46+
pub fn reorg_safe_distance_for_chain(chain_id: u64) -> U64 {
47+
if chain_id == 1 {
4848
U64::from(12)
4949
} else {
5050
U64::from(64)
@@ -53,19 +53,17 @@ pub fn reorg_safe_distance_for_chain(chain_id: &U256) -> U64 {
5353

5454
#[cfg(test)]
5555
mod tests {
56-
use alloy::primitives::U256;
57-
5856
use super::*;
5957

6058
#[test]
6159
fn test_reorg_safe_distance_for_chain() {
62-
let mainnet_chain_id = U256::from(1);
63-
assert_eq!(reorg_safe_distance_for_chain(&mainnet_chain_id), U64::from(12));
60+
let mainnet_chain_id = 1;
61+
assert_eq!(reorg_safe_distance_for_chain(mainnet_chain_id), U64::from(12));
6462

65-
let testnet_chain_id = U256::from(3);
66-
assert_eq!(reorg_safe_distance_for_chain(&testnet_chain_id), U64::from(64));
63+
let testnet_chain_id = 3;
64+
assert_eq!(reorg_safe_distance_for_chain(testnet_chain_id), U64::from(64));
6765

68-
let other_chain_id = U256::from(42);
69-
assert_eq!(reorg_safe_distance_for_chain(&other_chain_id), U64::from(64));
66+
let other_chain_id = 42;
67+
assert_eq!(reorg_safe_distance_for_chain(other_chain_id), U64::from(64));
7068
}
7169
}

core/src/indexer/start.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,7 @@ async fn get_start_end_block(
155155
}
156156

157157
let (end_block, indexing_distance_from_head) =
158-
calculate_safe_block_number(reorg_safe_distance, &provider, latest_block, end_block)
159-
.await?;
158+
calculate_safe_block_number(reorg_safe_distance, &provider, latest_block, end_block);
160159

161160
Ok((start_block, end_block, indexing_distance_from_head))
162161
}
@@ -630,22 +629,21 @@ pub async fn initialize_database(
630629
}
631630
}
632631

633-
pub async fn calculate_safe_block_number(
632+
pub fn calculate_safe_block_number(
634633
reorg_safe_distance: bool,
635634
provider: &Arc<JsonRpcCachedProvider>,
636635
latest_block: U64,
637636
mut end_block: U64,
638-
) -> Result<(U64, U64), StartIndexingError> {
637+
) -> (U64, U64) {
639638
let mut indexing_distance_from_head = U64::ZERO;
640639
if reorg_safe_distance {
641-
let chain_id =
642-
provider.get_chain_id().await.map_err(StartIndexingError::GetChainIdError)?;
643-
let reorg_safe_distance = reorg_safe_distance_for_chain(&chain_id);
640+
let chain_id = provider.chain.id();
641+
let reorg_safe_distance = reorg_safe_distance_for_chain(chain_id);
644642
let safe_block_number = latest_block - reorg_safe_distance;
645643
if end_block > safe_block_number {
646644
end_block = safe_block_number;
647645
}
648646
indexing_distance_from_head = reorg_safe_distance;
649647
}
650-
Ok((end_block, indexing_distance_from_head))
648+
(end_block, indexing_distance_from_head)
651649
}

0 commit comments

Comments
 (0)