diff --git a/prometheus/collect_topic_info.go b/prometheus/collect_topic_info.go index 7474ec16..baadc31a 100644 --- a/prometheus/collect_topic_info.go +++ b/prometheus/collect_topic_info.go @@ -66,6 +66,27 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me replicationFactor = len(topic.Partitions[0].Replicas) } + for _, partition := range topic.Partitions { + underReplicated := 0 + if len(partition.ISR) < len(partition.Replicas) { + underReplicated = 1 + } + + ch <- prometheus.MustNewConstMetric( + e.partitionUnderReplicated, + prometheus.GaugeValue, + float64(underReplicated), + topicName, strconv.Itoa(int(partition.Partition)), + ) + + ch <- prometheus.MustNewConstMetric( + e.partitionLeader, + prometheus.GaugeValue, + float64(partition.Leader), + topicName, strconv.Itoa(int(partition.Partition)), + ) + } + var labelsValues []string labelsValues = append(labelsValues, topicName) labelsValues = append(labelsValues, strconv.Itoa(partitionCount)) @@ -79,6 +100,12 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me float64(1), labelsValues..., ) + ch <- prometheus.MustNewConstMetric( + e.topicPartitionCount, + prometheus.GaugeValue, + float64(partitionCount), + topicName, + ) } return isOk } diff --git a/prometheus/exporter.go b/prometheus/exporter.go index d717bcfa..832cd350 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -32,11 +32,14 @@ type Exporter struct { topicLogDirSize *prometheus.Desc // Topic / Partition - topicInfo *prometheus.Desc - topicHighWaterMarkSum *prometheus.Desc - partitionHighWaterMark *prometheus.Desc - topicLowWaterMarkSum *prometheus.Desc - partitionLowWaterMark *prometheus.Desc + topicInfo *prometheus.Desc + topicHighWaterMarkSum *prometheus.Desc + partitionHighWaterMark *prometheus.Desc + topicLowWaterMarkSum *prometheus.Desc + partitionLowWaterMark *prometheus.Desc + topicPartitionCount *prometheus.Desc + partitionUnderReplicated *prometheus.Desc + partitionLeader *prometheus.Desc // Consumer Groups consumerGroupInfo *prometheus.Desc @@ -143,6 +146,27 @@ func (e *Exporter) InitializeMetrics() { nil, ) + e.topicPartitionCount = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_count"), + "Number of topic partitions", + []string{"topic_name"}, + nil, + ) + + e.partitionLeader = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_leader"), + "Leader Broker id of partition", + []string{"topic_name", "partition_id"}, + nil, + ) + + e.partitionUnderReplicated = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_under_replicated_partition"), + "Under replicated partition", + []string{"topic_name", "partition_id"}, + nil, + ) + // Consumer Group Metrics // Group Info e.consumerGroupInfo = prometheus.NewDesc(