Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 27 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -700,33 +700,33 @@ walkdir = "2.3.3"
vergen-git2 = "1.0.5"

# [patch.crates-io]
# alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-contract = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-json-rpc = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-network-primitives = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-types-admin = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-types-debug = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-types-eth = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-types-mev = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-rpc-types-txpool = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-serde = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-signer = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-signer-local = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-transport = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-transport-http = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-transport-ipc = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", rev = "cfb13aa" }
# alloy-consensus = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-contract = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-eips = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-genesis = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-json-rpc = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-network = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-network-primitives = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-provider = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-types-admin = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-types-debug = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-types-eth = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-types-mev = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-rpc-types-txpool = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-serde = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-signer = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-signer-local = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-transport = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-transport-http = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-transport-ipc = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
#
# op-alloy-consensus = { git = "https://github.com/alloy-rs/op-alloy", rev = "ad607c1" }
# op-alloy-network = { git = "https://github.com/alloy-rs/op-alloy", rev = "ad607c1" }
Expand Down
32 changes: 10 additions & 22 deletions crates/rpc/rpc-eth-types/src/logs_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use alloy_consensus::TxReceipt;
use alloy_eips::{eip2718::Encodable2718, BlockNumHash};
use alloy_primitives::TxHash;
use alloy_rpc_types_eth::{FilteredParams, Log};
use alloy_rpc_types_eth::{Filter, Log};
use reth_chainspec::ChainInfo;
use reth_errors::ProviderError;
use reth_primitives_traits::{BlockBody, RecoveredBlock, SignedTransaction};
Expand All @@ -14,7 +14,7 @@ use std::sync::Arc;

