Skip to content

Commit aa04a5c

Browse files
committed
Fix consumer assign
1 parent 22327c3 commit aa04a5c

File tree

2 files changed

+121
-0
lines changed

2 files changed

+121
-0
lines changed

src/karapace/core/kafka/consumer.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,59 @@ def _on_assign(self, _consumer: Consumer, partitions: list[TopicPartition]) -> N
143143
topics = frozenset(partition.topic for partition in partitions)
144144
self._subscription = self._subscription.union(topics)
145145

146+
# Position partitions at committed offsets to avoid message replay
147+
try:
148+
# Fetch committed offsets for all assigned partitions
149+
# Use the underlying confluent-kafka consumer directly to avoid wrapper exceptions
150+
try:
151+
committed = _consumer.committed(partitions, timeout=10.0)
152+
self.log.debug("Fetched committed offsets: %s", committed)
153+
except Exception as comm_exc:
154+
self.log.warning("Failed to fetch committed offsets: %s", comm_exc)
155+
# If we can't fetch committed offsets, let auto.offset.reset handle positioning
156+
return
157+
158+
# Process each partition's committed offset
159+
for tp in committed:
160+
if tp.offset == -1001: # OFFSET_INVALID
161+
# No committed offset exists, let auto.offset.reset handle it
162+
self.log.debug(
163+
"No committed offset for %s-%s; using auto.offset.reset",
164+
tp.topic,
165+
tp.partition,
166+
)
167+
elif tp.offset >= 0:
168+
# Valid committed offset exists, seek to it
169+
try:
170+
_consumer.seek(tp)
171+
self.log.debug(
172+
"Seeked to committed offset %d for %s-%s",
173+
tp.offset,
174+
tp.topic,
175+
tp.partition,
176+
)
177+
except KafkaException as seek_error:
178+
self.log.warning(
179+
"Failed to seek to committed offset %d for %s-%s: %s",
180+
tp.offset,
181+
tp.topic,
182+
tp.partition,
183+
seek_error,
184+
)
185+
else:
186+
# Negative offset (like -1 for latest), handle appropriately
187+
self.log.debug(
188+
"Special offset %d for %s-%s; using auto.offset.reset",
189+
tp.offset,
190+
tp.topic,
191+
tp.partition,
192+
)
193+
194+
except KafkaException as e:
195+
self.log.warning("Failed to fetch committed offsets during assignment: %s", e)
196+
# If we can't fetch committed offsets, let auto.offset.reset handle positioning
197+
# This ensures the consumer doesn't get stuck or start from an unexpected position
198+
146199
def _on_revoke(self, _consumer: Consumer, partitions: list[TopicPartition]) -> None:
147200
topics = frozenset(partition.topic for partition in partitions)
148201
self._subscription = self._subscription.difference(topics)

tests/integration/kafka_rest_apis/test_rest_consumer.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,3 +549,71 @@ async def test_consume_grafecul_deserialization_error_handling(rest_async_client
549549
# Consuming records should fail gracefully if record can not be deserialized to the selected format
550550
assert resp.status_code == 422, f"Expected 422 response: {resp}"
551551
assert f"value deserialization error for format {fmt}" in resp.json()["message"]
552+
553+
554+
@pytest.mark.parametrize("trail", ["", "/"])
555+
async def test_resume_from_committed_offsets(rest_async_client, admin_client, producer, trail):
556+
group_name = new_random_name("offset_resume_group")
557+
fmt = "binary"
558+
header = REST_HEADERS[fmt]
559+
560+
# Step 1: Create first consumer instance in group
561+
instance_id_1 = await new_consumer(
562+
rest_async_client,
563+
group_name,
564+
fmt=fmt,
565+
trail=trail,
566+
payload_override={"auto.commit.enable": "false"},
567+
)
568+
topic_name = new_topic(admin_client)
569+
570+
assign_path_1 = f"/consumers/{group_name}/instances/{instance_id_1}/assignments{trail}"
571+
offsets_path_1 = f"/consumers/{group_name}/instances/{instance_id_1}/offsets{trail}"
572+
consume_path_1 = f"/consumers/{group_name}/instances/{instance_id_1}/records{trail}?timeout=5000"
573+
574+
# Assign partition
575+
res = await rest_async_client.post(
576+
assign_path_1, json={"partitions": [{"topic": topic_name, "partition": 0}]}, headers=header
577+
)
578+
assert res.ok
579+
580+
# Produce 5 messages
581+
for i in range(5):
582+
producer.send(topic_name, value=f"msg-{i}".encode())
583+
producer.flush()
584+
585+
# Consume them with the first consumer
586+
resp = await rest_async_client.get(consume_path_1, headers=header)
587+
assert resp.ok
588+
data = resp.json()
589+
assert len(data) == 5, f"Expected 5 messages, got {data}"
590+
591+
# Commit the last consumed offsets
592+
res = await rest_async_client.post(
593+
offsets_path_1,
594+
json={"offsets": [{"topic": topic_name, "partition": 0, "offset": data[-1]["offset"] + 1}]},
595+
headers=header,
596+
)
597+
assert res.ok, f"Offset commit failed: {res}"
598+
599+
# Step 2: Create a new consumer in the same group (simulating rebalance or restart)
600+
instance_id_2 = await new_consumer(rest_async_client, group_name, fmt=fmt, trail=trail)
601+
assign_path_2 = f"/consumers/{group_name}/instances/{instance_id_2}/assignments{trail}"
602+
consume_path_2 = f"/consumers/{group_name}/instances/{instance_id_2}/records{trail}?timeout=5000"
603+
604+
res = await rest_async_client.post(
605+
assign_path_2, json={"partitions": [{"topic": topic_name, "partition": 0}]}, headers=header
606+
)
607+
assert res.ok
608+
609+
# Produce 3 more messages after rebalance
610+
for i in range(5, 8):
611+
producer.send(topic_name, value=f"msg-{i}".encode())
612+
producer.flush()
613+
614+
# Consume with new consumer - should ONLY get the new 3, not replay old 5
615+
resp = await rest_async_client.get(consume_path_2, headers=header)
616+
assert resp.ok
617+
data = resp.json()
618+
values = [base64.b64decode(r["value"]).decode() for r in data]
619+
assert values == ["msg-5", "msg-6", "msg-7"], f"Expected only new messages, got {values}"

0 commit comments

Comments
 (0)