@@ -25,6 +25,15 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
2525 return false
2626 }
2727
28+ // Highest Timestamp Offsets
29+ // NB: this requires Kafka Brokers 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541)
30+ // In older versions this is returning the timestamp of the low watermarks (earliest offset)
31+ maxTimestampOffsets , err := e .minionSvc .ListOffsetsCached (ctx , - 3 )
32+ if err != nil {
33+ e .logger .Error ("failed to fetch offsets for max timestamp" , zap .Error (err ))
34+ return false
35+ }
36+
2837 // Process Low Watermarks
2938 for _ , topic := range lowWaterMarks .Topics {
3039 if ! e .minionSvc .IsTopicAllowed (topic .Topic ) {
@@ -101,5 +110,47 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
101110 }
102111 }
103112
113+ // Process Max Timestamps
114+ for _ , topic := range maxTimestampOffsets .Topics {
115+ if ! e .minionSvc .IsTopicAllowed (topic .Topic ) {
116+ continue
117+ }
118+ topicMaxTimestamp := int64 (0 )
119+ hasErrors := false
120+ for _ , partition := range topic .Partitions {
121+ err := kerr .ErrorForCode (partition .ErrorCode )
122+ if err != nil {
123+ hasErrors = true
124+ isOk = false
125+ continue
126+ }
127+ if topicMaxTimestamp < partition .Timestamp {
128+ topicMaxTimestamp = partition .Timestamp
129+ }
130+ // Let's end here if partition metrics shall not be exposed
131+ if e .minionSvc .Cfg .Topics .Granularity == minion .TopicGranularityTopic {
132+ continue
133+ }
134+ if partition .Timestamp > 0 {
135+ ch <- prometheus .MustNewConstMetric (
136+ e .partitionMaxTimestamp ,
137+ prometheus .GaugeValue ,
138+ float64 (partition .Timestamp ),
139+ topic .Topic ,
140+ strconv .Itoa (int (partition .Partition )),
141+ )
142+ }
143+ }
144+ // We only want to report the max of all partition max timestamps if we receive results from all partitions
145+ // and the topic is not empty
146+ if ! hasErrors && topicMaxTimestamp > 0 {
147+ ch <- prometheus .MustNewConstMetric (
148+ e .topicMaxTimestamp ,
149+ prometheus .GaugeValue ,
150+ float64 (topicMaxTimestamp ),
151+ topic .Topic ,
152+ )
153+ }
154+ }
104155 return isOk
105156}
0 commit comments