Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@
"ETH_INTERNAL_TRACE_TXS_BATCH_SIZE", default=0
) # Number of `trace_transaction` calls allowed in the same RPC batch call, as results can be quite big
ETH_INTERNAL_TX_DECODED_PROCESS_BATCH = env.int(
"ETH_INTERNAL_TX_DECODED_PROCESS_BATCH", default=500
"ETH_INTERNAL_TX_DECODED_PROCESS_BATCH", default=5000
) # Number of InternalTxDecoded to process together. Keep it low to be memory friendly

# Event indexing configuration (L2 and ERC20/721)
Expand Down Expand Up @@ -596,6 +596,10 @@
PROCESSING_ENABLE_OUT_OF_ORDER_CHECK = env.bool(
"PROCESSING_ENABLE_OUT_OF_ORDER_CHECK", default=True
) # Enable out of order check, in case some transactions appear after a reindex so Safes don't get corrupt. Disabling it can speed up processing
PROCESSING_ALL_SAFES_TOGETHER = env.bool(
"PROCESSING_ALL_SAFES_TOGETHER", default=True
) # Process every Safe together in the same task. More optimal, but one problematic Safe can stuck the others


# Tokens
# ------------------------------------------------------------------------------
Expand Down
35 changes: 27 additions & 8 deletions safe_transaction_service/history/indexers/tx_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging
from abc import ABC, abstractmethod
from typing import Iterator, Optional, Sequence, Union
from typing import Optional, Sequence, Union

from django.db import transaction

Expand Down Expand Up @@ -376,23 +376,42 @@ def process_decoded_transaction(

@transaction.atomic
def process_decoded_transactions(
self, internal_txs_decoded: Iterator[InternalTxDecoded]
self, internal_txs_decoded: Sequence[InternalTxDecoded]
) -> list[bool]:
"""
Optimize to process multiple transactions in a batch

:param internal_txs_decoded:
:return:
:return: list of `True` if an element was processed correctly, `False` otherwise.
"""
internal_tx_ids = []
results: list[bool] = []
contract_addresses = set()
if not internal_txs_decoded:
return results

internal_tx_ids = []
internal_txs_decoded_list = internal_txs_decoded
contract_addresses = {
internal_tx_decoded.internal_tx._from
for internal_tx_decoded in internal_txs_decoded_list
}
banned_addresses = set(
SafeContract.objects.get_banned_addresses(addresses=contract_addresses)
)

try:
for internal_tx_decoded in internal_txs_decoded:
for internal_tx_decoded in internal_txs_decoded_list:
contract_address = internal_tx_decoded.internal_tx._from
contract_addresses.add(contract_address)
internal_tx_ids.append(internal_tx_decoded.internal_tx_id)
results.append(self.__process_decoded_transaction(internal_tx_decoded))
if contract_address in banned_addresses:
logger.info(
"Ignoring decoded internal txs for banned safe %s",
contract_address,
)
results.append(False)
else:
results.append(
self.__process_decoded_transaction(internal_tx_decoded)
)

# Set all as decoded in the same batch
InternalTxDecoded.objects.filter(internal_tx__in=internal_tx_ids).update(
Expand Down
34 changes: 25 additions & 9 deletions safe_transaction_service/history/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1195,17 +1195,17 @@ def pending_for_safes(self):
"""
:return: Pending `InternalTxDecoded` sorted by block number and then transaction index inside the block
"""
return self.not_processed().order_by_processing_queue()
return (
self.not_processed()
.order_by_processing_queue()
.select_related("internal_tx", "internal_tx__ethereum_tx")
)

def pending_for_safe(self, safe_address: ChecksumAddress):
"""
:return: Pending `InternalTxDecoded` sorted by block number and then transaction index inside the block
"""
return (
self.pending_for_safes()
.filter(internal_tx___from=safe_address)
.select_related("internal_tx", "internal_tx__ethereum_tx")
)
return self.pending_for_safes().filter(internal_tx___from=safe_address)

def safes_pending_to_be_processed(self) -> QuerySet[ChecksumAddress]:
"""
Expand Down Expand Up @@ -1814,12 +1814,28 @@ class Meta:


class SafeContractManager(models.Manager):
def get_banned_safes(self) -> QuerySet[ChecksumAddress]:
return self.filter(banned=True).values_list("address", flat=True)
def get_banned_addresses(
self, addresses: Optional[list[ChecksumAddress]] = None
) -> QuerySet[ChecksumAddress]:
return self.banned(addresses=addresses).values_list("address", flat=True)


class SafeContractQuerySet(models.QuerySet):
def banned(
self, addresses: Optional[list[ChecksumAddress]] = None
) -> QuerySet["SafeContract"]:
"""
:param addresses: If provided, only those `addresses` will be filtered.
:return: Banned addresses
"""
queryset = self.filter(banned=True)
if addresses:
queryset = queryset.filter(address__in=addresses)
return queryset


class SafeContract(models.Model):
objects = SafeContractManager()
objects = SafeContractManager.from_queryset(SafeContractQuerySet)()
created = models.DateTimeField(auto_now_add=True, db_index=True)
address = EthereumAddressBinaryField(primary_key=True)
ethereum_tx = models.ForeignKey(
Expand Down
70 changes: 62 additions & 8 deletions safe_transaction_service/history/services/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,61 @@ def fix_out_of_order(
SafeLastStatus.objects.filter(address=address).delete()
logger.info("[%s] Ended fixing out of order", address)

def process_decoded_txs(self, safe_address: ChecksumAddress) -> int:
def process_all_decoded_txs(self) -> int:
"""
Process all the pending `InternalTxDecoded` for every Safe

:return: Number of `InternalTxDecoded` processed
"""
# Use chunks for memory issues
total_processed_txs = 0

# Don't check out of order multiple times for a Safe
checked_out_of_order: set[ChecksumAddress] = set()

while True:
internal_txs_decoded = list(
InternalTxDecoded.objects.pending_for_safes()[
: self.eth_internal_tx_decoded_process_batch
]
)
if not internal_txs_decoded:
break

# Check if a new decoded tx appeared before other already processed (due to a reindex)
if self.processing_enable_out_of_order_check:
safe_addresses_to_check = {
internal_tx_decoded.internal_tx._from
for internal_tx_decoded in internal_txs_decoded
if internal_tx_decoded.internal_tx._from not in checked_out_of_order
}
logger.info(
"Checking out of order transactions for %d Safes",
len(safe_addresses_to_check),
)
for safe_address in safe_addresses_to_check:
if InternalTxDecoded.objects.out_of_order_for_safe(safe_address):
logger.error(
"[%s] Found out of order transactions", safe_address
)
self.fix_out_of_order(
safe_address,
InternalTxDecoded.objects.pending_for_safe(safe_address)[
0
].internal_tx,
)
checked_out_of_order.add(safe_address)
logger.info(
"Checked out of order transactions for %d Safes",
len(safe_addresses_to_check),
)

total_processed_txs += len(
self.tx_processor.process_decoded_transactions(internal_txs_decoded)
)
return total_processed_txs

def process_decoded_txs_for_safe(self, safe_address: ChecksumAddress) -> int:
"""
Process all the pending `InternalTxDecoded` for a Safe

Expand All @@ -472,15 +526,15 @@ def process_decoded_txs(self, safe_address: ChecksumAddress) -> int:
# Use chunks for memory issues
total_processed_txs = 0
while True:
internal_txs_decoded_queryset = InternalTxDecoded.objects.pending_for_safe(
safe_address
)[: self.eth_internal_tx_decoded_process_batch]
if not internal_txs_decoded_queryset:
internal_txs_decoded = list(
InternalTxDecoded.objects.pending_for_safe(safe_address)[
: self.eth_internal_tx_decoded_process_batch
]
)
if not internal_txs_decoded:
break
total_processed_txs += len(
self.tx_processor.process_decoded_transactions(
internal_txs_decoded_queryset
)
self.tx_processor.process_decoded_transactions(internal_txs_decoded)
)
return total_processed_txs

Expand Down
51 changes: 25 additions & 26 deletions safe_transaction_service/history/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
EthereumBlock,
InternalTxDecoded,
MultisigTransaction,
SafeContract,
SafeContractDelegate,
)
from .services import (
Expand Down Expand Up @@ -253,35 +252,35 @@ def index_safe_events_task(self) -> Optional[tuple[int, int]]:
def process_decoded_internal_txs_task(self) -> Optional[int]:
with contextlib.suppress(LockError):
with only_one_running_task(self):
logger.info("Start process decoded internal txs")
count = 0
banned_safes = set(SafeContract.objects.get_banned_safes())
for (
safe_to_process
) in InternalTxDecoded.objects.safes_pending_to_be_processed().iterator():
if safe_to_process in banned_safes:
logger.info(
"Ignoring decoded internal txs for banned safe %s",
safe_to_process,
)
# Mark traces as processed so they are not reprocessed all the time
# If not, `InternalTxDecoded` index with `decoded=True` can grow to
# a point were `safes_pending_to_be_processed` takes minutes to complete
InternalTxDecoded.objects.for_safe(
safe_to_process
).not_processed().update(processed=True)
else:
if settings.PROCESSING_ALL_SAFES_TOGETHER:
# We can process all Safes together, big optimization
logger.info(
"Start process decoded internal txs for every Safe together"
)
index_service = IndexServiceProvider()
return index_service.process_all_decoded_txs()
else:
# We need to process Safes individually
logger.info(
"Start process decoded internal txs for every Safe in a different task"
)
count = 0
for (
safe_to_process
) in (
InternalTxDecoded.objects.safes_pending_to_be_processed().iterator()
):
process_decoded_internal_txs_for_safe_task.delay(
safe_to_process, reindex_master_copies=True
)
count += 1

if not count:
logger.info("No Safes to process")
else:
logger.info("%d Safes to process", count)

return count
(
logger.info("%d Safes to process", count)
if count
else logger.info("No Safes to process")
)
return count


@app.shared_task(bind=True)
Expand All @@ -301,7 +300,7 @@ def process_decoded_internal_txs_for_safe_task(
with only_one_running_task(self, lock_name_suffix=safe_address):
logger.info("[%s] Start processing decoded internal txs", safe_address)
index_service: IndexService = IndexServiceProvider()
number_processed = index_service.process_decoded_txs(safe_address)
number_processed = index_service.process_decoded_txs_for_safe(safe_address)
logger.info(
"[%s] Processed %d decoded transactions", safe_address, number_processed
)
Expand Down
30 changes: 22 additions & 8 deletions safe_transaction_service/history/tests/test_index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,28 +126,36 @@ def test_is_service_synced(self, current_block_number_mock: PropertyMock):
current_block_number_mock.side_effect = RequestsConnectionError
self.assertFalse(self.index_service.is_service_synced())

def test_process_decoded_txs(self):
def test_process_decoded_txs_for_safe(self):
safe_address = Account.create().address
with mock.patch.object(
IndexService, "fix_out_of_order"
) as fix_out_of_order_mock:
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)
self.assertEqual(
self.index_service.process_decoded_txs_for_safe(safe_address), 0
)
fix_out_of_order_mock.assert_not_called()

# Setup for a random Safe should not be processed
InternalTxDecodedFactory(
function_name="setup",
)
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)
self.assertEqual(
self.index_service.process_decoded_txs_for_safe(safe_address), 0
)

setup_internal_tx = InternalTxDecodedFactory(
function_name="setup",
internal_tx___from=safe_address,
)
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 1)
self.assertEqual(
self.index_service.process_decoded_txs_for_safe(safe_address), 1
)
fix_out_of_order_mock.assert_not_called()
# After processed, it should not be processed again
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)
self.assertEqual(
self.index_service.process_decoded_txs_for_safe(safe_address), 0
)

exec_transactions = [
InternalTxDecodedFactory(
Expand All @@ -157,15 +165,21 @@ def test_process_decoded_txs(self):
for _ in range(3)
]

self.assertEqual(self.index_service.process_decoded_txs(safe_address), 3)
self.assertEqual(
self.index_service.process_decoded_txs_for_safe(safe_address), 3
)
fix_out_of_order_mock.assert_not_called()
# After processed, they should not be processed again
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 0)
self.assertEqual(
self.index_service.process_decoded_txs_for_safe(safe_address), 0
)

# Add a transaction out of order
exec_transactions[1].processed = False
exec_transactions[1].save(update_fields=["processed"])
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 1)
self.assertEqual(
self.index_service.process_decoded_txs_for_safe(safe_address), 1
)
# Out of order transaction was detected
fix_out_of_order_mock.assert_called_with(
safe_address, exec_transactions[1].internal_tx
Expand Down
Loading