Skip to content

Commit bd27c00

Browse files
committed
feat: Add conditional indexing to filter Safes by initiator address (#2703)
Implement selective Safe indexing on L2 networks to reduce database size and improve indexing speed by filtering out Safes created by specific initiators. Changes: - Add ETH_EVENTS_IGNORED_INITIATORS setting to define blocklisted initiators - Override process_elements in SafeEventsIndexer to filter by tx._from before storing EthereumTx records - Optimize RPC calls: fetch transactions first, filter, then fetch receipts only for allowed transactions - Add comprehensive tests for conditional indexing Closes #2703
1 parent a4428ab commit bd27c00

File tree

4 files changed

+536
-9
lines changed

4 files changed

+536
-9
lines changed

config/settings/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,11 @@
609609
ETH_ERC20_LOAD_ADDRESSES_CHUNK_SIZE = env.int(
610610
"ETH_ERC20_LOAD_ADDRESSES_CHUNK_SIZE", default=500_000
611611
) # Load Safe addresses for the ERC20 indexer with a database iterator with the defined `chunk_size`
612+
ETH_EVENTS_IGNORED_INITIATORS: set[ChecksumAddress] = {
613+
ChecksumAddress(HexAddress(HexStr(address)))
614+
for address in env.list("ETH_EVENTS_IGNORED_INITIATORS", default=[])
615+
} # Initiator addresses whose created Safes should be ignored during L2 indexing
616+
612617

613618
# ENABLE/DISABLE COLLECTIBLES DOWNLOAD METADATA, enable=True, disabled by default
614619
COLLECTIBLES_ENABLE_DOWNLOAD_METADATA = env.bool(

safe_transaction_service/history/indexers/safe_events_indexer.py

Lines changed: 279 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import datetime
2+
from collections import OrderedDict
3+
from collections.abc import Sequence
24
from functools import cached_property
35
from logging import getLogger
4-
from typing import Any
56

67
from django.conf import settings
78

@@ -20,14 +21,16 @@
2021
)
2122
from safe_eth.util.util import to_0x_hex_str
2223
from web3.contract.contract import ContractEvent
23-
from web3.types import EventData
24+
from web3.types import EventData, LogReceipt, TxData, TxReceipt
2425

2526
from ..models import (
2627
EthereumBlock,
28+
EthereumTx,
2729
EthereumTxCallType,
2830
InternalTx,
2931
InternalTxDecoded,
3032
InternalTxType,
33+
SafeContract,
3134
SafeMasterCopy,
3235
SafeRelevantTransaction,
3336
)
@@ -65,11 +68,133 @@ def __init__(self, *args, **kwargs):
6568
kwargs.setdefault(
6669
"eth_zksync_compatible_network", settings.ETH_ZKSYNC_COMPATIBLE_NETWORK
6770
)
71+
kwargs.setdefault("ignored_initiators", settings.ETH_EVENTS_IGNORED_INITIATORS)
72+
6873
self.eth_zksync_compatible_network = kwargs["eth_zksync_compatible_network"]
74+
self.ignored_initiators = kwargs["ignored_initiators"]
75+
self.conditional_indexing_enabled = bool(self.ignored_initiators)
6976
# Cache timestamp for block hashes
7077
self.block_hashes_with_timestamp: dict[bytes, datetime.datetime] = {}
7178
super().__init__(*args, **kwargs)
7279

80+
def process_elements(self, log_receipts: Sequence[LogReceipt]) -> list[InternalTx]:
81+
"""
82+
Override to filter events by tx._from when conditional indexing is enabled.
83+
This avoids storing EthereumTx in database for blocklisted initiators.
84+
"""
85+
if not log_receipts:
86+
return []
87+
88+
if not self.ignored_initiators:
89+
# No blocklist configured, use standard flow
90+
return super().process_elements(log_receipts)
91+
92+
# --- Conditional indexing enabled ---
93+
logger.debug("Conditional indexing: filtering events by tx._from")
94+
95+
# 1. Filter already processed log receipts
96+
not_processed_log_receipts = [
97+
log_receipt
98+
for log_receipt in log_receipts
99+
if not self.element_already_processed_checker.is_processed(
100+
log_receipt["transactionHash"],
101+
log_receipt["blockHash"],
102+
log_receipt["logIndex"],
103+
)
104+
]
105+
106+
if not not_processed_log_receipts:
107+
return []
108+
109+
# 2. Get unique tx_hashes preserving order
110+
tx_hashes = list(
111+
OrderedDict.fromkeys(
112+
HexBytes(r["transactionHash"]) for r in not_processed_log_receipts
113+
).keys()
114+
)
115+
116+
# 3. Check DB for existing txs
117+
db_txs: dict[bytes, EthereumTx] = {
118+
HexBytes(tx.tx_hash): tx
119+
for tx in EthereumTx.objects.filter(tx_hash__in=tx_hashes).exclude(
120+
block=None
121+
)
122+
}
123+
logger.debug("Conditional indexing: found %d existing txs in DB", len(db_txs))
124+
125+
# 4. Fetch missing txs from RPC (without receipts - we'll fetch those only for allowed txs)
126+
missing_hashes = [tx_hash for tx_hash in tx_hashes if tx_hash not in db_txs]
127+
logger.debug(
128+
"Conditional indexing: fetching %d missing txs from RPC",
129+
len(missing_hashes),
130+
)
131+
fetched_txs = self._fetch_txs(missing_hashes)
132+
133+
# 5. Filter by _from (blocklist check)
134+
allowed_tx_hashes: set[bytes] = set()
135+
136+
# Check existing DB txs
137+
for tx_hash, db_tx in db_txs.items():
138+
if db_tx._from not in self.ignored_initiators:
139+
allowed_tx_hashes.add(tx_hash)
140+
else:
141+
logger.debug(
142+
"Conditional indexing: filtering existing tx %s from blocklisted initiator %s",
143+
to_0x_hex_str(tx_hash),
144+
db_tx._from,
145+
)
146+
147+
# Check fetched txs, filter allowed ones
148+
allowed_fetched_txs: list[TxData] = []
149+
for tx in fetched_txs:
150+
tx_from = tx["from"]
151+
if tx_from not in self.ignored_initiators:
152+
allowed_fetched_txs.append(tx)
153+
allowed_tx_hashes.add(HexBytes(tx["hash"]))
154+
else:
155+
logger.debug(
156+
"Conditional indexing: filtering tx %s from blocklisted initiator %s",
157+
to_0x_hex_str(tx["hash"]),
158+
tx_from,
159+
)
160+
161+
logger.debug(
162+
"Conditional indexing: %d/%d txs allowed after filtering",
163+
len(allowed_tx_hashes),
164+
len(tx_hashes),
165+
)
166+
167+
# 6. Fetch receipts only for allowed txs and store them
168+
if allowed_fetched_txs:
169+
number_allowed_txs_inserted = self._fetch_receipts_and_store(
170+
allowed_fetched_txs
171+
)
172+
logger.debug(
173+
"Conditional indexing: %d allowed txs inserted",
174+
number_allowed_txs_inserted,
175+
)
176+
177+
# 7. Filter log_receipts to only allowed txs
178+
filtered_log_receipts = [
179+
log_receipt
180+
for log_receipt in not_processed_log_receipts
181+
if HexBytes(log_receipt["transactionHash"]) in allowed_tx_hashes
182+
]
183+
184+
# 8. Decode and process
185+
decoded_elements = self.decode_elements(filtered_log_receipts)
186+
processed_elements = self._process_decoded_elements(decoded_elements)
187+
188+
# 9. Mark ALL original receipts as processed (so we don't re-fetch blocked ones)
189+
for log_receipt in not_processed_log_receipts:
190+
self.element_already_processed_checker.mark_as_processed(
191+
log_receipt["transactionHash"],
192+
log_receipt["blockHash"],
193+
log_receipt["logIndex"],
194+
)
195+
196+
return processed_elements
197+
73198
@cached_property
74199
def contract_events(self) -> list[ContractEvent]:
75200
"""
@@ -545,6 +670,9 @@ def _process_safe_creation_events(
545670
logger.debug(
546671
"InternalTx and InternalTxDecoded objects for creation will be built"
547672
)
673+
# Track Safe addresses and their creation tx hashes for SafeContract creation
674+
safe_address_to_tx_hash: dict[ChecksumAddress, bytes] = {}
675+
548676
for safe_address in addresses_to_index:
549677
events = safe_addresses_with_creation_events[safe_address]
550678

@@ -617,11 +745,33 @@ def _process_safe_creation_events(
617745
internal_txs.append(internal_tx)
618746
internal_txs_decoded.append(internal_tx_decoded)
619747

748+
# Track for SafeContract creation
749+
safe_address_to_tx_hash[safe_address] = setup_event["transactionHash"]
750+
620751
logger.debug("InternalTx and InternalTxDecoded objects for creation were built")
621-
return InternalTx.objects.store_internal_txs_and_decoded_in_db(
752+
753+
stored_internal_txs = InternalTx.objects.store_internal_txs_and_decoded_in_db(
622754
internal_txs, internal_txs_decoded
623755
)
624756

757+
# Create SafeContract entries for newly created Safes
758+
# This ensures SafeContract exists before non-creation events are filtered
759+
# (when conditional indexing is enabled)
760+
if safe_address_to_tx_hash:
761+
logger.debug(
762+
"Creating %d SafeContract entries for new Safes",
763+
len(safe_address_to_tx_hash),
764+
)
765+
SafeContract.objects.bulk_create(
766+
[
767+
SafeContract(address=safe_address, ethereum_tx_id=tx_hash)
768+
for safe_address, tx_hash in safe_address_to_tx_hash.items()
769+
],
770+
ignore_conflicts=True, # Safe may already exist from previous indexing
771+
)
772+
773+
return stored_internal_txs
774+
625775
def _prefetch_timestamp_for_blocks(
626776
self, decoded_elements: list[EventData]
627777
) -> dict[bytes, datetime.datetime]:
@@ -646,8 +796,92 @@ def _prefetch_timestamp_for_blocks(
646796
logger.debug("Ended prefetching timestamp for every block hash")
647797
return block_hashes_with_timestamp
648798

649-
def _process_decoded_elements(self, decoded_elements: list[EventData]) -> list[Any]:
650-
processed_elements = []
799+
def _fetch_txs(self, tx_hashes: list[bytes]) -> list[TxData]:
800+
"""
801+
Fetch transactions from RPC without receipts.
802+
Used for conditional indexing to check tx._from before deciding to fetch receipts.
803+
804+
:param tx_hashes: List of transaction hashes to fetch
805+
:return: List of transactions
806+
"""
807+
if not tx_hashes:
808+
return []
809+
810+
txs: list[TxData] = []
811+
for tx_hash, tx in zip(
812+
tx_hashes,
813+
self.ethereum_client.get_transactions(tx_hashes),
814+
strict=False,
815+
):
816+
tx = tx or self.ethereum_client.get_transaction(tx_hash) # Retry if failed
817+
if tx:
818+
txs.append(tx)
819+
820+
return txs
821+
822+
def _fetch_receipts_and_store(self, txs: list[TxData]) -> int:
823+
"""
824+
Fetch receipts for allowed transactions and store them in the database.
825+
Called after filtering by tx._from to avoid fetching receipts for blocklisted txs.
826+
827+
:param txs: List of allowed transactions to fetch receipts for and store
828+
:return: Number of transactions inserted
829+
"""
830+
if not txs:
831+
return 0
832+
833+
tx_hashes = [tx["hash"] for tx in txs]
834+
835+
# Fetch receipts for allowed transactions
836+
logger.debug(
837+
"Conditional indexing: fetching %d receipts for allowed txs",
838+
len(tx_hashes),
839+
)
840+
841+
# Build list of (tx, receipt) pairs, only including successful receipt fetches
842+
txs_with_receipts: list[tuple[TxData, TxReceipt]] = []
843+
for tx, tx_receipt in zip(
844+
txs,
845+
self.ethereum_client.get_transaction_receipts(tx_hashes),
846+
strict=False,
847+
):
848+
tx_receipt = tx_receipt or self.ethereum_client.get_transaction_receipt(
849+
tx["hash"]
850+
) # Retry if failed
851+
if tx_receipt:
852+
txs_with_receipts.append((tx, tx_receipt))
853+
else:
854+
logger.warning(
855+
"Conditional indexing: failed to fetch receipt for tx %s",
856+
to_0x_hex_str(tx["hash"]),
857+
)
858+
859+
if not txs_with_receipts:
860+
return 0
861+
862+
# Collect block hashes only from txs with successful receipts
863+
block_hashes = {to_0x_hex_str(tx["blockHash"]) for tx, _ in txs_with_receipts}
864+
865+
# Create blocks
866+
logger.debug("Conditional indexing: inserting %d blocks", len(block_hashes))
867+
self.index_service.txs_create_or_update_from_block_hashes(block_hashes)
868+
869+
# Create EthereumTx records
870+
logger.debug(
871+
"Conditional indexing: inserting %d transactions", len(txs_with_receipts)
872+
)
873+
ethereum_txs_to_insert = [
874+
EthereumTx.objects.from_tx_dict(tx, receipt)
875+
for tx, receipt in txs_with_receipts
876+
]
877+
return EthereumTx.objects.bulk_create_from_generator(
878+
iter(ethereum_txs_to_insert), ignore_conflicts=True
879+
)
880+
881+
def _process_decoded_elements(
882+
self, decoded_elements: list[EventData]
883+
) -> list[InternalTx]:
884+
processed_elements: list[InternalTx] = []
651885

652886
self.block_hashes_with_timestamp = self._prefetch_timestamp_for_blocks(
653887
decoded_elements
@@ -659,18 +893,52 @@ def _process_decoded_elements(self, decoded_elements: list[EventData]) -> list[A
659893
)
660894
if safe_addresses_creation_events:
661895
# Process safe creation events
896+
# Note: When conditional indexing is enabled, events are already filtered
897+
# by tx._from in process_elements() before reaching this point
662898
creation_events_processed = self._process_safe_creation_events(
663899
safe_addresses_creation_events
664900
)
665901
processed_elements.extend(creation_events_processed)
666902

903+
elements_to_process = [
904+
element
905+
for element in decoded_elements
906+
if element["event"] not in ("SafeSetup", "ProxyCreation")
907+
]
908+
909+
# When conditional indexing is enabled, only process events for Safes
910+
# that exist in SafeContract table
911+
if self.conditional_indexing_enabled:
912+
# Get all unique Safe addresses from non-creation events
913+
non_creation_addresses = {
914+
element["address"] for element in elements_to_process
915+
}
916+
if non_creation_addresses:
917+
# Check which addresses exist in SafeContract
918+
existing_addresses = SafeContract.objects.get_existing_addresses(
919+
non_creation_addresses
920+
)
921+
len_non_creation_addresses = len(non_creation_addresses)
922+
len_existing_addresses = len(existing_addresses)
923+
logger.debug(
924+
"Conditional indexing: %d/%d Safes will be processed",
925+
len_existing_addresses,
926+
len_non_creation_addresses,
927+
)
928+
# Filter elements to only those with existing SafeContract
929+
elements_to_process = [
930+
element
931+
for element in elements_to_process
932+
if element["address"] in existing_addresses
933+
]
934+
667935
# Store everything together in the database if possible
668936
logger.debug("InternalTx and InternalTx for non creation events will be built")
669937
internal_txs_to_insert: list[InternalTx] = []
670938
internal_txs_decoded_to_insert: list[InternalTxDecoded] = []
671939
safe_relevant_txs: list[SafeRelevantTransaction] = []
672940
# Process the rest of Safe events. Store all together
673-
for decoded_element in decoded_elements:
941+
for decoded_element in elements_to_process:
674942
elements_to_insert = self._process_decoded_element(decoded_element)
675943
for element_to_insert in elements_to_insert:
676944
if isinstance(element_to_insert, InternalTx):
@@ -681,14 +949,16 @@ def _process_decoded_elements(self, decoded_elements: list[EventData]) -> list[A
681949
safe_relevant_txs.append(element_to_insert)
682950
logger.debug("InternalTx and InternalTx for non creation events were built")
683951

684-
stored_internal_txs = InternalTx.objects.store_internal_txs_and_decoded_in_db(
685-
internal_txs_to_insert, internal_txs_decoded_to_insert
952+
stored_internal_txs: list[InternalTx] = (
953+
InternalTx.objects.store_internal_txs_and_decoded_in_db(
954+
internal_txs_to_insert, internal_txs_decoded_to_insert
955+
)
686956
)
687957
logger.debug("Inserting %d SafeRelevantTransaction", len(safe_relevant_txs))
688958
SafeRelevantTransaction.objects.bulk_create(
689959
safe_relevant_txs, ignore_conflicts=True
690960
)
691-
logger.debug("Inserted SafeRelevantTransaction")
961+
logger.debug("Inserted %d SafeRelevantTransaction", len(safe_relevant_txs))
692962

693963
processed_elements.extend(stored_internal_txs)
694964
return processed_elements

0 commit comments

Comments
 (0)