Skip to content

Commit 4d27b37

Browse files
dobrerazvanamuraru
andcommitted
Collect metrics for a set of CG states (#7)
* Filter empty consumer groups * Collect metrics for a set of CG states * Update minion/config_consumer_group.go Co-authored-by: Adrian Muraru <[email protected]> * Implement review * Implement review * Update the CG state values --------- Co-authored-by: Adrian Muraru <[email protected]>
1 parent 8869e31 commit 4d27b37

10 files changed

+39
-12
lines changed

minion/config_consumer_group.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ type ConsumerGroupConfig struct {
3131
// IgnoredGroups are regex strings of group ids that shall be ignored/skipped when exporting metrics. Ignored groups
3232
// take precedence over allowed groups.
3333
IgnoredGroupIDs []string `koanf:"ignoredGroups"`
34+
35+
// Monitor consumer group states. Empty list means all consumer groups are monitoring regardless of its state
36+
// Allowed values are: Dead, Empty, Stable, PreparingRebalance, CompletingRebalance
37+
// Source: https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
38+
AllowedConsumerGroupStates []string `koanf:"allowedConsumerGroupStates"`
3439
}
3540

3641
func (c *ConsumerGroupConfig) SetDefaults() {
@@ -76,3 +81,13 @@ func (c *ConsumerGroupConfig) Validate() error {
7681

7782
return nil
7883
}
84+
85+
// Returns a map of allowed group states for faster lookup
86+
func (c *ConsumerGroupConfig) GetAllowedConsumerGroupStates() map[string]string {
87+
// create a map for faster lookup
88+
groupStatesMap := make(map[string]string, len(c.AllowedConsumerGroupStates))
89+
for _, state := range c.AllowedConsumerGroupStates {
90+
groupStatesMap[state] = state
91+
}
92+
return groupStatesMap
93+
}

minion/consumer_group_offsets.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@ func (s *Service) ListAllConsumerGroupOffsetsAdminAPI(ctx context.Context) (map[
2323
return nil, fmt.Errorf("failed to list groupsRes: %w", err)
2424
}
2525
groupIDs := make([]string, len(groupsRes.AllowedGroups.Groups))
26+
2627
for i, group := range groupsRes.AllowedGroups.Groups {
27-
if group.GroupState != "Dead" {
28-
groupIDs[i] = group.Group
29-
}
28+
groupIDs[i] = group.Group
3029
}
3130

3231
return s.listConsumerGroupOffsetsBulk(ctx, groupIDs)

minion/describe_consumer_groups.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ func (s *Service) listConsumerGroupsCached(ctx context.Context) (*GroupsInfo, er
3333
return nil, err
3434
}
3535
allowedGroups := make([]kmsg.ListGroupsResponseGroup, 0)
36+
3637
for i := range res.Groups {
37-
if s.IsGroupAllowed(res.Groups[i].Group) {
38+
if s.IsGroupAllowed(res.Groups[i].Group, res.Groups[i].GroupState) {
3839
allowedGroups = append(allowedGroups, res.Groups[i])
3940
}
4041
}

minion/storage.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (s *Storage) getNumberOfConsumedRecords() float64 {
108108
return s.consumedRecords.Load()
109109
}
110110

111-
func (s *Storage) getGroupOffsets(isAllowed func(groupName string) bool) map[string]map[string]map[int32]OffsetCommit {
111+
func (s *Storage) getGroupOffsets(isAllowed func(groupName string, groupState string) bool) map[string]map[string]map[int32]OffsetCommit {
112112
// Offsets by group, topic, partition
113113
offsetsByGroup := make(map[string]map[string]map[int32]OffsetCommit)
114114

@@ -121,7 +121,7 @@ func (s *Storage) getGroupOffsets(isAllowed func(groupName string) bool) map[str
121121
for _, offset := range offsets {
122122
val := offset.(OffsetCommit)
123123

124-
if !isAllowed(val.Key.Group) {
124+
if !isAllowed(val.Key.Group, "") {
125125
continue
126126
}
127127

minion/utils.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"strings"
77
)
88

9-
func (s *Service) IsGroupAllowed(groupName string) bool {
9+
func (s *Service) IsGroupAllowed(groupName string, groupState string) bool {
1010
isAllowed := false
1111
for _, regex := range s.AllowedGroupIDsExpr {
1212
if regex.MatchString(groupName) {
@@ -21,6 +21,15 @@ func (s *Service) IsGroupAllowed(groupName string) bool {
2121
break
2222
}
2323
}
24+
25+
if isAllowed && groupState != "" {
26+
groupStatesMap := s.Cfg.ConsumerGroups.GetAllowedConsumerGroupStates()
27+
if len(groupStatesMap) > 0 {
28+
if _, ok := groupStatesMap[groupState]; !ok {
29+
isAllowed = false
30+
}
31+
}
32+
}
2433
return isAllowed
2534
}
2635

prometheus/collect_broker_info.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package prometheus
22

33
import (
44
"context"
5+
"strconv"
6+
57
"github.com/prometheus/client_golang/prometheus"
68
"go.uber.org/zap"
7-
"strconv"
89
)
910

1011
func (e *Exporter) collectBrokerInfo(ctx context.Context, ch chan<- prometheus.Metric) bool {

prometheus/collect_cluster_info.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package prometheus
22

33
import (
44
"context"
5+
"strconv"
6+
57
"github.com/prometheus/client_golang/prometheus"
68
"go.uber.org/zap"
7-
"strconv"
89
)
910

1011
func (e *Exporter) collectClusterInfo(ctx context.Context, ch chan<- prometheus.Metric) bool {

prometheus/collect_exporter_metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package prometheus
22

33
import (
44
"context"
5+
56
"github.com/prometheus/client_golang/prometheus"
67
)
78

prometheus/collect_log_dirs.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ package prometheus
22

33
import (
44
"context"
5+
"strconv"
6+
57
"github.com/prometheus/client_golang/prometheus"
68
"github.com/twmb/franz-go/pkg/kerr"
79
"github.com/twmb/franz-go/pkg/kgo"
810
"go.uber.org/zap"
9-
"strconv"
1011
)
1112

1213
func (e *Exporter) collectLogDirs(ctx context.Context, ch chan<- prometheus.Metric) bool {

prometheus/collect_topic_partition_offsets.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@ import (
44
"context"
55
"strconv"
66

7+
"github.com/cloudhut/kminion/v2/minion"
78
"github.com/prometheus/client_golang/prometheus"
89
"go.uber.org/zap"
9-
10-
"github.com/cloudhut/kminion/v2/minion"
1110
)
1211

1312
func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- prometheus.Metric) bool {

0 commit comments

Comments
 (0)