Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions docs/reference-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# file, specify the path to the config file by setting the env variable
# CONFIG_FILEPATH.
#
# The env variable name is auto generated by upper casing everything and adding
# The env variable name is auto generated by upper-casing everything and adding
# an underscore for each indentation/level. Some examples:
# kafka.rackId => KAFKA_RACKID
# kafka.tls.caFilepath => KAFKA_TLS_CAFILEPATH
Expand Down Expand Up @@ -40,7 +40,7 @@ kafka:
insecureSkipTlsVerify: false

sasl:
# Whether or not SASL authentication will be used for authentication
# Whether SASL authentication will be used for authentication
enabled: false
# Username to use for PLAIN or SCRAM mechanism
username: ""
Expand Down Expand Up @@ -111,6 +111,12 @@ minion:
# to version 1.0.0 as describing log dirs was not supported back then.
enabled: true

# ACL Metrics
acls:
# Enabled specifies whether ACL information shall be scraped and exported as metrics.
# If disabled, no ACL metrics will be collected.
enabled: false

# EndToEnd Metrics
# When enabled, kminion creates a topic which it produces to and consumes from, to measure various advanced metrics. See docs for more info
endToEnd:
Expand All @@ -135,7 +141,7 @@ minion:
replicationFactor: 1

# Rarely makes sense to change this, but maybe if you want some sort of cheap load test?
# By default (1) every broker gets one partition
# By default, (1) every broker gets one partition
partitionsPerBroker: 1

producer:
Expand All @@ -153,7 +159,7 @@ minion:
groupIdPrefix: kminion-end-to-end

# Whether KMinion should try to delete empty consumer groups with the same prefix. This can be used if you want
# KMinion to cleanup it's old consumer groups. It should only be used if you use a unique prefix for KMinion.
# KMinion to clean up it's old consumer groups. It should only be used if you use a unique prefix for KMinion.
deleteStaleConsumerGroups: false

# This defines:
Expand Down
13 changes: 13 additions & 0 deletions minion/acl_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package minion

type ACLsConfig struct {
Enabled bool `koanf:"enabled"`
}

func (c *ACLsConfig) Validate() error {
return nil
}

func (c *ACLsConfig) SetDefaults() {
c.Enabled = false
}
24 changes: 24 additions & 0 deletions minion/acls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package minion

import (
"context"
"github.com/twmb/franz-go/pkg/kmsg"
)

func (s *Service) ListAllACLs(ctx context.Context) (*kmsg.DescribeACLsResponse, error) {
req := kmsg.NewDescribeACLsRequest()
req.ResourceType = kmsg.ACLResourceTypeAny
req.ResourcePatternType = kmsg.ACLResourcePatternTypeAny
req.ResourceName = nil
req.Principal = nil
req.Host = nil
req.Operation = kmsg.ACLOperationAny
req.PermissionType = kmsg.ACLPermissionTypeAny

res, err := req.RequestWith(ctx, s.client)
if err != nil {
return nil, err
}

return res, nil
}
7 changes: 7 additions & 0 deletions minion/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ type Config struct {
Topics TopicConfig `koanf:"topics"`
LogDirs LogDirsConfig `koanf:"logDirs"`
EndToEnd e2e.Config `koanf:"endToEnd"`
ACLs ACLsConfig `koanf:"acls"`
}

func (c *Config) SetDefaults() {
c.ConsumerGroups.SetDefaults()
c.Topics.SetDefaults()
c.LogDirs.SetDefaults()
c.EndToEnd.SetDefaults()
c.ACLs.SetDefaults()
}

func (c *Config) Validate() error {
Expand All @@ -41,5 +43,10 @@ func (c *Config) Validate() error {
return fmt.Errorf("failed to validate endToEnd config: %w", err)
}

err = c.ACLs.Validate()
if err != nil {
return fmt.Errorf("failed to validate ACLs config: %w", err)
}

return nil
}
54 changes: 54 additions & 0 deletions prometheus/collect_acl_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package prometheus

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
)

func (e *Exporter) collectACLInfo(ctx context.Context, ch chan<- prometheus.Metric) bool {
if !e.minionSvc.Cfg.ACLs.Enabled {
return true
}

ACLRes, err := e.minionSvc.ListAllACLs(ctx)
if err != nil {
e.logger.Error("failed to fetch ACLs", zap.Error(err))
return false
}

ACLsByType := getResourceTypeName(ACLRes)
totalACLs := 0
for _, count := range ACLsByType {
totalACLs += count
}

ch <- prometheus.MustNewConstMetric(
e.aclCount,
prometheus.GaugeValue,
float64(totalACLs),
)

for resourceType, count := range ACLsByType {
ch <- prometheus.MustNewConstMetric(
e.aclCountByType,
prometheus.GaugeValue,
float64(count),
resourceType,
)
}

return true
}

func getResourceTypeName(ACLResponse *kmsg.DescribeACLsResponse) map[string]int {
ACLsByType := make(map[string]int)
for _, resource := range ACLResponse.Resources {
resourceType := resource.ResourceType.String()
ACLsByType[resourceType] += len(resource.ACLs)
}

return ACLsByType
}
23 changes: 20 additions & 3 deletions prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type Exporter struct {
consumerGroupTopicPartitionLag *prometheus.Desc
consumerGroupTopicLag *prometheus.Desc
offsetCommits *prometheus.Desc

// ACLs
aclCount *prometheus.Desc
aclCountByType *prometheus.Desc
}

func NewExporter(cfg Config, logger *zap.Logger, minionSvc *minion.Service) (*Exporter, error) {
Expand Down Expand Up @@ -158,7 +162,7 @@ func (e *Exporter) InitializeMetrics() {
[]string{"group_id"},
nil,
)
// Group Empty Memmbers
// Group Empty Members
e.consumerGroupMembersEmpty = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_empty_members"),
"It will report the number of members in the consumer group with no partition assigned",
Expand Down Expand Up @@ -207,7 +211,19 @@ func (e *Exporter) InitializeMetrics() {
[]string{"group_id"},
nil,
)

// ACLs
e.aclCount = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "acls_total"),
"The total number of ACLs in the cluster",
[]string{},
nil,
)
e.aclCountByType = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "acls_by_type"),
"The number of ACLs by resource type",
[]string{"resource_type"},
nil,
)
}

// Describe implements the prometheus.Collector interface. It sends the
Expand All @@ -224,7 +240,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()

// Attach a unique id which will be used for caching (and and it's invalidation) of the kafka requests
// Attach a unique id which will be used for caching (and it's invalidation) of the kafka requests
uuid := uuid2.New()
ctx = context.WithValue(ctx, "requestId", uuid.String())

Expand All @@ -236,6 +252,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
ok = e.collectTopicPartitionOffsets(ctx, ch) && ok
ok = e.collectConsumerGroupLags(ctx, ch) && ok
ok = e.collectTopicInfo(ctx, ch) && ok
ok = e.collectACLInfo(ctx, ch) && ok

if ok {
ch <- prometheus.MustNewConstMetric(e.exporterUp, prometheus.GaugeValue, 1.0)
Expand Down