Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions minion/acls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package minion

import (
"context"
"github.com/twmb/franz-go/pkg/kmsg"
)

func (s *Service) ListAllACLs(ctx context.Context) (*kmsg.DescribeACLsResponse, error) {
req := kmsg.NewDescribeACLsRequest()
req.ResourceType = kmsg.ACLResourceTypeAny
req.ResourcePatternType = kmsg.ACLResourcePatternTypeAny
req.ResourceName = nil
req.Principal = nil
req.Host = nil
req.Operation = kmsg.ACLOperationAny
req.PermissionType = kmsg.ACLPermissionTypeAny

res, err := req.RequestWith(ctx, s.client)
if err != nil {
return nil, err
}

return res, nil
}
69 changes: 69 additions & 0 deletions prometheus/collect_acl_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package prometheus

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
)

func (e *Exporter) collectACLInfo(ctx context.Context, ch chan<- prometheus.Metric) bool {
ACLRes, err := e.minionSvc.ListAllACLs(ctx)
if err != nil {
e.logger.Error("failed to fetch ACLs", zap.Error(err))
return false
}

ACLsByType := getResourceTypeName(ACLRes)
totalACLs := 0
for _, count := range ACLsByType {
totalACLs += count
}

ch <- prometheus.MustNewConstMetric(
e.aclCount,
prometheus.GaugeValue,
float64(totalACLs),
)

for resourceType, count := range ACLsByType {
ch <- prometheus.MustNewConstMetric(
e.aclCountByType,
prometheus.GaugeValue,
float64(count),
resourceType,
)
}

return true
}

func getResourceTypeName(ACLResponse *kmsg.DescribeACLsResponse) map[string]int {
ACLsByType := make(map[string]int)
for _, resource := range ACLResponse.Resources {
resourceType := "unknown"
switch resource.ResourceType {
case 0:
resourceType = "unknown"
case 1:
resourceType = "any"
case 2:
resourceType = "topic"
case 3:
resourceType = "group"
case 4:
resourceType = "cluster"
case 5:
resourceType = "transactional_id"
case 6:
resourceType = "delegation_token"
case 7:
resourceType = "user"
}

ACLsByType[resourceType] += len(resource.ACLs)
}

return ACLsByType
}
23 changes: 20 additions & 3 deletions prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type Exporter struct {
consumerGroupTopicPartitionLag *prometheus.Desc
consumerGroupTopicLag *prometheus.Desc
offsetCommits *prometheus.Desc

// ACLs
aclCount *prometheus.Desc
aclCountByType *prometheus.Desc
}

func NewExporter(cfg Config, logger *zap.Logger, minionSvc *minion.Service) (*Exporter, error) {
Expand Down Expand Up @@ -158,7 +162,7 @@ func (e *Exporter) InitializeMetrics() {
[]string{"group_id"},
nil,
)
// Group Empty Memmbers
// Group Empty Members
e.consumerGroupMembersEmpty = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_empty_members"),
"It will report the number of members in the consumer group with no partition assigned",
Expand Down Expand Up @@ -207,7 +211,19 @@ func (e *Exporter) InitializeMetrics() {
[]string{"group_id"},
nil,
)

// ACLs
e.aclCount = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "acls_total"),
"The total number of ACLs in the cluster",
[]string{},
nil,
)
e.aclCountByType = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "acls_by_type"),
"The number of ACLs by resource type",
[]string{"resource_type"},
nil,
)
}

// Describe implements the prometheus.Collector interface. It sends the
Expand All @@ -224,7 +240,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()

// Attach a unique id which will be used for caching (and and it's invalidation) of the kafka requests
// Attach a unique id which will be used for caching (and it's invalidation) of the kafka requests
uuid := uuid2.New()
ctx = context.WithValue(ctx, "requestId", uuid.String())

Expand All @@ -236,6 +252,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
ok = e.collectTopicPartitionOffsets(ctx, ch) && ok
ok = e.collectConsumerGroupLags(ctx, ch) && ok
ok = e.collectTopicInfo(ctx, ch) && ok
ok = e.collectACLInfo(ctx, ch) && ok

if ok {
ch <- prometheus.MustNewConstMetric(e.exporterUp, prometheus.GaugeValue, 1.0)
Expand Down