Skip to content

Don't abort kafka_consumer when one partition's high-watermark lookup fails#24263

Draft
piochelepiotr wants to merge 5 commits into
masterfrom
pwolski/kafka-consumer-highwater-offset-resilience
Draft

Don't abort kafka_consumer when one partition's high-watermark lookup fails#24263
piochelepiotr wants to merge 5 commits into
masterfrom
pwolski/kafka-consumer-highwater-offset-resilience

Conversation

@piochelepiotr

@piochelepiotr piochelepiotr commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

What does this PR do?

Fetch kafka_consumer high-watermark offsets via AdminClient.list_offsets (one future per partition) instead of a single batched Consumer.offsets_for_times. A partition that can't be resolved (e.g. a leaderless/offline topic) is now skipped individually, instead of raising UNKNOWN_TOPIC_OR_PART for the whole batch and aborting the entire check.

Motivation

One broken topic-partition could take the whole integration down — dropping all metrics, not just lag — especially with enable_cluster_monitoring on and on shared/Confluent Cloud clusters that contain leaderless/RF-0 or mid-deletion topics.

Review checklist (to be filled by reviewers)

  • Feature or bugfix MUST have appropriate tests (unit, integration, e2e)
  • Add qa/required if this PR needs QA validation, or qa/skip-qa if it does not. Exactly one of the two is required.
  • If you need to backport this PR to another branch, you can add the backport/<branch-name> label to the PR and it will automatically open a backport PR once this one is merged

piochelepiotr and others added 2 commits June 30, 2026 16:47
… fails

The high-watermark offset collection issues a single batched
offsets_for_times call for all topic-partitions. If any partition cannot
be resolved at all (e.g. a leaderless/offline RF-0 or mid-deletion
topic), librdkafka raises a KafkaException (UNKNOWN_TOPIC_OR_PART) for
the entire batch call, which propagates up and aborts the whole check so
no metrics are collected.

Fall back to per-partition lookups when the batched call fails, skipping
only the partitions that still raise, so healthy partitions are still
collected and the check is not aborted.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@datadog-prod-us1-6

datadog-prod-us1-6 Bot commented Jun 30, 2026

Copy link
Copy Markdown

Tests  Code Coverage

🎉 All green!

🧪 All tests passed
❄️ No new flaky tests detected

🎯 Code Coverage (details)
Patch Coverage: 100.00%
Overall Coverage: 92.27% (+4.26%)

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: d6807c7 | Docs | Datadog PR Page | Give us feedback!

piochelepiotr and others added 3 commits June 30, 2026 17:14
Replace the resilient-but-clunky offsets_for_times approach with
AdminClient.list_offsets, which returns a future per partition. Errored
partitions (e.g. leaderless/offline topics that raise
UNKNOWN_TOPIC_OR_PART) are skipped individually, so healthy partitions
are still collected without a wholesale raise or a per-partition retry
loop.

list_offsets is also the purpose-built API for latest/earliest offsets,
unlike offsets_for_times with sentinel timestamp values.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Mirror cluster_metadata.fetch_earliest_offsets in get_partition_offsets:
broaden the per-future handler to catch any Exception so one bad
partition does not abort the loop, and wrap the outer list_offsets
call so a request/broker-level failure logs a warning and returns []
instead of aborting the whole highwater collection.

Strengthen unit tests: assert list_offsets is called with
isolation_level=READ_UNCOMMITTED and the request timeout, cover the
non-Kafka per-partition error and request-level failure paths, and add
an empty-partitions test that issues no request.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@piochelepiotr piochelepiotr force-pushed the pwolski/kafka-consumer-highwater-offset-resilience branch from aed9a66 to d6807c7 Compare June 30, 2026 16:01
@dd-octo-sts

dd-octo-sts Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Validation Report

All 21 validations passed.

Show details
Validation Description Status
agent-reqs Verify check versions match the Agent requirements file
ci Validate CI configuration and code coverage settings
codeowners Validate every integration has a CODEOWNERS entry
config Validate default configuration files against spec.yaml
dep Verify dependency pins are consistent and Agent-compatible
http Validate integrations use the HTTP wrapper correctly
imports Validate check imports do not use deprecated modules
integration-style Validate check code style conventions
jmx-metrics Validate JMX metrics definition files and config
labeler Validate PR labeler config matches integration directories
legacy-signature Validate no integration uses the legacy Agent check signature
license-headers Validate Python files have proper license headers
licenses Validate third-party license attribution list
metadata Validate metadata.csv metric definitions
models Validate configuration data models match spec.yaml
openmetrics Validate OpenMetrics integrations disable the metric limit
package Validate Python package metadata and naming
qa-label Validate the pull request declares whether it needs QA for the next Agent release
readmes Validate README files have required sections
saved-views Validate saved view JSON file structure and fields
version Validate version consistency between package and changelog

View full run

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant