Skip to content

Commit 2956a46

Browse files
committed
feat: Add conditional indexing to filter Safes by initiator address (#2703)
Implement selective Safe indexing on L2 networks to reduce database size and improve indexing speed by filtering out Safes created by specific initiators. Changes: - Add ETH_EVENTS_IGNORED_INITIATORS setting to define blocklisted initiators - Add ETH_EVENTS_CONDITIONAL_INDEXING flag (auto-enabled when blocklist is set) - Create SafeToIndex model for manual allowlist management - Override process_elements in SafeEventsIndexer to filter by tx._from before storing EthereumTx records - Optimize RPC calls: fetch transactions first, filter, then fetch receipts only for allowed transactions - Add safe_allowlist management command (add/remove/list/count) - Add Django admin for SafeToIndex model - Add comprehensive tests for conditional indexing and management command Closes #2703
1 parent 6056eff commit 2956a46

File tree

8 files changed

+707
-3
lines changed

8 files changed

+707
-3
lines changed

config/settings/base.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,13 @@
609609
ETH_ERC20_LOAD_ADDRESSES_CHUNK_SIZE = env.int(
610610
"ETH_ERC20_LOAD_ADDRESSES_CHUNK_SIZE", default=500_000
611611
) # Load Safe addresses for the ERC20 indexer with a database iterator with the defined `chunk_size`
612+
ETH_EVENTS_IGNORED_INITIATORS: set[ChecksumAddress] = {
613+
ChecksumAddress(HexAddress(HexStr(address)))
614+
for address in env.list("ETH_EVENTS_IGNORED_INITIATORS", default=[])
615+
} # Initiator addresses whose created Safes should be ignored during L2 indexing
616+
617+
# Auto-enabled when any conditional indexing selector is configured (e.g., ETH_EVENTS_IGNORED_INITIATORS)
618+
ETH_EVENTS_CONDITIONAL_INDEXING: bool = bool(ETH_EVENTS_IGNORED_INITIATORS)
612619

613620
# ENABLE/DISABLE COLLECTIBLES DOWNLOAD METADATA, enable=True, disabled by default
614621
COLLECTIBLES_ENABLE_DOWNLOAD_METADATA = env.bool(

safe_transaction_service/history/admin.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
SafeLastStatus,
3333
SafeMasterCopy,
3434
SafeStatus,
35+
SafeToIndex,
3536
)
3637
from .services import IndexServiceProvider
3738
from .utils import HexField
@@ -555,6 +556,13 @@ class SafeContractDelegateAdmin(AdvancedAdminSearchMixin, admin.ModelAdmin):
555556
search_fields = ["==safe_contract__address", "==delegate", "==delegator"]
556557

557558

559+
@admin.register(SafeToIndex)
560+
class SafeToIndexAdmin(AdvancedAdminSearchMixin, admin.ModelAdmin):
561+
list_display = ("address",)
562+
ordering = ["address"]
563+
search_fields = ["==address"]
564+
565+
558566
class SafeStatusModulesListFilter(admin.SimpleListFilter):
559567
title = "Modules enabled in Safe"
560568
parameter_name = "enabled_modules"

safe_transaction_service/history/indexers/safe_events_indexer.py

Lines changed: 203 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import datetime
2+
from collections import OrderedDict
3+
from collections.abc import Sequence
24
from functools import cached_property
35
from logging import getLogger
46
from typing import Any
@@ -20,10 +22,11 @@
2022
)
2123
from safe_eth.util.util import to_0x_hex_str
2224
from web3.contract.contract import ContractEvent
23-
from web3.types import EventData
25+
from web3.types import EventData, LogReceipt, TxData, TxReceipt
2426

