-
Notifications
You must be signed in to change notification settings - Fork 8
Description
I have a project using Kafka in KRaft mode, with two microservices. I’m running Kafka with 3 brokers and 3 controllers, using the following Docker image: confluentinc/cp-kafka:7.8.0.
- Service 1 (Producer): Publishes messages to a Kafka topic.
- Service 2 (Consumer): Subscribes to the topic and processes messages.
Here’s the scenario:
- Both services are running.
- Service 1 sends messages: m1, m2, m3.
- Service 2 receives and processes all three messages successfully.
- I then shut down Service 2.
- While Service 2 is offline, Service 1 sends message m4.
- I restart Service 2.
- Although the Kafka consumer in Service 2 is re-initialized correctly, it does not receive message m4.
- Service 1 then sends message m5, and Service 2 successfully consumes m5.
The message m4 is completely lost on the consumer side — it is never received or processed.
Consumer Configurations
In the non-working version, the consumer is initialized like this:
consumer = KafkaConnection(
group_id="wins-consumer-group-cost-service-01",
offset_reset="latest"
)
When I change the consumer to use a dynamic group.id and offset_reset='earliest', the message is received:
consumer = KafkaConnection(
group_id=f"wins-consumer-group-cost-service-{uuid.uuid4().hex[:4]}",
offset_reset="earliest",
enable_auto_commit=False,
auto_commit_interval_ms=1000
)
However, using a new group ID every time prevents me from maintaining durable state across restarts, and is not practical in production.
Consumer code block:
def sourceAttr_consumer():
consumer = None
retry_count = 0
topic_name = 'eys_device_wins_updated_defSourceAttrs'
try:
while True:
# Consumer yoksa veya kapandıysa yeniden oluştur
if consumer is None:
try:
dynamic_suffix = uuid.uuid4().hex[:4]
log.info("Creating new Kafka consumer...")
consumer = KafkaConnection(
group_id="wins-consumer-group-cost-service-01",
offset_reset="latest",
)
# Topic yoksa oluştur
if not consumer.topic_exists(topic_name):
consumer.add_topic(topic_name)
consumer = consumer.create_consumer()
consumer.subscribe([topic_name])
log.info(f"Subscribed to Kafka topic: {topic_name}")
except Exception as conn_ex:
err
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
codes
value = msg.value()
if value is None:
log.warning(f"Received message with None value. Message key: {msg.key()}")
continue
try:
decoded_value = value.decode('utf-8')
except Exception as decode_ex:
log.error(f"Failed to decode message: {decode_ex} - raw value: {value}")
continue
try:
jData = json.loads(decoded_value)
consumer.commit(message=msg)
except json.JSONDecodeError as json_ex:
log.error(f"Failed to parse JSON: {json_ex} - message: {decoded_value}")
continue
if jData is not None:
try:
consume_business_source_attr(jData)
except Exception as business_ex:
log.error(f"Error in business logic: {business_ex} - message: {jData}")
except Exception as ex:
log.exception(f"sourceAttr_consumer crashed with: {ex}")
retry_count += 1
sleep_time = min(60, 2 ** retry_count)
log.warning(f"Retrying consumer in {sleep_time} seconds...")
time.sleep(sleep_time)
finally:
retry_count = 0
if consumer is not None:
consumer.commit()
consumer.close()
log.info("Kafka consumer closed.")
Expected Behavior
When Service 2 (the consumer) restarts, it should resume consuming from where it left off — and receive any messages published while it was offline, such as m4.
Questions
Why is the message m4 lost when the consumer group restarts with the same group.id and offset_reset=latest?
Is this expected behavior?
How can I ensure durable message consumption across consumer restarts without changing the group ID every time?