Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions prometheus/collect_topic_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,27 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
replicationFactor = len(topic.Partitions[0].Replicas)
}

for _, partition := range topic.Partitions {
underReplicated := 0
if len(partition.ISR) < len(partition.Replicas) {
underReplicated = 1
}

ch <- prometheus.MustNewConstMetric(
e.partitionUnderReplicated,
prometheus.GaugeValue,
float64(underReplicated),
topicName, strconv.Itoa(int(partition.Partition)),
)

ch <- prometheus.MustNewConstMetric(
e.partitionLeader,
prometheus.GaugeValue,
float64(partition.Leader),
topicName, strconv.Itoa(int(partition.Partition)),
)
}

var labelsValues []string
labelsValues = append(labelsValues, topicName)
labelsValues = append(labelsValues, strconv.Itoa(partitionCount))
Expand All @@ -79,6 +100,12 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
float64(1),
labelsValues...,
)
ch <- prometheus.MustNewConstMetric(
e.topicPartitionCount,
prometheus.GaugeValue,
float64(partitionCount),
topicName,
)
}
return isOk
}
Expand Down
34 changes: 29 additions & 5 deletions prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ type Exporter struct {
topicLogDirSize *prometheus.Desc

// Topic / Partition
topicInfo *prometheus.Desc
topicHighWaterMarkSum *prometheus.Desc
partitionHighWaterMark *prometheus.Desc
topicLowWaterMarkSum *prometheus.Desc
partitionLowWaterMark *prometheus.Desc
topicInfo *prometheus.Desc
topicHighWaterMarkSum *prometheus.Desc
partitionHighWaterMark *prometheus.Desc
topicLowWaterMarkSum *prometheus.Desc
partitionLowWaterMark *prometheus.Desc
topicPartitionCount *prometheus.Desc
partitionUnderReplicated *prometheus.Desc
partitionLeader *prometheus.Desc

// Consumer Groups
consumerGroupInfo *prometheus.Desc
Expand Down Expand Up @@ -143,6 +146,27 @@ func (e *Exporter) InitializeMetrics() {
nil,
)

e.topicPartitionCount = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_count"),
"Number of topic partitions",
[]string{"topic_name"},
nil,
)

e.partitionLeader = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_leader"),
"Leader Broker id of partition",
[]string{"topic_name", "partition_id"},
nil,
)

e.partitionUnderReplicated = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_under_replicated_partition"),
"Under replicated partition",
[]string{"topic_name", "partition_id"},
nil,
)

// Consumer Group Metrics
// Group Info
e.consumerGroupInfo = prometheus.NewDesc(
Expand Down