From a63d7dbbeb88e8801ca48d94930e8928e34b3af7 Mon Sep 17 00:00:00 2001 From: rightx2 Date: Sun, 6 Nov 2022 08:40:10 +0900 Subject: [PATCH] Add *args, **kwargs on create_item_exporter(s) for flexiblility --- ethereumetl/cli/stream.py | 5 ++- .../streaming/item_exporter_creator.py | 44 ++++++++++++------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/ethereumetl/cli/stream.py b/ethereumetl/cli/stream.py index 217208dba..daf7685fd 100644 --- a/ethereumetl/cli/stream.py +++ b/ethereumetl/cli/stream.py @@ -53,8 +53,9 @@ @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers') @click.option('--log-file', default=None, show_default=True, type=str, help='Log file') @click.option('--pid-file', default=None, show_default=True, type=str, help='pid file') +@click.option('--topic-prefix', default=None, show_default=True, type=str, help='topic prefix for kafka, pubsub, e.g. blockchain.ethereuem') def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types, - period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None): + period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None, topic_prefix=None): """Streams all data types to console or Google Pub/Sub.""" configure_logging(log_file) configure_signals() @@ -69,7 +70,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit streamer_adapter = EthStreamerAdapter( batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), - item_exporter=create_item_exporters(output), + item_exporter=create_item_exporters(output, topic_prefix=topic_prefix), batch_size=batch_size, max_workers=max_workers, entity_types=entity_types diff --git a/ethereumetl/streaming/item_exporter_creator.py b/ethereumetl/streaming/item_exporter_creator.py index 18bdb22b4..08c8e3172 100644 --- a/ethereumetl/streaming/item_exporter_creator.py +++ b/ethereumetl/streaming/item_exporter_creator.py @@ -24,27 +24,32 @@ from blockchainetl.jobs.exporters.multi_item_exporter import MultiItemExporter -def create_item_exporters(outputs): +def create_item_exporters(outputs, *args, **kwargs): split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console'] - item_exporters = [create_item_exporter(output) for output in split_outputs] + item_exporters = [create_item_exporter(output, *args, **kwargs) for output in split_outputs] return MultiItemExporter(item_exporters) -def create_item_exporter(output): +def create_item_exporter(output, *args, **kwargs): item_exporter_type = determine_item_exporter_type(output) if item_exporter_type == ItemExporterType.PUBSUB: from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter + topic_prefix = kwargs.get("topic_prefix") + if topic_prefix and not topic_prefix.endswith("."): + topic_prefix += "." + else: + topic_prefix = output + "." # For backward compatibitliy enable_message_ordering = 'sorted' in output or 'ordered' in output item_exporter = GooglePubSubItemExporter( item_type_to_topic_mapping={ - 'block': output + '.blocks', - 'transaction': output + '.transactions', - 'log': output + '.logs', - 'token_transfer': output + '.token_transfers', - 'trace': output + '.traces', - 'contract': output + '.contracts', - 'token': output + '.tokens', + 'block': topic_prefix + 'blocks', + 'transaction': topic_prefix + 'transactions', + 'log': topic_prefix + 'logs', + 'token_transfer': topic_prefix + 'token_transfers', + 'trace': topic_prefix + 'traces', + 'contract': topic_prefix + 'contracts', + 'token': topic_prefix + 'tokens', }, message_attributes=('item_id', 'item_timestamp'), batch_max_bytes=1024 * 1024 * 5, @@ -84,14 +89,19 @@ def create_item_exporter(output): item_exporter = ConsoleItemExporter() elif item_exporter_type == ItemExporterType.KAFKA: from blockchainetl.jobs.exporters.kafka_exporter import KafkaItemExporter + topic_prefix = kwargs.get("topic_prefix") + if topic_prefix and not topic_prefix.endswith("."): + topic_prefix += "." + else: + topic_prefix = "" item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping={ - 'block': 'blocks', - 'transaction': 'transactions', - 'log': 'logs', - 'token_transfer': 'token_transfers', - 'trace': 'traces', - 'contract': 'contracts', - 'token': 'tokens', + 'block': topic_prefix + 'blocks', + 'transaction': topic_prefix + 'transactions', + 'log': topic_prefix + 'logs', + 'token_transfer': topic_prefix + 'token_transfers', + 'trace': topic_prefix + 'traces', + 'contract': topic_prefix + 'contracts', + 'token': topic_prefix + 'tokens', }) else: