Skip to content

Commit ed86638

Browse files
author
amuraru
committed
fix: low watermark with admin api
Signed-off-by: matteo.gazzetta <[email protected]>
1 parent cf6ad4f commit ed86638

File tree

3 files changed

+61
-18
lines changed

3 files changed

+61
-18
lines changed

minion/list_offsets.go

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,49 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"strconv"
87
"time"
98

109
"github.com/twmb/franz-go/pkg/kadm"
1110
"go.uber.org/zap"
1211
)
1312

14-
func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
13+
func (s *Service) ListEndOffsetsCached(ctx context.Context) (kadm.ListedOffsets, error) {
14+
return s.listOffsetsCached(ctx, "end")
15+
}
16+
17+
func (s *Service) ListStartOffsetsCached(ctx context.Context) (kadm.ListedOffsets, error) {
18+
return s.listOffsetsCached(ctx, "start")
19+
}
20+
21+
func (s *Service) ListMaxTimestampOffsetsCached(ctx context.Context) (kadm.ListedOffsets, error) {
22+
return s.listOffsetsCached(ctx, "maxTimestamp")
23+
}
24+
25+
func (s *Service) listOffsetsCached(ctx context.Context, offsetType string) (kadm.ListedOffsets, error) {
1526
reqId, ok := ctx.Value(RequestIDKey).(string)
1627
if !ok || reqId == "" {
1728
reqId = "default"
1829
}
19-
key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId
30+
key := fmt.Sprintf("partition-%s-offsets-%s", offsetType, reqId)
2031

2132
if cachedRes, exists := s.getCachedItem(key); exists {
2233
return cachedRes.(kadm.ListedOffsets), nil
2334
}
2435

36+
var listFunc func(context.Context) (kadm.ListedOffsets, error)
37+
switch offsetType {
38+
case "end":
39+
listFunc = s.ListEndOffsets
40+
case "start":
41+
listFunc = s.ListStartOffsets
42+
case "maxTimestamp":
43+
listFunc = s.ListMaxTimestampOffsets
44+
default:
45+
return nil, fmt.Errorf("invalid offset type: %s", offsetType)
46+
}
47+
2548
res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) {
26-
offsets, err := s.ListOffsets(ctx, timestamp)
49+
offsets, err := listFunc(ctx)
2750
if err != nil {
2851
return nil, err
2952
}
@@ -39,21 +62,41 @@ func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.
3962
return res.(kadm.ListedOffsets), nil
4063
}
4164