/// Returns all matching of a block's receipts when the transaction hashes are known.
pub fn matching_block_logs_with_tx_hashes<'a, I, R>(
filter: &FilteredParams,
filter: &Filter,
block_num_hash: BlockNumHash,
tx_hashes_and_receipts: I,
removed: bool,
Expand All @@ -23,13 +23,18 @@ where
I: IntoIterator<Item = (TxHash, &'a R)>,
R: TxReceipt<Log = alloy_primitives::Log> + 'a,
{
if !filter.matches_block(&block_num_hash) {
return vec![];
}

let mut all_logs = Vec::new();
// Tracks the index of a log in the entire block.
let mut log_index: u64 = 0;

// Iterate over transaction hashes and receipts and append matching logs.
for (receipt_idx, (tx_hash, receipt)) in tx_hashes_and_receipts.into_iter().enumerate() {
for log in receipt.logs() {
if log_matches_filter(block_num_hash, log, filter) {
if filter.matches(log) {
let log = Log {
inner: log.clone(),
block_hash: Some(block_num_hash.hash),
Expand Down Expand Up @@ -63,7 +68,7 @@ pub enum ProviderOrBlock<'a, P: BlockReader> {
pub fn append_matching_block_logs<P>(
all_logs: &mut Vec<Log>,
provider_or_block: ProviderOrBlock<'_, P>,
filter: &FilteredParams,
filter: &Filter,
block_num_hash: BlockNumHash,
receipts: &[P::Receipt],
removed: bool,
Expand All @@ -86,7 +91,7 @@ where
let mut transaction_hash = None;

for log in receipt.logs() {
if log_matches_filter(block_num_hash, log, filter) {
if filter.matches(log) {
// if this is the first match in the receipt's logs, look up the transaction hash
if transaction_hash.is_none() {
transaction_hash = match &provider_or_block {
Expand Down Expand Up @@ -139,23 +144,6 @@ where
Ok(())
}

/// Returns true if the log matches the filter and should be included
pub fn log_matches_filter(
block: BlockNumHash,
log: &alloy_primitives::Log,
params: &FilteredParams,
) -> bool {
if params.filter.is_some() &&
(!params.filter_block_range(block.number) ||
!params.filter_block_hash(block.hash) ||
!params.filter_address(&log.address) ||
!params.filter_topics(log.topics()))
{
return false
}
true
}

/// Computes the block range based on the filter range and current block numbers
pub fn get_filter_block_range(
from_block: Option<u64>,
Expand Down
89 changes: 41 additions & 48 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use alloy_consensus::BlockHeader;
use alloy_primitives::TxHash;
use alloy_rpc_types_eth::{
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
PendingTransactionFilterKind,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -443,7 +443,7 @@ where
maybe_block
.map(ProviderOrBlock::Block)
.unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
&FilteredParams::new(Some(filter)),
&filter,
block_num_hash,
&receipts,
false,
Expand Down Expand Up @@ -518,61 +518,54 @@ where
}

let mut all_logs = Vec::new();
let filter_params = FilteredParams::new(Some(filter.clone()));

// derive bloom filters from filter input, so we can check headers for matching logs
let address_filter = FilteredParams::address_filter(&filter.address);
let topics_filter = FilteredParams::topics_filter(&filter.topics);

// loop over the range of new blocks and check logs if the filter matches the log's bloom
// filter
for (from, to) in
BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
{
let headers = self.provider().headers_range(from..=to)?;
for (idx, header) in headers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this diff block is mostly unindenting, as the if was moved to a .filter()

.iter()
.enumerate()
.filter(|(_, header)| filter.matches_bloom(header.logs_bloom()))
{
// these are consecutive headers, so we can use the parent hash of the next
// block to get the current header's hash
let block_hash = match headers.get(idx + 1) {
Some(child) => child.parent_hash(),
None => self
.provider()
.block_hash(header.number())?
.ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
};

for (idx, header) in headers.iter().enumerate() {
// only if filter matches
if FilteredParams::matches_address(header.logs_bloom(), &address_filter) &&
FilteredParams::matches_topics(header.logs_bloom(), &topics_filter)
let num_hash = BlockNumHash::new(header.number(), block_hash);
if let Some((receipts, maybe_block)) =
self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await?
{
// these are consecutive headers, so we can use the parent hash of the next
// block to get the current header's hash
let block_hash = match headers.get(idx + 1) {
Some(child) => child.parent_hash(),
None => self
.provider()
.block_hash(header.number())?
.ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
};

let num_hash = BlockNumHash::new(header.number(), block_hash);
if let Some((receipts, maybe_block)) =
self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await?
{
append_matching_block_logs(
&mut all_logs,
maybe_block
.map(ProviderOrBlock::Block)
.unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
&filter_params,
num_hash,
&receipts,
false,
header.timestamp(),
)?;

// size check but only if range is multiple blocks, so we always return all
// logs of a single block
let is_multi_block_range = from_block != to_block;
if let Some(max_logs_per_response) = limits.max_logs_per_response {
if is_multi_block_range && all_logs.len() > max_logs_per_response {
return Err(EthFilterError::QueryExceedsMaxResults {
max_logs: max_logs_per_response,
from_block,
to_block: num_hash.number.saturating_sub(1),
});
}
append_matching_block_logs(
&mut all_logs,
maybe_block
.map(ProviderOrBlock::Block)
.unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
filter,
num_hash,
&receipts,
false,
header.timestamp(),
)?;

// size check but only if range is multiple blocks, so we always return all
// logs of a single block
let is_multi_block_range = from_block != to_block;
if let Some(max_logs_per_response) = limits.max_logs_per_response {
if is_multi_block_range && all_logs.len() > max_logs_per_response {
return Err(EthFilterError::QueryExceedsMaxResults {
max_logs: max_logs_per_response,
from_block,
to_block: num_hash.number.saturating_sub(1),
});
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/rpc/rpc/src/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use alloy_primitives::TxHash;
use alloy_rpc_types_eth::{
pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
FilteredParams, Header, Log,
Filter, Header, Log,
};
use futures::StreamExt;
use jsonrpsee::{
Expand Down Expand Up @@ -105,11 +105,11 @@ where
SubscriptionKind::Logs => {
// if no params are provided, used default filter params
let filter = match params {
Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)),
Some(Params::Logs(filter)) => *filter,
Some(Params::Bool(_)) => {
return Err(invalid_params_rpc_err("Invalid params for logs"))
}
_ => FilteredParams::default(),
_ => Default::default(),
};
pipe_from_stream(accepted_sink, pubsub.log_stream(filter)).await
}
Expand Down Expand Up @@ -308,7 +308,7 @@ where
}

/// Returns a stream that yields all logs that match the given filter.
fn log_stream(&self, filter: FilteredParams) -> impl Stream<Item = Log> {
fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
.map(move |canon_state| {
canon_state.expect("new block subscription never ends").block_receipts()
Expand Down
13 changes: 3 additions & 10 deletions examples/db-access/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_ethereum::{
providers::ReadOnlyConfig, AccountReader, BlockReader, BlockSource, HeaderProvider,
ReceiptProvider, StateProvider, TransactionsProvider,
},
rpc::eth::primitives::{Filter, FilteredParams},
rpc::eth::primitives::Filter,
TransactionSigned,
};

Expand Down Expand Up @@ -201,21 +201,14 @@ fn receipts_provider_example<
// TODO: Make it clearer how to choose between event_signature(topic0) (event name) and the
// other 3 indexed topics. This API is a bit clunky and not obvious to use at the moment.
let filter = Filter::new().address(addr).event_signature(topic);
let filter_params = FilteredParams::new(Some(filter));
let address_filter = FilteredParams::address_filter(&addr.into());
let topics_filter = FilteredParams::topics_filter(&[topic.into()]);

// 3. If the address & topics filters match do something. We use the outer check against the
// bloom filter stored in the header to avoid having to query the receipts table when there
// is no instance of any event that matches the filter in the header.
if FilteredParams::matches_address(bloom, &address_filter) &&
FilteredParams::matches_topics(bloom, &topics_filter)
{
if filter.matches_bloom(bloom) {
let receipts = provider.receipt(header_num)?.ok_or(eyre::eyre!("receipt not found"))?;
for log in &receipts.logs {
if filter_params.filter_address(&log.address) &&
filter_params.filter_topics(log.topics())
{
if filter.matches(log) {
// Do something with the log e.g. decode it.
println!("Matching log found! {log:?}")
}
Expand Down
Loading