Skip to content

Commit 9ec9282

Browse files
committed
Expose max timestamp for each topic and/or partition (#3)
* Expose max timestamp for each topic and/or partition Kafka 3.0+ added support to fetch offset with the max timestamp. This is useful in monitoring the steady progress of producing to a topic/partion but also to detect very high timestamps in the topic that can stuck topic retention. This patch adds two new metrics: - kafka_topic_partition_max_timestamp: for each topic partition reports the max timestamp in that partition - kafka_topic_max_timestamp: for each topic reports the max timestamp in that topic, computed as the maximum timestamp across partitions See: - https://issues.apache.org/jira/browse/KAFKA-12541 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp * Update prometheus/collect_topic_partition_offsets.go
1 parent 205d739 commit 9ec9282

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

prometheus/collect_topic_partition_offsets.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
2525
return false
2626
}
2727

28+
// Highest Timestamp Offsets
29+
// NB: this requires Kafka Brokers 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541)
30+
// In older versions this is returning the timestamp of the low watermarks (earliest offset)
31+
maxTimestampOffsets, err := e.minionSvc.ListOffsetsCached(ctx, -3)
32+
if err != nil {
33+
e.logger.Error("failed to fetch offsets for max timestamp", zap.Error(err))
34+
return false
35+
}
36+
2837
// Process Low Watermarks
2938
for _, topic := range lowWaterMarks.Topics {
3039
if !e.minionSvc.IsTopicAllowed(topic.Topic) {
@@ -101,5 +110,47 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
101110
}
102111
}
103112

113+
// Process Max Timestamps
114+
for _, topic := range maxTimestampOffsets.Topics {
115+
if !e.minionSvc.IsTopicAllowed(topic.Topic) {
116+
continue
117+
}
118+
topicMaxTimestamp := int64(0)
119+
hasErrors := false
120+
for _, partition := range topic.Partitions {
121+
err := kerr.ErrorForCode(partition.ErrorCode)
122+
if err != nil {
123+
hasErrors = true
124+
isOk = false
125+
continue
126+
}
127+
if topicMaxTimestamp < partition.Timestamp {
128+
topicMaxTimestamp = partition.Timestamp
129+
}
130+
// Let's end here if partition metrics shall not be exposed
131+
if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic {
132+
continue
133+
}
134+
if partition.Timestamp > 0 {
135+
ch <- prometheus.MustNewConstMetric(
136+
e.partitionMaxTimestamp,
137+
prometheus.GaugeValue,
138+
float64(partition.Timestamp),
139+
topic.Topic,
140+
strconv.Itoa(int(partition.Partition)),
141+
)
142+
}
143+
}
144+
// We only want to report the max of all partition max timestamps if we receive results from all partitions
145+
// and the topic is not empty
146+
if !hasErrors && topicMaxTimestamp > 0 {
147+
ch <- prometheus.MustNewConstMetric(
148+
e.topicMaxTimestamp,
149+
prometheus.GaugeValue,
150+
float64(topicMaxTimestamp),
151+
topic.Topic,
152+
)
153+
}
154+
}
104155
return isOk
105156
}

prometheus/exporter.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ type Exporter struct {
4141
partitionHighWaterMark *prometheus.Desc
4242
topicLowWaterMarkSum *prometheus.Desc
4343
partitionLowWaterMark *prometheus.Desc
44+
topicMaxTimestamp *prometheus.Desc
45+
partitionMaxTimestamp *prometheus.Desc
4446

4547
// Consumer Groups
4648
consumerGroupInfo *prometheus.Desc
@@ -172,6 +174,20 @@ func (e *Exporter) InitializeMetrics() {
172174
[]string{"topic_name"},
173175
nil,
174176
)
177+
// Partition Max Timestamp
178+
e.partitionMaxTimestamp = prometheus.NewDesc(
179+
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_max_timestamp"),
180+
"Partition Max Timestamp",
181+
[]string{"topic_name", "partition_id"},
182+
nil,
183+
)
184+
// Topic Max Timestamp
185+
e.topicMaxTimestamp = prometheus.NewDesc(
186+
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_max_timestamp"),
187+
"Topic Max Timestamp",
188+
[]string{"topic_name"},
189+
nil,
190+
)
175191

176192
// Consumer Group Metrics
177193
// Group Info

0 commit comments

Comments
 (0)