42-
// ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions
43-
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
44-
listedOffsets, err := s.admClient.ListEndOffsets(ctx)
65+
// ListEndOffsets fetches the high water mark for all topic partitions.
66+
func (s *Service) ListEndOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
67+
return s.listOffsetsInternal(ctx, s.admClient.ListEndOffsets, "end")
68+
}
69+
70+
// ListStartOffsets fetches the low water mark for all topic partitions.
71+
func (s *Service) ListStartOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
72+
return s.listOffsetsInternal(ctx, s.admClient.ListStartOffsets, "start")
73+
}
74+
75+
// ListMaxTimestampOffsets fetches the offsets for the maximum timestamp for all topic partitions.
76+
// This requires Kafka 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541)
77+
// Uses timestamp -3 which returns the offset for the record with the largest timestamp.
78+
func (s *Service) ListMaxTimestampOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
79+
return s.listOffsetsInternal(ctx, func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error) {
80+
return s.admClient.ListOffsetsAfterMilli(ctx, -3, topics...)
81+
}, "maxTimestamp")
82+
}
83+
84+
type listOffsetsFunc func(context.Context, ...string) (kadm.ListedOffsets, error)
85+
86+
func (s *Service) listOffsetsInternal(ctx context.Context, listFunc listOffsetsFunc, offsetType string) (kadm.ListedOffsets, error) {
87+
listedOffsets, err := listFunc(ctx)
4588
if err != nil {
4689
var se *kadm.ShardErrors
4790
if !errors.As(err, &se) {
48-
return nil, fmt.Errorf("failed to list offsets: %w", err)
91+
return nil, fmt.Errorf("failed to list %s offsets: %w", offsetType, err)
4992
}
5093

5194
if se.AllFailed {
52-
return nil, fmt.Errorf("failed to list offsets, all shard responses failed: %w", err)
95+
return nil, fmt.Errorf("failed to list %s offsets, all shard responses failed: %w", offsetType, err)
5396
}
54-
s.logger.Info("failed to list offset from some shards", zap.Int("failed_shards", len(se.Errs)))
97+
s.logger.Info(fmt.Sprintf("failed to list %s offset from some shards", offsetType), zap.Int("failed_shards", len(se.Errs)))
5598
for _, shardErr := range se.Errs {
56-
s.logger.Warn("shard error for listing end offsets",
99+
s.logger.Warn(fmt.Sprintf("shard error for listing %s offsets", offsetType),
57100
zap.Int32("broker_id", shardErr.Broker.NodeID),
58101
zap.Error(shardErr.Err))
59102
}
@@ -77,12 +120,12 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.Listed
77120

78121
// Print log line for each error type
79122
for err, count := range errorCountByErrCode {
80-
s.logger.Warn("failed to list some partitions watermarks",
123+
s.logger.Warn(fmt.Sprintf("failed to list some partitions %s watermarks", offsetType),
81124
zap.Error(err),
82125
zap.Int("error_count", count))
83126
}
84127
if len(errorCountByTopic) > 0 {
85-
s.logger.Warn("some topics had one or more partitions whose watermarks could not be fetched from Kafka",
128+
s.logger.Warn(fmt.Sprintf("some topics had one or more partitions whose %s watermarks could not be fetched from Kafka", offsetType),
86129
zap.Int("topics_with_errors", len(errorCountByTopic)))
87130
}
88131

prometheus/collect_consumer_group_lags.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ func (e *Exporter) collectConsumerGroupLags(ctx context.Context, ch chan<- prome
2626

2727
// Low Watermarks (at the moment they are not needed at all, they could be used to calculate the lag on partitions
2828
// that don't have any active offsets)
29-
lowWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -2)
29+
lowWaterMarks, err := e.minionSvc.ListStartOffsetsCached(ctx)
3030
if err != nil {
3131
e.logger.Error("failed to fetch low water marks", zap.Error(err))
3232
return false
3333
}
3434
// High Watermarks
35-
highWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -1)
35+
highWaterMarks, err := e.minionSvc.ListEndOffsetsCached(ctx)
3636
if err != nil {
3737
e.logger.Error("failed to fetch low water marks", zap.Error(err))
3838
return false

prometheus/collect_topic_partition_offsets.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
1717
isOk := true
1818

1919
// Low Watermarks
20-
lowWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -2)
20+
lowWaterMarks, err := e.minionSvc.ListStartOffsetsCached(ctx)
2121
if err != nil {
2222
e.logger.Error("failed to fetch low water marks", zap.Error(err))
2323
return false
2424
}
2525
// High Watermarks
26-
highWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -1)
26+
highWaterMarks, err := e.minionSvc.ListEndOffsetsCached(ctx)
2727
if err != nil {
2828
e.logger.Error("failed to fetch low water marks", zap.Error(err))
2929
return false
@@ -32,7 +32,7 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
3232
// Highest Timestamp Offsets
3333
// NB: this requires Kafka Brokers 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541)
3434
// In older versions this is returning the timestamp of the low watermarks (earliest offset)
35-
maxTimestampOffsets, err := e.minionSvc.ListOffsetsCached(ctx, -3)
35+
maxTimestampOffsets, err := e.minionSvc.ListMaxTimestampOffsetsCached(ctx)
3636
if err != nil {
3737
e.logger.Error("failed to fetch offsets for max timestamp", zap.Error(err))
3838
return false

0 commit comments

Comments
 (0)