diff --git a/e2e/bin/integration-test.sh b/e2e/bin/integration-test.sh index 60562ec..24a321f 100755 --- a/e2e/bin/integration-test.sh +++ b/e2e/bin/integration-test.sh @@ -100,9 +100,14 @@ validate_builtin_metrics() { local core_metrics=( "kminion_exporter_up" "kminion_exporter_offset_consumer_records_consumed_total" + "kminion_kafka_received_bytes" + "kminion_kafka_requests_received_total" + "kminion_kafka_requests_sent_total" + "kminion_kafka_sent_bytes" + "kminion_log_messages_total" ) - # Kafka cluster metrics + # Kafka cluster/broker metrics local kafka_metrics=( "kminion_kafka_cluster_info" "kminion_kafka_broker_info" @@ -111,17 +116,29 @@ validate_builtin_metrics() { # Topic metrics local topic_metrics=( "kminion_kafka_topic_info" - "kminion_kafka_topic_partition_high_water_mark" - "kminion_kafka_topic_high_water_mark_sum" - "kminion_kafka_topic_partition_low_water_mark" + "kminion_kafka_topic_info_min_insync_replicas" + "kminion_kafka_topic_info_partitions_count" + "kminion_kafka_topic_info_replication_factor" + "kminion_kafka_topic_info_retention_ms" "kminion_kafka_topic_low_water_mark_sum" + "kminion_kafka_topic_partition_low_water_mark" + "kminion_kafka_topic_high_water_mark_sum" + "kminion_kafka_topic_partition_high_water_mark" + "kminion_kafka_topic_max_timestamp" + "kminion_kafka_topic_partition_max_timestamp" ) # Consumer group metrics local consumer_group_metrics=( "kminion_kafka_consumer_group_info" + "kminion_kafka_consumer_group_info_count" + "kminion_kafka_consumer_group_info_empty_groups" "kminion_kafka_consumer_group_members" + "kminion_kafka_consumer_group_topic_assigned_partitions" "kminion_kafka_consumer_group_topic_lag" + "kminion_kafka_consumer_group_topic_members" + "kminion_kafka_consumer_group_topic_offset_sum" + "kminion_kafka_consumer_group_topic_partition_lag" ) # Log dir metrics diff --git a/go.mod b/go.mod index 5e76a51..d61eaa7 100644 --- a/go.mod +++ b/go.mod @@ -16,8 +16,8 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.23.2 github.com/stretchr/testify v1.11.1 - github.com/twmb/franz-go v1.20.0 - github.com/twmb/franz-go/pkg/kadm v1.17.0 + github.com/twmb/franz-go v1.20.1 + github.com/twmb/franz-go/pkg/kadm v1.17.1 github.com/twmb/franz-go/pkg/kmsg v1.12.0 github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 go.uber.org/atomic v1.11.0 @@ -37,7 +37,7 @@ require ( github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.1 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect diff --git a/go.sum b/go.sum index 5650e68..42f2cad 100644 --- a/go.sum +++ b/go.sum @@ -38,8 +38,8 @@ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJk github.com/jellydator/ttlcache/v3 v3.4.0 h1:YS4P125qQS0tNhtL6aeYkheEaB/m8HCqdMMP4mnWdTY= github.com/jellydator/ttlcache/v3 v3.4.0/go.mod h1:Hw9EgjymziQD3yGsQdf1FqFdpp7YjFMd4Srg5EJlgD4= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= +github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo= github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/parsers/yaml v1.1.0 h1:3ltfm9ljprAHt4jxgeYLlFPmUaunuCgu1yILuTXRdM4= @@ -93,10 +93,10 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/twmb/franz-go v1.7.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro= -github.com/twmb/franz-go v1.20.0 h1:j+FLLIo8wuMtp4IV7ulT5MVsQyAtl/GJqFmncIq6BkU= -github.com/twmb/franz-go v1.20.0/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA= -github.com/twmb/franz-go/pkg/kadm v1.17.0 h1:u9iZ83OLbFu3xLKDP83NBaGPDeBRT/2OwOH+KJG0OF8= -github.com/twmb/franz-go/pkg/kadm v1.17.0/go.mod h1:VL9WdrYqaoVST5xRUzoWqSJN4itcTxdMrLE5Fhc2Prk= +github.com/twmb/franz-go v1.20.1 h1:ql6+OXi0DPJPSEeOY2zApQu+IssoRLTazl+u2cy5xAo= +github.com/twmb/franz-go v1.20.1/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA= +github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw= +github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg= github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc= github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY= diff --git a/minion/list_offsets.go b/minion/list_offsets.go index 51d6b9e..9f7c7d5 100644 --- a/minion/list_offsets.go +++ b/minion/list_offsets.go @@ -64,49 +64,19 @@ func (s *Service) listOffsetsCached(ctx context.Context, offsetType string) (kad // ListEndOffsets fetches the high water mark for all topic partitions. func (s *Service) ListEndOffsets(ctx context.Context) (kadm.ListedOffsets, error) { - return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) { - if len(topics) == 0 { - // Get all topics if none specified - allTopics, err := s.getAllowedTopics(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get topics for listing end offsets: %w", err) - } - topics = allTopics - } - return s.admClient.ListEndOffsets(ctx, topics...) - }, "end") + return s.listOffsetsInternal(ctx, s.admClient.ListEndOffsets, "end") } // ListStartOffsets fetches the low water mark for all topic partitions. func (s *Service) ListStartOffsets(ctx context.Context) (kadm.ListedOffsets, error) { - return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) { - if len(topics) == 0 { - // Get all topics if none specified - allTopics, err := s.getAllowedTopics(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get topics for listing start offsets: %w", err) - } - topics = allTopics - } - return s.admClient.ListStartOffsets(ctx, topics...) - }, "start") + return s.listOffsetsInternal(ctx, s.admClient.ListStartOffsets, "start") } // ListMaxTimestampOffsets fetches the offsets for the maximum timestamp for all topic partitions. // This requires Kafka 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541) // Uses the dedicated ListMaxTimestampOffsets method from franz-go. func (s *Service) ListMaxTimestampOffsets(ctx context.Context) (kadm.ListedOffsets, error) { - return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) { - if len(topics) == 0 { - // Get all topics if none specified - allTopics, err := s.getAllowedTopics(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get topics for listing max timestamp offsets: %w", err) - } - topics = allTopics - } - return s.admClient.ListMaxTimestampOffsets(ctx, topics...) - }, "maxTimestamp") + return s.listOffsetsInternal(ctx, s.admClient.ListMaxTimestampOffsets, "maxTimestamp") } type listOffsetsFunc func(context.Context, ...string) (kadm.ListedOffsets, error) @@ -159,20 +129,3 @@ func (s *Service) listOffsetsInternal(ctx context.Context, listFunc listOffsetsF return listedOffsets, nil } - -// getAllowedTopics gets all topics from metadata and filters them according to the configuration -func (s *Service) getAllowedTopics(ctx context.Context) ([]string, error) { - metadata, err := s.GetMetadataCached(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get metadata: %w", err) - } - - var allowedTopics []string - for _, topic := range metadata.Topics { - if topic.Topic != nil && s.IsTopicAllowed(*topic.Topic) { - allowedTopics = append(allowedTopics, *topic.Topic) - } - } - - return allowedTopics, nil -}