Skip to content

Commit 77b7cda

Browse files
kafka_consumer: bound and refine estimated_consumer_lag (#24167)
* kafka_consumer: bound and refine estimated_consumer_lag Cap left-extrapolation of the broker timestamp cache so a consumer offset older than the oldest cached sample cannot extrapolate more than 10 minutes past it, keeping estimated_consumer_lag bounded. Use max(consumer_offset, low_watermark) as the offset basis for lag-in-time when cluster monitoring is enabled: messages below the low watermark are out of retention and unreachable, so they should not inflate the time lag. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: add changelog entry for PR #24167 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: compact and prune the broker-timestamp cache Replace single-oldest eviction with batch compaction (Visvalingam-Whyatt) triggered when the cache reaches capacity: keep the oldest and newest samples and drop the points that least distort the offset/timestamp curve, so the cache spans a longer history at a coarsening resolution and high lag is interpolated rather than extrapolated. At the same trigger, prune samples below the earliest consumer offset (keeping one anchor) since no consumer will ever interpolate there. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: prune broker-timestamp cache by low watermark Use the partition low watermark as the prune floor when cluster monitoring is enabled (the physically meaningful "lowest readable offset"), falling back to the earliest committed consumer offset otherwise. The low watermark is now fetched before the cache update and reused for both pruning and the lag-in-time floor, so there is no extra broker call. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: fetch low watermark offsets once and share them Previously the log-start (low watermark) offsets were fetched twice per run when cluster monitoring and data streams were both enabled: once by the metadata collector for partition.size/topic.size/throughput, and again by the lag path for the lag-in-time and cache-pruning floor. Fetch them once in check(), gated on cluster monitoring, over all non-internal topic partitions, and share the result with both the data-streams lag path and the metadata collector. Removes the duplicate list_offsets(earliest) call and the divergent internal-topic handling. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: reuse _fetch_earliest_offsets instead of a parallel fetch Drop the PR-added Client.get_low_watermark_offsets and the _get_low_watermark_offsets wrapper, which duplicated the existing ClusterMetadataCollector._fetch_earliest_offsets. The check now calls _fetch_earliest_offsets once under cluster monitoring and shares the result with both the data-streams lag/pruning path and the topic-metadata collection, so the earliest offsets are still fetched only once per run. This reverts client.py to master and keeps the cluster_metadata.py change to a small signature tweak. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: use low_watermark_offsets directly in topic metadata Drop the redundant earliest_offsets alias and reference the passed-in low_watermark_offsets directly. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: address review feedback on lag bounding - Clarify that the left-extrapolation cap bounds lag-in-time regardless of cluster monitoring or the low-watermark floor, and document why there is no symmetric right-side clamp (the newest cached sample is the just-collected highwater, which the consumer offset can never exceed). - Promote ClusterMetadataCollector.fetch_earliest_offsets to a public method since KafkaCheck now calls it across the class boundary. - Log a debug line when the cache-prune floor falls back from the low watermark to the earliest consumer offset. - Extract the Visvalingam-Whyatt significance closure into a module-level _interpolation_error helper. - Parameterize the _visvalingam_whyatt tests; add direct tests for _earliest_consumer_offsets, _prune_below_anchor, and the left-extrapolation cap through report_consumer_offsets_and_lag without a low watermark. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: trim comments to a single note on the extrapolation cap Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: move extrapolation-cap comment to the clamp line Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: reuse fetched topic partitions in topic metadata collection Pass the topic-partition map computed in check() through collect_all_metadata into _collect_topic_metadata instead of fetching it again, so the cluster monitoring path makes the same number of get_topic_partitions calls as before. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: satisfy ruff formatting for collect_all_metadata call Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_consumer: clear full timestamp cache on reset, test pruning end-to-end When a reset is detected (any cached offset above the new highwater), clear the entire cache instead of only dropping entries above the highwater. The VW compactor always preserves the minimum cached offset as an endpoint, so old-generation low-offset entries would never age out and would poison lag interpolation indefinitely after a partial reset. Also replaces the direct private-method test for consumer-floor pruning with a dd_run_check test that exercises the full check() path, and adds tests for the new clear-on-reset behaviour. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: satisfy ruff formatting for new unit tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: shorten reset-detection comment Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: trim reset test comment Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: test timestamp compaction via dd_run_check instead of private method Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: replace _prune_below_anchor direct tests with dd_run_check tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: satisfy ruff formatting for prune_below_anchor replacement tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: replace private method tests with public method tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: test that lag accuracy is preserved after VW compaction Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: parametrize VW compaction test with 4 cases Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 8c05418 commit 77b7cda

4 files changed

Lines changed: 425 additions & 57 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve the accuracy of ``estimated_consumer_lag`` for consumers that are far behind: cap interpolation for offsets older than the cached broker history, use the low watermark as a floor for the lag offset when cluster monitoring is enabled, and retain a longer broker-timestamp history by compacting the cache (Visvalingam-Whyatt) and pruning samples below the lowest readable offset (the low watermark, or the earliest consumer offset when cluster monitoring is disabled) instead of evicting the oldest one.

kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ def _parallel_fetch(self, fn: Callable[[str], Any], subjects: list[str], error_l
204204
self.log.warning("Error fetching %s for %s: %s", error_label, subject, e)
205205
return results
206206

207-
def collect_all_metadata(self, highwater_offsets):
207+
def collect_all_metadata(self, highwater_offsets, low_watermark_offsets, topic_partitions):
208208
try:
209209
shared_metadata = self.client.kafka_client.list_topics(timeout=self.config._request_timeout)
210210
except Exception as e:
@@ -217,7 +217,7 @@ def collect_all_metadata(self, highwater_offsets):
217217
self.log.error("Error collecting broker metadata: %s", e)
218218

219219
try:
220-
self._collect_topic_metadata(shared_metadata, highwater_offsets)
220+
self._collect_topic_metadata(shared_metadata, highwater_offsets, low_watermark_offsets, topic_partitions)
221221
except Exception as e:
222222
self.log.error("Error collecting topic metadata: %s", e)
223223

@@ -386,7 +386,7 @@ def _collect_broker_metadata(self, metadata=None):
386386
"data-streams-message",
387387
)
388388

389-
def _fetch_earliest_offsets(self, topic_partitions):
389+
def fetch_earliest_offsets(self, topic_partitions):
390390
"""Batch-fetch log-start offsets via AdminClient.list_offsets(earliest).
391391
392392
Uses ListOffsets with the EARLIEST_TIMESTAMP sentinel, which the broker
@@ -441,11 +441,9 @@ def _fetch_earliest_offsets(self, topic_partitions):
441441
)
442442
return result
443443

444-
def _collect_topic_metadata(self, metadata, highwater_offsets):
444+
def _collect_topic_metadata(self, metadata, highwater_offsets, low_watermark_offsets, topic_partitions):
445445
self.log.debug("Collecting topic metadata")
446446

447-
topic_partitions = self.client.get_topic_partitions()
448-
449447
cluster_id = self.config._kafka_cluster_id_override or (
450448
metadata.cluster_id if hasattr(metadata, 'cluster_id') else 'unknown'
451449
)
@@ -455,8 +453,6 @@ def _collect_topic_metadata(self, metadata, highwater_offsets):
455453

456454
self.check.gauge('topic.count', len(topic_partitions), tags=self.config._get_tags(cluster_id))
457455

458-
earliest_offsets = self._fetch_earliest_offsets(topic_partitions)
459-
460456
now_ts = time.time()
461457
prev_ts = None
462458
previous_partition_offsets = {}
@@ -496,7 +492,7 @@ def _collect_topic_metadata(self, metadata, highwater_offsets):
496492

497493
partition_metadata = topic_metadata.partitions.get(partition_id)
498494
latest = highwater_offsets.get((topic_name, partition_id), 0)
499-
earliest = earliest_offsets.get((topic_name, partition_id))
495+
earliest = low_watermark_offsets.get((topic_name, partition_id))
500496

501497
if earliest is None:
502498
have_all_earliest = False

kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Lines changed: 101 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# (C) Datadog, Inc. 2019-present
22
# All rights reserved
33
# Licensed under Simplified BSD License (see LICENSE)
4+
import heapq
45
import json
56
from collections import defaultdict
67
from time import time
@@ -18,6 +19,8 @@
1819

1920
MAX_TIMESTAMPS = 1000
2021

22+
LAG_EXTRAPOLATION_LIMIT_SECONDS = 600
23+
2124

2225
class KafkaCheck(AgentCheck):
2326
__NAMESPACE__ = 'kafka'
@@ -67,6 +70,8 @@ def check(self, _):
6770
# Fetch the broker highwater offsets
6871
highwater_offsets = {}
6972
broker_timestamps = defaultdict(dict)
73+
low_watermark_offsets = {}
74+
topic_partitions = {}
7075
cluster_id = ""
7176
persistent_cache_key = "broker_timestamps_"
7277
consumer_contexts_count = self.count_consumer_contexts(consumer_offsets)
@@ -86,9 +91,17 @@ def check(self, _):
8691
partitions.add((topic, partition))
8792
# Expected format: ({(topic, partition): offset}, cluster_id)
8893
highwater_offsets, cluster_id = self.get_highwater_offsets(partitions)
94+
if self.config._cluster_monitoring_enabled:
95+
topic_partitions = self.client.get_topic_partitions()
96+
low_watermark_offsets = self.metadata_collector.fetch_earliest_offsets(topic_partitions)
8997
if self._data_streams_enabled:
9098
broker_timestamps = self._load_broker_timestamps(persistent_cache_key)
91-
self._add_broker_timestamps(broker_timestamps, highwater_offsets)
99+
if low_watermark_offsets:
100+
prune_floors = low_watermark_offsets
101+
else:
102+
self.log.debug("No low watermarks available; pruning cache by earliest consumer offset")
103+
prune_floors = self._earliest_consumer_offsets(consumer_offsets)
104+
self._add_broker_timestamps(broker_timestamps, highwater_offsets, prune_floors)
92105
self._save_broker_timestamps(broker_timestamps, persistent_cache_key)
93106
else:
94107
self.warning("Context limit reached. Skipping highwater offset collection.")
@@ -129,6 +142,7 @@ def check(self, _):
129142
reporting_limit - len(highwater_offsets),
130143
broker_timestamps,
131144
cluster_id,
145+
low_watermark_offsets,
132146
)
133147

134148
# Collect cluster metadata if enabled
@@ -137,7 +151,7 @@ def check(self, _):
137151
self._send_cluster_monitoring_heartbeat(total_contexts, cluster_id, connect_status)
138152

139153
try:
140-
self.metadata_collector.collect_all_metadata(highwater_offsets)
154+
self.metadata_collector.collect_all_metadata(highwater_offsets, low_watermark_offsets, topic_partitions)
141155
except Exception as e:
142156
self.log.error("Error collecting cluster metadata: %s", e)
143157

@@ -274,22 +288,29 @@ def _load_broker_timestamps(self, persistent_cache_key):
274288
self.log.warning('Could not read broker timestamps from cache: %s', str(e))
275289
return broker_timestamps
276290

277-
def _add_broker_timestamps(self, broker_timestamps, highwater_offsets):
291+
def _earliest_consumer_offsets(self, consumer_offsets):
292+
"""Return the lowest committed offset per (topic, partition) across all consumer groups."""
293+
earliest = {}
294+
for offsets in consumer_offsets.values():
295+
for topic_partition, offset in offsets.items():
296+
if topic_partition not in earliest or offset < earliest[topic_partition]:
297+
earliest[topic_partition] = offset
298+
return earliest
299+
300+
def _add_broker_timestamps(self, broker_timestamps, highwater_offsets, prune_floors=None):
301+
prune_floors = prune_floors or {}
278302
for (topic, partition), highwater_offset in highwater_offsets.items():
279303
timestamps = broker_timestamps["{}_{}".format(topic, partition)]
280-
# If the highwater offset went backwards (topic recreated,
281-
# retention wipe, or offset reset) any cached pair with a larger
282-
# offset points to a now-nonexistent message and would poison
283-
# interpolation. Drop those entries.
284-
stale = [o for o in timestamps if o > highwater_offset]
285-
for o in stale:
286-
del timestamps[o]
304+
# Reset detected: clear the whole cache. Low-offset survivors are from the
305+
# previous generation and VW pins the minimum endpoint, so they'd never age out.
306+
if any(o > highwater_offset for o in timestamps):
307+
timestamps.clear()
287308
timestamps[highwater_offset] = time()
288-
# If there's too many timestamps, we delete the oldest one (by
289-
# timestamp, not by offset — evicting by min offset would discard
290-
# the fresh post-reset entries and keep poisonous stale ones).
291-
if len(timestamps) > self._max_timestamps:
292-
del timestamps[min(timestamps, key=timestamps.get)]
309+
if len(timestamps) >= self._max_timestamps:
310+
prune_floor = prune_floors.get((topic, partition))
311+
if prune_floor is not None:
312+
_prune_below_anchor(timestamps, prune_floor)
313+
_visvalingam_whyatt(timestamps, max(2, self._max_timestamps // 2))
293314

294315
def _save_broker_timestamps(self, broker_timestamps, persistent_cache_key):
295316
"""Saves broker timestamps to persistent cache."""
@@ -312,9 +333,16 @@ def report_highwater_offsets(self, highwater_offsets, contexts_limit, cluster_id
312333
self.log.debug('%s highwater offsets reported', reported_contexts)
313334

314335
def report_consumer_offsets_and_lag(
315-
self, consumer_offsets, highwater_offsets, contexts_limit, broker_timestamps, cluster_id
336+
self,
337+
consumer_offsets,
338+
highwater_offsets,
339+
contexts_limit,
340+
broker_timestamps,
341+
cluster_id,
342+
low_watermark_offsets=None,
316343
):
317344
"""Report the consumer offsets and consumer lag."""
345+
low_watermark_offsets = low_watermark_offsets or {}
318346
reported_contexts = 0
319347
self.log.debug("Reporting consumer offsets and lag metrics")
320348
for consumer_group, offsets in consumer_offsets.items():
@@ -388,7 +416,9 @@ def report_consumer_offsets_and_lag(
388416
timestamps = broker_timestamps["{}_{}".format(topic, partition)]
389417
# The producer timestamp can be not set if there was an error fetching broker offsets.
390418
producer_timestamp = timestamps.get(producer_offset, None)
391-
consumer_timestamp = _get_interpolated_timestamp(timestamps, consumer_offset)
419+
low_watermark = low_watermark_offsets.get((topic, partition))
420+
effective_offset = consumer_offset if low_watermark is None else max(consumer_offset, low_watermark)
421+
consumer_timestamp = _get_interpolated_timestamp(timestamps, effective_offset)
392422
if consumer_timestamp is None or producer_timestamp is None:
393423
continue
394424
lag = producer_timestamp - consumer_timestamp
@@ -502,4 +532,58 @@ def _get_interpolated_timestamp(timestamps, offset):
502532
timestamp_after = timestamps[offset_after]
503533
slope = (timestamp_after - timestamp_before) / float(offset_after - offset_before)
504534
timestamp = slope * (offset - offset_after) + timestamp_after
535+
536+
if offset < offset_before:
537+
# Cap how far past the oldest cached sample we extrapolate, so estimated lag stays bounded.
538+
timestamp = max(timestamp, timestamp_before - LAG_EXTRAPOLATION_LIMIT_SECONDS)
505539
return timestamp
540+
541+
542+
def _prune_below_anchor(timestamps, floor):
543+
below = [o for o in timestamps if o < floor]
544+
if len(below) <= 1:
545+
return
546+
anchor = max(below)
547+
for o in below:
548+
if o != anchor:
549+
del timestamps[o]
550+
551+
552+
def _visvalingam_whyatt(timestamps, target_count):
553+
if len(timestamps) <= target_count:
554+
return timestamps
555+
556+
offsets = sorted(timestamps)
557+
prev = {o: (offsets[i - 1] if i > 0 else None) for i, o in enumerate(offsets)}
558+
nxt = {o: (offsets[i + 1] if i < len(offsets) - 1 else None) for i, o in enumerate(offsets)}
559+
alive = set(offsets)
560+
561+
current = {}
562+
heap = []
563+
for o in offsets:
564+
if prev[o] is not None and nxt[o] is not None:
565+
current[o] = _interpolation_error(o, prev, nxt, timestamps)
566+
heap.append((current[o], o))
567+
heapq.heapify(heap)
568+
569+
remaining = len(offsets)
570+
while remaining > target_count and heap:
571+
error, o = heapq.heappop(heap)
572+
if o not in alive or error != current.get(o):
573+
continue
574+
before, after = prev[o], nxt[o]
575+
alive.discard(o)
576+
del timestamps[o]
577+
remaining -= 1
578+
nxt[before], prev[after] = after, before
579+
for neighbor in (before, after):
580+
if prev[neighbor] is not None and nxt[neighbor] is not None:
581+
current[neighbor] = _interpolation_error(neighbor, prev, nxt, timestamps)
582+
heapq.heappush(heap, (current[neighbor], neighbor))
583+
return timestamps
584+
585+
586+
def _interpolation_error(o, prev, nxt, timestamps):
587+
before, after = prev[o], nxt[o]
588+
predicted = timestamps[before] + (timestamps[after] - timestamps[before]) * (o - before) / (after - before)
589+
return abs(timestamps[o] - predicted)

0 commit comments

Comments
 (0)