Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 48 additions & 30 deletions safe_transaction_service/history/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from safe_eth.safe.safe import SafeInfo
from safe_eth.safe.safe_signature import SafeSignature, SafeSignatureType
from safe_eth.util.util import to_0x_hex_str
from web3.types import EventData
from web3.types import BlockData, EventData

from safe_transaction_service.account_abstraction.constants import (
USER_OPERATION_EVENT_TOPIC,
Expand Down Expand Up @@ -223,35 +223,44 @@ def __str__(self):
return f"ChainId {self.chain_id}"


class EthereumBlockManager(models.Manager):
def get_or_create_from_block(self, block: dict[str, Any], confirmed: bool = False):
class EthereumBlockManager(BulkCreateSignalMixin, models.Manager):
def get_or_create_from_block_dict(
self, block: dict[str, Any], confirmed: bool = False
):
try:
return self.get(block_hash=block["hash"])
except self.model.DoesNotExist:
return self.create_from_block(block, confirmed=confirmed)
return self.create_from_block_dict(block, confirmed=confirmed)

def create_from_block(
self, block: dict[str, Any], confirmed: bool = False
def from_block_dict(
self, block: BlockData, confirmed: bool = False
) -> "EthereumBlock":
return EthereumBlock(
number=block["number"],
# Some networks like CELO don't provide gasLimit
gas_limit=block.get("gasLimit", 0),
gas_used=block["gasUsed"],
timestamp=datetime.datetime.fromtimestamp(
block["timestamp"], datetime.timezone.utc
),
block_hash=to_0x_hex_str(block["hash"]),
parent_hash=to_0x_hex_str(block["parentHash"]),
confirmed=confirmed,
)

def create_from_block_dict(
self, block: BlockData, confirmed: bool = False
) -> "EthereumBlock":
"""
:param block: Block Dict returned by Web3
:param block: Block Dict returned by web3.py
:param confirmed: If True we will not check for reorgs in the future
:return: EthereumBlock model
"""
try:
with transaction.atomic(): # Needed for handling IntegrityError
return super().create(
number=block["number"],
# Some networks like CELO don't provide gasLimit
gas_limit=block.get("gasLimit", 0),
gas_used=block["gasUsed"],
timestamp=datetime.datetime.fromtimestamp(
block["timestamp"], datetime.timezone.utc
),
block_hash=to_0x_hex_str(block["hash"]),
parent_hash=to_0x_hex_str(block["parentHash"]),
confirmed=confirmed,
)
ethereum_block = self.from_block_dict(block, confirmed=confirmed)
ethereum_block.save(force_insert=True)
return ethereum_block
except IntegrityError:
db_block = self.get(number=block["number"])
if HexBytes(db_block.block_hash) == block["hash"]: # pragma: no cover
Expand Down Expand Up @@ -339,13 +348,13 @@ def set_not_confirmed(self):
return self._set_confirmed(False)


class EthereumTxManager(models.Manager):
def create_from_tx_dict(
self,
tx: dict[str, Any],
tx_receipt: Optional[dict[str, Any]] = None,
ethereum_block: Optional[EthereumBlock] = None,
class EthereumTxManager(BulkCreateSignalMixin, models.Manager):
def from_tx_dict(
self, tx: dict[str, Any], tx_receipt: dict[str, Any]
) -> "EthereumTx":
if tx_receipt is None:
raise ValueError("tx_receipt cannot be empty")

data = HexBytes(tx.get("data") or tx.get("input"))
logs = tx_receipt and [
clean_receipt_log(log) for log in tx_receipt.get("logs", [])
Expand All @@ -358,25 +367,34 @@ def create_from_tx_dict(
or 0
)

return super().create(
block=ethereum_block,
return EthereumTx(
block_id=tx["blockNumber"],
tx_hash=to_0x_hex_str(HexBytes(tx["hash"])),
gas_used=tx_receipt and tx_receipt["gasUsed"],
gas_used=tx_receipt["gasUsed"],
_from=tx["from"],
gas=tx["gas"],
gas_price=gas_price,
max_fee_per_gas=tx.get("maxFeePerGas"),
max_priority_fee_per_gas=tx.get("maxPriorityFeePerGas"),
logs=logs,
status=tx_receipt and tx_receipt.get("status"),
transaction_index=tx_receipt and tx_receipt["transactionIndex"],
status=tx_receipt.get("status"),
transaction_index=tx_receipt["transactionIndex"],
data=data if data else None,
nonce=tx["nonce"],
to=tx.get("to"),
value=tx["value"],
type=tx.get("type", 0),
)

def create_from_tx_dict(
self,
tx: dict[str, Any],
tx_receipt: dict[str, Any],
) -> "EthereumTx":
ethereum_tx = self.from_tx_dict(tx, tx_receipt)
ethereum_tx.save()
return ethereum_tx

def account_abstraction_txs(self) -> RawQuerySet:
"""
:return: Transactions containing ERC4337 `UserOperation` event
Expand Down
142 changes: 64 additions & 78 deletions safe_transaction_service/history/services/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from dataclasses import dataclass
from typing import Collection, Optional, OrderedDict, Union

from django.db import IntegrityError, transaction
from django.db import transaction
from django.db.models import Min, Q

from eth_typing import ChecksumAddress
from eth_typing import ChecksumAddress, Hash32
from hexbytes import HexBytes
from safe_eth.eth import EthereumClient, get_auto_ethereum_client
from safe_eth.util.util import to_0x_hex_str
Expand Down Expand Up @@ -107,21 +107,6 @@ def __init__(

self.tx_processor: SafeTxProcessor = SafeTxProcessorProvider()

def block_get_or_create_from_block_hash(self, block_hash: int):
try:
return EthereumBlock.objects.get(block_hash=block_hash)
except EthereumBlock.DoesNotExist:
current_block_number = (
self.ethereum_client.current_block_number
) # For reorgs
block = self.ethereum_client.get_block(block_hash)
confirmed = (
current_block_number - block["number"]
) >= self.eth_reorg_blocks
return EthereumBlock.objects.get_or_create_from_block(
block, confirmed=confirmed
)

def get_erc20_721_current_indexing_block_number(self) -> int:
return IndexingStatusDb.objects.get_erc20_721_indexing_status().block_number

Expand Down Expand Up @@ -229,40 +214,56 @@ def is_service_synced(self) -> bool:

return synced

def tx_create_or_update_from_tx_hash(self, tx_hash: str) -> "EthereumTx":
try:
ethereum_tx = EthereumTx.objects.get(tx_hash=tx_hash)
# For txs stored before being mined
if ethereum_tx.block is None:
tx_receipt = self.ethereum_client.get_transaction_receipt(tx_hash)
ethereum_block = self.block_get_or_create_from_block_hash(
tx_receipt["blockHash"]
def txs_create_or_update_from_block_hashes(
self, block_hashes: set[Hash32]
) -> tuple[int, dict[Hash32, EthereumBlock]]:
block_hashes = list(block_hashes) # Iterate in a defined order
blocks = self.ethereum_client.get_blocks(block_hashes)

# Validate blocks from RPC
for block_hash, block in zip(block_hashes, blocks):
if not block:
raise BlockNotFoundException(
f"Block with hash={block_hash} was not found"
)
ethereum_tx.update_with_block_and_receipt(ethereum_block, tx_receipt)
return ethereum_tx
except EthereumTx.DoesNotExist:
tx_receipt = self.ethereum_client.get_transaction_receipt(tx_hash)
ethereum_block = self.block_get_or_create_from_block_hash(
tx_receipt["blockHash"]
)
tx = self.ethereum_client.get_transaction(tx_hash)
return EthereumTx.objects.create_from_tx_dict(
tx, tx_receipt=tx_receipt, ethereum_block=ethereum_block
assert block_hash == to_0x_hex_str(
block["hash"]
), f"{block_hash} does not match retrieved block hash"

current_block_number = self.ethereum_client.current_block_number
ethereum_blocks_to_insert = [
EthereumBlock.objects.from_block_dict(
block,
confirmed=(current_block_number - block["number"])
>= self.eth_reorg_blocks,
)
for block in blocks
]
inserted = EthereumBlock.objects.bulk_create_from_generator(
iter(ethereum_blocks_to_insert), ignore_conflicts=True
)
return inserted, {
HexBytes(ethereum_block.block_hash): ethereum_block
for ethereum_block in ethereum_blocks_to_insert
}

def txs_create_or_update_from_tx_hashes(
self, tx_hashes: Collection[Union[str, bytes]]
) -> list["EthereumTx"]:
"""
:param tx_hashes:
:return: List of EthereumTx in the same order that `tx_hashes` were provided
"""
logger.debug("Don't retrieve existing txs on DB. Find them first")
# Search first in database
ethereum_txs_dict = OrderedDict.fromkeys(
[to_0x_hex_str(HexBytes(tx_hash)) for tx_hash in tx_hashes]
[HexBytes(tx_hash) for tx_hash in tx_hashes]
)
db_ethereum_txs = EthereumTx.objects.filter(tx_hash__in=tx_hashes).exclude(
block=None
)
for db_ethereum_tx in db_ethereum_txs:
ethereum_txs_dict[db_ethereum_tx.tx_hash] = db_ethereum_tx
ethereum_txs_dict[HexBytes(db_ethereum_tx.tx_hash)] = db_ethereum_tx
logger.debug("Found %d existing txs on DB", len(db_ethereum_txs))

# Retrieve from the node the txs missing from database
Expand All @@ -275,7 +276,7 @@ def txs_create_or_update_from_tx_hashes(
if not tx_hashes_not_in_db:
return list(ethereum_txs_dict.values())

# Get receipts for hashes not in db
# Get receipts for hashes not in db. First get the receipts as they guarantee tx is mined and confirmed
logger.debug("Get tx receipts for hashes not on db")
tx_receipts = []
for tx_hash, tx_receipt in zip(
Expand Down Expand Up @@ -320,50 +321,35 @@ def txs_create_or_update_from_tx_hashes(

block_hashes.add(to_0x_hex_str(tx["blockHash"]))
txs.append(tx)
logger.debug("Got txs from RPC. Getting %d blocks", len(block_hashes))

blocks = self.ethereum_client.get_blocks(block_hashes)
block_dict = {}
for block_hash, block in zip(block_hashes, blocks):
block = block or self.ethereum_client.get_block(
block_hash
) # Retry fetching if failed
if not block:
raise BlockNotFoundException(
f"Block with hash={block_hash} was not found"
)
assert block_hash == to_0x_hex_str(block["hash"])
block_dict[block["hash"]] = block

logger.debug(
"Got blocks from RPC. Inserting blocks. Creating txs or updating them if they have not receipt"
"Got txs from RPC. Getting and inserting %d blocks", len(block_hashes)
)
number_inserted_blocks, blocks = self.txs_create_or_update_from_block_hashes(
block_hashes
)
logger.debug("Inserted %d blocks", number_inserted_blocks)

logger.debug("Inserting %d transactions", len(txs))
# Create new transactions or ignore if they already exist
ethereum_txs_to_insert = [
EthereumTx.objects.from_tx_dict(tx, tx_receipt)
for tx, tx_receipt in zip(txs, tx_receipts)
]
number_inserted_txs = EthereumTx.objects.bulk_create_from_generator(
iter(ethereum_txs_to_insert), ignore_conflicts=True
)
for ethereum_tx, tx in zip(ethereum_txs_to_insert, txs):
# Trust they were inserted and add them to the txs dictionary
assert ethereum_tx.tx_hash == to_0x_hex_str(
tx["hash"]
), f"{ethereum_tx.tx_hash} does not match retrieved tx hash"
ethereum_tx.block = blocks[tx["blockHash"]]
ethereum_txs_dict[HexBytes(ethereum_tx.tx_hash)] = ethereum_tx
# Block info is required for traces

logger.debug("Inserted %d transactions", number_inserted_txs)

# Create new transactions or update them if they have no receipt
current_block_number = self.ethereum_client.current_block_number
for tx, tx_receipt in zip(txs, tx_receipts):
block = block_dict[tx["blockHash"]]
confirmed = (
current_block_number - block["number"]
) >= self.eth_reorg_blocks
ethereum_block: EthereumBlock = (
EthereumBlock.objects.get_or_create_from_block(
block, confirmed=confirmed
)
)
try:
with transaction.atomic():
ethereum_tx = EthereumTx.objects.create_from_tx_dict(
tx, tx_receipt=tx_receipt, ethereum_block=ethereum_block
)
ethereum_txs_dict[to_0x_hex_str(HexBytes(ethereum_tx.tx_hash))] = (
ethereum_tx
)
except IntegrityError: # Tx exists
ethereum_tx = EthereumTx.objects.get(tx_hash=tx["hash"])
# For txs stored before being mined
ethereum_tx.update_with_block_and_receipt(ethereum_block, tx_receipt)
ethereum_txs_dict[ethereum_tx.tx_hash] = ethereum_tx
logger.debug("Blocks, transactions and receipts were inserted")

return list(ethereum_txs_dict.values())
Expand Down
30 changes: 8 additions & 22 deletions safe_transaction_service/history/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,23 +329,9 @@ class TestEthereumTx(TestCase):
def test_create_from_tx_dict(self):
for tx_mock in (type_0_tx, type_2_tx):
with self.subTest(tx_mock=tx_mock):
EthereumBlockFactory(number=tx_mock["tx"]["blockNumber"])
tx_dict = tx_mock["tx"]
ethereum_tx = EthereumTx.objects.create_from_tx_dict(tx_dict)
self.assertEqual(ethereum_tx.type, tx_dict["type"], 0)
self.assertEqual(ethereum_tx.gas_price, tx_dict["gasPrice"])
self.assertEqual(
ethereum_tx.max_fee_per_gas, tx_dict.get("maxFeePerGas")
)
self.assertEqual(
ethereum_tx.max_priority_fee_per_gas,
tx_dict.get("maxPriorityFeePerGas"),
)
self.assertIsNone(ethereum_tx.gas_used)
self.assertIsNone(ethereum_tx.status)
self.assertIsNone(ethereum_tx.transaction_index)

tx_receipt = tx_mock["receipt"]
ethereum_tx.delete()
ethereum_tx = EthereumTx.objects.create_from_tx_dict(
tx_dict, tx_receipt=tx_receipt
)
Expand Down Expand Up @@ -1346,26 +1332,26 @@ def test_remove_unused_confirmations(self):


class TestEthereumBlock(TestCase):
def test_get_or_create_from_block(self):
def test_get_or_create_from_block_dict(self):
mock_block = block_result[0]
self.assertEqual(EthereumBlock.objects.count(), 0)
db_block = EthereumBlock.objects.get_or_create_from_block(mock_block)
db_block = EthereumBlock.objects.get_or_create_from_block_dict(mock_block)
db_block.set_confirmed()
self.assertEqual(db_block.confirmed, True)
self.assertEqual(EthereumBlock.objects.count(), 1)
with mock.patch.object(
EthereumBlockManager, "create_from_block"
) as create_from_block_mock:
EthereumBlockManager, "create_from_block_dict"
) as create_from_block_dict_mock:
# Block already exists
EthereumBlock.objects.get_or_create_from_block(mock_block)
create_from_block_mock.assert_not_called()
EthereumBlock.objects.get_or_create_from_block_dict(mock_block)
create_from_block_dict_mock.assert_not_called()

# Test block with different block-hash but same block number
mock_block_2 = dict(mock_block)
mock_block_2["hash"] = fast_keccak_text("another-hash")
self.assertNotEqual(mock_block["hash"], mock_block_2["hash"])
with self.assertRaises(IntegrityError):
EthereumBlock.objects.get_or_create_from_block(mock_block_2)
EthereumBlock.objects.get_or_create_from_block_dict(mock_block_2)
self.assertEqual(EthereumBlock.objects.count(), 1)
db_block.refresh_from_db()
self.assertEqual(db_block.confirmed, False)
Expand Down