Skip to content

Commit 60cd863

Browse files
author
amuraru
committed
temp fix
1 parent 084bc29 commit 60cd863

File tree

2 files changed

+10
-56
lines changed

2 files changed

+10
-56
lines changed

e2e/bin/integration-test.sh

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ validate_e2e_metrics() {
5151
"kminion_end_to_end_roundtrip_latency_seconds"
5252
"kminion_end_to_end_offset_commit_latency_seconds"
5353
)
54+
echo ""
55+
log_info "Available E2E metrics:"
56+
echo "$metrics" | grep -E "kminion_end_to_end_" || echo " (none found)"
5457

5558
local missing_metrics=()
5659
for metric in "${required_metrics[@]}"; do
@@ -62,9 +65,6 @@ validate_e2e_metrics() {
6265
if [ ${#missing_metrics[@]} -ne 0 ]; then
6366
log_error "Missing required E2E metrics:"
6467
printf ' - %s\n' "${missing_metrics[@]}"
65-
echo ""
66-
log_info "Available E2E metrics:"
67-
echo "$metrics" | grep -E "kminion_end_to_end_" || echo " (none found)"
6868
return 1
6969
fi
7070

@@ -95,6 +95,9 @@ validate_builtin_metrics() {
9595

9696
local metrics
9797
metrics=$(fetch_metrics "$METRICS_URL") || return 1
98+
echo ""
99+
log_info "Available built-in metrics:"
100+
echo "$metrics" | grep "^kminion_"
98101

99102
# Core exporter metrics
100103
local core_metrics=(
@@ -142,9 +145,7 @@ validate_builtin_metrics() {
142145
if [ ${#missing_metrics[@]} -ne 0 ]; then
143146
log_error "Missing required built-in metrics:"
144147
printf ' - %s\n' "${missing_metrics[@]}"
145-
echo ""
146-
log_info "Available metrics:"
147-
echo "$metrics" | grep "^kminion_"
148+
148149
return 1
149150
fi
150151

minion/list_offsets.go

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -64,49 +64,19 @@ func (s *Service) listOffsetsCached(ctx context.Context, offsetType string) (kad
6464

6565
// ListEndOffsets fetches the high water mark for all topic partitions.
6666
func (s *Service) ListEndOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
67-
return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) {
68-
if len(topics) == 0 {
69-
// Get all topics if none specified
70-
allTopics, err := s.getAllowedTopics(ctx)
71-
if err != nil {
72-
return nil, fmt.Errorf("failed to get topics for listing end offsets: %w", err)
73-
}
74-
topics = allTopics
75-
}
76-
return s.admClient.ListEndOffsets(ctx, topics...)
77-
}, "end")
67+
return s.listOffsetsInternal(ctx, s.admClient.ListEndOffsets, "end")
7868
}
7969

8070
// ListStartOffsets fetches the low water mark for all topic partitions.
8171
func (s *Service) ListStartOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
82-
return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) {
83-
if len(topics) == 0 {
84-
// Get all topics if none specified
85-
allTopics, err := s.getAllowedTopics(ctx)
86-
if err != nil {
87-
return nil, fmt.Errorf("failed to get topics for listing start offsets: %w", err)
88-
}
89-
topics = allTopics
90-
}
91-
return s.admClient.ListStartOffsets(ctx, topics...)
92-
}, "start")
72+
return s.listOffsetsInternal(ctx, s.admClient.ListStartOffsets, "start")
9373
}
9474

9575
// ListMaxTimestampOffsets fetches the offsets for the maximum timestamp for all topic partitions.
9676
// This requires Kafka 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541)
9777
// Uses the dedicated ListMaxTimestampOffsets method from franz-go.
9878
func (s *Service) ListMaxTimestampOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
99-
return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) {
100-
if len(topics) == 0 {
101-
// Get all topics if none specified
102-
allTopics, err := s.getAllowedTopics(ctx)
103-
if err != nil {
104-
return nil, fmt.Errorf("failed to get topics for listing max timestamp offsets: %w", err)
105-
}
106-
topics = allTopics
107-
}
108-
return s.admClient.ListMaxTimestampOffsets(ctx, topics...)
109-
}, "maxTimestamp")
79+
return s.listOffsetsInternal(ctx, s.admClient.ListMaxTimestampOffsets, "maxTimestamp")
11080
}
11181

11282
type listOffsetsFunc func(context.Context, ...string) (kadm.ListedOffsets, error)
@@ -159,20 +129,3 @@ func (s *Service) listOffsetsInternal(ctx context.Context, listFunc listOffsetsF
159129

160130
return listedOffsets, nil
161131
}
162-
163-
// getAllowedTopics gets all topics from metadata and filters them according to the configuration
164-
func (s *Service) getAllowedTopics(ctx context.Context) ([]string, error) {
165-
metadata, err := s.GetMetadataCached(ctx)
166-
if err != nil {
167-
return nil, fmt.Errorf("failed to get metadata: %w", err)
168-
}
169-
170-
var allowedTopics []string
171-
for _, topic := range metadata.Topics {
172-
if topic.Topic != nil && s.IsTopicAllowed(*topic.Topic) {
173-
allowedTopics = append(allowedTopics, *topic.Topic)
174-
}
175-
}
176-
177-
return allowedTopics, nil
178-
}

0 commit comments

Comments
 (0)