diff --git a/ethereumetl/cli/export_all.py b/ethereumetl/cli/export_all.py index 60fc47f4f..5f79308fb 100644 --- a/ethereumetl/cli/export_all.py +++ b/ethereumetl/cli/export_all.py @@ -114,11 +114,12 @@ def get_partitions(start, end, partition_batch_size, provider_uri): 'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io') @click.option('-o', '--output-dir', default='output', show_default=True, type=str, help='Output directory, partitioned in Hive style.') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('-B', '--export-batch-size', default=100, show_default=True, type=int, help='The number of requests in JSON RPC batches.') @click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.') -def export_all(start, end, partition_batch_size, provider_uri, output_dir, max_workers, export_batch_size, +def export_all(start, end, partition_batch_size, provider_uri, output_dir, max_workers, max_retries, export_batch_size, chain='ethereum'): """Exports all data for a range of blocks.""" provider_uri = check_classic_provider_uri(chain, provider_uri) export_all_common(get_partitions(start, end, partition_batch_size, provider_uri), - output_dir, provider_uri, max_workers, export_batch_size) + output_dir, provider_uri, max_workers, export_batch_size, max_retries) diff --git a/ethereumetl/cli/export_blocks_and_transactions.py b/ethereumetl/cli/export_blocks_and_transactions.py index 88564be96..0a731a7fa 100644 --- a/ethereumetl/cli/export_blocks_and_transactions.py +++ b/ethereumetl/cli/export_blocks_and_transactions.py @@ -41,13 +41,14 @@ help='The URI of the web3 provider e.g. ' 'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('--blocks-output', default=None, show_default=True, type=str, help='The output file for blocks. If not provided blocks will not be exported. Use "-" for stdout') @click.option('--transactions-output', default=None, show_default=True, type=str, help='The output file for transactions. ' 'If not provided transactions will not be exported. Use "-" for stdout') @click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.') -def export_blocks_and_transactions(start_block, end_block, batch_size, provider_uri, max_workers, blocks_output, +def export_blocks_and_transactions(start_block, end_block, batch_size, provider_uri, max_workers, max_retries, blocks_output, transactions_output, chain='ethereum'): """Exports blocks and transactions.""" provider_uri = check_classic_provider_uri(chain, provider_uri) @@ -60,6 +61,7 @@ def export_blocks_and_transactions(start_block, end_block, batch_size, provider_ batch_size=batch_size, batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), max_workers=max_workers, + max_retries=max_retries, item_exporter=blocks_and_transactions_item_exporter(blocks_output, transactions_output), export_blocks=blocks_output is not None, export_transactions=transactions_output is not None) diff --git a/ethereumetl/cli/export_contracts.py b/ethereumetl/cli/export_contracts.py index ae7ba765b..8f2aa7f98 100644 --- a/ethereumetl/cli/export_contracts.py +++ b/ethereumetl/cli/export_contracts.py @@ -40,11 +40,12 @@ help='The file containing contract addresses, one per line.') @click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file. If not specified stdout is used.') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('-p', '--provider-uri', default='https://mainnet.infura.io', show_default=True, type=str, help='The URI of the web3 provider e.g. ' 'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io') @click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.') -def export_contracts(batch_size, contract_addresses, output, max_workers, provider_uri, chain='ethereum'): +def export_contracts(batch_size, contract_addresses, output, max_workers, max_retries, provider_uri, chain='ethereum'): """Exports contracts bytecode and sighashes.""" check_classic_provider_uri(chain, provider_uri) with smart_open(contract_addresses, 'r') as contract_addresses_file: @@ -55,6 +56,7 @@ def export_contracts(batch_size, contract_addresses, output, max_workers, provid batch_size=batch_size, batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), item_exporter=contracts_item_exporter(output), - max_workers=max_workers) + max_workers=max_workers, + max_retries=max_retries) job.run() diff --git a/ethereumetl/cli/export_geth_traces.py b/ethereumetl/cli/export_geth_traces.py index 8359d7a35..40089e20a 100644 --- a/ethereumetl/cli/export_geth_traces.py +++ b/ethereumetl/cli/export_geth_traces.py @@ -39,10 +39,11 @@ @click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file for geth traces. If not specified stdout is used.') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('-p', '--provider-uri', required=True, type=str, help='The URI of the web3 provider e.g. ' 'file://$HOME/Library/Ethereum/geth.ipc or http://localhost:8545/') -def export_geth_traces(start_block, end_block, batch_size, output, max_workers, provider_uri): +def export_geth_traces(start_block, end_block, batch_size, output, max_workers, max_retries, provider_uri): """Exports traces from geth node.""" job = ExportGethTracesJob( start_block=start_block, @@ -50,6 +51,7 @@ def export_geth_traces(start_block, end_block, batch_size, output, max_workers, batch_size=batch_size, batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), max_workers=max_workers, + max_retries=max_retries, item_exporter=geth_traces_item_exporter(output)) job.run() diff --git a/ethereumetl/cli/export_origin.py b/ethereumetl/cli/export_origin.py index 419821e2f..998ae7548 100644 --- a/ethereumetl/cli/export_origin.py +++ b/ethereumetl/cli/export_origin.py @@ -40,9 +40,10 @@ @click.option('--marketplace-output', default='-', show_default=True, type=str, help='The output file for marketplace data. If not specified stdout is used.') @click.option('--shop-output', default='-', show_default=True, type=str, help='The output file for shop data. If not specified stdout is used.') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('-p', '--provider-uri', required=True, type=str, help='The URI of the web3 provider e.g. file://$HOME/Library/Ethereum/geth.ipc or http://localhost:8545/') -def export_origin(start_block, end_block, batch_size, marketplace_output, shop_output, max_workers, provider_uri): +def export_origin(start_block, end_block, batch_size, marketplace_output, shop_output, max_workers, max_retries, provider_uri): """Exports Origin Protocol data.""" job = ExportOriginJob( start_block=start_block, @@ -52,5 +53,6 @@ def export_origin(start_block, end_block, batch_size, marketplace_output, shop_o ipfs_client=get_origin_ipfs_client(), marketplace_listing_exporter=origin_marketplace_listing_item_exporter(marketplace_output), shop_product_exporter=origin_shop_product_item_exporter(shop_output), - max_workers=max_workers) + max_workers=max_workers, + max_retries=max_retries) job.run() \ No newline at end of file diff --git a/ethereumetl/cli/export_receipts_and_logs.py b/ethereumetl/cli/export_receipts_and_logs.py index 95ff491a8..e104489fb 100644 --- a/ethereumetl/cli/export_receipts_and_logs.py +++ b/ethereumetl/cli/export_receipts_and_logs.py @@ -42,13 +42,14 @@ help='The URI of the web3 provider e.g. ' 'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('--receipts-output', default=None, show_default=True, type=str, help='The output file for receipts. If not provided receipts will not be exported. Use "-" for stdout') @click.option('--logs-output', default=None, show_default=True, type=str, help='The output file for receipt logs. ' 'If not provided receipt logs will not be exported. Use "-" for stdout') @click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.') -def export_receipts_and_logs(batch_size, transaction_hashes, provider_uri, max_workers, receipts_output, logs_output, +def export_receipts_and_logs(batch_size, transaction_hashes, provider_uri, max_workers, max_retries, receipts_output, logs_output, chain='ethereum'): """Exports receipts and logs.""" provider_uri = check_classic_provider_uri(chain, provider_uri) @@ -58,6 +59,7 @@ def export_receipts_and_logs(batch_size, transaction_hashes, provider_uri, max_w batch_size=batch_size, batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), max_workers=max_workers, + max_retries=max_retries, item_exporter=receipts_and_logs_item_exporter(receipts_output, logs_output), export_receipts=receipts_output is not None, export_logs=logs_output is not None) diff --git a/ethereumetl/cli/export_token_transfers.py b/ethereumetl/cli/export_token_transfers.py index d9a6fa29f..d1f299b92 100644 --- a/ethereumetl/cli/export_token_transfers.py +++ b/ethereumetl/cli/export_token_transfers.py @@ -40,10 +40,11 @@ @click.option('-b', '--batch-size', default=100, show_default=True, type=int, help='The number of blocks to filter at a time.') @click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file. If not specified stdout is used.') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('-p', '--provider-uri', required=True, type=str, help='The URI of the web3 provider e.g. file://$HOME/Library/Ethereum/geth.ipc or http://localhost:8545/') @click.option('-t', '--tokens', default=None, show_default=True, type=str, multiple=True, help='The list of token addresses to filter by.') -def export_token_transfers(start_block, end_block, batch_size, output, max_workers, provider_uri, tokens): +def export_token_transfers(start_block, end_block, batch_size, output, max_workers, max_retries, provider_uri, tokens): """Exports ERC20/ERC721 transfers.""" job = ExportTokenTransfersJob( start_block=start_block, @@ -52,5 +53,6 @@ def export_token_transfers(start_block, end_block, batch_size, output, max_worke web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri))), item_exporter=token_transfers_item_exporter(output), max_workers=max_workers, + max_retries=max_retries, tokens=tokens) job.run() diff --git a/ethereumetl/cli/export_tokens.py b/ethereumetl/cli/export_tokens.py index 941276254..91d1bb391 100644 --- a/ethereumetl/cli/export_tokens.py +++ b/ethereumetl/cli/export_tokens.py @@ -41,11 +41,12 @@ help='The file containing token addresses, one per line.') @click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file. If not specified stdout is used.') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('-p', '--provider-uri', default='https://mainnet.infura.io', show_default=True, type=str, help='The URI of the web3 provider e.g. ' 'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io') @click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.') -def export_tokens(token_addresses, output, max_workers, provider_uri, chain='ethereum'): +def export_tokens(token_addresses, output, max_workers, max_retries, provider_uri, chain='ethereum'): """Exports ERC20/ERC721 tokens.""" provider_uri = check_classic_provider_uri(chain, provider_uri) with smart_open(token_addresses, 'r') as token_addresses_file: @@ -53,6 +54,7 @@ def export_tokens(token_addresses, output, max_workers, provider_uri, chain='eth token_addresses_iterable=(token_address.strip() for token_address in token_addresses_file), web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri))), item_exporter=tokens_item_exporter(output), - max_workers=max_workers) + max_workers=max_workers, + max_retries=max_retries) job.run() diff --git a/ethereumetl/cli/export_traces.py b/ethereumetl/cli/export_traces.py index 378e5161b..84a2be366 100644 --- a/ethereumetl/cli/export_traces.py +++ b/ethereumetl/cli/export_traces.py @@ -40,6 +40,7 @@ @click.option('-b', '--batch-size', default=5, show_default=True, type=int, help='The number of blocks to filter at a time.') @click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file. If not specified stdout is used.') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('-p', '--provider-uri', required=True, type=str, help='The URI of the web3 provider e.g. ' 'file://$HOME/.local/share/io.parity.ethereum/jsonrpc.ipc or http://localhost:8545/') @@ -47,7 +48,7 @@ @click.option('--daofork-traces/--no-daofork-traces', default=False, show_default=True, help='Whether to include daofork traces') @click.option('-t', '--timeout', default=60, show_default=True, type=int, help='IPC or HTTP request timeout.') @click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.') -def export_traces(start_block, end_block, batch_size, output, max_workers, provider_uri, +def export_traces(start_block, end_block, batch_size, output, max_workers, max_retries, provider_uri, genesis_traces, daofork_traces, timeout=60, chain='ethereum'): """Exports traces from parity node.""" if chain == 'classic' and daofork_traces == True: @@ -60,6 +61,7 @@ def export_traces(start_block, end_block, batch_size, output, max_workers, provi web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri, timeout=timeout))), item_exporter=traces_item_exporter(output), max_workers=max_workers, + max_retries=max_retries, include_genesis_traces=genesis_traces, include_daofork_traces=daofork_traces) diff --git a/ethereumetl/cli/extract_contracts.py b/ethereumetl/cli/extract_contracts.py index dd2c25918..85c38847a 100644 --- a/ethereumetl/cli/extract_contracts.py +++ b/ethereumetl/cli/extract_contracts.py @@ -39,7 +39,8 @@ @click.option('-b', '--batch-size', default=100, show_default=True, type=int, help='The number of blocks to filter at a time.') @click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file. If not specified stdout is used.') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') -def extract_contracts(traces, batch_size, output, max_workers): +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') +def extract_contracts(traces, batch_size, output, max_workers, max_retries): """Extracts contracts from traces file.""" set_max_field_size_limit() @@ -53,6 +54,7 @@ def extract_contracts(traces, batch_size, output, max_workers): traces_iterable=traces_iterable, batch_size=batch_size, max_workers=max_workers, + max_retries=max_retries, item_exporter=contracts_item_exporter(output)) job.run() diff --git a/ethereumetl/cli/extract_geth_traces.py b/ethereumetl/cli/extract_geth_traces.py index ab21d2ec2..02b28e86a 100644 --- a/ethereumetl/cli/extract_geth_traces.py +++ b/ethereumetl/cli/extract_geth_traces.py @@ -37,7 +37,9 @@ @click.option('-b', '--batch-size', default=100, show_default=True, type=int, help='The number of blocks to filter at a time.') @click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file. If not specified stdout is used.') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') -def extract_geth_traces(input, batch_size, output, max_workers): +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') + +def extract_geth_traces(input, batch_size, output, max_workers, max_retries): """Extracts geth traces from JSON lines file.""" with smart_open(input, 'r') as geth_traces_file: if input.endswith('.json'): @@ -48,6 +50,7 @@ def extract_geth_traces(input, batch_size, output, max_workers): traces_iterable=traces_iterable, batch_size=batch_size, max_workers=max_workers, + max_retries=max_retries, item_exporter=traces_item_exporter(output)) job.run() diff --git a/ethereumetl/cli/extract_token_transfers.py b/ethereumetl/cli/extract_token_transfers.py index 650b5277c..82e06b66a 100644 --- a/ethereumetl/cli/extract_token_transfers.py +++ b/ethereumetl/cli/extract_token_transfers.py @@ -39,8 +39,9 @@ @click.option('-b', '--batch-size', default=100, show_default=True, type=int, help='The number of blocks to filter at a time.') @click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file. If not specified stdout is used.') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('--values-as-strings', default=False, show_default=True, is_flag=True, help='Whether to convert values to strings.') -def extract_token_transfers(logs, batch_size, output, max_workers, values_as_strings=False): +def extract_token_transfers(logs, batch_size, output, max_workers, max_retries, values_as_strings=False): """Extracts ERC20/ERC721 transfers from logs file.""" with smart_open(logs, 'r') as logs_file: if logs.endswith('.json'): @@ -52,6 +53,7 @@ def extract_token_transfers(logs, batch_size, output, max_workers, values_as_str logs_iterable=logs_reader, batch_size=batch_size, max_workers=max_workers, + max_retries=max_retries, item_exporter=token_transfers_item_exporter(output, converters=converters)) job.run() diff --git a/ethereumetl/cli/stream.py b/ethereumetl/cli/stream.py index 1ff978e04..4724715f3 100644 --- a/ethereumetl/cli/stream.py +++ b/ethereumetl/cli/stream.py @@ -50,10 +50,11 @@ @click.option('-b', '--batch-size', default=10, show_default=True, type=int, help='How many blocks to batch in single request') @click.option('-B', '--block-batch-size', default=1, show_default=True, type=int, help='How many blocks to batch in single sync round') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers') +@click.option('-r', '--max-retries', default=5, show_default=True, type=int, help='The maximum number of retries') @click.option('--log-file', default=None, show_default=True, type=str, help='Log file') @click.option('--pid-file', default=None, show_default=True, type=str, help='pid file') def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types, - period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None): + period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, max_retries=5, log_file=None, pid_file=None): """Streams all data types to console or Google Pub/Sub.""" configure_logging(log_file) configure_signals() @@ -71,6 +72,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit item_exporter=create_item_exporters(output), batch_size=batch_size, max_workers=max_workers, + max_retries=max_retries, entity_types=entity_types ) streamer = Streamer( diff --git a/ethereumetl/executors/batch_work_executor.py b/ethereumetl/executors/batch_work_executor.py index fcae6a8b7..a81dd66eb 100644 --- a/ethereumetl/executors/batch_work_executor.py +++ b/ethereumetl/executors/batch_work_executor.py @@ -37,10 +37,12 @@ BATCH_CHANGE_COOLDOWN_PERIOD_SECONDS = 2 * 60 - # Executes the given work in batches, reducing the batch size exponentially in case of errors. class BatchWorkExecutor: - def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCEPTIONS, max_retries=5): + + DEFAULT_MAX_RETRIES = 5 + + def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCEPTIONS, max_retries=DEFAULT_MAX_RETRIES): self.batch_size = starting_batch_size self.max_batch_size = starting_batch_size self.latest_batch_size_change_time = None diff --git a/ethereumetl/jobs/export_all_common.py b/ethereumetl/jobs/export_all_common.py index ba4b03e47..0b7eec9c3 100644 --- a/ethereumetl/jobs/export_all_common.py +++ b/ethereumetl/jobs/export_all_common.py @@ -63,7 +63,7 @@ def extract_csv_column_unique(input, output, column): output_file.write(row[column] + '\n') -def export_all_common(partitions, output_dir, provider_uri, max_workers, batch_size): +def export_all_common(partitions, output_dir, provider_uri, max_workers, batch_size, max_retries): for batch_start_block, batch_end_block, partition_dir in partitions: # # # start # # # @@ -118,6 +118,7 @@ def export_all_common(partitions, output_dir, provider_uri, max_workers, batch_s batch_size=batch_size, batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), max_workers=max_workers, + max_retries=max_retries, item_exporter=blocks_and_transactions_item_exporter(blocks_file, transactions_file), export_blocks=blocks_file is not None, export_transactions=transactions_file is not None) @@ -148,7 +149,8 @@ def export_all_common(partitions, output_dir, provider_uri, max_workers, batch_s batch_size=batch_size, web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri))), item_exporter=token_transfers_item_exporter(token_transfers_file), - max_workers=max_workers) + max_workers=max_workers, + max_retries=max_retries) job.run() # # # receipts_and_logs # # # @@ -200,6 +202,7 @@ def export_all_common(partitions, output_dir, provider_uri, max_workers, batch_s batch_size=batch_size, batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), max_workers=max_workers, + max_retries=max_retries, item_exporter=receipts_and_logs_item_exporter(receipts_file, logs_file), export_receipts=receipts_file is not None, export_logs=logs_file is not None) @@ -239,7 +242,8 @@ def export_all_common(partitions, output_dir, provider_uri, max_workers, batch_s batch_size=batch_size, batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), item_exporter=contracts_item_exporter(contracts_file), - max_workers=max_workers) + max_workers=max_workers, + max_retries=max_retries) job.run() # # # tokens # # # @@ -274,7 +278,8 @@ def export_all_common(partitions, output_dir, provider_uri, max_workers, batch_s token_addresses_iterable=(token_address.strip() for token_address in token_addresses), web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri))), item_exporter=tokens_item_exporter(tokens_file), - max_workers=max_workers) + max_workers=max_workers, + max_retries=max_retries) job.run() # # # finish # # # diff --git a/ethereumetl/jobs/export_blocks_job.py b/ethereumetl/jobs/export_blocks_job.py index 6f4eff7a7..317825dd0 100644 --- a/ethereumetl/jobs/export_blocks_job.py +++ b/ethereumetl/jobs/export_blocks_job.py @@ -42,14 +42,19 @@ def __init__( max_workers, item_exporter, export_blocks=True, - export_transactions=True): + export_transactions=True, + max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): validate_range(start_block, end_block) self.start_block = start_block self.end_block = end_block self.batch_web3_provider = batch_web3_provider - self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=batch_size, + max_workers=max_workers, + max_retries=max_retries + ) self.item_exporter = item_exporter self.export_blocks = export_blocks @@ -72,6 +77,7 @@ def _export(self): def _export_batch(self, block_number_batch): blocks_rpc = list(generate_get_block_by_number_json_rpc(block_number_batch, self.export_transactions)) + response = self.batch_web3_provider.make_batch_request(json.dumps(blocks_rpc)) results = rpc_response_batch_to_results(response) blocks = [self.block_mapper.json_dict_to_block(result) for result in results] diff --git a/ethereumetl/jobs/export_contracts_job.py b/ethereumetl/jobs/export_contracts_job.py index 08f36bc52..0adac38c4 100644 --- a/ethereumetl/jobs/export_contracts_job.py +++ b/ethereumetl/jobs/export_contracts_job.py @@ -40,11 +40,16 @@ def __init__( batch_size, batch_web3_provider, max_workers, - item_exporter): + item_exporter, + max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): self.batch_web3_provider = batch_web3_provider self.contract_addresses_iterable = contract_addresses_iterable - self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=batch_size, + max_workers=max_workers, + max_retries=max_retries + ) self.item_exporter = item_exporter self.contract_service = EthContractService() diff --git a/ethereumetl/jobs/export_geth_traces_job.py b/ethereumetl/jobs/export_geth_traces_job.py index 745c403e2..a051f57fe 100644 --- a/ethereumetl/jobs/export_geth_traces_job.py +++ b/ethereumetl/jobs/export_geth_traces_job.py @@ -38,14 +38,19 @@ def __init__( batch_size, batch_web3_provider, max_workers, - item_exporter): + item_exporter, + max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): validate_range(start_block, end_block) self.start_block = start_block self.end_block = end_block self.batch_web3_provider = batch_web3_provider - self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=batch_size, + max_workers=max_workers, + max_retries=max_retries + ) self.item_exporter = item_exporter self.geth_trace_mapper = EthGethTraceMapper() diff --git a/ethereumetl/jobs/export_origin_job.py b/ethereumetl/jobs/export_origin_job.py index 58d2e931f..1bb3a7a64 100644 --- a/ethereumetl/jobs/export_origin_job.py +++ b/ethereumetl/jobs/export_origin_job.py @@ -26,7 +26,8 @@ def __init__( ipfs_client, marketplace_listing_exporter, shop_product_exporter, - max_workers): + max_workers, + max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): validate_range(start_block, end_block) self.start_block = start_block self.end_block = end_block @@ -36,7 +37,11 @@ def __init__( self.marketplace_listing_exporter = marketplace_listing_exporter self.shop_product_exporter = shop_product_exporter - self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=batch_size, + max_workers=max_workers, + max_retries=max_retries + ) self.event_extractor = OriginEventExtractor(ipfs_client) diff --git a/ethereumetl/jobs/export_receipts_job.py b/ethereumetl/jobs/export_receipts_job.py index 3dc1622cb..641d4d02e 100644 --- a/ethereumetl/jobs/export_receipts_job.py +++ b/ethereumetl/jobs/export_receipts_job.py @@ -41,11 +41,16 @@ def __init__( max_workers, item_exporter, export_receipts=True, - export_logs=True): + export_logs=True, + max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): self.batch_web3_provider = batch_web3_provider self.transaction_hashes_iterable = transaction_hashes_iterable - self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=batch_size, + max_workers=max_workers, + max_retries=max_retries + ) self.item_exporter = item_exporter self.export_receipts = export_receipts diff --git a/ethereumetl/jobs/export_token_transfers_job.py b/ethereumetl/jobs/export_token_transfers_job.py index b0d6f6f1b..b5d737e74 100644 --- a/ethereumetl/jobs/export_token_transfers_job.py +++ b/ethereumetl/jobs/export_token_transfers_job.py @@ -37,7 +37,8 @@ def __init__( web3, item_exporter, max_workers, - tokens=None): + tokens=None, + max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): validate_range(start_block, end_block) self.start_block = start_block self.end_block = end_block @@ -46,7 +47,11 @@ def __init__( self.tokens = tokens self.item_exporter = item_exporter - self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=batch_size, + max_workers=max_workers, + max_retries=max_retries + ) self.receipt_log_mapper = EthReceiptLogMapper() self.token_transfer_mapper = EthTokenTransferMapper() diff --git a/ethereumetl/jobs/export_tokens_job.py b/ethereumetl/jobs/export_tokens_job.py index 5e0c88288..9471bca83 100644 --- a/ethereumetl/jobs/export_tokens_job.py +++ b/ethereumetl/jobs/export_tokens_job.py @@ -28,10 +28,14 @@ class ExportTokensJob(BaseJob): - def __init__(self, web3, item_exporter, token_addresses_iterable, max_workers): + def __init__(self, web3, item_exporter, token_addresses_iterable, max_workers, max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): self.item_exporter = item_exporter self.token_addresses_iterable = token_addresses_iterable - self.batch_work_executor = BatchWorkExecutor(1, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=1, + max_workers=max_workers, + max_retries=max_retries + ) self.token_service = EthTokenService(web3, clean_user_provided_content) self.token_mapper = EthTokenMapper() diff --git a/ethereumetl/jobs/export_traces_job.py b/ethereumetl/jobs/export_traces_job.py index 50a3c2c0e..138ff22ef 100644 --- a/ethereumetl/jobs/export_traces_job.py +++ b/ethereumetl/jobs/export_traces_job.py @@ -41,7 +41,8 @@ def __init__( item_exporter, max_workers, include_genesis_traces=False, - include_daofork_traces=False): + include_daofork_traces=False, + max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): validate_range(start_block, end_block) self.start_block = start_block self.end_block = end_block @@ -49,7 +50,11 @@ def __init__( self.web3 = web3 # TODO: use batch_size when this issue is fixed https://github.com/paritytech/parity-ethereum/issues/9822 - self.batch_work_executor = BatchWorkExecutor(1, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=1, + max_workers=max_workers, + max_retries=max_retries + ) self.item_exporter = item_exporter self.trace_mapper = EthTraceMapper() diff --git a/ethereumetl/jobs/extract_contracts_job.py b/ethereumetl/jobs/extract_contracts_job.py index 508f54372..57c3619ba 100644 --- a/ethereumetl/jobs/extract_contracts_job.py +++ b/ethereumetl/jobs/extract_contracts_job.py @@ -37,10 +37,15 @@ def __init__( traces_iterable, batch_size, max_workers, - item_exporter): + item_exporter, + max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): self.traces_iterable = traces_iterable - self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=batch_size, + max_workers=max_workers, + max_retries=max_retries + ) self.item_exporter = item_exporter self.contract_service = EthContractService() diff --git a/ethereumetl/jobs/extract_geth_traces_job.py b/ethereumetl/jobs/extract_geth_traces_job.py index d1f8e0985..2405bc009 100644 --- a/ethereumetl/jobs/extract_geth_traces_job.py +++ b/ethereumetl/jobs/extract_geth_traces_job.py @@ -32,10 +32,15 @@ def __init__( traces_iterable, batch_size, max_workers, - item_exporter): + item_exporter, + max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): self.traces_iterable = traces_iterable - self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=batch_size, + max_workers=max_workers, + max_retries=max_retries + ) self.item_exporter = item_exporter self.trace_mapper = EthTraceMapper() diff --git a/ethereumetl/jobs/extract_token_transfers_job.py b/ethereumetl/jobs/extract_token_transfers_job.py index 342f3dd3c..61bab1429 100644 --- a/ethereumetl/jobs/extract_token_transfers_job.py +++ b/ethereumetl/jobs/extract_token_transfers_job.py @@ -33,10 +33,15 @@ def __init__( logs_iterable, batch_size, max_workers, - item_exporter): + item_exporter, + max_retries=BatchWorkExecutor.DEFAULT_MAX_RETRIES): self.logs_iterable = logs_iterable - self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers) + self.batch_work_executor = BatchWorkExecutor( + starting_batch_size=batch_size, + max_workers=max_workers, + max_retries=max_retries + ) self.item_exporter = item_exporter self.receipt_log_mapper = EthReceiptLogMapper() diff --git a/ethereumetl/jobs/extract_tokens_job.py b/ethereumetl/jobs/extract_tokens_job.py index 62679de71..07cc0af83 100644 --- a/ethereumetl/jobs/extract_tokens_job.py +++ b/ethereumetl/jobs/extract_tokens_job.py @@ -25,8 +25,8 @@ class ExtractTokensJob(ExportTokensJob): - def __init__(self, web3, item_exporter, contracts_iterable, max_workers): - super().__init__(web3, item_exporter, [], max_workers) + def __init__(self, web3, item_exporter, contracts_iterable, max_workers, max_retries): + super().__init__(web3, item_exporter, [], max_workers, max_retries) self.contracts_iterable = contracts_iterable def _export(self): diff --git a/ethereumetl/streaming/eth_streamer_adapter.py b/ethereumetl/streaming/eth_streamer_adapter.py index 7fcf39377..ea2873348 100644 --- a/ethereumetl/streaming/eth_streamer_adapter.py +++ b/ethereumetl/streaming/eth_streamer_adapter.py @@ -24,11 +24,13 @@ def __init__( item_exporter=ConsoleItemExporter(), batch_size=100, max_workers=5, + max_retries=5, entity_types=tuple(EntityType.ALL_FOR_STREAMING)): self.batch_web3_provider = batch_web3_provider self.item_exporter = item_exporter self.batch_size = batch_size self.max_workers = max_workers + self.max_retries = max_retries self.entity_types = entity_types self.item_id_calculator = EthItemIdCalculator() self.item_timestamp_calculator = EthItemTimestampCalculator() @@ -110,6 +112,7 @@ def _export_blocks_and_transactions(self, start_block, end_block): batch_size=self.batch_size, batch_web3_provider=self.batch_web3_provider, max_workers=self.max_workers, + max_retries=self.max_retries, item_exporter=blocks_and_transactions_item_exporter, export_blocks=self._should_export(EntityType.BLOCK), export_transactions=self._should_export(EntityType.TRANSACTION) @@ -126,6 +129,7 @@ def _export_receipts_and_logs(self, transactions): batch_size=self.batch_size, batch_web3_provider=self.batch_web3_provider, max_workers=self.max_workers, + max_retries=self.max_retries, item_exporter=exporter, export_receipts=self._should_export(EntityType.RECEIPT), export_logs=self._should_export(EntityType.LOG) @@ -141,6 +145,7 @@ def _extract_token_transfers(self, logs): logs_iterable=logs, batch_size=self.batch_size, max_workers=self.max_workers, + max_retries=self.max_retries, item_exporter=exporter) job.run() token_transfers = exporter.get_items('token_transfer') @@ -154,6 +159,7 @@ def _export_traces(self, start_block, end_block): batch_size=self.batch_size, web3=ThreadLocalProxy(lambda: build_web3(self.batch_web3_provider)), max_workers=self.max_workers, + max_retries=self.max_retries, item_exporter=exporter ) job.run() @@ -166,6 +172,7 @@ def _export_contracts(self, traces): traces_iterable=traces, batch_size=self.batch_size, max_workers=self.max_workers, + max_retries=self.max_retries, item_exporter=exporter ) job.run() @@ -178,6 +185,7 @@ def _extract_tokens(self, contracts): contracts_iterable=contracts, web3=ThreadLocalProxy(lambda: build_web3(self.batch_web3_provider)), max_workers=self.max_workers, + max_retries=self.max_retries, item_exporter=exporter ) job.run()