Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions core/src/event/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub struct ContractEventProcessingConfig {
pub project_path: PathBuf,
pub indexer_name: String,
pub contract_name: String,
pub info_log_name: String,
pub topic_id: B256,
pub event_name: String,
pub config: Config,
Expand All @@ -46,6 +45,10 @@ pub struct ContractEventProcessingConfig {
}

impl ContractEventProcessingConfig {
pub fn info_log_name(&self) -> String {
format!("{}::{}::{}", self.contract_name, self.event_name, self.network_contract.network)
}

pub fn to_event_filter(&self) -> Result<RindexerEventFilter, BuildRindexerFilterError> {
match &self.network_contract.indexing_contract_setup {
IndexingContractSetup::Address(details) => RindexerEventFilter::new_address_filter(
Expand Down Expand Up @@ -153,7 +156,7 @@ impl FactoryEventProcessingConfig {
}

pub fn info_log_name(&self) -> String {
format!("{}::{}", self.contract_name, self.event.name)
format!("{}::{}::{}", self.contract_name, self.event.name, self.network_contract.network)
}
}

Expand Down Expand Up @@ -214,7 +217,7 @@ impl EventProcessingConfig {

pub fn info_log_name(&self) -> String {
match self {
Self::ContractEventProcessing(config) => config.info_log_name.clone(),
Self::ContractEventProcessing(config) => config.info_log_name().clone(),
Self::FactoryEventProcessing(config) => config.info_log_name(),
}
}
Expand Down
98 changes: 35 additions & 63 deletions core/src/indexer/fetch_logs.rs

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions core/src/indexer/native_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,10 @@ pub async fn native_transfer_block_fetch(
// Spawn a separate task to handle notifications
if let Some(notifications) = chain_state_notification {
// Subscribe to notifications for this network
let network_clone = network.clone();
let mut notifications_clone = notifications.subscribe();
tokio::spawn(async move {
while let Ok(notification) = notifications_clone.recv().await {
handle_chain_notification(notification, "NativeTransfer", &network_clone);
handle_chain_notification(notification, "NativeTransfer");
}
});
}
Expand Down
84 changes: 43 additions & 41 deletions core/src/indexer/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,26 @@ pub enum ProcessEventError {
ProviderCallError(#[from] ProviderError),
}

pub async fn process_event(
/// Processes an event that doesn't have dependencies.
/// First processes historical logs, then starts live indexing if the event is configured for live indexing.
pub async fn process_non_blocking_event(
config: EventProcessingConfig,
block_until_indexed: bool,
) -> Result<(), ProcessEventError> {
debug!("{} - Processing events", config.info_log_name());
debug!("{} - Processing non blocking event", config.info_log_name());

process_event_logs(Arc::new(config), false, false).await?;

Ok(())
}

/// Processes an event that doesn't have dependencies.
/// First processes historical logs, then starts live indexing if the event is configured for live indexing.
pub async fn process_blocking_event_historical_data(
config: Arc<EventProcessingConfig>,
) -> Result<(), Box<ProviderError>> {
debug!("{} - Processing blocking event historical data", config.info_log_name());

process_event_logs(Arc::new(config), false, block_until_indexed).await?;
process_event_logs(config, true, true).await?;

Ok(())
}
Expand Down Expand Up @@ -191,9 +204,10 @@ async fn process_contract_events_with_dependencies(
let task = tokio::spawn({
let live_indexing_events = Arc::clone(&live_indexing_events);
async move {
// forces live indexing off as it has to handle it a bit differently
process_event_logs(Arc::clone(&event_processing_config), true, true)
.await?;
process_blocking_event_historical_data(Arc::clone(
&event_processing_config,
))
.await?;

if event_processing_config.live_indexing() {
let network_contract = event_processing_config.network_contract();
Expand Down Expand Up @@ -275,7 +289,7 @@ async fn live_indexing_for_contract_event_dependencies(
EventDependenciesIndexingConfig { cached_provider, events, network }: EventDependenciesIndexingConfig,
) {
debug!(
"Live indexing events on {} - {}",
"Live indexing events on {} in order: {}",
network,
events
.iter()
Expand Down Expand Up @@ -359,12 +373,11 @@ async fn live_indexing_for_contract_event_dependencies(
>= log_no_new_block_interval
{
info!(
"{}::{} - {} - No new blocks published in the last 5 minutes - latest block number {}",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
latest_block_number
);
"{} - {} - No new blocks published in the last 5 minutes - latest block number {}",
&config.info_log_name(),
IndexingEventProgressStatus::Live.log(),
latest_block_number
);
ordering_live_indexing_details.last_no_new_block_log_time = Instant::now();
*ordering_live_indexing_details_map
.get(&config.id())
Expand Down Expand Up @@ -396,18 +409,16 @@ async fn live_indexing_for_contract_event_dependencies(
// therefore, we log an error as means RCP state is not in sync with the blockchain
if is_outside_reorg_range {
error!(
"{}::{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}",
"{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
latest_block_number,
from_block
);
} else {
info!(
"{}::{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}",
"{} - {} - RPC has gone back on latest block: rpc returned {}, last seen: {}",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
latest_block_number,
from_block
Expand All @@ -417,9 +428,8 @@ async fn live_indexing_for_contract_event_dependencies(
continue;
} else {
info!(
"{}::{} - {} - not in safe reorg block range yet block: {} > range: {}",
"{} - {} - not in safe reorg block range yet block: {} > range: {}",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
from_block,
safe_block_number
Expand All @@ -438,16 +448,14 @@ async fn live_indexing_for_contract_event_dependencies(
)
{
debug!(
"{}::{} - {} - Skipping block {} as it's not relevant",
"{} - {} - Skipping block {} as it's not relevant",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
from_block
);
debug!(
"{}::{} - {} - Did not need to hit RPC as no events in {} block - LogsBloom for block checked",
"{} - {} - Did not need to hit RPC as no events in {} block - LogsBloom for block checked",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
from_block
);
Expand Down Expand Up @@ -477,9 +485,8 @@ async fn live_indexing_for_contract_event_dependencies(
match cached_provider.get_logs(&ordering_live_indexing_details.filter).await {
Ok(logs) => {
debug!(
"{}::{} - {} - Live id {} topic_id {}, Logs: {} from {} to {}",
"{} - {} - Live id {} topic_id {}, Logs: {} from {} to {}",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
&config.id(),
&config.topic_id(),
Expand All @@ -489,9 +496,8 @@ async fn live_indexing_for_contract_event_dependencies(
);

debug!(
"{}::{} - {} - Fetched {} event logs - blocks: {} - {}",
"{} - {} - Fetched {} event logs - blocks: {} - {}",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
logs.len(),
from_block,
Expand All @@ -516,12 +522,11 @@ async fn live_indexing_for_contract_event_dependencies(
let complete = task.await;
if let Err(e) = complete {
error!(
"{}::{} - {} - Error indexing task: {} - will try again in 200ms",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
e
);
"{} - {} - Error indexing task: {} - will try again in 200ms",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to say anything that self-recovers could be a warn!, whereas messages like RPC has gone back on latest block need to be an error since it is unliekly to self-recover

&config.info_log_name(),
IndexingEventProgressStatus::Live.log(),
e
);
break;
}
ordering_live_indexing_details.last_seen_block_number = to_block;
Expand All @@ -531,9 +536,8 @@ async fn live_indexing_for_contract_event_dependencies(
.filter
.set_from_block(to_block + U64::from(1));
debug!(
"{}::{} - {} - No events found between blocks {} - {}",
"{} - {} - No events found between blocks {} - {}",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
from_block,
to_block
Expand All @@ -557,9 +561,8 @@ async fn live_indexing_for_contract_event_dependencies(
}
Err(err) => {
error!(
"{}::{} - {} - Error fetching logs: {} - will try again in 200ms",
"{} - {} - Error fetching logs: {} - will try again in 200ms",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
err
);
Expand All @@ -569,9 +572,8 @@ async fn live_indexing_for_contract_event_dependencies(
}
Err(err) => {
error!(
"{}::{} - {} - Error fetching logs: {} - will try again in 200ms",
"{} - {} - Error fetching logs: {} - will try again in 200ms",
&config.info_log_name(),
&config.network_contract().network,
IndexingEventProgressStatus::Live.log(),
err
);
Expand Down Expand Up @@ -621,7 +623,7 @@ async fn handle_logs_result(
) -> Result<JoinHandle<()>, Box<dyn std::error::Error + Send>> {
match result {
Ok(result) => {
debug!("Processing logs {} - length {}", config.event_name(), result.logs.len());
debug!("{} - Processing {} logs", config.info_log_name(), result.logs.len());

let fn_data = result
.logs
Expand Down
17 changes: 6 additions & 11 deletions core/src/indexer/reorg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use tracing::{debug, warn};
use crate::notifications::ChainStateNotification;

/// Handles chain state notifications (reorgs, reverts, commits)
pub fn handle_chain_notification(
notification: ChainStateNotification,
info_log_name: &str,
network: &str,
) {
pub fn handle_chain_notification(notification: ChainStateNotification, info_log_name: &str) {
match notification {
ChainStateNotification::Reorged {
revert_from_block,
Expand All @@ -18,9 +14,8 @@ pub fn handle_chain_notification(
new_tip_hash,
} => {
warn!(
"{}::{} - REORG DETECTED! Need to revert blocks {} to {} and re-index {} to {} (new tip: {})",
"{} - REORG DETECTED! Need to revert blocks {} to {} and re-index {} to {} (new tip: {})",
info_log_name,
network,
revert_from_block, revert_to_block,
new_from_block, new_to_block,
new_tip_hash
Expand All @@ -29,15 +24,15 @@ pub fn handle_chain_notification(
}
ChainStateNotification::Reverted { from_block, to_block } => {
warn!(
"{}::{} - CHAIN REVERTED! Blocks {} to {} have been reverted",
info_log_name, network, from_block, to_block
"{} - CHAIN REVERTED! Blocks {} to {} have been reverted",
info_log_name, from_block, to_block
);
// TODO: In future PR, mark affected logs as removed in the database
}
ChainStateNotification::Committed { from_block, to_block, tip_hash } => {
debug!(
"{}::{} - Chain committed: blocks {} to {} (tip: {})",
info_log_name, network, from_block, to_block, tip_hash
"{} - Chain committed: blocks {} to {} (tip: {})",
info_log_name, from_block, to_block, tip_hash
);
}
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/indexer/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
last_synced::{get_last_synced_block_number, SyncConfig},
native_transfer::{native_transfer_block_fetch, NATIVE_TRANSFER_CONTRACT_NAME},
process::{
process_contracts_events_with_dependencies, process_event,
process_contracts_events_with_dependencies, process_non_blocking_event,
ProcessContractsEventsWithDependenciesError, ProcessEventError,
},
progress::IndexingEventsProgressState,
Expand Down Expand Up @@ -451,7 +451,6 @@ pub async fn start_indexing_contract_events(
project_path: project_path.clone(),
indexer_name: event.indexer_name.clone(),
contract_name: event.contract.name.clone(),
info_log_name: event.info_log_name(),
topic_id: event.topic_id,
event_name: event.event_name.clone(),
network_contract: Arc::new(network_contract.clone()),
Expand Down Expand Up @@ -505,7 +504,7 @@ pub async fn start_indexing_contract_events(
&dependencies,
);
} else {
let process_event = tokio::spawn(process_event(event_processing_config, false));
let process_event = tokio::spawn(process_non_blocking_event(event_processing_config));
non_blocking_process_events.push(process_event);
}
}
Expand Down
8 changes: 6 additions & 2 deletions core/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ impl tracing_subscriber::fmt::time::FormatTime for CustomTimer {
}
}

pub fn setup_logger(log_level: LevelFilter) {
let filter = EnvFilter::from_default_env().add_directive(log_level.into());
const LOG_LEVEL_ENV: &str = "RINDEXER_LOG";

pub fn setup_logger(default_log_level: LevelFilter) {
let filter = EnvFilter::try_from_env(LOG_LEVEL_ENV).unwrap_or(
EnvFilter::builder().with_default_directive(default_log_level.into()).parse_lossy(""),
);

let format = Format::default().with_timer(CustomTimer).with_level(true).with_target(false);

Expand Down
1 change: 1 addition & 0 deletions documentation/docs/pages/docs/changelog.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
### Features
-------------------------------------------------
- feat: check if the RPC chain id is matching the configured chain id in the yaml config on startup
- feat: add support for `RINDEXER_LOG` environment variable to control the log level of rindexer

### Bug fixes
-------------------------------------------------
Expand Down