Skip to content

Commit 1df80c1

Browse files
committed
Optimize tx processing
Process every Safe together: - Less database sessions/transactions - Less database queries to get internal txs for every Safe
1 parent 4634018 commit 1df80c1

File tree

9 files changed

+225
-80
lines changed

9 files changed

+225
-80
lines changed

config/settings/base.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@
547547
"ETH_INTERNAL_TRACE_TXS_BATCH_SIZE", default=0
548548
) # Number of `trace_transaction` calls allowed in the same RPC batch call, as results can be quite big
549549
ETH_INTERNAL_TX_DECODED_PROCESS_BATCH = env.int(
550-
"ETH_INTERNAL_TX_DECODED_PROCESS_BATCH", default=500
550+
"ETH_INTERNAL_TX_DECODED_PROCESS_BATCH", default=5000
551551
) # Number of InternalTxDecoded to process together. Keep it low to be memory friendly
552552

553553
# Event indexing configuration (L2 and ERC20/721)
@@ -596,6 +596,10 @@
596596
PROCESSING_ENABLE_OUT_OF_ORDER_CHECK = env.bool(
597597
"PROCESSING_ENABLE_OUT_OF_ORDER_CHECK", default=True
598598
) # Enable out of order check, in case some transactions appear after a reindex so Safes don't get corrupt. Disabling it can speed up processing
599+
PROCESSING_ALL_SAFES_TOGETHER = env.bool(
600+
"PROCESSING_ALL_SAFES_TOGETHER", default=True
601+
) # Process every Safe together in the same task. More optimal, but one problematic Safe can stuck the others
602+
599603

600604
# Tokens
601605
# ------------------------------------------------------------------------------

safe_transaction_service/history/indexers/tx_processor.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import logging
66
from abc import ABC, abstractmethod
7-
from typing import Iterator, Optional, Sequence, Union
7+
from typing import Optional, Sequence, Union
88

99
from django.db import transaction
1010

