From 478f6ee3f0e8bfe497e327b1637ed8591354c960 Mon Sep 17 00:00:00 2001 From: Tamas Kornai Date: Tue, 26 Mar 2024 16:47:45 +0100 Subject: [PATCH 1/2] Add committed offset metric per CG and partition Add a new committed offset metric per consumer group and per partition that can be used to judge if consumers are making progress on all partitions or if there is a poison pill on any of them. Please let me know if exporting these metrics should be configurable. --- docs/metrics.md | 4 ++++ prometheus/collect_consumer_group_lags.go | 16 ++++++++++++++++ prometheus/exporter.go | 10 +++++++++- 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/docs/metrics.md b/docs/metrics.md index 07232dbc..d317f442 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -95,6 +95,10 @@ kminion_kafka_consumer_group_topic_offset_sum{group_id="bigquery-sink",topic_nam # TYPE kminion_kafka_consumer_group_topic_partition_lag gauge kminion_kafka_consumer_group_topic_partition_lag{group_id="bigquery-sink",partition_id="10",topic_name="shop-activity"} 147481 +# HELP kminion_kafka_consumer_group_topic_partition_offset The committed offset of a consumer group for a given partition +# TYPE kminion_kafka_consumer_group_topic_partition_offset counter +kminion_kafka_consumer_group_topic_partition_offset{group_id="bigquery-sink",partition_id="10",topic_name="shop-activity"} 427 + # HELP kminion_kafka_consumer_group_topic_lag The number of messages a consumer group is lagging behind across all partitions in a topic # TYPE kminion_kafka_consumer_group_topic_lag gauge kminion_kafka_consumer_group_topic_lag{group_id="bigquery-sink",topic_name="shop-activity"} 147481 diff --git a/prometheus/collect_consumer_group_lags.go b/prometheus/collect_consumer_group_lags.go index b61fd073..c5df5703 100644 --- a/prometheus/collect_consumer_group_lags.go +++ b/prometheus/collect_consumer_group_lags.go @@ -84,6 +84,14 @@ func (e *Exporter) collectConsumerGroupLagsOffsetTopic(_ context.Context, ch cha if e.minionSvc.Cfg.ConsumerGroups.Granularity == minion.ConsumerGroupGranularityTopic { continue } + ch <- prometheus.MustNewConstMetric( + e.consumerGroupTopicPartitionOffset, + prometheus.CounterValue, + float64(partition.Value.Offset), + groupName, + topicName, + strconv.Itoa(int(partitionID)), + ) ch <- prometheus.MustNewConstMetric( e.consumerGroupTopicPartitionLag, prometheus.GaugeValue, @@ -176,6 +184,14 @@ func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan if e.minionSvc.Cfg.ConsumerGroups.Granularity == minion.ConsumerGroupGranularityTopic { continue } + ch <- prometheus.MustNewConstMetric( + e.consumerGroupTopicPartitionOffset, + prometheus.CounterValue, + float64(partition.Offset), + groupName, + topic.Topic, + strconv.Itoa(int(partition.Partition)), + ) ch <- prometheus.MustNewConstMetric( e.consumerGroupTopicPartitionLag, prometheus.GaugeValue, diff --git a/prometheus/exporter.go b/prometheus/exporter.go index d717bcfa..7c1814e2 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -44,6 +44,7 @@ type Exporter struct { consumerGroupMembersEmpty *prometheus.Desc consumerGroupTopicMembers *prometheus.Desc consumerGroupAssignedTopicPartitions *prometheus.Desc + consumerGroupTopicPartitionOffset *prometheus.Desc consumerGroupTopicOffsetSum *prometheus.Desc consumerGroupTopicPartitionLag *prometheus.Desc consumerGroupTopicLag *prometheus.Desc @@ -179,7 +180,14 @@ func (e *Exporter) InitializeMetrics() { []string{"group_id", "topic_name"}, nil, ) - // Topic / Partition Offset Sum (useful for calculating the consumed messages / sec on a topic) + // Topic Partition Offsets (useful for calculating the consumed messages / sec on a topic partition) + e.consumerGroupTopicPartitionOffset = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_partition_offset"), + "The committed offset of a consumer group for a given partition", + []string{"group_id", "topic_name", "partition_id"}, + nil, + ) + // Topic Offset Sum (useful for calculating the consumed messages / sec on a topic) e.consumerGroupTopicOffsetSum = prometheus.NewDesc( prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_offset_sum"), "The sum of all committed group offsets across all partitions in a topic", From ffcb3e05ba0248303eaabea112917ed9cb2f06b8 Mon Sep 17 00:00:00 2001 From: Tamas Kornai Date: Tue, 26 Mar 2024 17:14:40 +0100 Subject: [PATCH 2/2] consumer_group_topic_partition_offset should be a gauge Offset resets can decrease its value. --- docs/metrics.md | 2 +- prometheus/collect_consumer_group_lags.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/metrics.md b/docs/metrics.md index d317f442..f36e523d 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -96,7 +96,7 @@ kminion_kafka_consumer_group_topic_offset_sum{group_id="bigquery-sink",topic_nam kminion_kafka_consumer_group_topic_partition_lag{group_id="bigquery-sink",partition_id="10",topic_name="shop-activity"} 147481 # HELP kminion_kafka_consumer_group_topic_partition_offset The committed offset of a consumer group for a given partition -# TYPE kminion_kafka_consumer_group_topic_partition_offset counter +# TYPE kminion_kafka_consumer_group_topic_partition_offset gauge kminion_kafka_consumer_group_topic_partition_offset{group_id="bigquery-sink",partition_id="10",topic_name="shop-activity"} 427 # HELP kminion_kafka_consumer_group_topic_lag The number of messages a consumer group is lagging behind across all partitions in a topic diff --git a/prometheus/collect_consumer_group_lags.go b/prometheus/collect_consumer_group_lags.go index c5df5703..18b0db58 100644 --- a/prometheus/collect_consumer_group_lags.go +++ b/prometheus/collect_consumer_group_lags.go @@ -86,7 +86,7 @@ func (e *Exporter) collectConsumerGroupLagsOffsetTopic(_ context.Context, ch cha } ch <- prometheus.MustNewConstMetric( e.consumerGroupTopicPartitionOffset, - prometheus.CounterValue, + prometheus.GaugeValue, float64(partition.Value.Offset), groupName, topicName, @@ -186,7 +186,7 @@ func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan } ch <- prometheus.MustNewConstMetric( e.consumerGroupTopicPartitionOffset, - prometheus.CounterValue, + prometheus.GaugeValue, float64(partition.Offset), groupName, topic.Topic,