Skip to content

Bug: REST Proxy ignores committed offsets for lazy-assigned partitions, causing message replay #1138

@lukaesch

Description

@lukaesch

What happened?

When using the Kafka REST Proxy v2 API with consumer subscriptions, partitions that are assigned during rebalancing or are not immediately polled start consuming from the beginning of the topic (auto.offset.reset=earliest) instead of using previously committed offsets. This causes massive message replay and data duplication.

Reproduction Steps:

  1. Create a REST Proxy consumer with auto.offset.reset=earliest and auto.commit.enable=false:
curl -X POST "https://rest-proxy/consumers/test-group" \
  -H "Content-Type: application/vnd.kafka.avro.v2+json" \
  -d '{
    "name": "test-consumer",
    "format": "avro",
    "auto.offset.reset": "earliest",
    "auto.commit.enable": "false"
  }'
  1. Subscribe to 3 topics with multiple partitions:
curl -X POST "https://rest-proxy/consumers/test-group/instances/test-consumer/subscription" \
  -H "Content-Type: application/vnd.kafka.avro.v2+json" \
  -d '{"topics": ["topic1", "topic2", "topic3"]}'
  1. Consume messages with a small buffer (triggers single partition activation):
curl -X GET "https://rest-proxy/consumers/test-group/instances/test-consumer/records?max_bytes=1000"
# Returns messages from partition 1, offsets 11325-11400
  1. Commit offsets:
curl -X POST "https://rest-proxy/consumers/test-group/instances/test-consumer/offsets" \
  -H "Content-Type: application/vnd.kafka.avro.v2+json" \
  -d '{"offsets": [{"topic": "topic1", "partition": 1, "offset": 11401}]}'
  1. Check committed offsets for all partitions:
curl -X GET "https://rest-proxy/consumers/test-group/instances/test-consumer/offsets" \
  -H "Content-Type: application/vnd.kafka.avro.v2+json" \
  -d '{
    "partitions": [
      {"topic": "topic1", "partition": 0},
      {"topic": "topic1", "partition": 1},
      {"topic": "topic1", "partition": 2}
    ]
  }'
# Example response showing existing committed offsets:
# P0: 11144, P1: 11401, P2: 12670
  1. Consume with larger buffer (activates more partitions):
curl -X GET "https://rest-proxy/consumers/test-group/instances/test-consumer/records?max_bytes=1000000"
# BUG: Returns OLD messages from P0 starting at 9732 (should start at 11144)
#      Returns OLD messages from P2 starting at 10331 (should start at 12670)

Root Cause:

The bug is in /src/karapace/core/kafka/consumer.py lines 142-148. The _on_assign callback only tracks topic subscriptions but fails to position newly assigned partitions at their committed offsets:

def _on_assign(self, _consumer: Consumer, partitions: list[TopicPartition]) -> None:
    topics = frozenset(partition.topic for partition in partitions)
    self._subscription = self._subscription.union(topics)
    # BUG: Missing offset positioning logic!

This causes partitions that aren't immediately polled after assignment to remain at their default position (auto.offset.reset), ignoring committed offsets.

What did you expect to happen?

When a consumer subscribes to topics and partitions are assigned (either initially or during rebalancing), ALL partitions should resume from their last committed offsets if they exist.

Expected behavior:

  • P0 should start at offset 11144 (its committed offset)
  • P1 should start at offset 11401 (its committed offset)
  • P2 should start at offset 12670 (its committed offset)
  • Only partitions WITHOUT committed offsets should use auto.offset.reset

Example of correct behavior:

Consumer group: test-group
Committed offsets: P0=11144, P1=11401, P2=12670

After consumer recreation or rebalancing:
- Fetch from P0 → First message at offset 11144 ✓
- Fetch from P1 → First message at offset 11401 ✓
- Fetch from P2 → First message at offset 12670 ✓

What else do we need to know?

Environment:

  • Karapace version: Latest (reproduced on main branch)
  • Kafka version: 3.x
  • REST Proxy API: v2
  • Consumer configuration: auto.offset.reset=earliest, auto.commit.enable=false

Impact:
This bug affects production systems causing:

  • Message replay after consumer recreation
  • Data duplication in downstream systems
  • Incorrect processing when partitions rebalance
  • Worse impact with larger topic backlogs

Workaround:
Using manual partition assignment instead of subscription works correctly, but loses the benefits of automatic rebalancing.

Proposed Fix:
The _on_assign callback should retrieve committed offsets and seek to them:

def _on_assign(self, consumer: Consumer, partitions: list[TopicPartition]) -> None:
    topics = frozenset(partition.topic for partition in partitions)
    self._subscription = self._subscription.union(topics)
    
    # FIX: Position partitions at committed offsets
    # Note: committed() may need timeout_ms parameter depending on confluent-kafka version
    committed = consumer.committed(partitions, timeout_ms=5000)
    for partition, committed_partition in zip(partitions, committed):
        if committed_partition and committed_partition.offset >= 0:
            consumer.seek(committed_partition)

Additional Notes:

  • Bug is more visible with larger max_bytes values (more partitions activate simultaneously)
  • Bug always occurs after consumer recreation (all partitions need repositioning)
  • Tests in /tests/integration/kafka_rest_apis/test_rest_consumer.py primarily use manual assignment, missing this subscription-based bug

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions