Skip to content

Commit 20e5c6c

Browse files
author
amuraru
committed
temp fix
1 parent 084bc29 commit 20e5c6c

File tree

2 files changed

+24
-54
lines changed

2 files changed

+24
-54
lines changed

e2e/bin/integration-test.sh

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,14 @@ validate_builtin_metrics() {
100100
local core_metrics=(
101101
"kminion_exporter_up"
102102
"kminion_exporter_offset_consumer_records_consumed_total"
103+
"kminion_kafka_received_bytes"
104+
"kminion_kafka_requests_received_total"
105+
"kminion_kafka_requests_sent_total"
106+
"kminion_kafka_sent_bytes"
107+
"kminion_log_messages_total"
103108
)
104109

105-
# Kafka cluster metrics
110+
# Kafka cluster/broker metrics
106111
local kafka_metrics=(
107112
"kminion_kafka_cluster_info"
108113
"kminion_kafka_broker_info"
@@ -111,17 +116,29 @@ validate_builtin_metrics() {
111116
# Topic metrics
112117
local topic_metrics=(
113118
"kminion_kafka_topic_info"
114-
"kminion_kafka_topic_partition_high_water_mark"
115-
"kminion_kafka_topic_high_water_mark_sum"
116-
"kminion_kafka_topic_partition_low_water_mark"
119+
"kminion_kafka_topic_info_min_insync_replicas"
120+
"kminion_kafka_topic_info_partitions_count"
121+
"kminion_kafka_topic_info_replication_factor"
122+
"kminion_kafka_topic_info_retention_ms"
117123
"kminion_kafka_topic_low_water_mark_sum"
124+
"kminion_kafka_topic_partition_low_water_mark"
125+
"kminion_kafka_topic_high_water_mark_sum"
126+
"kminion_kafka_topic_partition_high_water_mark"
127+
"kminion_kafka_topic_max_timestamp"
128+
"kminion_kafka_topic_partition_max_timestamp"
118129
)
119130

120131
# Consumer group metrics
121132
local consumer_group_metrics=(
122133
"kminion_kafka_consumer_group_info"
134+
"kminion_kafka_consumer_group_info_count"
135+
"kminion_kafka_consumer_group_info_empty_groups"
123136
"kminion_kafka_consumer_group_members"
137+
"kminion_kafka_consumer_group_topic_assigned_partitions"
124138
"kminion_kafka_consumer_group_topic_lag"
139+
"kminion_kafka_consumer_group_topic_members"
140+
"kminion_kafka_consumer_group_topic_offset_sum"
141+
"kminion_kafka_consumer_group_topic_partition_lag"
125142
)
126143

127144
# Log dir metrics

minion/list_offsets.go

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -64,49 +64,19 @@ func (s *Service) listOffsetsCached(ctx context.Context, offsetType string) (kad
6464

6565
// ListEndOffsets fetches the high water mark for all topic partitions.
6666
func (s *Service) ListEndOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
67-
return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) {
68-
if len(topics) == 0 {
69-
// Get all topics if none specified
70-
allTopics, err := s.getAllowedTopics(ctx)
71-
if err != nil {
72-
return nil, fmt.Errorf("failed to get topics for listing end offsets: %w", err)
73-
}
74-
topics = allTopics
75-
}
76-
return s.admClient.ListEndOffsets(ctx, topics...)
77-
}, "end")
67+
return s.listOffsetsInternal(ctx, s.admClient.ListEndOffsets, "end")
7868
}
7969

8070
// ListStartOffsets fetches the low water mark for all topic partitions.
8171
func (s *Service) ListStartOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
82-
return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) {
83-
if len(topics) == 0 {
84-
// Get all topics if none specified
85-
allTopics, err := s.getAllowedTopics(ctx)
86-
if err != nil {
87-
return nil, fmt.Errorf("failed to get topics for listing start offsets: %w", err)
88-
}
89-
topics = allTopics
90-
}
91-
return s.admClient.ListStartOffsets(ctx, topics...)
92-
}, "start")
72+
return s.listOffsetsInternal(ctx, s.admClient.ListStartOffsets, "start")
9373
}
9474

9575
// ListMaxTimestampOffsets fetches the offsets for the maximum timestamp for all topic partitions.
9676
// This requires Kafka 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541)
9777
// Uses the dedicated ListMaxTimestampOffsets method from franz-go.
9878
func (s *Service) ListMaxTimestampOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
99-
return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) {
100-
if len(topics) == 0 {
101-
// Get all topics if none specified
102-
allTopics, err := s.getAllowedTopics(ctx)
103-
if err != nil {
104-
return nil, fmt.Errorf("failed to get topics for listing max timestamp offsets: %w", err)
105-
}
106-
topics = allTopics
107-
}
108-
return s.admClient.ListMaxTimestampOffsets(ctx, topics...)
109-
}, "maxTimestamp")
79+
return s.listOffsetsInternal(ctx, s.admClient.ListMaxTimestampOffsets, "maxTimestamp")
11080
}
11181

11282
type listOffsetsFunc func(context.Context, ...string) (kadm.ListedOffsets, error)
@@ -159,20 +129,3 @@ func (s *Service) listOffsetsInternal(ctx context.Context, listFunc listOffsetsF
159129

160130
return listedOffsets, nil
161131
}
162-
163-
// getAllowedTopics gets all topics from metadata and filters them according to the configuration
164-
func (s *Service) getAllowedTopics(ctx context.Context) ([]string, error) {
165-
metadata, err := s.GetMetadataCached(ctx)
166-
if err != nil {
167-
return nil, fmt.Errorf("failed to get metadata: %w", err)
168-
}
169-
170-
var allowedTopics []string
171-
for _, topic := range metadata.Topics {
172-
if topic.Topic != nil && s.IsTopicAllowed(*topic.Topic) {
173-
allowedTopics = append(allowedTopics, *topic.Topic)
174-
}
175-
}
176-
177-
return allowedTopics, nil
178-
}

0 commit comments

Comments
 (0)