Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions minion/list_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,40 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/twmb/franz-go/pkg/kadm"
"go.uber.org/zap"
)

func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
func (s *Service) ListEndOffsetsCached(ctx context.Context) (kadm.ListedOffsets, error) {
return s.listOffsetsCached(ctx, "end")
}

func (s *Service) ListStartOffsetsCached(ctx context.Context) (kadm.ListedOffsets, error) {
return s.listOffsetsCached(ctx, "start")
}

func (s *Service) listOffsetsCached(ctx context.Context, offsetType string) (kadm.ListedOffsets, error) {
reqId := ctx.Value("requestId").(string)
key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId
key := fmt.Sprintf("partition-%s-offsets-%s", offsetType, reqId)

if cachedRes, exists := s.getCachedItem(key); exists {
return cachedRes.(kadm.ListedOffsets), nil
}

var listFunc func(context.Context) (kadm.ListedOffsets, error)
switch offsetType {
case "end":
listFunc = s.ListEndOffsets
case "start":
listFunc = s.ListStartOffsets
default:
return nil, fmt.Errorf("invalid offset type: %s", offsetType)
}

res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) {
offsets, err := s.ListOffsets(ctx, timestamp)
offsets, err := listFunc(ctx)
if err != nil {
return nil, err
}
Expand All @@ -36,21 +53,32 @@ func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.
return res.(kadm.ListedOffsets), nil
}

// ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
listedOffsets, err := s.admClient.ListEndOffsets(ctx)
// ListEndOffsets fetches the high water mark for all topic partitions.
func (s *Service) ListEndOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
return s.listOffsetsInternal(ctx, s.admClient.ListEndOffsets, "end")
}

// ListStartOffsets fetches the low water mark for all topic partitions.
func (s *Service) ListStartOffsets(ctx context.Context) (kadm.ListedOffsets, error) {
return s.listOffsetsInternal(ctx, s.admClient.ListStartOffsets, "start")
}

type listOffsetsFunc func(context.Context, ...string) (kadm.ListedOffsets, error)

func (s *Service) listOffsetsInternal(ctx context.Context, listFunc listOffsetsFunc, offsetType string) (kadm.ListedOffsets, error) {
listedOffsets, err := listFunc(ctx)
if err != nil {
var se *kadm.ShardErrors
if !errors.As(err, &se) {
return nil, fmt.Errorf("failed to list offsets: %w", err)
return nil, fmt.Errorf("failed to list %s offsets: %w", offsetType, err)
}

if se.AllFailed {
return nil, fmt.Errorf("failed to list offsets, all shard responses failed: %w", err)
return nil, fmt.Errorf("failed to list %s offsets, all shard responses failed: %w", offsetType, err)
}
s.logger.Info("failed to list offset from some shards", zap.Int("failed_shards", len(se.Errs)))
s.logger.Info(fmt.Sprintf("failed to list %s offset from some shards", offsetType), zap.Int("failed_shards", len(se.Errs)))
for _, shardErr := range se.Errs {
s.logger.Warn("shard error for listing end offsets",
s.logger.Warn(fmt.Sprintf("shard error for listing %s offsets", offsetType),
zap.Int32("broker_id", shardErr.Broker.NodeID),
zap.Error(shardErr.Err))
}
Expand All @@ -74,12 +102,12 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.Listed

// Print log line for each error type
for err, count := range errorCountByErrCode {
s.logger.Warn("failed to list some partitions watermarks",
s.logger.Warn(fmt.Sprintf("failed to list some partitions %s watermarks", offsetType),
zap.Error(err),
zap.Int("error_count", count))
}
if len(errorCountByTopic) > 0 {
s.logger.Warn("some topics had one or more partitions whose watermarks could not be fetched from Kafka",
s.logger.Warn(fmt.Sprintf("some topics had one or more partitions whose %s watermarks could not be fetched from Kafka", offsetType),
zap.Int("topics_with_errors", len(errorCountByTopic)))
}

Expand Down
4 changes: 2 additions & 2 deletions prometheus/collect_consumer_group_lags.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ func (e *Exporter) collectConsumerGroupLags(ctx context.Context, ch chan<- prome

// Low Watermarks (at the moment they are not needed at all, they could be used to calculate the lag on partitions
// that don't have any active offsets)
lowWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -2)
lowWaterMarks, err := e.minionSvc.ListStartOffsetsCached(ctx)
if err != nil {
e.logger.Error("failed to fetch low water marks", zap.Error(err))
return false
}
// High Watermarks
highWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -1)
highWaterMarks, err := e.minionSvc.ListEndOffsetsCached(ctx)
if err != nil {
e.logger.Error("failed to fetch low water marks", zap.Error(err))
return false
Expand Down
4 changes: 2 additions & 2 deletions prometheus/collect_topic_partition_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
isOk := true

// Low Watermarks
lowWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -2)
lowWaterMarks, err := e.minionSvc.ListStartOffsetsCached(ctx)
if err != nil {
e.logger.Error("failed to fetch low water marks", zap.Error(err))
return false
}
// High Watermarks
highWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -1)
highWaterMarks, err := e.minionSvc.ListEndOffsetsCached(ctx)
if err != nil {
e.logger.Error("failed to fetch low water marks", zap.Error(err))
return false
Expand Down