@@ -376,23 +376,42 @@ def process_decoded_transaction(
376376

377377
@transaction.atomic
378378
def process_decoded_transactions(
379-
self, internal_txs_decoded: Iterator[InternalTxDecoded]
379+
self, internal_txs_decoded: Sequence[InternalTxDecoded]
380380
) -> list[bool]:
381381
"""
382382
Optimize to process multiple transactions in a batch
383+
383384
:param internal_txs_decoded:
384-
:return:
385+
:return: list of `True` if an element was processed correctly, `False` otherwise.
385386
"""
386-
internal_tx_ids = []
387387
results: list[bool] = []
388-
contract_addresses = set()
388+
if not internal_txs_decoded:
389+
return results
390+
391+
internal_tx_ids = []
392+
internal_txs_decoded_list = internal_txs_decoded
393+
contract_addresses = {
394+
internal_tx_decoded.internal_tx._from
395+
for internal_tx_decoded in internal_txs_decoded_list
396+
}
397+
banned_addresses = set(
398+
SafeContract.objects.get_banned_addresses(addresses=contract_addresses)
399+
)
389400

390401
try:
391-
for internal_tx_decoded in internal_txs_decoded:
402+
for internal_tx_decoded in internal_txs_decoded_list:
392403
contract_address = internal_tx_decoded.internal_tx._from
393-
contract_addresses.add(contract_address)
394404
internal_tx_ids.append(internal_tx_decoded.internal_tx_id)
395-
results.append(self.__process_decoded_transaction(internal_tx_decoded))
405+
if contract_address in banned_addresses:
406+
logger.info(
407+
"Ignoring decoded internal txs for banned safe %s",
408+
contract_address,
409+
)
410+
results.append(False)
411+
else:
412+
results.append(
413+
self.__process_decoded_transaction(internal_tx_decoded)
414+
)
396415

397416
# Set all as decoded in the same batch
398417
InternalTxDecoded.objects.filter(internal_tx__in=internal_tx_ids).update(

safe_transaction_service/history/models.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,17 +1195,17 @@ def pending_for_safes(self):
11951195
"""
11961196
:return: Pending `InternalTxDecoded` sorted by block number and then transaction index inside the block
11971197
"""
1198-
return self.not_processed().order_by_processing_queue()
1198+
return (
1199+
self.not_processed()
1200+
.order_by_processing_queue()
1201+
.select_related("internal_tx", "internal_tx__ethereum_tx")
1202+
)
11991203

12001204
def pending_for_safe(self, safe_address: ChecksumAddress):
12011205
"""
12021206
:return: Pending `InternalTxDecoded` sorted by block number and then transaction index inside the block
12031207
"""
1204-
return (
1205-
self.pending_for_safes()
1206-
.filter(internal_tx___from=safe_address)
1207-
.select_related("internal_tx", "internal_tx__ethereum_tx")
1208-
)
1208+
return self.pending_for_safes().filter(internal_tx___from=safe_address)
12091209

12101210
def safes_pending_to_be_processed(self) -> QuerySet[ChecksumAddress]:
12111211
"""
@@ -1814,12 +1814,28 @@ class Meta:
18141814

18151815

18161816
class SafeContractManager(models.Manager):
1817-
def get_banned_safes(self) -> QuerySet[ChecksumAddress]:
1818-
return self.filter(banned=True).values_list("address", flat=True)
1817+
def get_banned_addresses(
1818+
self, addresses: Optional[list[ChecksumAddress]] = None
1819+
) -> QuerySet[ChecksumAddress]:
1820+
return self.banned(addresses=addresses).values_list("address", flat=True)
1821+
1822+
1823+
class SafeContractQuerySet(models.QuerySet):
1824+
def banned(
1825+
self, addresses: Optional[list[ChecksumAddress]] = None
1826+
) -> QuerySet["SafeContract"]:
1827+
"""
1828+
:param addresses: If provided, only those `addresses` will be filtered.
1829+
:return: Banned addresses
1830+
"""
1831+
queryset = self.filter(banned=True)
1832+
if addresses:
1833+
queryset = queryset.filter(address__in=addresses)
1834+
return queryset
18191835

18201836

18211837
class SafeContract(models.Model):
1822-
objects = SafeContractManager()
1838+
objects = SafeContractManager.from_queryset(SafeContractQuerySet)()
18231839
created = models.DateTimeField(auto_now_add=True, db_index=True)
18241840
address = EthereumAddressBinaryField(primary_key=True)
18251841
ethereum_tx = models.ForeignKey(

safe_transaction_service/history/services/index_service.py

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,61 @@ def fix_out_of_order(
450450
SafeLastStatus.objects.filter(address=address).delete()
451451
logger.info("[%s] Ended fixing out of order", address)
452452

453-
def process_decoded_txs(self, safe_address: ChecksumAddress) -> int:
453+
def process_all_decoded_txs(self) -> int:
454+
"""
455+
Process all the pending `InternalTxDecoded` for every Safe
456+
457+
:return: Number of `InternalTxDecoded` processed
458+
"""
459+
# Use chunks for memory issues
460+
total_processed_txs = 0
461+
462+
# Don't check out of order multiple times for a Safe
463+
checked_out_of_order: set[ChecksumAddress] = set()
464+
465+
while True:
466+
internal_txs_decoded = list(
467+
InternalTxDecoded.objects.pending_for_safes()[
468+
: self.eth_internal_tx_decoded_process_batch
469+
]
470+
)
471+
if not internal_txs_decoded:
472+
break
473+
474+
# Check if a new decoded tx appeared before other already processed (due to a reindex)
475+
if self.processing_enable_out_of_order_check:
476+
safe_addresses_to_check = {
477+
internal_tx_decoded.internal_tx._from
478+
for internal_tx_decoded in internal_txs_decoded
479+
if internal_tx_decoded.internal_tx._from not in checked_out_of_order
480+
}
481+
logger.info(
482+
"Checking out of order transactions for %d Safes",
483+
len(safe_addresses_to_check),
484+
)
485+
for safe_address in safe_addresses_to_check:
486+
if InternalTxDecoded.objects.out_of_order_for_safe(safe_address):
487+
logger.error(
488+
"[%s] Found out of order transactions", safe_address
489+
)
490+
self.fix_out_of_order(
491+
safe_address,
492+
InternalTxDecoded.objects.pending_for_safe(safe_address)[
493+
0
494+
].internal_tx,
495+
)
496+
checked_out_of_order.add(safe_address)
497+
logger.info(
498+
"Checked out of order transactions for %d Safes",
499+
len(safe_addresses_to_check),
500+
)
501+
502+
total_processed_txs += len(
503+
self.tx_processor.process_decoded_transactions(internal_txs_decoded)
504+
)
505+
return total_processed_txs
506+
507+
def process_decoded_txs_for_safe(self, safe_address: ChecksumAddress) -> int:
454508
"""
455509
Process all the pending `InternalTxDecoded` for a Safe
456510
@@ -472,15 +526,15 @@ def process_decoded_txs(self, safe_address: ChecksumAddress) -> int:
472526
# Use chunks for memory issues
473527
total_processed_txs = 0
474528
while True:
475-
internal_txs_decoded_queryset = InternalTxDecoded.objects.pending_for_safe(
476-
safe_address
477-
)[: self.eth_internal_tx_decoded_process_batch]
478-
if not internal_txs_decoded_queryset:
529+
internal_txs_decoded = list(
530+
InternalTxDecoded.objects.pending_for_safe(safe_address)[
531+
: self.eth_internal_tx_decoded_process_batch
532+
]
533+
)
534+
if not internal_txs_decoded:
479535
break
480536
total_processed_txs += len(
481-
self.tx_processor.process_decoded_transactions(
482-
internal_txs_decoded_queryset
483-
)
537+
self.tx_processor.process_decoded_transactions(internal_txs_decoded)
484538
)
485539
return total_processed_txs
486540

safe_transaction_service/history/tasks.py

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
EthereumBlock,
3131
InternalTxDecoded,
3232
MultisigTransaction,
33-
SafeContract,
3433
SafeContractDelegate,
3534
)
3635
from .services import (
@@ -253,35 +252,35 @@ def index_safe_events_task(self) -> Optional[tuple[int, int]]:
253252
def process_decoded_internal_txs_task(self) -> Optional[int]:
254253
with contextlib.suppress(LockError):
255254
with only_one_running_task(self):
256-
logger.info("Start process decoded internal txs")
257-
count = 0
258-
banned_safes = set(SafeContract.objects.get_banned_safes())
259-
for (
260-
safe_to_process
261-
) in InternalTxDecoded.objects.safes_pending_to_be_processed().iterator():
262-
if safe_to_process in banned_safes:
263-
logger.info(
264-
"Ignoring decoded internal txs for banned safe %s",
265-
safe_to_process,
266-
)
267-
# Mark traces as processed so they are not reprocessed all the time
268-
# If not, `InternalTxDecoded` index with `decoded=True` can grow to
269-
# a point were `safes_pending_to_be_processed` takes minutes to complete
270-
InternalTxDecoded.objects.for_safe(
271-
safe_to_process
272-
).not_processed().update(processed=True)
273-
else:
255+
if settings.PROCESSING_ALL_SAFES_TOGETHER:
256+
# We can process all Safes together, big optimization
257+
logger.info(
258+
"Start process decoded internal txs for every Safe together"
259+
)
260+
index_service = IndexServiceProvider()
261+
return index_service.process_all_decoded_txs()
262+
else:
263+
# We need to process Safes individually
264+
logger.info(
265+
"Start process decoded internal txs for every Safe in a different task"
266+
)
267+
count = 0
268+
for (
269+
safe_to_process
270+
) in (
271+
InternalTxDecoded.objects.safes_pending_to_be_processed().iterator()
272+
):
274273
process_decoded_internal_txs_for_safe_task.delay(
275274
safe_to_process, reindex_master_copies=True
276275
)
277276
count += 1
278277

279-
if not count:
280-
logger.info("No Safes to process")
281-
else:
282-
logger.info("%d Safes to process", count)
283-
284-
return count
278+
(
279+
logger.info("%d Safes to process", count)
280+
if count
281+
else logger.info("No Safes to process")
282+
)
283+
return count
285284

286285

287286
@app.shared_task(bind=True)
@@ -301,7 +300,7 @@ def process_decoded_internal_txs_for_safe_task(
301300
with only_one_running_task(self, lock_name_suffix=safe_address):
302301
logger.info("[%s] Start processing decoded internal txs", safe_address)
303302
index_service: IndexService = IndexServiceProvider()
304-
number_processed = index_service.process_decoded_txs(safe_address)
303+
number_processed = index_service.process_decoded_txs_for_safe(safe_address)
305304
logger.info(
306305
"[%s] Processed %d decoded transactions", safe_address, number_processed
307306
)

safe_transaction_service/history/tests/test_index_service.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,28 +126,36 @@ def test_is_service_synced(self, current_block_number_mock: PropertyMock):
126126
current_block_number_mock.side_effect = RequestsConnectionError
127127
self.assertFalse(self.index_service.is_service_synced())
128128

129-
def test_process_decoded_txs(self):
129+
def test_process_decoded_txs_for_safe(self):
130130
safe_address = Account.create().address
131131
with mock.patch.object(
132132
IndexService, "fix_out_of_order"
133133
) as fix_out_of_order_mock:
134-
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)
134+
self.assertEqual(
135+
self.index_service.process_decoded_txs_for_safe(safe_address), 0
136+
)
135137
fix_out_of_order_mock.assert_not_called()
136138

137139
# Setup for a random Safe should not be processed
138140
InternalTxDecodedFactory(
139141
function_name="setup",
140142
)
141-
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)
143+
self.assertEqual(
144+
self.index_service.process_decoded_txs_for_safe(safe_address), 0
145+
)
142146

143147
setup_internal_tx = InternalTxDecodedFactory(
144148
function_name="setup",
145149
internal_tx___from=safe_address,
146150
)
147-
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 1)
151+
self.assertEqual(
152+
self.index_service.process_decoded_txs_for_safe(safe_address), 1
153+
)
148154
fix_out_of_order_mock.assert_not_called()
149155
# After processed, it should not be processed again
150-
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)
156+
self.assertEqual(
157+
self.index_service.process_decoded_txs_for_safe(safe_address), 0
158+
)
151159

152160
exec_transactions = [
153161
InternalTxDecodedFactory(
@@ -157,15 +165,21 @@ def test_process_decoded_txs(self):
157165
for _ in range(3)
158166
]
159167

160-
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 3)
168+
self.assertEqual(
169+
self.index_service.process_decoded_txs_for_safe(safe_address), 3
170+
)
161171
fix_out_of_order_mock.assert_not_called()
162172
# After processed, they should not be processed again
163-
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)
173+
self.assertEqual(
174+
self.index_service.process_decoded_txs_for_safe(safe_address), 0
175+
)
164176

165177
# Add a transaction out of order
166178
exec_transactions[1].processed = False
167179
exec_transactions[1].save(update_fields=["processed"])
168-
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 1)
180+
self.assertEqual(
181+
self.index_service.process_decoded_txs_for_safe(safe_address), 1
182+
)
169183
# Out of order transaction was detected
170184
fix_out_of_order_mock.assert_called_with(
171185
safe_address, exec_transactions[1].internal_tx

0 commit comments

Comments
 (0)