Fix: #1314 partition watcher doesn't reacts on partition number changing#1365
Fix: #1314 partition watcher doesn't reacts on partition number changing#1365arxon31 wants to merge 6 commits into
Conversation
|
@erikdw @petedannemann |
| } | ||
|
|
||
| for _, topic := range config.Topics { | ||
| cg.partitionsPerTopic[topic] = 0 |
There was a problem hiding this comment.
Why is this being set to zero instead of the actual value?
There was a problem hiding this comment.
@petedannemann
This code can be removed. I forgot that I fill this map in the assignTopicPartitions function and only after that I run partitionWatcher
Should I remove this?
There was a problem hiding this comment.
Yeah if it's not necessary let's remove this
| if cg.config.WatchPartitionChanges { | ||
| for _, topic := range cg.config.Topics { | ||
| gen.partitionWatcher(cg.config.PartitionWatchInterval, topic) | ||
| if cg.config.WatchPartitionChanges && iAmLeader { |
There was a problem hiding this comment.
Can you provide where you found this in the Kafka docs or code base? I guess this makes sense as the leader will trigger a consumer balance, which will let follows pick up partition changes
There was a problem hiding this comment.
@petedannemann
So it's my mistake, when I said that kafka says that this is correct. I misunderstood Jun Rao from this video. He did not say who exactly should run the partition watcher mechanism. But it still seems to me that partitionWatcher should only be launched by the leader. This way we create less load on the cluster and if leader disconnects broker also triggers rebalance for group and leader election
There was a problem hiding this comment.
I am a bit worried about this causing some unintended regression. Could we break that change out into a separate PR and let us think it through for a bit?
There was a problem hiding this comment.
@petedannemann
Removed from this PR
Should I make an issue where we can discuss this or just a new PR?
|
@petedannemann Thanks) |
|
any updates on this? kind of crucial fix for us when we read from a non-existent topic |
need to review from @petedannemann or somebody else |
Suggestion to fix issue #1314
Steps to reproduce the issue:
After all we have empty consumer group, and partitionWatcher monitoring for changing partition number with non-zero start number of partitions. So we have to increase partitions to receive topic into consumer group.
Solution:
We need to assign partitions number to topic in assignTopicPartitions function. So we run partitionWatcher with assigned in assignTopicPartition function number of partitions and don't scrape it inside partitionWatcher
P.S. In this PR I also changed partitionWatcher. Now it runs only if consumer is leader of consumer group. Kafka says this is correct