Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions e2e/bin/integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ validate_builtin_metrics() {
local core_metrics=(
"kminion_exporter_up"
"kminion_exporter_offset_consumer_records_consumed_total"
"kminion_kafka_received_bytes"
"kminion_kafka_requests_received_total"
"kminion_kafka_requests_sent_total"
"kminion_kafka_sent_bytes"
"kminion_log_messages_total"
)

# Kafka cluster metrics
# Kafka cluster/broker metrics
local kafka_metrics=(
"kminion_kafka_cluster_info"
"kminion_kafka_broker_info"
Expand All @@ -111,17 +116,29 @@ validate_builtin_metrics() {
# Topic metrics
local topic_metrics=(
"kminion_kafka_topic_info"
"kminion_kafka_topic_partition_high_water_mark"
"kminion_kafka_topic_high_water_mark_sum"
"kminion_kafka_topic_partition_low_water_mark"
"kminion_kafka_topic_info_min_insync_replicas"
"kminion_kafka_topic_info_partitions_count"
"kminion_kafka_topic_info_replication_factor"
"kminion_kafka_topic_info_retention_ms"
"kminion_kafka_topic_low_water_mark_sum"
"kminion_kafka_topic_partition_low_water_mark"
"kminion_kafka_topic_high_water_mark_sum"
"kminion_kafka_topic_partition_high_water_mark"
"kminion_kafka_topic_max_timestamp"
"kminion_kafka_topic_partition_max_timestamp"
)

# Consumer group metrics
local consumer_group_metrics=(
"kminion_kafka_consumer_group_info"
"kminion_kafka_consumer_group_info_count"
"kminion_kafka_consumer_group_info_empty_groups"
"kminion_kafka_consumer_group_members"
"kminion_kafka_consumer_group_topic_assigned_partitions"
"kminion_kafka_consumer_group_topic_lag"
"kminion_kafka_consumer_group_topic_members"
"kminion_kafka_consumer_group_topic_offset_sum"
"kminion_kafka_consumer_group_topic_partition_lag"
)

# Log dir metrics
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.2
github.com/stretchr/testify v1.11.1
github.com/twmb/franz-go v1.20.0
github.com/twmb/franz-go/pkg/kadm v1.17.0
github.com/twmb/franz-go v1.20.1
github.com/twmb/franz-go/pkg/kadm v1.17.1
github.com/twmb/franz-go/pkg/kmsg v1.12.0
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0
go.uber.org/atomic v1.11.0
Expand All @@ -37,7 +37,7 @@ require (
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/knadh/koanf/maps v0.1.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJk
github.com/jellydator/ttlcache/v3 v3.4.0 h1:YS4P125qQS0tNhtL6aeYkheEaB/m8HCqdMMP4mnWdTY=
github.com/jellydator/ttlcache/v3 v3.4.0/go.mod h1:Hw9EgjymziQD3yGsQdf1FqFdpp7YjFMd4Srg5EJlgD4=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo=
github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI=
github.com/knadh/koanf/parsers/yaml v1.1.0 h1:3ltfm9ljprAHt4jxgeYLlFPmUaunuCgu1yILuTXRdM4=
Expand Down Expand Up @@ -93,10 +93,10 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/twmb/franz-go v1.7.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro=
github.com/twmb/franz-go v1.20.0 h1:j+FLLIo8wuMtp4IV7ulT5MVsQyAtl/GJqFmncIq6BkU=
github.com/twmb/franz-go v1.20.0/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA=
github.com/twmb/franz-go/pkg/kadm v1.17.0 h1:u9iZ83OLbFu3xLKDP83NBaGPDeBRT/2OwOH+KJG0OF8=
github.com/twmb/franz-go/pkg/kadm v1.17.0/go.mod h1:VL9WdrYqaoVST5xRUzoWqSJN4itcTxdMrLE5Fhc2Prk=
github.com/twmb/franz-go v1.20.1 h1:ql6+OXi0DPJPSEeOY2zApQu+IssoRLTazl+u2cy5xAo=
github.com/twmb/franz-go v1.20.1/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA=
github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw=
github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg=
github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc=
github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY=
Expand Down
53 changes: 3 additions & 50 deletions minion/list_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,49 +64,19 @@ func (s *Service) listOffsetsCached(ctx context.Context, offsetType string) (kad

// ListEndOffsets fetches the high water mark for all topic partitions.
func (s *Service) ListEndOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) {
if len(topics) == 0 {
// Get all topics if none specified
allTopics, err := s.getAllowedTopics(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get topics for listing end offsets: %w", err)
}
topics = allTopics
}
return s.admClient.ListEndOffsets(ctx, topics...)
}, "end")
return s.listOffsetsInternal(ctx, s.admClient.ListEndOffsets, "end")
}

// ListStartOffsets fetches the low water mark for all topic partitions.
func (s *Service) ListStartOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) {
if len(topics) == 0 {
// Get all topics if none specified
allTopics, err := s.getAllowedTopics(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get topics for listing start offsets: %w", err)
}
topics = allTopics
}
return s.admClient.ListStartOffsets(ctx, topics...)
}, "start")
return s.listOffsetsInternal(ctx, s.admClient.ListStartOffsets, "start")
}

// ListMaxTimestampOffsets fetches the offsets for the maximum timestamp for all topic partitions.
// This requires Kafka 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541)
// Uses the dedicated ListMaxTimestampOffsets method from franz-go.
func (s *Service) ListMaxTimestampOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) {
if len(topics) == 0 {
// Get all topics if none specified
allTopics, err := s.getAllowedTopics(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get topics for listing max timestamp offsets: %w", err)
}
topics = allTopics
}
return s.admClient.ListMaxTimestampOffsets(ctx, topics...)
}, "maxTimestamp")
return s.listOffsetsInternal(ctx, s.admClient.ListMaxTimestampOffsets, "maxTimestamp")
}

type listOffsetsFunc func(context.Context, ...string) (kadm.ListedOffsets, error)
Expand Down Expand Up @@ -159,20 +129,3 @@ func (s *Service) listOffsetsInternal(ctx context.Context, listFunc listOffsetsF

return listedOffsets, nil
}

// getAllowedTopics gets all topics from metadata and filters them according to the configuration
func (s *Service) getAllowedTopics(ctx context.Context) ([]string, error) {
metadata, err := s.GetMetadataCached(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata: %w", err)
}

var allowedTopics []string
for _, topic := range metadata.Topics {
if topic.Topic != nil && s.IsTopicAllowed(*topic.Topic) {
allowedTopics = append(allowedTopics, *topic.Topic)
}
}

return allowedTopics, nil
}