Skip to content

Commit 58e79db

Browse files
committed
[INTERNAL] Add a new metrics to count all groups included those ignored
This is useful to capture the total number of groups in kafka
1 parent f01e512 commit 58e79db

4 files changed

+38
-17
lines changed

minion/consumer_group_offsets.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ func (s *Service) ListAllConsumerGroupOffsetsAdminAPI(ctx context.Context) (map[
2222
if err != nil {
2323
return nil, fmt.Errorf("failed to list groupsRes: %w", err)
2424
}
25-
groupIDs := make([]string, len(groupsRes.Groups))
26-
for i, group := range groupsRes.Groups {
25+
groupIDs := make([]string, len(groupsRes.AllowedGroups.Groups))
26+
for i, group := range groupsRes.AllowedGroups.Groups {
2727
groupIDs[i] = group.Group
2828
}
2929

minion/describe_consumer_groups.go

+22-14
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,18 @@ type DescribeConsumerGroupsResponse struct {
1616
Groups *kmsg.DescribeGroupsResponse
1717
}
1818

19-
func (s *Service) listConsumerGroupsCached(ctx context.Context) (*kmsg.ListGroupsResponse, error) {
20-
key := "list-consumer-groups"
19+
type GroupsInfo struct {
20+
AllowedGroups *kmsg.ListGroupsResponse
21+
AllGroupsCount int
22+
}
23+
24+
func (s *Service) listConsumerGroupsCached(ctx context.Context) (*GroupsInfo, error) {
25+
keyAllowedGroups := "list-consumer-groups"
2126

22-
if cachedRes, exists := s.getCachedItem(key); exists {
23-
return cachedRes.(*kmsg.ListGroupsResponse), nil
27+
if cachedRes, exists := s.getCachedItem(keyAllowedGroups); exists {
28+
return cachedRes.(*GroupsInfo), nil
2429
}
25-
res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) {
30+
groups, err, _ := s.requestGroup.Do(keyAllowedGroups, func() (interface{}, error) {
2631
res, err := s.listConsumerGroups(ctx)
2732
if err != nil {
2833
return nil, err
@@ -34,15 +39,19 @@ func (s *Service) listConsumerGroupsCached(ctx context.Context) (*kmsg.ListGroup
3439
}
3540
}
3641
res.Groups = allowedGroups
37-
s.setCachedItem(key, res, 120*time.Second)
42+
groups := &GroupsInfo{
43+
AllGroupsCount: len(res.Groups),
44+
AllowedGroups: res,
45+
}
46+
s.setCachedItem(keyAllowedGroups, groups, 120*time.Second)
3847

39-
return res, nil
48+
return groups, nil
4049
})
4150
if err != nil {
4251
return nil, err
4352
}
4453

45-
return res.(*kmsg.ListGroupsResponse), nil
54+
return groups.(*GroupsInfo), nil
4655
}
4756

4857
func (s *Service) listConsumerGroups(ctx context.Context) (*kmsg.ListGroupsResponse, error) {
@@ -59,14 +68,14 @@ func (s *Service) listConsumerGroups(ctx context.Context) (*kmsg.ListGroupsRespo
5968
return res, nil
6069
}
6170

62-
func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsumerGroupsResponse, error) {
71+
func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsumerGroupsResponse, int, error) {
6372
listRes, err := s.listConsumerGroupsCached(ctx)
6473
if err != nil {
65-
return nil, err
74+
return nil, -1, err
6675
}
6776

68-
groupIDs := make([]string, len(listRes.Groups))
69-
for i, group := range listRes.Groups {
77+
groupIDs := make([]string, len(listRes.AllowedGroups.Groups))
78+
for i, group := range listRes.AllowedGroups.Groups {
7079
groupIDs[i] = group.Group
7180
}
7281

@@ -90,6 +99,5 @@ func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsume
9099
Groups: res,
91100
})
92101
}
93-
94-
return describedGroups, nil
102+
return describedGroups, listRes.AllGroupsCount, nil
95103
}

prometheus/collect_consumer_groups.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe
1515
if !e.minionSvc.Cfg.ConsumerGroups.Enabled {
1616
return true
1717
}
18-
groups, err := e.minionSvc.DescribeConsumerGroups(ctx)
18+
groups, allGroups, err := e.minionSvc.DescribeConsumerGroups(ctx)
1919
if err != nil {
2020
e.logger.Error("failed to collect consumer groups, because Kafka request failed", zap.Error(err))
2121
return false
@@ -138,6 +138,11 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe
138138
}
139139
}
140140
}
141+
ch <- prometheus.MustNewConstMetric(
142+
e.consumerGroupInfoAllGroups,
143+
prometheus.GaugeValue,
144+
float64(allGroups),
145+
)
141146
ch <- prometheus.MustNewConstMetric(
142147
e.consumerGroupInfoEmptyGroups,
143148
prometheus.GaugeValue,

prometheus/exporter.go

+8
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type Exporter struct {
4444

4545
// Consumer Groups
4646
consumerGroupInfo *prometheus.Desc
47+
consumerGroupInfoAllGroups *prometheus.Desc
4748
consumerGroupInfoEmptyGroups *prometheus.Desc
4849
consumerGroupMembers *prometheus.Desc
4950
consumerGroupMembersEmpty *prometheus.Desc
@@ -180,6 +181,13 @@ func (e *Exporter) InitializeMetrics() {
180181
[]string{"group_id", "protocol", "protocol_type", "state", "coordinator_id"},
181182
nil,
182183
)
184+
// Group Info - All Groups
185+
e.consumerGroupInfoAllGroups = prometheus.NewDesc(
186+
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_info_count"),
187+
"Consumer Group info metrics. It will report the number of all groups in cluster.",
188+
nil,
189+
nil,
190+
)
183191
// Group Info - Empty Groups
184192
e.consumerGroupInfoEmptyGroups = prometheus.NewDesc(
185193
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_info_empty_groups"),

0 commit comments

Comments
 (0)