diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py
index ac75e242b..32e6a07ba 100644
--- a/blockchainetl/jobs/exporters/kafka_exporter.py
+++ b/blockchainetl/jobs/exporters/kafka_exporter.py
@@ -2,7 +2,7 @@
import json
import logging
-from kafka import KafkaProducer
+from confluent_kafka import Producer
from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter
@@ -13,28 +13,41 @@ def __init__(self, output, item_type_to_topic_mapping, converters=()):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.converter = CompositeItemConverter(converters)
self.connection_url = self.get_connection_url(output)
- print(self.connection_url)
- self.producer = KafkaProducer(bootstrap_servers=self.connection_url)
+ self.topic_prefix = self.get_topic_prefix(output)
+ print(self.connection_url, self.topic_prefix)
+ self.producer = Producer({
+ 'bootstrap.servers': self.connection_url,
+ 'transactional.id': 'ethereumetl',
+ 'enable.idempotence': True,
+ })
def get_connection_url(self, output):
try:
return output.split('/')[1]
- except KeyError:
- raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092"')
+ except IndexError:
+ raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092" or "kafka/127.0.0.1:9092/"')
+
+ def get_topic_prefix(self, output):
+ try:
+ return output.split('/')[2] + "."
+ except IndexError:
+ return ''
def open(self):
- pass
+ self.producer.init_transactions()
def export_items(self, items):
+ self.producer.begin_transaction()
for item in items:
self.export_item(item)
-
+ self.producer.commit_transaction()
+
def export_item(self, item):
item_type = item.get('type')
if item_type is not None and item_type in self.item_type_to_topic_mapping:
data = json.dumps(item).encode('utf-8')
- print(data)
- return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data)
+ logging.debug(data)
+ return self.producer.produce(self.topic_prefix + self.item_type_to_topic_mapping[item_type], data)
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))
@@ -43,9 +56,8 @@ def convert_items(self, items):
yield self.converter.convert_item(item)
def close(self):
- pass
-
-
+ pass
+
def group_by_item_type(items):
result = collections.defaultdict(list)
for item in items:
diff --git a/docs/commands.md b/docs/commands.md
index 2be620cda..e8b1124d7 100644
--- a/docs/commands.md
+++ b/docs/commands.md
@@ -213,7 +213,7 @@ e.g. `-e block,transaction,log,token_transfer,trace,contract,token`.
- For Postgres: `--output=postgresql+pg8000://:@:/`,
e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`.
- For GCS: `--output=gs://`. Make sure to install and initialize `gcloud` cli.
- - For Kafka: `--output=kafka/:`, e.g. `--output=kafka/127.0.0.1:9092`
+ - For Kafka: `--output=kafka/:/`, e.g. `--output=kafka/127.0.0.1:9092` or `--output=kafka/127.0.0.1:9092/crypto_ethereum`.
- Those output types can be combined with a comma e.g. `--output=gs://,projects//topics/crypto_ethereum`
The [schema](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema)
diff --git a/setup.py b/setup.py
index 3804545c1..8447f52de 100644
--- a/setup.py
+++ b/setup.py
@@ -46,7 +46,7 @@ def read(fname):
'timeout-decorator==0.4.1',
'google-cloud-pubsub==2.1.0',
'google-cloud-storage==1.33.0',
- 'kafka-python==2.0.2',
+ 'confluent-kafka==1.9.0',
'sqlalchemy==1.4',
'pg8000==1.16.6',
# This library is a dependency for google-cloud-pubsub, starting from 0.3.22 it requires Rust,