Skip to content

Commit 72fbdc6

Browse files
committed
Insert EthereumTxs and EthereumBlocks in batch
- Previously, there were inserted individually - This should be quite faster for indexing/reindexing
1 parent f09f385 commit 72fbdc6

File tree

3 files changed

+120
-130
lines changed

3 files changed

+120
-130
lines changed

safe_transaction_service/history/models.py

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
from safe_eth.safe.safe import SafeInfo
5454
from safe_eth.safe.safe_signature import SafeSignature, SafeSignatureType
5555
from safe_eth.util.util import to_0x_hex_str
56-
from web3.types import EventData
56+
from web3.types import BlockData, EventData
5757

5858
from safe_transaction_service.account_abstraction.constants import (
5959
USER_OPERATION_EVENT_TOPIC,
@@ -223,35 +223,44 @@ def __str__(self):
223223
return f"ChainId {self.chain_id}"
224224

225225

226-
class EthereumBlockManager(models.Manager):
227-
def get_or_create_from_block(self, block: dict[str, Any], confirmed: bool = False):
226+
class EthereumBlockManager(BulkCreateSignalMixin, models.Manager):
227+
def get_or_create_from_block_dict(
228+
self, block: dict[str, Any], confirmed: bool = False
229+
):
228230
try:
229231
return self.get(block_hash=block["hash"])
230232
except self.model.DoesNotExist:
231-
return self.create_from_block(block, confirmed=confirmed)
233+
return self.create_from_block_dict(block, confirmed=confirmed)
232234

233-
def create_from_block(
234-
self, block: dict[str, Any], confirmed: bool = False
235+
def from_block_dict(
236+
self, block: BlockData, confirmed: bool = False
237+
) -> "EthereumBlock":
238+
return EthereumBlock(
239+
number=block["number"],
240+
# Some networks like CELO don't provide gasLimit
241+
gas_limit=block.get("gasLimit", 0),
242+
gas_used=block["gasUsed"],
243+
timestamp=datetime.datetime.fromtimestamp(
244+
block["timestamp"], datetime.timezone.utc
245+
),
246+
block_hash=to_0x_hex_str(block["hash"]),
247+
parent_hash=to_0x_hex_str(block["parentHash"]),
248+
confirmed=confirmed,
249+
)
250+
251+
def create_from_block_dict(
252+
self, block: BlockData, confirmed: bool = False
235253
) -> "EthereumBlock":
236254
"""
237-
:param block: Block Dict returned by Web3
255+
:param block: Block Dict returned by web3.py
238256
:param confirmed: If True we will not check for reorgs in the future
239257
:return: EthereumBlock model
240258
"""
241259
try:
242260
with transaction.atomic(): # Needed for handling IntegrityError
243-
return super().create(
244-
number=block["number"],
245-
# Some networks like CELO don't provide gasLimit
246-
gas_limit=block.get("gasLimit", 0),
247-
gas_used=block["gasUsed"],
248-
timestamp=datetime.datetime.fromtimestamp(
249-
block["timestamp"], datetime.timezone.utc
250-
),
251-
block_hash=to_0x_hex_str(block["hash"]),
252-
parent_hash=to_0x_hex_str(block["parentHash"]),
253-
confirmed=confirmed,
254-
)
261+
ethereum_block = self.from_block_dict(block, confirmed=confirmed)
262+
ethereum_block.save(force_insert=True)
263+
return ethereum_block
255264
except IntegrityError:
256265
db_block = self.get(number=block["number"])
257266
if HexBytes(db_block.block_hash) == block["hash"]: # pragma: no cover
@@ -339,13 +348,13 @@ def set_not_confirmed(self):
339348
return self._set_confirmed(False)
340349

341350

342-
class EthereumTxManager(models.Manager):
343-
def create_from_tx_dict(
344-
self,
345-
tx: dict[str, Any],
346-
tx_receipt: Optional[dict[str, Any]] = None,
347-
ethereum_block: Optional[EthereumBlock] = None,
351+
class EthereumTxManager(BulkCreateSignalMixin, models.Manager):
352+
def from_tx_dict(
353+
self, tx: dict[str, Any], tx_receipt: dict[str, Any]
348354
) -> "EthereumTx":
355+
if tx_receipt is None:
356+
raise ValueError("tx_receipt cannot be empty")
357+
349358
data = HexBytes(tx.get("data") or tx.get("input"))
350359
logs = tx_receipt and [
351360
clean_receipt_log(log) for log in tx_receipt.get("logs", [])
@@ -358,25 +367,34 @@ def create_from_tx_dict(
358367
or 0
359368
)
360369

361-
return super().create(
362-
block=ethereum_block,
370+
return EthereumTx(
371+
block_id=tx["blockNumber"],
363372
tx_hash=to_0x_hex_str(HexBytes(tx["hash"])),
364-
gas_used=tx_receipt and tx_receipt["gasUsed"],
373+
gas_used=tx_receipt["gasUsed"],
365374
_from=tx["from"],
366375
gas=tx["gas"],
367376
gas_price=gas_price,
368377
max_fee_per_gas=tx.get("maxFeePerGas"),
369378
max_priority_fee_per_gas=tx.get("maxPriorityFeePerGas"),
370379
logs=logs,
371-
status=tx_receipt and tx_receipt.get("status"),
372-
transaction_index=tx_receipt and tx_receipt["transactionIndex"],
380+
status=tx_receipt.get("status"),
381+
transaction_index=tx_receipt["transactionIndex"],
373382
data=data if data else None,
374383
nonce=tx["nonce"],
375384
to=tx.get("to"),
376385
value=tx["value"],
377386
type=tx.get("type", 0),
378387
)
379388

389+
def create_from_tx_dict(
390+
self,
391+
tx: dict[str, Any],
392+
tx_receipt: dict[str, Any],
393+
) -> "EthereumTx":
394+
ethereum_tx = self.from_tx_dict(tx, tx_receipt)
395+
ethereum_tx.save()
396+
return ethereum_tx
397+
380398
def account_abstraction_txs(self) -> RawQuerySet:
381399
"""
382400
:return: Transactions containing ERC4337 `UserOperation` event

safe_transaction_service/history/services/index_service.py

Lines changed: 64 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
from dataclasses import dataclass
33
from typing import Collection, Optional, OrderedDict, Union
44

5-
from django.db import IntegrityError, transaction
5+
from django.db import transaction
66
from django.db.models import Min, Q
77

8-
from eth_typing import ChecksumAddress
8+
from eth_typing import ChecksumAddress, Hash32
99
from hexbytes import HexBytes
1010
from safe_eth.eth import EthereumClient, get_auto_ethereum_client
1111
from safe_eth.util.util import to_0x_hex_str
@@ -107,21 +107,6 @@ def __init__(
107107

108108
self.tx_processor: SafeTxProcessor = SafeTxProcessorProvider()
109109

110-
def block_get_or_create_from_block_hash(self, block_hash: int):
111-
try:
112-
return EthereumBlock.objects.get(block_hash=block_hash)
113-
except EthereumBlock.DoesNotExist:
114-
current_block_number = (
115-
self.ethereum_client.current_block_number
116-
) # For reorgs
117-
block = self.ethereum_client.get_block(block_hash)
118-
confirmed = (
119-
current_block_number - block["number"]
120-
) >= self.eth_reorg_blocks
121-
return EthereumBlock.objects.get_or_create_from_block(
122-
block, confirmed=confirmed
123-
)
124-
125110
def get_erc20_721_current_indexing_block_number(self) -> int:
126111
return IndexingStatusDb.objects.get_erc20_721_indexing_status().block_number
127112

@@ -229,40 +214,56 @@ def is_service_synced(self) -> bool:
229214

230215
return synced
231216

232-
def tx_create_or_update_from_tx_hash(self, tx_hash: str) -> "EthereumTx":
233-
try:
234-
ethereum_tx = EthereumTx.objects.get(tx_hash=tx_hash)
235-
# For txs stored before being mined
236-
if ethereum_tx.block is None:
237-
tx_receipt = self.ethereum_client.get_transaction_receipt(tx_hash)
238-
ethereum_block = self.block_get_or_create_from_block_hash(
239-
tx_receipt["blockHash"]
217+
def txs_create_or_update_from_block_hashes(
218+
self, block_hashes: set[Hash32]
219+
) -> tuple[int, dict[Hash32, EthereumBlock]]:
220+
block_hashes = list(block_hashes) # Iterate in a defined order
221+
blocks = self.ethereum_client.get_blocks(block_hashes)
222+
223+
# Validate blocks from RPC
224+
for block_hash, block in zip(block_hashes, blocks):
225+
if not block:
226+
raise BlockNotFoundException(
227+
f"Block with hash={block_hash} was not found"
240228
)
241-
ethereum_tx.update_with_block_and_receipt(ethereum_block, tx_receipt)
242-
return ethereum_tx
243-
except EthereumTx.DoesNotExist:
244-
tx_receipt = self.ethereum_client.get_transaction_receipt(tx_hash)
245-
ethereum_block = self.block_get_or_create_from_block_hash(
246-
tx_receipt["blockHash"]
247-
)
248-
tx = self.ethereum_client.get_transaction(tx_hash)
249-
return EthereumTx.objects.create_from_tx_dict(
250-
tx, tx_receipt=tx_receipt, ethereum_block=ethereum_block
229+
assert block_hash == to_0x_hex_str(
230+
block["hash"]
231+
), f"{block_hash} does not match retrieved block hash"
232+
233+
current_block_number = self.ethereum_client.current_block_number
234+
ethereum_blocks_to_insert = [
235+
EthereumBlock.objects.from_block_dict(
236+
block,
237+
confirmed=(current_block_number - block["number"])
238+
>= self.eth_reorg_blocks,
251239
)
240+
for block in blocks
241+
]
242+
inserted = EthereumBlock.objects.bulk_create_from_generator(
243+
iter(ethereum_blocks_to_insert), ignore_conflicts=True
244+
)
245+
return inserted, {
246+
HexBytes(ethereum_block.block_hash): ethereum_block
247+
for ethereum_block in ethereum_blocks_to_insert
248+
}
252249

253250
def txs_create_or_update_from_tx_hashes(
254251
self, tx_hashes: Collection[Union[str, bytes]]
255252
) -> list["EthereumTx"]:
253+
"""
254+
:param tx_hashes:
255+
:return: List of EthereumTx in the same order that `tx_hashes` were provided
256+
"""
256257
logger.debug("Don't retrieve existing txs on DB. Find them first")
257258
# Search first in database
258259
ethereum_txs_dict = OrderedDict.fromkeys(
259-
[to_0x_hex_str(HexBytes(tx_hash)) for tx_hash in tx_hashes]
260+
[HexBytes(tx_hash) for tx_hash in tx_hashes]
260261
)
261262
db_ethereum_txs = EthereumTx.objects.filter(tx_hash__in=tx_hashes).exclude(
262263
block=None
263264
)
264265
for db_ethereum_tx in db_ethereum_txs:
265-
ethereum_txs_dict[db_ethereum_tx.tx_hash] = db_ethereum_tx
266+
ethereum_txs_dict[HexBytes(db_ethereum_tx.tx_hash)] = db_ethereum_tx
266267
logger.debug("Found %d existing txs on DB", len(db_ethereum_txs))
267268

268269
# Retrieve from the node the txs missing from database
@@ -275,7 +276,7 @@ def txs_create_or_update_from_tx_hashes(
275276
if not tx_hashes_not_in_db:
276277
return list(ethereum_txs_dict.values())
277278

278-
# Get receipts for hashes not in db
279+
# Get receipts for hashes not in db. First get the receipts as they guarantee tx is mined and confirmed
279280
logger.debug("Get tx receipts for hashes not on db")
280281
tx_receipts = []
281282
for tx_hash, tx_receipt in zip(
@@ -320,50 +321,35 @@ def txs_create_or_update_from_tx_hashes(
320321

321322
block_hashes.add(to_0x_hex_str(tx["blockHash"]))
322323
txs.append(tx)
323-
logger.debug("Got txs from RPC. Getting %d blocks", len(block_hashes))
324-
325-
blocks = self.ethereum_client.get_blocks(block_hashes)
326-
block_dict = {}
327-
for block_hash, block in zip(block_hashes, blocks):
328-
block = block or self.ethereum_client.get_block(
329-
block_hash
330-
) # Retry fetching if failed
331-
if not block:
332-
raise BlockNotFoundException(
333-
f"Block with hash={block_hash} was not found"
334-
)
335-
assert block_hash == to_0x_hex_str(block["hash"])
336-
block_dict[block["hash"]] = block
337324

338325
logger.debug(
339-
"Got blocks from RPC. Inserting blocks. Creating txs or updating them if they have not receipt"
326+
"Got txs from RPC. Getting and inserting %d blocks", len(block_hashes)
327+
)
328+
number_inserted_blocks, blocks = self.txs_create_or_update_from_block_hashes(
329+
block_hashes
340330
)
331+
logger.debug("Inserted %d blocks", number_inserted_blocks)
332+
333+
logger.debug("Inserting %d transactions", len(txs))
334+
# Create new transactions or ignore if they already exist
335+
ethereum_txs_to_insert = [
336+
EthereumTx.objects.from_tx_dict(tx, tx_receipt)
337+
for tx, tx_receipt in zip(txs, tx_receipts)
338+
]
339+
number_inserted_txs = EthereumTx.objects.bulk_create_from_generator(
340+
iter(ethereum_txs_to_insert), ignore_conflicts=True
341+
)
342+
for ethereum_tx, tx in zip(ethereum_txs_to_insert, txs):
343+
# Trust they were inserted and add them to the txs dictionary
344+
assert ethereum_tx.tx_hash == to_0x_hex_str(
345+
tx["hash"]
346+
), f"{ethereum_tx.tx_hash} does not match retrieved tx hash"
347+
ethereum_tx.block = blocks[tx["blockHash"]]
348+
ethereum_txs_dict[HexBytes(ethereum_tx.tx_hash)] = ethereum_tx
349+
# Block info is required for traces
350+
351+
logger.debug("Inserted %d transactions", number_inserted_txs)
341352

342-
# Create new transactions or update them if they have no receipt
343-
current_block_number = self.ethereum_client.current_block_number
344-
for tx, tx_receipt in zip(txs, tx_receipts):
345-
block = block_dict[tx["blockHash"]]
346-
confirmed = (
347-
current_block_number - block["number"]
348-
) >= self.eth_reorg_blocks
349-
ethereum_block: EthereumBlock = (
350-
EthereumBlock.objects.get_or_create_from_block(
351-
block, confirmed=confirmed
352-
)
353-
)
354-
try:
355-
with transaction.atomic():
356-
ethereum_tx = EthereumTx.objects.create_from_tx_dict(
357-
tx, tx_receipt=tx_receipt, ethereum_block=ethereum_block
358-
)
359-
ethereum_txs_dict[to_0x_hex_str(HexBytes(ethereum_tx.tx_hash))] = (
360-
ethereum_tx
361-
)
362-
except IntegrityError: # Tx exists
363-
ethereum_tx = EthereumTx.objects.get(tx_hash=tx["hash"])
364-
# For txs stored before being mined
365-
ethereum_tx.update_with_block_and_receipt(ethereum_block, tx_receipt)
366-
ethereum_txs_dict[ethereum_tx.tx_hash] = ethereum_tx
367353
logger.debug("Blocks, transactions and receipts were inserted")
368354

369355
return list(ethereum_txs_dict.values())

safe_transaction_service/history/tests/test_models.py

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -329,23 +329,9 @@ class TestEthereumTx(TestCase):
329329
def test_create_from_tx_dict(self):
330330
for tx_mock in (type_0_tx, type_2_tx):
331331
with self.subTest(tx_mock=tx_mock):
332+
EthereumBlockFactory(number=tx_mock["tx"]["blockNumber"])
332333
tx_dict = tx_mock["tx"]
333-
ethereum_tx = EthereumTx.objects.create_from_tx_dict(tx_dict)
334-
self.assertEqual(ethereum_tx.type, tx_dict["type"], 0)
335-
self.assertEqual(ethereum_tx.gas_price, tx_dict["gasPrice"])
336-
self.assertEqual(
337-
ethereum_tx.max_fee_per_gas, tx_dict.get("maxFeePerGas")
338-
)
339-
self.assertEqual(
340-
ethereum_tx.max_priority_fee_per_gas,
341-
tx_dict.get("maxPriorityFeePerGas"),
342-
)
343-
self.assertIsNone(ethereum_tx.gas_used)
344-
self.assertIsNone(ethereum_tx.status)
345-
self.assertIsNone(ethereum_tx.transaction_index)
346-
347334
tx_receipt = tx_mock["receipt"]
348-
ethereum_tx.delete()
349335
ethereum_tx = EthereumTx.objects.create_from_tx_dict(
350336
tx_dict, tx_receipt=tx_receipt
351337
)
@@ -1346,26 +1332,26 @@ def test_remove_unused_confirmations(self):
13461332

13471333

13481334
class TestEthereumBlock(TestCase):
1349-
def test_get_or_create_from_block(self):
1335+
def test_get_or_create_from_block_dict(self):
13501336
mock_block = block_result[0]
13511337
self.assertEqual(EthereumBlock.objects.count(), 0)
1352-
db_block = EthereumBlock.objects.get_or_create_from_block(mock_block)
1338+
db_block = EthereumBlock.objects.get_or_create_from_block_dict(mock_block)
13531339
db_block.set_confirmed()
13541340
self.assertEqual(db_block.confirmed, True)
13551341
self.assertEqual(EthereumBlock.objects.count(), 1)
13561342
with mock.patch.object(
1357-
EthereumBlockManager, "create_from_block"
1358-
) as create_from_block_mock:
1343+
EthereumBlockManager, "create_from_block_dict"
1344+
) as create_from_block_dict_mock:
13591345
# Block already exists
1360-
EthereumBlock.objects.get_or_create_from_block(mock_block)
1361-
create_from_block_mock.assert_not_called()
1346+
EthereumBlock.objects.get_or_create_from_block_dict(mock_block)
1347+
create_from_block_dict_mock.assert_not_called()
13621348

13631349
# Test block with different block-hash but same block number
13641350
mock_block_2 = dict(mock_block)
13651351
mock_block_2["hash"] = fast_keccak_text("another-hash")
13661352
self.assertNotEqual(mock_block["hash"], mock_block_2["hash"])
13671353
with self.assertRaises(IntegrityError):
1368-
EthereumBlock.objects.get_or_create_from_block(mock_block_2)
1354+
EthereumBlock.objects.get_or_create_from_block_dict(mock_block_2)
13691355
self.assertEqual(EthereumBlock.objects.count(), 1)
13701356
db_block.refresh_from_db()
13711357
self.assertEqual(db_block.confirmed, False)

0 commit comments

Comments
 (0)