Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 21 additions & 27 deletions core/src/blockclock/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,20 +209,14 @@ impl BlockFetcher {
#[cfg(test)]
mod tests {
use super::*;
use crate::provider::create_client;
use crate::HeaderMap;

async fn client() -> Arc<JsonRpcCachedProvider> {
let rpc_url = "http://localhost:8545";
let client = create_client(rpc_url, 1, Some(660), None, None, HeaderMap::new(), None, None)
.await
.unwrap();
client

fn mock_provider() -> Arc<JsonRpcCachedProvider> {
JsonRpcCachedProvider::mock(1)
}

#[tokio::test]
async fn test_simple_interpolation() {
let clock = BlockFetcher::new(None, client().await);
#[test]
fn test_simple_interpolation() {
let clock = BlockFetcher::new(None, mock_provider());

// Lets assume perfect times, 1 block = 1 second.
let mut anchors = vec![(1, 1), (10, 10), (20, 20), (30, 30), (10_000, 10_000)];
Expand All @@ -239,38 +233,38 @@ mod tests {
}
}

#[tokio::test]
async fn test_ratio_sampling_min() {
let clock = BlockFetcher::new(Some(1.0), client().await);
#[test]
fn test_ratio_sampling_min() {
let clock = BlockFetcher::new(Some(1.0), mock_provider());
let samples = clock.block_range_samples(1000, 1234);
let actual_sampling_ratio = samples.len() as f32 / (1234 - 1000) as f32;

assert!(actual_sampling_ratio >= 1.0);

let clock = BlockFetcher::new(Some(0.1), client().await);
let clock = BlockFetcher::new(Some(0.1), mock_provider());
let samples = clock.block_range_samples(1000, 1019);
let actual_sampling_ratio = samples.len() as f32 / (1019 - 1000) as f32;

assert!(actual_sampling_ratio >= 0.1);

let clock = BlockFetcher::new(Some(0.1), client().await);
let clock = BlockFetcher::new(Some(0.1), mock_provider());
let samples = clock.block_range_samples(1000, 1600);
let actual_sampling_ratio = samples.len() as f32 / (1600 - 1000) as f32;

assert!(actual_sampling_ratio >= 0.1);
assert!(actual_sampling_ratio <= 0.15); // Ensure we don't oversample here either

let clock = BlockFetcher::new(Some(0.5), client().await);
let clock = BlockFetcher::new(Some(0.5), mock_provider());
let samples = clock.block_range_samples(1000, 1300);
let actual_sampling_ratio = samples.len() as f32 / (1300 - 1000) as f32;

assert!(actual_sampling_ratio >= 0.5);
assert!(actual_sampling_ratio <= 0.6); // Ensure we don't oversample here either
}

#[tokio::test]
async fn test_sampling() {
let clock = BlockFetcher::new(Some(0.1), client().await);
#[test]
fn test_sampling() {
let clock = BlockFetcher::new(Some(0.1), mock_provider());

assert_eq!(clock.block_range_samples(10, 10), vec![10]);
assert_eq!(clock.block_range_samples(10, 11), vec![10, 11]);
Expand All @@ -280,9 +274,9 @@ mod tests {
assert!(clock.block_range_samples(11, 10).is_empty());
}

#[tokio::test]
async fn test_sampling_unique() {
let clock = BlockFetcher::new(Some(0.1), client().await);
#[test]
fn test_sampling_unique() {
let clock = BlockFetcher::new(Some(0.1), mock_provider());
let samples = clock.block_range_samples(1000, 1600);
let unique_samples = samples.iter().cloned().collect::<std::collections::HashSet<_>>();

Expand All @@ -295,9 +289,9 @@ mod tests {
assert!(wide_samples.contains(&20_000));
}

#[tokio::test]
async fn test_sampling_window_sequence() {
let clock = BlockFetcher::new(Some(0.1), client().await);
#[test]
fn test_sampling_window_sequence() {
let clock = BlockFetcher::new(Some(0.1), mock_provider());
let samples = clock.block_range_samples(1000, 1600);

assert!(samples.windows(2).all(|w| w[0] < w[1]));
Expand Down
34 changes: 26 additions & 8 deletions core/src/indexer/fetch_logs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::blockclock::BlockClock;
use crate::helpers::{halved_block_number, is_relevant_block};
use crate::indexer::reorg::reorg_safe_distance_for_chain;
use crate::{
event::{config::EventProcessingConfig, RindexerEventFilter},
indexer::{reorg::handle_chain_notification, IndexingEventProgressStatus},
Expand Down Expand Up @@ -488,14 +489,31 @@ async fn live_indexing_stream(
let from_block = current_filter.from_block();
if from_block > safe_block_number {
if reorg_safe_distance.is_zero() {
error!(
"{}::{} - {} - LIVE INDEXING STEAM - RPC has gone back on latest block: rpc returned {}, last seen: {}",
info_log_name,
network,
IndexingEventProgressStatus::Live.log(),
latest_block_number,
from_block
);
let block_distance = latest_block_number - from_block;
let is_outside_reorg_range = block_distance
> reorg_safe_distance_for_chain(cached_provider.chain.id());

// it should never get under normal conditions outside the reorg range,
// therefore, we log an error as means RCP state is not in sync with the blockchain
if is_outside_reorg_range {
error!(
"{}::{} - {} - LIVE INDEXING STEAM - RPC has gone back on latest block: rpc returned {}, last seen: {}",
info_log_name,
network,
IndexingEventProgressStatus::Live.log(),
latest_block_number,
from_block
);
} else {
info!(
"{}::{} - {} - LIVE INDEXING STEAM - RPC has gone back on latest block: rpc returned {}, last seen: {}",
info_log_name,
network,
IndexingEventProgressStatus::Live.log(),
latest_block_number,
from_block
);
}
} else {
info!(
"{}::{} - {} - LIVE INDEXING STEAM - not in safe reorg block range yet block: {} > range: {}",
Expand Down
35 changes: 27 additions & 8 deletions core/src/indexer/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::{
use tracing::{debug, error, info};

use crate::helpers::is_relevant_block;
use crate::indexer::reorg::reorg_safe_distance_for_chain;
use crate::provider::JsonRpcCachedProvider;
use crate::{
event::{
Expand Down Expand Up @@ -387,14 +388,32 @@ async fn live_indexing_for_contract_event_dependencies(
// check reorg distance and skip if not safe
if from_block > safe_block_number {
if reorg_safe_distance.is_zero() {
error!(
"{}::{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
latest_block_number,
from_block
);
let block_distance = latest_block_number - from_block;
let is_outside_reorg_range =
block_distance > reorg_safe_distance_for_chain(cached_provider.chain.id());

// it should never get under normal conditions outside the reorg range,
// therefore, we log an error as means RCP state is not in sync with the blockchain
if is_outside_reorg_range {
error!(
"{}::{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
latest_block_number,
from_block
);
} else {
info!(
"{}::{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
latest_block_number,
from_block
);
}

continue;
} else {
info!(
Expand Down
20 changes: 9 additions & 11 deletions core/src/indexer/reorg.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use alloy::primitives::{U256, U64};
use alloy::primitives::U64;
use tracing::{debug, warn};

use crate::notifications::ChainStateNotification;
Expand Down Expand Up @@ -43,8 +43,8 @@ pub fn handle_chain_notification(
}
}

pub fn reorg_safe_distance_for_chain(chain_id: &U256) -> U64 {
if chain_id == &U256::from(1) {
pub fn reorg_safe_distance_for_chain(chain_id: u64) -> U64 {
if chain_id == 1 {
U64::from(12)
} else {
U64::from(64)
Expand All @@ -53,19 +53,17 @@ pub fn reorg_safe_distance_for_chain(chain_id: &U256) -> U64 {

#[cfg(test)]
mod tests {
use alloy::primitives::U256;

use super::*;

#[test]
fn test_reorg_safe_distance_for_chain() {
let mainnet_chain_id = U256::from(1);
assert_eq!(reorg_safe_distance_for_chain(&mainnet_chain_id), U64::from(12));
let mainnet_chain_id = 1;
assert_eq!(reorg_safe_distance_for_chain(mainnet_chain_id), U64::from(12));

let testnet_chain_id = U256::from(3);
assert_eq!(reorg_safe_distance_for_chain(&testnet_chain_id), U64::from(64));
let testnet_chain_id = 3;
assert_eq!(reorg_safe_distance_for_chain(testnet_chain_id), U64::from(64));

let other_chain_id = U256::from(42);
assert_eq!(reorg_safe_distance_for_chain(&other_chain_id), U64::from(64));
let other_chain_id = 42;
assert_eq!(reorg_safe_distance_for_chain(other_chain_id), U64::from(64));
}
}
14 changes: 6 additions & 8 deletions core/src/indexer/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ async fn get_start_end_block(
}

let (end_block, indexing_distance_from_head) =
calculate_safe_block_number(reorg_safe_distance, &provider, latest_block, end_block)
.await?;
calculate_safe_block_number(reorg_safe_distance, &provider, latest_block, end_block);

Ok((start_block, end_block, indexing_distance_from_head))
}
Expand Down Expand Up @@ -630,22 +629,21 @@ pub async fn initialize_database(
}
}

pub async fn calculate_safe_block_number(
pub fn calculate_safe_block_number(
reorg_safe_distance: bool,
provider: &Arc<JsonRpcCachedProvider>,
latest_block: U64,
mut end_block: U64,
) -> Result<(U64, U64), StartIndexingError> {
) -> (U64, U64) {
let mut indexing_distance_from_head = U64::ZERO;
if reorg_safe_distance {
let chain_id =
provider.get_chain_id().await.map_err(StartIndexingError::GetChainIdError)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was not optimal, there is no need to call eth_chain_id every safe block number calculation.
Also this function is not a sync one with is another win

let reorg_safe_distance = reorg_safe_distance_for_chain(&chain_id);
let chain_id = provider.chain.id();
let reorg_safe_distance = reorg_safe_distance_for_chain(chain_id);
let safe_block_number = latest_block - reorg_safe_distance;
if end_block > safe_block_number {
end_block = safe_block_number;
}
indexing_distance_from_head = reorg_safe_distance;
}
Ok((end_block, indexing_distance_from_head))
(end_block, indexing_distance_from_head)
}
Loading