Skip to content

Commit 34a5aa6

Browse files
authored
backend: use kadm client to list partition offsets (#277)
This should also fix issue #269
1 parent f47c58b commit 34a5aa6

6 files changed

+89
-111
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ require (
4343
github.com/prometheus/client_model v0.6.1 // indirect
4444
github.com/prometheus/common v0.60.1 // indirect
4545
github.com/prometheus/procfs v0.15.1 // indirect
46+
github.com/twmb/franz-go/pkg/kadm v1.14.0 // indirect
4647
go.uber.org/multierr v1.11.0 // indirect
4748
golang.org/x/crypto v0.28.0 // indirect
4849
golang.org/x/net v0.30.0 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
320320
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
321321
github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
322322
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
323+
github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs=
324+
github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4=
323325
github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
324326
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
325327
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=

minion/list_offsets.go

+29-44
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@ package minion
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
6-
"github.com/twmb/franz-go/pkg/kerr"
7-
"go.uber.org/zap"
87
"strconv"
98
"time"
109

11-
"github.com/twmb/franz-go/pkg/kmsg"
10+
"github.com/twmb/franz-go/pkg/kadm"
11+
"go.uber.org/zap"
1212
)
1313

14-
func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) {
14+
func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
1515
reqId := ctx.Value("requestId").(string)
1616
key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId
1717

1818
if cachedRes, exists := s.getCachedItem(key); exists {
19-
return cachedRes.(*kmsg.ListOffsetsResponse), nil
19+
return cachedRes.(kadm.ListedOffsets), nil
2020
}
2121

2222
res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) {
@@ -33,70 +33,55 @@ func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg
3333
return nil, err
3434
}
3535

36-
return res.(*kmsg.ListOffsetsResponse), nil
36+
return res.(kadm.ListedOffsets), nil
3737
}
3838

