Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions tests/kafkatest/tests/produce_consume_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from ducktape.utils.util import wait_until

from kafkatest.utils import validate_delivery
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.verifiable_producer import VerifiableProducer


class ProduceConsumeValidateTest(Test):
"""This class provides a shared template for tests which follow the common pattern of:
Expand Down Expand Up @@ -130,3 +133,69 @@ def check_lost_data(missing_records):
self.mark_for_collect(s)

assert succeeded, error_msg

def test_simple_consumer_commit(self):
"""
Verify that a simple consumer commits offsets and does not re-consume
messages after a restart.
"""

# Topic configuration
self.topic = "simple-consumer-commit-topic"
self.num_partitions = 1
self.replication_factor = 1

self.kafka.create_topic(
self.topic,
num_partitions=self.num_partitions,
replication_factor=self.replication_factor
)

# Number of messages to produce
self.num_messages = 100

# Create producer
self.producer = VerifiableProducer(
self.test_context,
num_nodes=1,
kafka=self.kafka,
topic=self.topic,
throughput=100,
message_validator=None
)

# Create consumer with manual commit (important!)
self.consumer = VerifiableConsumer(
self.test_context,
num_nodes=1,
kafka=self.kafka,
topic=self.topic,
group_id="simple-consumer-commit-group",
enable_autocommit=False
)
# Run produce-consume, restart consumer in between
self.run_produce_consume_validate(self._restart_consumer)

def _restart_consumer(self):
"""
Stop the consumer after offsets are committed and restart it
to ensure consumption resumes from committed offsets.
"""
# Ensure consumer has consumed some messages
wait_until(
lambda: len(self.consumer.messages_consumed[1]) > 0,
timeout_sec=30,
err_msg="Consumer did not consume messages before restart"
)

# Stop consumer (commit should have happened)
self.consumer.stop()
self.consumer.wait()

# Restart consumer
self.consumer.start()