Skip to content

Commit 746c1e4

Browse files
committed
fix: low watermark with admin api
1 parent 4b0fb1f commit 746c1e4

File tree

3 files changed

+45
-17
lines changed

3 files changed

+45
-17
lines changed

minion/list_offsets.go

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,40 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"strconv"
87
"time"
98

109
"github.com/twmb/franz-go/pkg/kadm"
1110
"go.uber.org/zap"
1211
)
1312

14-
func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
13+
func (s *Service) ListEndOffsetsCached(ctx context.Context) (kadm.ListedOffsets, error) {
14+
return s.listOffsetsCached(ctx, "end")
15+
}
16+
17+
func (s *Service) ListStartOffsetsCached(ctx context.Context) (kadm.ListedOffsets, error) {
18+
return s.listOffsetsCached(ctx, "start")
19+
}
20+
21+
func (s *Service) listOffsetsCached(ctx context.Context, offsetType string) (kadm.ListedOffsets, error) {
1522
reqId := ctx.Value("requestId").(string)
16-
key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId
23+
key := fmt.Sprintf("partition-%s-offsets-%s", offsetType, reqId)
1724

1825
if cachedRes, exists := s.getCachedItem(key); exists {
1926
return cachedRes.(kadm.ListedOffsets), nil
2027
}
2128

29+
var listFunc func(context.Context) (kadm.ListedOffsets, error)
30+
switch offsetType {
31+
case "end":
32+
listFunc = s.ListEndOffsets
33+
case "start":
34+
listFunc = s.ListStartOffsets
35+
default:
36+
return nil, fmt.Errorf("invalid offset type: %s", offsetType)
37+
}
38+
2239
res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) {
23-
offsets, err := s.ListOffsets(ctx, timestamp)
40+
offsets, err := listFunc(ctx)
2441
if err != nil {
2542
return nil, err
2643
}
@@ -36,21 +53,32 @@ func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.
3653
return res.(kadm.ListedOffsets), nil
3754
}
3855

39-
// ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions
40-
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
41-
listedOffsets, err := s.admClient.ListEndOffsets(ctx)
56+
// ListEndOffsets fetches the high water mark for all topic partitions.
57+
func (s *Service) ListEndOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
58+
return s.listOffsetsInternal(ctx, s.admClient.ListEndOffsets, "end")
59+
}
60+
61+
// ListStartOffsets fetches the low water mark for all topic partitions.
62+
func (s *Service) ListStartOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
63+
return s.listOffsetsInternal(ctx, s.admClient.ListStartOffsets, "start")
64+
}
65+
66+
type listOffsetsFunc func(context.Context, ...string) (kadm.ListedOffsets, error)
67+
68+
func (s *Service) listOffsetsInternal(ctx context.Context, listFunc listOffsetsFunc, offsetType string) (kadm.ListedOffsets, error) {
69+
listedOffsets, err := listFunc(ctx)
4270
if err != nil {
4371
var se *kadm.ShardErrors
4472
if !errors.As(err, &se) {
45-
return nil, fmt.Errorf("failed to list offsets: %w", err)
73+
return nil, fmt.Errorf("failed to list %s offsets: %w", offsetType, err)
4674
}
4775

4876
if se.AllFailed {
49-
return nil, fmt.Errorf("failed to list offsets, all shard responses failed: %w", err)
77+
return nil, fmt.Errorf("failed to list %s offsets, all shard responses failed: %w", offsetType, err)
5078
}
51-
s.logger.Info("failed to list offset from some shards", zap.Int("failed_shards", len(se.Errs)))
79+
s.logger.Info(fmt.Sprintf("failed to list %s offset from some shards", offsetType), zap.Int("failed_shards", len(se.Errs)))
5280
for _, shardErr := range se.Errs {
53-
s.logger.Warn("shard error for listing end offsets",
81+
s.logger.Warn(fmt.Sprintf("shard error for listing %s offsets", offsetType),
5482
zap.Int32("broker_id", shardErr.Broker.NodeID),
5583
zap.Error(shardErr.Err))
5684
}
@@ -74,12 +102,12 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.Listed
74102

75103
// Print log line for each error type
76104
for err, count := range errorCountByErrCode {
77-
s.logger.Warn("failed to list some partitions watermarks",
105+
s.logger.Warn(fmt.Sprintf("failed to list some partitions %s watermarks", offsetType),
78106
zap.Error(err),
79107
zap.Int("error_count", count))
80108
}
81109
if len(errorCountByTopic) > 0 {
82-
s.logger.Warn("some topics had one or more partitions whose watermarks could not be fetched from Kafka",
110+
s.logger.Warn(fmt.Sprintf("some topics had one or more partitions whose %s watermarks could not be fetched from Kafka", offsetType),
83111
zap.Int("topics_with_errors", len(errorCountByTopic)))
84112
}
85113

prometheus/collect_consumer_group_lags.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ func (e *Exporter) collectConsumerGroupLags(ctx context.Context, ch chan<- prome
2727

2828
// Low Watermarks (at the moment they are not needed at all, they could be used to calculate the lag on partitions
2929
// that don't have any active offsets)
30-
lowWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -2)
30+
lowWaterMarks, err := e.minionSvc.ListStartOffsetsCached(ctx)
3131
if err != nil {
3232
e.logger.Error("failed to fetch low water marks", zap.Error(err))
3333
return false
3434
}
3535
// High Watermarks
36-
highWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -1)
36+
highWaterMarks, err := e.minionSvc.ListEndOffsetsCached(ctx)
3737
if err != nil {
3838
e.logger.Error("failed to fetch low water marks", zap.Error(err))
3939
return false

prometheus/collect_topic_partition_offsets.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
1818
isOk := true
1919

2020
// Low Watermarks
21-
lowWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -2)
21+
lowWaterMarks, err := e.minionSvc.ListStartOffsetsCached(ctx)
2222
if err != nil {
2323
e.logger.Error("failed to fetch low water marks", zap.Error(err))
2424
return false
2525
}
2626
// High Watermarks
27-
highWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -1)
27+
highWaterMarks, err := e.minionSvc.ListEndOffsetsCached(ctx)
2828
if err != nil {
2929
e.logger.Error("failed to fetch low water marks", zap.Error(err))
3030
return false

0 commit comments

Comments
 (0)