Skip to content

Latest #471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -49,4 +49,5 @@ venv/
ENV/

# etl
/last_synced_block.txt
/last_synced_block.txt
.idea
3 changes: 3 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions .idea/csv-editor.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions .idea/ethereum-etl.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions ethereumetl/cli/export_blocks_and_transactions.py
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@
'If not provided transactions will not be exported. Use "-" for stdout')
@click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.')
def export_blocks_and_transactions(start_block, end_block, batch_size, provider_uri, max_workers, blocks_output,
transactions_output, chain='ethereum'):
transactions_output, chain):
"""Exports blocks and transactions."""
provider_uri = check_classic_provider_uri(chain, provider_uri)
if blocks_output is None and transactions_output is None:
@@ -61,6 +61,8 @@ def export_blocks_and_transactions(start_block, end_block, batch_size, provider_
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
max_workers=max_workers,
item_exporter=blocks_and_transactions_item_exporter(blocks_output, transactions_output),
chain=chain,
export_blocks=blocks_output is not None,
export_transactions=transactions_output is not None)
export_transactions=transactions_output is not None,
)
job.run()
10 changes: 5 additions & 5 deletions ethereumetl/cli/export_receipts_and_logs.py
Original file line number Diff line number Diff line change
@@ -36,8 +36,8 @@

@click.command(context_settings=dict(help_option_names=['-h', '--help']))
@click.option('-b', '--batch-size', default=100, show_default=True, type=int, help='The number of receipts to export at a time.')
@click.option('-t', '--transaction-hashes', required=True, type=str,
help='The file containing transaction hashes, one per line.')
@click.option('-t', '--block-hashes', required=True, type=str,
help='The file containing block hashes, one per line.')
@click.option('-p', '--provider-uri', default='https://mainnet.infura.io', show_default=True, type=str,
help='The URI of the web3 provider e.g. '
'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io')
@@ -48,13 +48,13 @@
help='The output file for receipt logs. '
'If not provided receipt logs will not be exported. Use "-" for stdout')
@click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.')
def export_receipts_and_logs(batch_size, transaction_hashes, provider_uri, max_workers, receipts_output, logs_output,
def export_receipts_and_logs(batch_size, block_number, provider_uri, max_workers, receipts_output, logs_output,
chain='ethereum'):
"""Exports receipts and logs."""
provider_uri = check_classic_provider_uri(chain, provider_uri)
with smart_open(transaction_hashes, 'r') as transaction_hashes_file:
with smart_open(block_number, 'r') as blocks_file:
job = ExportReceiptsJob(
transaction_hashes_iterable=(transaction_hash.strip() for transaction_hash in transaction_hashes_file),
transaction_hashes_iterable=(block_number.strip() for block_number in blocks_file),
batch_size=batch_size,
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
max_workers=max_workers,
6 changes: 4 additions & 2 deletions ethereumetl/cli/stream.py
Original file line number Diff line number Diff line change
@@ -53,7 +53,8 @@
@click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers')
@click.option('--log-file', default=None, show_default=True, type=str, help='Log file')
@click.option('--pid-file', default=None, show_default=True, type=str, help='pid file')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
@click.option('--chain', default=None, show_default=True, type=str, help='chain')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,chain,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
"""Streams all data types to console or Google Pub/Sub."""
configure_logging(log_file)
@@ -72,7 +73,8 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit
item_exporter=create_item_exporters(output),
batch_size=batch_size,
max_workers=max_workers,
entity_types=entity_types
entity_types=entity_types,
chain=chain
)
streamer = Streamer(
blockchain_streamer_adapter=streamer_adapter,
19 changes: 15 additions & 4 deletions ethereumetl/jobs/export_blocks_job.py
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@

from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.json_rpc_requests import generate_get_block_by_number_json_rpc
from ethereumetl.json_rpc_requests import generate_get_block_by_number_json_rpc,generate_get_bor_author_by_number_json_rpc
from ethereumetl.mappers.block_mapper import EthBlockMapper
from ethereumetl.mappers.transaction_mapper import EthTransactionMapper
from ethereumetl.utils import rpc_response_batch_to_results, validate_range
@@ -41,8 +41,10 @@ def __init__(
batch_web3_provider,
max_workers,
item_exporter,
chain,
export_blocks=True,
export_transactions=True):
export_transactions=True,
):
validate_range(start_block, end_block)
self.start_block = start_block
self.end_block = end_block
@@ -54,6 +56,7 @@ def __init__(

self.export_blocks = export_blocks
self.export_transactions = export_transactions
self.chain = chain
if not self.export_blocks and not self.export_transactions:
raise ValueError('At least one of export_blocks or export_transactions must be True')

@@ -80,8 +83,16 @@ def _export_batch(self, block_number_batch):
self._export_block(block)

def _export_block(self, block):
if self.export_blocks:
self.item_exporter.export_item(self.block_mapper.block_to_dict(block))
if self.chain == 'polygon':
bor_author_rpc = generate_get_bor_author_by_number_json_rpc(block.number)
bor_response = self.batch_web3_provider.make_batch_request(json.dumps(next(bor_author_rpc)))
bor_result = rpc_response_batch_to_results([bor_response])
if self.export_blocks:
self.item_exporter.export_item(self.block_mapper.block_to_dict_with_author(block, next(bor_result)))
else:
if self.export_blocks:
self.item_exporter.export_item(self.block_mapper.block_to_dict(block))

if self.export_transactions:
for tx in block.transactions:
self.item_exporter.export_item(self.transaction_mapper.transaction_to_dict(tx))
15 changes: 8 additions & 7 deletions ethereumetl/jobs/export_receipts_job.py
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@

from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.json_rpc_requests import generate_get_receipt_json_rpc
from ethereumetl.json_rpc_requests import generate_get_receipt_json_rpc,generate_get_receipt_by_block_json_rpc
from ethereumetl.mappers.receipt_log_mapper import EthReceiptLogMapper
from ethereumetl.mappers.receipt_mapper import EthReceiptMapper
from ethereumetl.utils import rpc_response_batch_to_results
@@ -62,13 +62,14 @@ def _start(self):
def _export(self):
self.batch_work_executor.execute(self.transaction_hashes_iterable, self._export_receipts)

def _export_receipts(self, transaction_hashes):
receipts_rpc = list(generate_get_receipt_json_rpc(transaction_hashes))
def _export_receipts(self, block_number):
receipts_rpc = list(generate_get_receipt_by_block_json_rpc(block_number))
response = self.batch_web3_provider.make_batch_request(json.dumps(receipts_rpc))
results = rpc_response_batch_to_results(response)
receipts = [self.receipt_mapper.json_dict_to_receipt(result) for result in results]
for receipt in receipts:
self._export_receipt(receipt)
results_raw = rpc_response_batch_to_results(response)
for results in results_raw:
receipts = [self.receipt_mapper.json_dict_to_receipt(result) for result in results]
for receipt in receipts:
self._export_receipt(receipt)

def _export_receipt(self, receipt):
if self.export_receipts:
14 changes: 14 additions & 0 deletions ethereumetl/json_rpc_requests.py
Original file line number Diff line number Diff line change
@@ -29,6 +29,11 @@ def generate_get_block_by_number_json_rpc(block_numbers, include_transactions):
request_id=idx
)

def generate_get_bor_author_by_number_json_rpc(block_number):
yield generate_json_rpc(
method='bor_getAuthor',
params=[hex(block_number)],
)

def generate_trace_block_by_number_json_rpc(block_numbers):
for block_number in block_numbers:
@@ -49,6 +54,15 @@ def generate_get_receipt_json_rpc(transaction_hashes):
)


