Implement partition tracking and rebalance handling in Consumer and BatchConsumer #67
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fix "Local: Erroneous state" panic during Kafka rebalancing
Problem
When a Kafka rebalance occurs during batch processing (especially with concurrency > 1), goroutines attempt to commit offsets for partitions they no longer own, causing librdkafka to return "Erroneous state" errors which bubble up as panics.
Root Cause
Local: Erroneous stateis a librdkafka internal error that occurs when the client attempts a Kafka operation while in an invalid/transition state (e.g., after partitions were revoked during a rebalance).With batch concurrency > 1: Multiple goroutines are processing simultaneously. When rebalance starts, all goroutines try to store offsets/commit at nearly the same time while the rebalance is in progress. Kafka's transactional state machine cannot handle overlapping operations during a rebalance — one succeeds, the others panic.
Solution
onPartitionsAssigned/onPartitionsRevoked)StoreOffsetsChanges
consumer.goactivePartitionsmap, rebalance callback,isPartitionActive()check instoreMessage()batch_consumer.gostoreBatch()to only commit offsets for active partitions*_test.go