3939
// ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions
40-
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) {
41-
metadata, err := s.GetMetadataCached(ctx)
40+
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
41+
listedOffsets, err := s.admClient.ListEndOffsets(ctx)
4242
if err != nil {
43-
return nil, fmt.Errorf("failed to list consumer groups: %w", err)
44-
}
45-
46-
topicReqs := make([]kmsg.ListOffsetsRequestTopic, len(metadata.Topics))
47-
for i, topic := range metadata.Topics {
48-
req := kmsg.NewListOffsetsRequestTopic()
49-
req.Topic = *topic.Topic
50-
51-
partitionReqs := make([]kmsg.ListOffsetsRequestTopicPartition, len(topic.Partitions))
52-
for j, partition := range topic.Partitions {
53-
partitionReqs[j] = kmsg.NewListOffsetsRequestTopicPartition()
54-
partitionReqs[j].Partition = partition.Partition
55-
partitionReqs[j].Timestamp = timestamp
43+
var se *kadm.ShardErrors
44+
if !errors.As(err, &se) {
45+
return nil, fmt.Errorf("failed to list offsets: %w", err)
5646
}
57-
req.Partitions = partitionReqs
58-
59-
topicReqs[i] = req
60-
}
6147

62-
req := kmsg.NewListOffsetsRequest()
63-
req.Topics = topicReqs
64-
65-
res, err := req.RequestWith(ctx, s.client)
66-
if err != nil {
67-
return res, err
48+
if se.AllFailed {
49+
return nil, fmt.Errorf("failed to list offsets, all shard responses failed: %w", err)
50+
}
51+
s.logger.Info("failed to list offset from some shards", zap.Int("failed_shards", len(se.Errs)))
52+
for _, shardErr := range se.Errs {
53+
s.logger.Warn("shard error for listing end offsets",
54+
zap.Int32("broker_id", shardErr.Broker.NodeID),
55+
zap.Error(shardErr.Err))
56+
}
6857
}
6958

7059
// Log inner errors before returning them. We do that inside of this function to avoid duplicate logging as the response
7160
// are cached for each scrape anyways.
7261
//
7362
// Create two metrics to aggregate error logs in few messages. Logging one message per occured partition error
7463
// is too much. Typical errors are LEADER_NOT_AVAILABLE etc.
75-
errorCountByErrCode := make(map[int16]int)
64+
errorCountByErrCode := make(map[error]int)
7665
errorCountByTopic := make(map[string]int)
7766

7867
// Iterate on all partitions
79-
for _, topic := range res.Topics {
80-
for _, partition := range topic.Partitions {
81-
err := kerr.TypedErrorForCode(partition.ErrorCode)
82-
if err != nil {
83-
errorCountByErrCode[partition.ErrorCode]++
84-
errorCountByTopic[topic.Topic]++
85-
}
68+
listedOffsets.Each(func(offset kadm.ListedOffset) {
69+
if offset.Err != nil {
70+
errorCountByTopic[offset.Topic]++
71+
errorCountByErrCode[offset.Err]++
8672
}
87-
}
73+
})
8874

8975
// Print log line for each error type
90-
for errCode, count := range errorCountByErrCode {
91-
typedErr := kerr.TypedErrorForCode(errCode)
76+
for err, count := range errorCountByErrCode {
9277
s.logger.Warn("failed to list some partitions watermarks",
93-
zap.Error(typedErr),
78+
zap.Error(err),
9479
zap.Int("error_count", count))
9580
}
9681
if len(errorCountByTopic) > 0 {
9782
s.logger.Warn("some topics had one or more partitions whose watermarks could not be fetched from Kafka",
9883
zap.Int("topics_with_errors", len(errorCountByTopic)))
9984
}
10085

101-
return res, nil
86+
return listedOffsets, nil
10287
}

minion/service.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync"
1111
"time"
1212

13+
"github.com/twmb/franz-go/pkg/kadm"
1314
"github.com/twmb/franz-go/pkg/kgo"
1415
"github.com/twmb/franz-go/pkg/kmsg"
1516
"github.com/twmb/franz-go/pkg/kversion"
@@ -33,8 +34,9 @@ type Service struct {
3334
AllowedTopicsExpr []*regexp.Regexp
3435
IgnoredTopicsExpr []*regexp.Regexp
3536

36-
client *kgo.Client
37-
storage *Storage
37+
client *kgo.Client
38+
admClient *kadm.Client
39+
storage *Storage
3840
}
3941

4042
func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricsNamespace string, ctx context.Context) (*Service, error) {
@@ -82,7 +84,9 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics
8284
AllowedTopicsExpr: allowedTopicsExpr,
8385
IgnoredTopicsExpr: ignoredTopicsExpr,
8486

85-
client: client,
87+
client: client,
88+
admClient: kadm.NewClient(client),
89+
8690
storage: storage,
8791
}
8892

prometheus/collect_consumer_group_lags.go

+31-43
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"strconv"
77

88
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/twmb/franz-go/pkg/kadm"
910
"github.com/twmb/franz-go/pkg/kerr"
10-
"github.com/twmb/franz-go/pkg/kmsg"
1111
"go.uber.org/zap"
1212

1313
"github.com/cloudhut/kminion/v2/minion"
@@ -211,61 +211,49 @@ func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan
211211
return isOk
212212
}
213213

214-
func (e *Exporter) waterMarksByTopic(lowMarks *kmsg.ListOffsetsResponse, highMarks *kmsg.ListOffsetsResponse) map[string]map[int32]waterMark {
214+
func (e *Exporter) waterMarksByTopic(lowMarks kadm.ListedOffsets, highMarks kadm.ListedOffsets) map[string]map[int32]waterMark {
215215
type partitionID = int32
216216
type topicName = string
217217
waterMarks := make(map[topicName]map[partitionID]waterMark)
218218

219-
for _, topic := range lowMarks.Topics {
220-
_, exists := waterMarks[topic.Topic]
219+
for topic, lowMarksByPartitionID := range lowMarks {
220+
_, exists := waterMarks[topic]
221221
if !exists {
222-
waterMarks[topic.Topic] = make(map[partitionID]waterMark)
222+
waterMarks[topic] = make(map[partitionID]waterMark)
223223
}
224-
for _, partition := range topic.Partitions {
225-
err := kerr.ErrorForCode(partition.ErrorCode)
226-
if err != nil {
224+
225+
for _, lowOffset := range lowMarksByPartitionID {
226+
if lowOffset.Err != nil {
227227
e.logger.Debug("failed to get partition low water mark, inner kafka error",
228-
zap.String("topic_name", topic.Topic),
229-
zap.Int32("partition_id", partition.Partition),
230-
zap.Error(err))
228+
zap.String("topic_name", lowOffset.Topic),
229+
zap.Int32("partition_id", lowOffset.Partition),
230+
zap.Error(lowOffset.Err))
231231
continue
232232
}
233-
waterMarks[topic.Topic][partition.Partition] = waterMark{
234-
TopicName: topic.Topic,
235-
PartitionID: partition.Partition,
236-
LowWaterMark: partition.Offset,
237-
HighWaterMark: -1,
238-
}
239-
}
240-
}
241233

242-
for _, topic := range highMarks.Topics {
243-
mark, exists := waterMarks[topic.Topic]
244-
if !exists {
245-
e.logger.Error("got high water marks for a topic but no low watermarks", zap.String("topic_name", topic.Topic))
246-
delete(waterMarks, topic.Topic)
247-
continue
248-
}
249-
for _, partition := range topic.Partitions {
250-
err := kerr.ErrorForCode(partition.ErrorCode)
251-
if err != nil {
252-
e.logger.Debug("failed to get partition high water mark, inner kafka error",
253-
zap.String("topic_name", topic.Topic),
254-
zap.Int32("partition_id", partition.Partition),
255-
zap.Error(err))
256-
continue
257-
}
258-
partitionMark, exists := mark[partition.Partition]
234+
higOffset, exists := highMarks.Lookup(lowOffset.Topic, lowOffset.Partition)
259235
if !exists {
260-
e.logger.Error("got high water marks for a topic's partition but no low watermarks",
261-
zap.String("topic_name", topic.Topic),
262-
zap.Int32("partition_id", partition.Partition),
263-
zap.Int64("offset", partition.Offset))
264-
delete(waterMarks, topic.Topic)
236+
e.logger.Error("got low water marks for a topic's partition but no high watermarks",
237+
zap.String("topic_name", lowOffset.Topic),
238+
zap.Int32("partition_id", lowOffset.Partition),
239+
zap.Int64("offset", lowOffset.Offset))
240+
delete(waterMarks, lowOffset.Topic)
265241
break // Topic watermarks are invalid -> delete & skip this topic
266242
}
267-
partitionMark.HighWaterMark = partition.Offset
268-
waterMarks[topic.Topic][partition.Partition] = partitionMark
243+
if higOffset.Err != nil {
244+
e.logger.Debug("failed to get partition low water mark, inner kafka error",
245+
zap.String("topic_name", lowOffset.Topic),
246+
zap.Int32("partition_id", lowOffset.Partition),
247+
zap.Error(lowOffset.Err))
248+
continue
249+
}
250+
251+
waterMarks[lowOffset.Topic][lowOffset.Partition] = waterMark{
252+
TopicName: lowOffset.Topic,
253+
PartitionID: lowOffset.Partition,
254+
LowWaterMark: lowOffset.Offset,
255+
HighWaterMark: higOffset.Offset,
256+
}
269257
}
270258
}
271259

prometheus/collect_topic_partition_offsets.go

+19-21
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"strconv"
66

77
"github.com/prometheus/client_golang/prometheus"
8-
"github.com/twmb/franz-go/pkg/kerr"
98
"go.uber.org/zap"
109

1110
"github.com/cloudhut/kminion/v2/minion"
@@ -32,31 +31,31 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
3231
}
3332

3433
// Process Low Watermarks
35-
for _, topic := range lowWaterMarks.Topics {
36-
if !e.minionSvc.IsTopicAllowed(topic.Topic) {
34+
35+
for topicName, partitions := range lowWaterMarks {
36+
if !e.minionSvc.IsTopicAllowed(topicName) {
3737
continue
3838
}
3939

4040
waterMarkSum := int64(0)
4141
hasErrors := false
42-
for _, partition := range topic.Partitions {
43-
err := kerr.ErrorForCode(partition.ErrorCode)
44-
if err != nil {
42+
for _, offset := range partitions {
43+
if offset.Err != nil {
4544
hasErrors = true
4645
isOk = false
4746
continue
4847
}
49-
waterMarkSum += partition.Offset
48+
waterMarkSum += offset.Offset
5049
// Let's end here if partition metrics shall not be exposed
5150
if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic {
5251
continue
5352
}
5453
ch <- prometheus.MustNewConstMetric(
5554
e.partitionLowWaterMark,
5655
prometheus.GaugeValue,
57-
float64(partition.Offset),
58-
topic.Topic,
59-
strconv.Itoa(int(partition.Partition)),
56+
float64(offset.Offset),
57+
topicName,
58+
strconv.Itoa(int(offset.Partition)),
6059
)
6160
}
6261
// We only want to report the sum of all partition marks if we receive watermarks from all partition
@@ -65,35 +64,34 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
6564
e.topicLowWaterMarkSum,
6665
prometheus.GaugeValue,
6766
float64(waterMarkSum),
68-
topic.Topic,
67+
topicName,
6968
)
7069
}
7170
}
7271

73-
for _, topic := range highWaterMarks.Topics {
74-
if !e.minionSvc.IsTopicAllowed(topic.Topic) {
72+
for topicName, partitions := range highWaterMarks {
73+
if !e.minionSvc.IsTopicAllowed(topicName) {
7574
continue
7675
}
7776
waterMarkSum := int64(0)
7877
hasErrors := false
79-
for _, partition := range topic.Partitions {
80-
err := kerr.ErrorForCode(partition.ErrorCode)
81-
if err != nil {
78+
for _, offset := range partitions {
79+
if offset.Err != nil {
8280
hasErrors = true
8381
isOk = false
8482
continue
8583
}
86-
waterMarkSum += partition.Offset
84+
waterMarkSum += offset.Offset
8785
// Let's end here if partition metrics shall not be exposed
8886
if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic {
8987
continue
9088
}
9189
ch <- prometheus.MustNewConstMetric(
9290
e.partitionHighWaterMark,
9391
prometheus.GaugeValue,
94-
float64(partition.Offset),
95-
topic.Topic,
96-
strconv.Itoa(int(partition.Partition)),
92+
float64(offset.Offset),
93+
topicName,
94+
strconv.Itoa(int(offset.Partition)),
9795
)
9896
}
9997
// We only want to report the sum of all partition marks if we receive watermarks from all partitions
@@ -102,7 +100,7 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
102100
e.topicHighWaterMarkSum,
103101
prometheus.GaugeValue,
104102
float64(waterMarkSum),
105-
topic.Topic,
103+
topicName,
106104
)
107105
}
108106
}

0 commit comments

Comments
 (0)