-
Notifications
You must be signed in to change notification settings - Fork 55
Description
I'm getting an error when I run the wordcount example. I cloned the git repo and did not make any changes. The issue seems to be that timestamp is represented as a tuple, but it expects an integer.
I ran zookeeper-server-start.sh, kafka-server-start.sh, example.py, and source_client.py.
Source client output:
producing a b c to wks-wordcount-example-topic
source_client.py:9: DeprecationWarning: PY_SSIZE_T_CLEAN will be required for '#' formats
p.produce(topic, data.encode('utf-8'))
producing a b to wks-wordcount-example-topic
producing a to wks-wordcount-example-topic
example.py output:
WARNING:winton_kafka_streams.processor._stream_thread(Thread-1):Unexpected state transition from RUNNING to ASSIGNING_PARTITIONS.
WARNING:winton_kafka_streams.processor._stream_thread(Thread-1):Unexpected state transition from RUNNING to NOT_RUNNING.
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_stream_thread.py", line 140, in run
self.process_and_punctuate()
File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_stream_thread.py", line 181, in process_and_punctuate
if task.process():
File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_stream_task.py", line 122, in process
self.topology.sources[topic].process(key, value)
File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/topology.py", line 25, in process
self.processor.process(key, value)
File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/processor.py", line 35, in process
self.context.forward(key, value)
File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/processor_context.py", line 49, in forward
child.process(key, value)
File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/topology.py", line 25, in process
self.processor.process(key, value)
File "example.py", line 37, in process
self.word_count_store[word] = count + 1
File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/state/logging/change_logging_state_store.py", line 46, in setitem
self.change_logger.log_change(key_bytes, value_bytes)
File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/state/logging/store_change_logger.py", line 10, in log_change
self.record_collector.send(self.topic, key, value, self.context.timestamp, partition=self.partition)
File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_record_collector.py", line 38, in send
self.producer.produce(topic, ser_value, ser_key, partition, self.on_delivery, partitioner, timestamp)
TypeError: an integer is required (got type tuple)