Skip to content

Commit 7453431

Browse files
authored
Merge pull request #438 from prodo56/oauth-fix-for-kafka-source
Fixes issue 431 for kafka oauth source
2 parents d7fdb71 + dc7f409 commit 7453431

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

ci/environment-py37.yml

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,19 @@ dependencies:
1212
- toolz
1313
- zict
1414
- six
15-
- librdkafka=1.2.2
15+
- librdkafka=1.5.3
1616
- dask
1717
- distributed
1818
- pandas
19-
- python-confluent-kafka=1.1.0
19+
- python-confluent-kafka=1.5.0
2020
- numpydoc
2121
- sphinx
2222
- sphinx_rtd_theme
2323
- codecov
2424
- coverage
2525
- networkx
2626
- graphviz
27+
- pytest-asyncio
2728
- python-graphviz
2829
- bokeh
2930
- ipython

ci/environment-py38.yml

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ dependencies:
1212
- toolz
1313
- zict
1414
- six
15-
- librdkafka=1.2.2
15+
- librdkafka=1.5.3
1616
- dask
1717
- distributed
1818
- pandas
19-
- python-confluent-kafka=1.1.0
19+
- python-confluent-kafka=1.5.0
2020
- numpydoc
2121
- sphinx
2222
- sphinx_rtd_theme
@@ -25,6 +25,7 @@ dependencies:
2525
- networkx
2626
- graphviz
2727
- python-graphviz
28+
- pytest-asyncio
2829
- bokeh
2930
- ipython
3031
- ipykernel

streamz/sources.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -449,10 +449,14 @@ def start(self):
449449
self.stopped = False
450450
self.consumer = ck.Consumer(self.cpars)
451451
self.consumer.subscribe(self.topics)
452-
weakref.finalize(self, lambda consumer=self.consumer: _close_consumer(consumer))
452+
weakref.finalize(
453+
self, lambda consumer=self.consumer: _close_consumer(consumer)
454+
)
453455
tp = ck.TopicPartition(self.topics[0], 0, 0)
454456

455-
# blocks for consumer thread to come up
457+
# blocks for consumer thread to come up and invoke poll to
458+
# establish connection with broker to fetch oauth token for kafka
459+
self.consumer.poll(timeout=1)
456460
self.consumer.get_watermark_offsets(tp)
457461
self.loop.add_callback(self.poll_kafka)
458462

@@ -479,7 +483,8 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
479483
max_batch_size=10000, keys=False,
480484
engine=None, **kwargs):
481485
self.consumer_params = consumer_params
482-
# Override the auto-commit config to enforce custom streamz checkpointing
486+
# Override the auto-commit config to enforce custom streamz
487+
# checkpointing
483488
self.consumer_params['enable.auto.commit'] = 'false'
484489
if 'auto.offset.reset' not in self.consumer_params.keys():
485490
consumer_params['auto.offset.reset'] = 'latest'
@@ -587,7 +592,9 @@ def start(self):
587592
self.stopped = False
588593
tp = ck.TopicPartition(self.topic, 0, 0)
589594

590-
# blocks for consumer thread to come up
595+
# blocks for consumer thread to come up and invoke poll to establish
596+
# connection with broker to fetch oauth token for kafka
597+
self.consumer.poll(timeout=1)
591598
self.consumer.get_watermark_offsets(tp)
592599
self.loop.add_callback(self.poll_kafka)
593600

0 commit comments

Comments
 (0)