Skip to content

Commit 1e00f53

Browse files
committed
Expose max timestamp for each topic and/or partition (#3)
* Expose max timestamp for each topic and/or partition Kafka 3.0+ added support to fetch offset with the max timestamp. This is useful in monitoring the steady progress of producing to a topic/partion but also to detect very high timestamps in the topic that can stuck topic retention. This patch adds two new metrics: - kafka_topic_partition_max_timestamp: for each topic partition reports the max timestamp in that partition - kafka_topic_max_timestamp: for each topic reports the max timestamp in that topic, computed as the maximum timestamp across partitions See: - https://issues.apache.org/jira/browse/KAFKA-12541 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp * Update prometheus/collect_topic_partition_offsets.go
1 parent 2e0a2a5 commit 1e00f53

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

prometheus/collect_topic_partition_offsets.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
3030
return false
3131
}
3232

33+
// Highest Timestamp Offsets
34+
// NB: this requires Kafka Brokers 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541)
35+
// In older versions this is returning the timestamp of the low watermarks (earliest offset)
36+
maxTimestampOffsets, err := e.minionSvc.ListOffsetsCached(ctx, -3)
37+
if err != nil {
38+
e.logger.Error("failed to fetch offsets for max timestamp", zap.Error(err))
39+
return false
40+
}
41+
3342
// Process Low Watermarks
3443

3544
for topicName, partitions := range lowWaterMarks {
@@ -105,5 +114,47 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
105114
}
106115
}
107116

117+
// Process Max Timestamps
118+
for _, topic := range maxTimestampOffsets.Topics {
119+
if !e.minionSvc.IsTopicAllowed(topic.Topic) {
120+
continue
121+
}
122+
topicMaxTimestamp := int64(0)
123+
hasErrors := false
124+
for _, partition := range topic.Partitions {
125+
err := kerr.ErrorForCode(partition.ErrorCode)
126+
if err != nil {
127+
hasErrors = true
128+
isOk = false
129+
continue
130+
}
131+
if topicMaxTimestamp < partition.Timestamp {
132+
topicMaxTimestamp = partition.Timestamp
133+
}
134+
// Let's end here if partition metrics shall not be exposed
135+
if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic {
136+
continue
137+
}
138+
if partition.Timestamp > 0 {
139+
ch <- prometheus.MustNewConstMetric(
140+
e.partitionMaxTimestamp,
141+
prometheus.GaugeValue,
142+
float64(partition.Timestamp),
143+
topic.Topic,
144+
strconv.Itoa(int(partition.Partition)),
145+
)
146+
}
147+
}
148+
// We only want to report the max of all partition max timestamps if we receive results from all partitions
149+
// and the topic is not empty
150+
if !hasErrors && topicMaxTimestamp > 0 {
151+
ch <- prometheus.MustNewConstMetric(
152+
e.topicMaxTimestamp,
153+
prometheus.GaugeValue,
154+
float64(topicMaxTimestamp),
155+
topic.Topic,
156+
)
157+
}
158+
}
108159
return isOk
109160
}

prometheus/exporter.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ type Exporter struct {
4141
partitionHighWaterMark *prometheus.Desc
4242
topicLowWaterMarkSum *prometheus.Desc
4343
partitionLowWaterMark *prometheus.Desc
44+
topicMaxTimestamp *prometheus.Desc
45+
partitionMaxTimestamp *prometheus.Desc
4446

4547
// Consumer Groups
4648
consumerGroupInfo *prometheus.Desc
@@ -172,6 +174,20 @@ func (e *Exporter) InitializeMetrics() {
172174
[]string{"topic_name"},
173175
nil,
174176
)
177+
// Partition Max Timestamp
178+
e.partitionMaxTimestamp = prometheus.NewDesc(
179+
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_max_timestamp"),
180+
"Partition Max Timestamp",
181+
[]string{"topic_name", "partition_id"},
182+
nil,
183+
)
184+
// Topic Max Timestamp
185+
e.topicMaxTimestamp = prometheus.NewDesc(
186+
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_max_timestamp"),
187+
"Topic Max Timestamp",
188+
[]string{"topic_name"},
189+
nil,
190+
)
175191

176192
// Consumer Group Metrics
177193
// Group Info

0 commit comments

Comments
 (0)