def generate_get_receipt_by_block_json_rpc(block_number):
for idx, block_number in enumerate(block_number):
yield generate_json_rpc(
method='eth_getBlockReceipts',
params=[hex(int(block_number))],
request_id=idx
)


def generate_get_code_json_rpc(contract_addresses, block='latest'):
for idx, contract_address in enumerate(contract_addresses):
yield generate_json_rpc(
26 changes: 26 additions & 0 deletions ethereumetl/mappers/block_mapper.py
Original file line number Diff line number Diff line change
@@ -105,3 +105,29 @@ def block_to_dict(self, block):
'withdrawals_root': block.withdrawals_root,
'withdrawals': block.withdrawals,
}

def block_to_dict_with_author(self, block, bor_result):
return {
'type': 'block',
'number': block.number,
'hash': block.hash,
'parent_hash': block.parent_hash,
'nonce': block.nonce,
'sha3_uncles': block.sha3_uncles,
'logs_bloom': block.logs_bloom,
'transactions_root': block.transactions_root,
'state_root': block.state_root,
'receipts_root': block.receipts_root,
'miner': bor_result,
'difficulty': block.difficulty,
'total_difficulty': block.total_difficulty,
'size': block.size,
'extra_data': block.extra_data,
'gas_limit': block.gas_limit,
'gas_used': block.gas_used,
'timestamp': block.timestamp,
'transaction_count': block.transaction_count,
'base_fee_per_gas': block.base_fee_per_gas,
'withdrawals_root': block.withdrawals_root,
'withdrawals': block.withdrawals
}
1 change: 1 addition & 0 deletions ethereumetl/mappers/receipt_mapper.py
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ def __init__(self, receipt_log_mapper=None):
self.receipt_log_mapper = receipt_log_mapper

