Skip to content

Issue with manual offset commit when using ConsumePartition and OffsetManager  #3036

Open
@krishnapal2545

Description

@krishnapal2545

I am encountering an issue with manually committing offsets when using the ConsumePartition method in Sarama. After consuming messages and calling OffsetManager.ManagePartition.MarkOffset to mark the offset, it seems that the offsets are not being properly committed, and the consumer continues to re-read messages from the beginning upon restart. I have verified that I am using the correct offset manager methods and committing the offsets manually, but the behavior persists.

Steps to Reproduce:
Set up a Kafka consumer group using Sarama with ConsumePartition.
Consume messages from a specific partition of a topic.
Use OffsetManager.ManagePartition.MarkOffset MarkOffset to mark the offset for the next message after processing each message.
Call Commit() on the OffsetManager to commit the offsets manually.
Restart the consumer.
Observe that the consumer reads messages from the beginning of the topic, even though offsets were committed manually.

Expected Behavior: After manually committing the offsets, the consumer should start from the last committed offset, not from the beginning of the topic, upon restart.

Actual Behavior: The consumer re-reads all messages from the beginning, ignoring the manually committed offsets.

Sarama Version: v1.43.3

go

config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = false  // Auto-commit disabled
config.Consumer.Return.Errors = true

package main

import (
	"fmt"
	"github.com/IBM/sarama"
	"log"
)

func main() {
	config := sarama.NewConfig()
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Offsets.AutoCommit.Enable = false

	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatal("Error creating consumer:", err)
	}
	defer consumer.Close()

	partitionConsumer, err := consumer.ConsumePartition("my-topic", 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatal("Error consuming partition:", err)
	}
	defer partitionConsumer.Close()

	offsetManager, err := sarama.NewOffsetManagerFromClient("my-consumer-group", consumer)
	if err != nil {
		log.Fatal("Error creating offset manager:", err)
	}
	defer offsetManager.Close()

	offsetMgr, err := offsetManager.ManagePartition("my-topic", 0)
	if err != nil {
		log.Fatal("Error managing partition:", err)
	}
	defer offsetMgr.Close()

	for msg := range partitionConsumer.Messages() {
		// Process the message
		fmt.Printf("Consumed message: %s\n", string(msg.Value))

		// Mark the offset (next message to read)
		offsetMgr.MarkOffset(msg.Offset+1, "")

		// Commit the offset (mark the offset as committed)
		err := offsetManager.Commit()  // Commit manually
		if err != nil {
			log.Printf("Error committing offset: %v", err)
		} else {
			log.Printf("Successfully committed offset: %d", msg.Offset+1)
		}
	}
}

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions