Skip to content

Commit 1ea6f6c

Browse files
feat: reorged-out logs: re-emitted lost logs with removed=true, and winning-chain logs with removed=false (#1862)
* eth_subscribe("logs") & eth_getFilterChanges for log filters now treat reorgs as first-class events instead of silently keeping stale logs * Fix build and add more tests * Remove Box::pin() * Add more unit tests * coderabbit: nitpicks * coderabbit: fix tests * coderabbit: Box::pin() is back * coderabbit: nitpick * coderabbit: don't suppress the gap marker * Improve util.ts * Add winning branch checks to tests * coderabbit: ts test improvements * coderabbit: better ts code * Add isFork param * coderabbit: nitpicks --------- Co-authored-by: Artur Gontijo <arturgontijo@gmail.com>
1 parent 03c2da8 commit 1ea6f6c

File tree

11 files changed

+1065
-186
lines changed

11 files changed

+1065
-186
lines changed

client/rpc-core/src/types/filter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ impl Filter {
140140

141141
/// Helper for Filter matching.
142142
/// Supports conditional indexed parameters and wildcards.
143-
#[derive(Debug, Default)]
143+
#[derive(Clone, Debug, Default)]
144144
pub struct FilteredParams {
145145
pub filter: Filter,
146146
}
@@ -315,6 +315,7 @@ pub enum FilterType {
315315
#[derive(Clone, Debug)]
316316
pub struct FilterPoolItem {
317317
pub last_poll: BlockNumberOrHash,
318+
pub last_log_journal_seq: Option<u64>,
318319
pub filter_type: FilterType,
319320
pub at_block: u64,
320321
pub pending_transaction_hashes: HashSet<H256>,

client/rpc/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ scale-codec = { workspace = true }
2626
schnellru = "0.2.4"
2727
serde = { workspace = true, optional = true }
2828
thiserror = { workspace = true }
29-
tokio = { workspace = true, features = ["sync"] }
29+
tokio = { workspace = true, features = ["sync", "time"] }
3030

3131
# Substrate
3232
prometheus-endpoint = { workspace = true }

client/rpc/src/eth/filter.rs

Lines changed: 118 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ use sp_runtime::{
4040
use fc_rpc_core::{types::*, EthFilterApiServer};
4141
use fp_rpc::{EthereumRuntimeRPCApi, TransactionStatus};
4242

43-
use crate::{cache::EthBlockDataCacheTask, frontier_backend_client, internal_err};
43+
use crate::{
44+
cache::EthBlockDataCacheTask, frontier_backend_client, internal_err, LogsJournal,
45+
LogsJournalError,
46+
};
4447

4548
pub struct EthFilter<B: BlockT, C, BE, P> {
4649
client: Arc<C>,
@@ -51,6 +54,7 @@ pub struct EthFilter<B: BlockT, C, BE, P> {
5154
max_past_logs: u32,
5255
max_block_range: u32,
5356
block_data_cache: Arc<EthBlockDataCacheTask<B>>,
57+
logs_journal: Arc<LogsJournal>,
5458
_marker: PhantomData<BE>,
5559
}
5660

@@ -64,6 +68,7 @@ impl<B: BlockT, C, BE, P: TransactionPool> EthFilter<B, C, BE, P> {
6468
max_past_logs: u32,
6569
max_block_range: u32,
6670
block_data_cache: Arc<EthBlockDataCacheTask<B>>,
71+
logs_journal: Arc<LogsJournal>,
6772
) -> Self {
6873
Self {
6974
client,
@@ -74,6 +79,7 @@ impl<B: BlockT, C, BE, P: TransactionPool> EthFilter<B, C, BE, P> {
7479
max_past_logs,
7580
max_block_range,
7681
block_data_cache,
82+
logs_journal,
7783
_marker: PhantomData,
7884
}
7985
}
@@ -170,10 +176,13 @@ where
170176

171177
// Assume `max_stored_filters` is always < U256::max.
172178
let key = last_key.checked_add(U256::one()).unwrap();
179+
let last_log_journal_seq =
180+
matches!(&filter_type, FilterType::Log(_)).then(|| self.logs_journal.cursor());
173181
locked.insert(
174182
key,
175183
FilterPoolItem {
176184
last_poll: BlockNumberOrHash::Num(best_number),
185+
last_log_journal_seq,
177186
filter_type,
178187
at_block: best_number,
179188
pending_transaction_hashes,
@@ -218,7 +227,7 @@ where
218227
// To avoid issues with multiple async blocks (having different
219228
// anonymous types) we collect all necessary data in this enum then have
220229
// a single async block.
221-
enum FuturePath<B: BlockT> {
230+
enum FuturePath {
222231
Block {
223232
last: u64,
224233
next: u64,
@@ -228,8 +237,8 @@ where
228237
},
229238
Log {
230239
filter: Filter,
231-
from_number: NumberFor<B>,
232-
current_number: NumberFor<B>,
240+
cursor: u64,
241+
last_poll_block: u64,
233242
},
234243
Error(jsonrpsee::types::ErrorObjectOwned),
235244
}
@@ -238,9 +247,6 @@ where
238247
let info = self.client.info();
239248
let best_hash = info.best_hash;
240249
let best_number = UniqueSaturatedInto::<u64>::unique_saturated_into(info.best_number);
241-
// Get latest indexed block number before acquiring the lock to avoid
242-
// holding the lock across an await point.
243-
let latest_indexed_number = self.latest_indexed_block_number().await?;
244250
let pool = self.filter_pool.clone();
245251
// Try to lock.
246252
let path = if let Ok(locked) = &mut pool.lock() {
@@ -256,13 +262,14 @@ where
256262
key,
257263
FilterPoolItem {
258264
last_poll: BlockNumberOrHash::Num(next),
265+
last_log_journal_seq: None,
259266
filter_type: pool_item.filter_type.clone(),
260267
at_block: pool_item.at_block,
261268
pending_transaction_hashes: HashSet::new(),
262269
},
263270
);
264271

265-
FuturePath::<B>::Block { last, next }
272+
FuturePath::Block { last, next }
266273
}
267274
FilterType::PendingTransaction => {
268275
let previous_hashes = pool_item.pending_transaction_hashes;
@@ -287,6 +294,7 @@ where
287294
key,
288295
FilterPoolItem {
289296
last_poll: BlockNumberOrHash::Num(best_number + 1),
297+
last_log_journal_seq: None,
290298
filter_type: pool_item.filter_type.clone(),
291299
at_block: pool_item.at_block,
292300
pending_transaction_hashes: current_hashes.clone(),
@@ -302,62 +310,12 @@ where
302310
}
303311
// For each event since last poll, get a vector of ethereum logs.
304312
FilterType::Log(filter) => {
305-
// Either the filter-specific `to` block or latest indexed block.
306-
// Use latest indexed block to ensure consistency with other RPCs.
307-
let mut current_number = filter
308-
.to_block
309-
.and_then(|v| v.to_min_block_num())
310-
.map(|s| s.unique_saturated_into())
311-
.unwrap_or(latest_indexed_number);
312-
313-
if current_number > latest_indexed_number {
314-
current_number = latest_indexed_number;
315-
}
316-
317-
// The from clause is the max(last_poll, filter_from).
318-
let last_poll = pool_item
319-
.last_poll
320-
.to_min_block_num()
321-
.unwrap()
322-
.unique_saturated_into();
323-
324-
let filter_from = filter
325-
.from_block
326-
.and_then(|v| v.to_min_block_num())
327-
.map(|s| s.unique_saturated_into())
328-
.unwrap_or(last_poll);
329-
330-
let from_number = std::cmp::max(last_poll, filter_from);
331-
let block_range = current_number.saturating_sub(from_number);
332-
333-
// Validate block range before advancing last_poll. If we reject after
334-
// updating last_poll, the cursor would skip logs on the next poll.
335-
if block_range > self.max_block_range.into() {
336-
FuturePath::Error(internal_err(format!(
337-
"block range is too wide (maximum {})",
338-
self.max_block_range
339-
)))
340-
} else {
341-
// Update filter `last_poll` based on the same capped head we query.
342-
// This avoids skipping blocks when best_number is ahead of indexed data.
343-
let next_last_poll =
344-
UniqueSaturatedInto::<u64>::unique_saturated_into(current_number)
345-
.saturating_add(1);
346-
locked.insert(
347-
key,
348-
FilterPoolItem {
349-
last_poll: BlockNumberOrHash::Num(next_last_poll),
350-
filter_type: pool_item.filter_type.clone(),
351-
at_block: pool_item.at_block,
352-
pending_transaction_hashes: HashSet::new(),
353-
},
354-
);
355-
// Build the response.
356-
FuturePath::Log {
357-
filter: filter.clone(),
358-
from_number,
359-
current_number,
360-
}
313+
let cursor = pool_item.last_log_journal_seq.unwrap_or(0);
314+
let last_poll_block = pool_item.last_poll.to_min_block_num().unwrap_or(0);
315+
FuturePath::Log {
316+
filter: filter.clone(),
317+
cursor,
318+
last_poll_block,
361319
}
362320
}
363321
}
@@ -369,7 +327,6 @@ where
369327
};
370328

371329
let client = Arc::clone(&self.client);
372-
let backend = Arc::clone(&self.backend);
373330
let block_data_cache = Arc::clone(&self.block_data_cache);
374331
let max_past_logs = self.max_past_logs;
375332

@@ -393,32 +350,51 @@ where
393350
FuturePath::PendingTransaction { new_hashes } => Ok(FilterChanges::Hashes(new_hashes)),
394351
FuturePath::Log {
395352
filter,
396-
from_number,
397-
current_number,
353+
cursor,
354+
last_poll_block,
398355
} => {
399-
let logs = if backend.is_indexed() {
400-
filter_range_logs_indexed(
401-
client.as_ref(),
402-
backend.log_indexer(),
403-
&block_data_cache,
404-
max_past_logs,
405-
&filter,
406-
from_number,
407-
current_number,
408-
)
409-
.await?
410-
} else {
411-
filter_range_logs(
412-
client.as_ref(),
413-
&block_data_cache,
414-
max_past_logs,
415-
&filter,
416-
from_number,
417-
current_number,
418-
)
419-
.await?
356+
let latest_indexed = self.latest_indexed_block_number().await?;
357+
let latest_u64: u64 = latest_indexed.unique_saturated_into();
358+
if latest_u64.saturating_sub(last_poll_block) > self.max_block_range.into() {
359+
return Err(internal_err(format!(
360+
"block range is too wide (maximum {})",
361+
self.max_block_range
362+
)));
363+
}
364+
365+
let params = FilteredParams::new(filter);
366+
let (entries, next_cursor) = match self.logs_journal.snapshot_since(cursor) {
367+
Ok(snapshot) => snapshot,
368+
Err(err) => {
369+
if let Ok(locked) = &mut self.filter_pool.lock() {
370+
let _ = locked.remove(&key);
371+
}
372+
return Err(logs_journal_error(err));
373+
}
420374
};
421375

376+
let mut logs = Vec::new();
377+
for entry in entries {
378+
for log in entry.logs.iter() {
379+
if log_matches_filter(&params, log, true) {
380+
logs.push(log.clone());
381+
}
382+
}
383+
}
384+
385+
if logs.len() as u32 > max_past_logs {
386+
return Err(internal_err(format!(
387+
"query returned more than {max_past_logs} results",
388+
)));
389+
}
390+
391+
if let Ok(locked) = &mut self.filter_pool.lock() {
392+
if let Some(pool_item) = locked.get_mut(&key) {
393+
pool_item.last_log_journal_seq = Some(next_cursor);
394+
pool_item.last_poll = BlockNumberOrHash::Num(latest_u64);
395+
}
396+
}
397+
422398
Ok(FilterChanges::Logs(logs))
423399
}
424400
}
@@ -803,10 +779,41 @@ where
803779
Ok(logs)
804780
}
805781

782+
pub(crate) fn log_matches_filter(
783+
params: &FilteredParams,
784+
log: &Log,
785+
include_block_range: bool,
786+
) -> bool {
787+
let block_hash_match = log
788+
.block_hash
789+
.is_none_or(|block_hash| params.filter_block_hash(block_hash));
790+
let topics_match = params.filter.topics().is_empty() || params.filter_topics(&log.topics);
791+
let address_match = params
792+
.filter
793+
.address
794+
.as_ref()
795+
.is_none_or(|_| params.filter_address(&log.address));
796+
let block_range_match = !include_block_range
797+
|| log
798+
.block_number
799+
.is_some_and(|block_number| params.filter_block_range(block_number.low_u64()));
800+
801+
block_hash_match && topics_match && address_match && block_range_match
802+
}
803+
806804
pub(crate) fn filter_block_logs(
807805
filter: &Filter,
808806
block: EthereumBlock,
809807
transaction_statuses: Vec<TransactionStatus>,
808+
) -> Vec<Log> {
809+
filter_block_logs_with_removed(filter, block, transaction_statuses, false)
810+
}
811+
812+
pub(crate) fn filter_block_logs_with_removed(
813+
filter: &Filter,
814+
block: EthereumBlock,
815+
transaction_statuses: Vec<TransactionStatus>,
816+
removed: bool,
810817
) -> Vec<Log> {
811818
let params = FilteredParams::new(filter.clone());
812819
let mut block_log_index: u32 = 0;
@@ -827,21 +834,16 @@ pub(crate) fn filter_block_logs(
827834
transaction_index: None,
828835
log_index: None,
829836
transaction_log_index: None,
830-
removed: false,
837+
removed,
831838
};
832839

833-
let topics_match = filter.topics().is_empty() || params.filter_topics(&log.topics);
834-
let address_match = filter
835-
.address
836-
.as_ref()
837-
.is_none_or(|_| params.filter_address(&log.address));
838-
if topics_match && address_match {
839-
log.block_hash = Some(block_hash);
840-
log.block_number = Some(block.header.number);
841-
log.transaction_hash = Some(transaction_hash);
842-
log.transaction_index = Some(U256::from(status.transaction_index));
843-
log.log_index = Some(U256::from(block_log_index));
844-
log.transaction_log_index = Some(U256::from(transaction_log_index));
840+
log.block_hash = Some(block_hash);
841+
log.block_number = Some(block.header.number);
842+
log.transaction_hash = Some(transaction_hash);
843+
log.transaction_index = Some(U256::from(status.transaction_index));
844+
log.log_index = Some(U256::from(block_log_index));
845+
log.transaction_log_index = Some(U256::from(transaction_log_index));
846+
if log_matches_filter(&params, &log, false) {
845847
logs.push(log);
846848
}
847849
transaction_log_index += 1;
@@ -850,3 +852,18 @@ pub(crate) fn filter_block_logs(
850852
}
851853
logs
852854
}
855+
856+
fn logs_journal_error(err: LogsJournalError) -> jsonrpsee::types::ErrorObjectOwned {
857+
match err {
858+
LogsJournalError::CursorTooOld {
859+
cursor,
860+
earliest_available,
861+
next_cursor,
862+
} => internal_err(format!(
863+
"log filter fell behind the retained reorg journal (cursor={cursor}, earliest={earliest_available}, next={next_cursor}); recreate the filter"
864+
)),
865+
LogsJournalError::IncompleteEntry { seq } => internal_err(format!(
866+
"log filter encountered an incomplete reorg journal entry at sequence {seq}; recreate the filter"
867+
)),
868+
}
869+
}

0 commit comments

Comments
 (0)