def json_dict_to_receipt(self, json_dict):

receipt = EthReceipt()

receipt.transaction_hash = json_dict.get('transactionHash')
15 changes: 9 additions & 6 deletions ethereumetl/streaming/eth_streamer_adapter.py
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ def __init__(
item_exporter=ConsoleItemExporter(),
batch_size=100,
max_workers=5,
chain="ethereum",
entity_types=tuple(EntityType.ALL_FOR_STREAMING)):
self.batch_web3_provider = batch_web3_provider
self.item_exporter = item_exporter
@@ -32,6 +33,7 @@ def __init__(
self.entity_types = entity_types
self.item_id_calculator = EthItemIdCalculator()
self.item_timestamp_calculator = EthItemTimestampCalculator()
self.chain=chain

def open(self):
self.item_exporter.open()
@@ -44,12 +46,12 @@ def export_all(self, start_block, end_block):
# Export blocks and transactions
blocks, transactions = [], []
if self._should_export(EntityType.BLOCK) or self._should_export(EntityType.TRANSACTION):
blocks, transactions = self._export_blocks_and_transactions(start_block, end_block)
blocks, transactions = self._export_blocks_and_transactions(start_block, end_block,self.chain)

# Export receipts and logs
receipts, logs = [], []
if self._should_export(EntityType.RECEIPT) or self._should_export(EntityType.LOG):
receipts, logs = self._export_receipts_and_logs(transactions)
receipts, logs = self._export_receipts_and_logs(blocks)

# Extract token transfers
token_transfers = []
@@ -102,7 +104,7 @@ def export_all(self, start_block, end_block):

self.item_exporter.export_items(all_items)

def _export_blocks_and_transactions(self, start_block, end_block):
def _export_blocks_and_transactions(self, start_block, end_block, chain):
blocks_and_transactions_item_exporter = InMemoryItemExporter(item_types=['block', 'transaction'])
blocks_and_transactions_job = ExportBlocksJob(
start_block=start_block,
@@ -112,17 +114,18 @@ def _export_blocks_and_transactions(self, start_block, end_block):
max_workers=self.max_workers,
item_exporter=blocks_and_transactions_item_exporter,
export_blocks=self._should_export(EntityType.BLOCK),
export_transactions=self._should_export(EntityType.TRANSACTION)
export_transactions=self._should_export(EntityType.TRANSACTION),
chain=chain
)
blocks_and_transactions_job.run()
blocks = blocks_and_transactions_item_exporter.get_items('block')
transactions = blocks_and_transactions_item_exporter.get_items('transaction')
return blocks, transactions

def _export_receipts_and_logs(self, transactions):
def _export_receipts_and_logs(self, blocks):
exporter = InMemoryItemExporter(item_types=['receipt', 'log'])
job = ExportReceiptsJob(
transaction_hashes_iterable=(transaction['hash'] for transaction in transactions),
transaction_hashes_iterable=(blocks['number'] for blocks in blocks),
batch_size=self.batch_size,
batch_web3_provider=self.batch_web3_provider,
max_workers=self.max_workers,
18 changes: 18 additions & 0 deletions ethereumetl/utils.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@

import itertools
import warnings
import base58

from ethereumetl.misc.retriable_value_error import RetriableValueError

@@ -47,6 +48,7 @@ def to_int_or_none(val):
except ValueError:
return None


def to_float_or_none(val):
if isinstance(val, float):
return val
@@ -58,6 +60,7 @@ def to_float_or_none(val):
print("can't cast %s to float" % val)
return val


def chunk_string(string, length):
return (string[0 + i:length + i] for i in range(0, len(string), length))

@@ -68,6 +71,21 @@ def to_normalized_address(address):
return address.lower()


def hex_to_base58(hex_string):
if hex_string is None:
return hex_string
if hex_string[:2] in ["0x", "0X"]:
hex_string = "41" + hex_string[2:]
bytes_str = bytes.fromhex(hex_string)
base58_str = base58.b58encode_check(bytes_str)
return base58_str.decode("UTF-8")


def base58_to_hex(self, base58_string):
asc_string = base58.b58decode_check(base58_string)
return asc_string.hex().upper()


def validate_range(range_start_incl, range_end_incl):
if range_start_incl < 0 or range_end_incl < 0:
raise ValueError('range_start and range_end must be greater or equal to 0')
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ def read(fname):

setup(
name='ethereum-etl',
version='2.3.1',
version='2.3.4',
author='Evgeny Medvedev',
author_email='evge.medvedev@gmail.com',
description='Tools for exporting Ethereum blockchain data to CSV or JSON',