2527
from ..models import (
2628
EthereumBlock,
29+
EthereumTx,
2730
EthereumTxCallType,
2831
InternalTx,
2932
InternalTxDecoded,
@@ -70,6 +73,122 @@ def __init__(self, *args, **kwargs):
7073
self.block_hashes_with_timestamp: dict[bytes, datetime.datetime] = {}
7174
super().__init__(*args, **kwargs)
7275

76+
def process_elements(self, log_receipts: Sequence[LogReceipt]) -> list[Any]:
77+
"""
78+
Override to filter events by tx._from when conditional indexing is enabled.
79+
This avoids storing EthereumTx records for blocklisted initiators.
80+
"""
81+
if not settings.ETH_EVENTS_CONDITIONAL_INDEXING:
82+
return super().process_elements(log_receipts)
83+
84+
# --- Conditional indexing enabled ---
85+
if not log_receipts:
86+
return []
87+
88+
ignored_initiators = settings.ETH_EVENTS_IGNORED_INITIATORS
89+
if not ignored_initiators:
90+
# No blocklist configured, use standard flow
91+
return super().process_elements(log_receipts)
92+
93+
logger.debug("Conditional indexing: filtering events by tx._from")
94+
95+
# 1. Filter already processed log receipts
96+
not_processed_log_receipts = [
97+
log_receipt
98+
for log_receipt in log_receipts
99+
if not self.element_already_processed_checker.is_processed(
100+
log_receipt["transactionHash"],
101+
log_receipt["blockHash"],
102+
log_receipt["logIndex"],
103+
)
104+
]
105+
106+
if not not_processed_log_receipts:
107+
return []
108+
109+
# 2. Get unique tx_hashes preserving order
110+
tx_hashes = list(
111+
OrderedDict.fromkeys(
112+
HexBytes(r["transactionHash"]) for r in not_processed_log_receipts
113+
).keys()
114+
)
115+
116+
# 3. Check DB for existing txs
117+
db_txs: dict[bytes, EthereumTx] = {
118+
HexBytes(tx.tx_hash): tx
119+
for tx in EthereumTx.objects.filter(tx_hash__in=tx_hashes).exclude(
120+
block=None
121+
)
122+
}
123+
logger.debug("Conditional indexing: found %d existing txs in DB", len(db_txs))
124+
125+
# 4. Fetch missing txs from RPC (without receipts - we'll fetch those only for allowed txs)
126+
missing_hashes = [h for h in tx_hashes if h not in db_txs]
127+
logger.debug(
128+
"Conditional indexing: fetching %d missing txs from RPC",
129+
len(missing_hashes),
130+
)
131+
fetched_txs = self._fetch_txs(missing_hashes)
132+
133+
# 5. Filter by _from (blocklist check)
134+
allowed_tx_hashes: set[bytes] = set()
135+
136+
# Check existing DB txs
137+
for tx_hash, db_tx in db_txs.items():
138+
if db_tx._from not in ignored_initiators:
139+
allowed_tx_hashes.add(tx_hash)
140+
else:
141+
logger.debug(
142+
"Conditional indexing: filtering existing tx %s from blocklisted initiator %s",
143+
to_0x_hex_str(tx_hash),
144+
db_tx._from,
145+
)
146+
147+
# Check fetched txs, filter allowed ones
148+
allowed_fetched_txs: list[TxData] = []
149+
for tx in fetched_txs:
150+
tx_from = tx["from"]
151+
if tx_from not in ignored_initiators:
152+
allowed_fetched_txs.append(tx)
153+
allowed_tx_hashes.add(HexBytes(tx["hash"]))
154+
else:
155+
logger.debug(
156+
"Conditional indexing: filtering tx %s from blocklisted initiator %s",
157+
to_0x_hex_str(tx["hash"]),
158+
tx_from,
159+
)
160+
161+
logger.debug(
162+
"Conditional indexing: %d/%d txs allowed after filtering",
163+
len(allowed_tx_hashes),
164+
len(tx_hashes),
165+
)
166+
167+
# 6. Fetch receipts only for allowed txs and store them
168+
if allowed_fetched_txs:
169+
self._fetch_receipts_and_store(allowed_fetched_txs)
170+
171+
# 7. Filter log_receipts to only allowed txs
172+
filtered_log_receipts = [
173+
r
174+
for r in not_processed_log_receipts
175+
if HexBytes(r["transactionHash"]) in allowed_tx_hashes
176+
]
177+
178+
# 8. Decode and process
179+
decoded_elements = self.decode_elements(filtered_log_receipts)
180+
processed_elements = self._process_decoded_elements(decoded_elements)
181+
182+
# 9. Mark ALL original receipts as processed (so we don't re-fetch blocked ones)
183+
for log_receipt in not_processed_log_receipts:
184+
self.element_already_processed_checker.mark_as_processed(
185+
log_receipt["transactionHash"],
186+
log_receipt["blockHash"],
187+
log_receipt["logIndex"],
188+
)
189+
190+
return processed_elements
191+
73192
@cached_property
74193
def contract_events(self) -> list[ContractEvent]:
75194
"""
@@ -516,6 +635,9 @@ def _process_safe_creation_events(
516635
- SafeSetup
517636
- ProxyCreation
518637
638+
Note: When conditional indexing is enabled, events are filtered by tx._from
639+
in process_elements() before this method is called.
640+
519641
:param safe_addresses_with_creation_events:
520642
:return: Generated InternalTxs for safe creation
521643
"""
@@ -618,9 +740,11 @@ def _process_safe_creation_events(
618740
internal_txs_decoded.append(internal_tx_decoded)
619741

620742
logger.debug("InternalTx and InternalTxDecoded objects for creation were built")
621-
return InternalTx.objects.store_internal_txs_and_decoded_in_db(
743+
744+
stored_internal_txs = InternalTx.objects.store_internal_txs_and_decoded_in_db(
622745
internal_txs, internal_txs_decoded
623746
)
747+
return stored_internal_txs
624748

625749
def _prefetch_timestamp_for_blocks(
626750
self, decoded_elements: list[EventData]
@@ -646,6 +770,81 @@ def _prefetch_timestamp_for_blocks(
646770
logger.debug("Ended prefetching timestamp for every block hash")
647771
return block_hashes_with_timestamp
648772

773+
def _fetch_txs(self, tx_hashes: list[bytes]) -> list[TxData]:
774+
"""
775+
Fetch transactions from RPC without receipts.
776+
Used for conditional indexing to check tx._from before deciding to fetch receipts.
777+
778+
:param tx_hashes: List of transaction hashes to fetch
779+
:return: List of transactions
780+
"""
781+
if not tx_hashes:
782+
return []
783+
784+
txs: list[TxData] = []
785+
for tx_hash, tx in zip(
786+
tx_hashes,
787+
self.ethereum_client.get_transactions(tx_hashes),
788+
strict=False,
789+
):
790+
tx = tx or self.ethereum_client.get_transaction(tx_hash) # Retry if failed
791+
if tx:
792+
txs.append(tx)
793+
794+
return txs
795+
796+
def _fetch_receipts_and_store(self, txs: list[TxData]) -> None:
797+
"""
798+
Fetch receipts for allowed transactions and store them in the database.
799+
Called after filtering by tx._from to avoid fetching receipts for blocklisted txs.
800+
801+
:param txs: List of allowed transactions to fetch receipts for and store
802+
"""
803+
if not txs:
804+
return
805+
806+
tx_hashes = [tx["hash"] for tx in txs]
807+
808+
# Fetch receipts for allowed transactions
809+
logger.debug(
810+
"Conditional indexing: fetching %d receipts for allowed txs",
811+
len(tx_hashes),
812+
)
813+
receipts: list[TxReceipt] = []
814+
for tx_hash, tx_receipt in zip(
815+
tx_hashes,
816+
self.ethereum_client.get_transaction_receipts(tx_hashes),
817+
strict=False,
818+
):
819+
tx_receipt = tx_receipt or self.ethereum_client.get_transaction_receipt(
820+
tx_hash
821+
) # Retry if failed
822+
if tx_receipt:
823+
receipts.append(tx_receipt)
824+
825+
# Collect block hashes
826+
block_hashes = {to_0x_hex_str(tx["blockHash"]) for tx in txs}
827+
828+
# Create blocks
829+
logger.debug("Conditional indexing: inserting %d blocks", len(block_hashes))
830+
_, blocks = self.index_service.txs_create_or_update_from_block_hashes(
831+
block_hashes
832+
)
833+
834+
# Create EthereumTx records
835+
logger.debug("Conditional indexing: inserting %d transactions", len(txs))
836+
ethereum_txs_to_insert = [
837+
EthereumTx.objects.from_tx_dict(tx, receipt)
838+
for tx, receipt in zip(txs, receipts, strict=False)
839+
]
840+
EthereumTx.objects.bulk_create_from_generator(
841+
iter(ethereum_txs_to_insert), ignore_conflicts=True
842+
)
843+
844+
# Set block references
845+
for ethereum_tx, tx in zip(ethereum_txs_to_insert, txs, strict=False):
846+
ethereum_tx.block = blocks[tx["blockHash"]]
847+
649848
def _process_decoded_elements(self, decoded_elements: list[EventData]) -> list[Any]:
650849
processed_elements = []
651850

@@ -659,6 +858,8 @@ def _process_decoded_elements(self, decoded_elements: list[EventData]) -> list[A
659858
)
660859
if safe_addresses_creation_events:
661860
# Process safe creation events
861+
# Note: When conditional indexing is enabled, events are already filtered
862+
# by tx._from in process_elements() before reaching this point
662863
creation_events_processed = self._process_safe_creation_events(
663864
safe_addresses_creation_events
664865
)

0 commit comments

Comments
 (0)