Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions minion/config_log_dirs.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
package minion

import (
"fmt"
)

type LogDirsConfig struct {
// Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior
// to version 1.0.0 as describing log dirs was not supported back then.
Enabled bool `koanf:"enabled"`
// AllowedTopics are regex strings of topic names whose topic metrics that shall be exported.
AllowedTopics []string `koanf:"allowedTopics"`

// IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics
// take precedence over allowed topics.
IgnoredTopics []string `koanf:"ignoredTopics"`
}

// Validate if provided LogDirsConfig is valid.
func (c *LogDirsConfig) Validate() error {
// Check whether each provided string is valid regex
for _, topic := range c.AllowedTopics {
_, err := compileRegex(topic)
if err != nil {
return fmt.Errorf("allowed topic string '%v' is not valid regex", topic)
}
}

for _, topic := range c.IgnoredTopics {
_, err := compileRegex(topic)
if err != nil {
return fmt.Errorf("ignored topic string '%v' is not valid regex", topic)
}
}
return nil
}

// SetDefaults for topic config
func (c *LogDirsConfig) SetDefaults() {
c.Enabled = true
c.AllowedTopics = []string{"/.*/"}
}
14 changes: 10 additions & 4 deletions minion/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ type Service struct {
cache map[string]interface{}
cacheLock sync.RWMutex

AllowedGroupIDsExpr []*regexp.Regexp
IgnoredGroupIDsExpr []*regexp.Regexp
AllowedTopicsExpr []*regexp.Regexp
IgnoredTopicsExpr []*regexp.Regexp
AllowedGroupIDsExpr []*regexp.Regexp
IgnoredGroupIDsExpr []*regexp.Regexp
AllowedTopicsExpr []*regexp.Regexp
IgnoredTopicsExpr []*regexp.Regexp
AllowedlogDirsTopicsExpr []*regexp.Regexp
IgnoredlogDirsTopicsExpr []*regexp.Regexp

client *kgo.Client
storage *Storage
Expand Down Expand Up @@ -67,6 +69,8 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics
ignoredGroupIDsExpr, _ := compileRegexes(cfg.ConsumerGroups.IgnoredGroupIDs)
allowedTopicsExpr, _ := compileRegexes(cfg.Topics.AllowedTopics)
ignoredTopicsExpr, _ := compileRegexes(cfg.Topics.IgnoredTopics)
allowedlogDirsTopicsExpr, _ := compileRegexes(cfg.LogDirs.AllowedTopics)
ignoredlogDirsTopicsExpr, _ := compileRegexes(cfg.LogDirs.IgnoredTopics)

service := &Service{
Cfg: cfg,
Expand All @@ -80,6 +84,8 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics
IgnoredGroupIDsExpr: ignoredGroupIDsExpr,
AllowedTopicsExpr: allowedTopicsExpr,
IgnoredTopicsExpr: ignoredTopicsExpr,
AllowedlogDirsTopicsExpr: allowedlogDirsTopicsExpr,
IgnoredlogDirsTopicsExpr: ignoredlogDirsTopicsExpr,

client: client,
storage: storage,
Expand Down
18 changes: 18 additions & 0 deletions minion/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@ func (s *Service) IsTopicAllowed(topicName string) bool {
return isAllowed
}

func (s *Service) IsLOgDirsTopicAllowed(topicName string) bool {
isAllowed := false
for _, regex := range s.AllowedlogDirsTopicsExpr {
if regex.MatchString(topicName) {
isAllowed = true
break
}
}

for _, regex := range s.IgnoredlogDirsTopicsExpr {
if regex.MatchString(topicName) {
isAllowed = false
break
}
}
return isAllowed
}

func compileRegex(expr string) (*regexp.Regexp, error) {
if strings.HasPrefix(expr, "/") && strings.HasSuffix(expr, "/") {
substr := expr[1 : len(expr)-1]
Expand Down
3 changes: 3 additions & 0 deletions prometheus/collect_log_dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func (e *Exporter) collectLogDirs(ctx context.Context, ch chan<- prometheus.Metr
continue
}
for _, topic := range dir.Topics {
if !e.minionSvc.IsLOgDirsTopicAllowed(topic.Topic) {
continue
}
topicSize := int64(0)
for _, partition := range topic.Partitions {
topicSize += partition.Size
Expand Down