diff --git a/minion/list_offsets.go b/minion/list_offsets.go index ae8617f6..fe425810 100644 --- a/minion/list_offsets.go +++ b/minion/list_offsets.go @@ -4,23 +4,40 @@ import ( "context" "errors" "fmt" - "strconv" "time" "github.com/twmb/franz-go/pkg/kadm" "go.uber.org/zap" ) -func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) { +func (s *Service) ListEndOffsetsCached(ctx context.Context) (kadm.ListedOffsets, error) { + return s.listOffsetsCached(ctx, "end") +} + +func (s *Service) ListStartOffsetsCached(ctx context.Context) (kadm.ListedOffsets, error) { + return s.listOffsetsCached(ctx, "start") +} + +func (s *Service) listOffsetsCached(ctx context.Context, offsetType string) (kadm.ListedOffsets, error) { reqId := ctx.Value("requestId").(string) - key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId + key := fmt.Sprintf("partition-%s-offsets-%s", offsetType, reqId) if cachedRes, exists := s.getCachedItem(key); exists { return cachedRes.(kadm.ListedOffsets), nil } + var listFunc func(context.Context) (kadm.ListedOffsets, error) + switch offsetType { + case "end": + listFunc = s.ListEndOffsets + case "start": + listFunc = s.ListStartOffsets + default: + return nil, fmt.Errorf("invalid offset type: %s", offsetType) + } + res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) { - offsets, err := s.ListOffsets(ctx, timestamp) + offsets, err := listFunc(ctx) if err != nil { return nil, err } @@ -36,21 +53,32 @@ func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm. return res.(kadm.ListedOffsets), nil } -// ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions -func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) { - listedOffsets, err := s.admClient.ListEndOffsets(ctx) +// ListEndOffsets fetches the high water mark for all topic partitions. +func (s *Service) ListEndOffsets(ctx context.Context) (kadm.ListedOffsets, error) { + return s.listOffsetsInternal(ctx, s.admClient.ListEndOffsets, "end") +} + +// ListStartOffsets fetches the low water mark for all topic partitions. +func (s *Service) ListStartOffsets(ctx context.Context) (kadm.ListedOffsets, error) { + return s.listOffsetsInternal(ctx, s.admClient.ListStartOffsets, "start") +} + +type listOffsetsFunc func(context.Context, ...string) (kadm.ListedOffsets, error) + +func (s *Service) listOffsetsInternal(ctx context.Context, listFunc listOffsetsFunc, offsetType string) (kadm.ListedOffsets, error) { + listedOffsets, err := listFunc(ctx) if err != nil { var se *kadm.ShardErrors if !errors.As(err, &se) { - return nil, fmt.Errorf("failed to list offsets: %w", err) + return nil, fmt.Errorf("failed to list %s offsets: %w", offsetType, err) } if se.AllFailed { - return nil, fmt.Errorf("failed to list offsets, all shard responses failed: %w", err) + return nil, fmt.Errorf("failed to list %s offsets, all shard responses failed: %w", offsetType, err) } - s.logger.Info("failed to list offset from some shards", zap.Int("failed_shards", len(se.Errs))) + s.logger.Info(fmt.Sprintf("failed to list %s offset from some shards", offsetType), zap.Int("failed_shards", len(se.Errs))) for _, shardErr := range se.Errs { - s.logger.Warn("shard error for listing end offsets", + s.logger.Warn(fmt.Sprintf("shard error for listing %s offsets", offsetType), zap.Int32("broker_id", shardErr.Broker.NodeID), zap.Error(shardErr.Err)) } @@ -74,12 +102,12 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.Listed // Print log line for each error type for err, count := range errorCountByErrCode { - s.logger.Warn("failed to list some partitions watermarks", + s.logger.Warn(fmt.Sprintf("failed to list some partitions %s watermarks", offsetType), zap.Error(err), zap.Int("error_count", count)) } if len(errorCountByTopic) > 0 { - s.logger.Warn("some topics had one or more partitions whose watermarks could not be fetched from Kafka", + s.logger.Warn(fmt.Sprintf("some topics had one or more partitions whose %s watermarks could not be fetched from Kafka", offsetType), zap.Int("topics_with_errors", len(errorCountByTopic))) } diff --git a/prometheus/collect_consumer_group_lags.go b/prometheus/collect_consumer_group_lags.go index b641766e..6d1a5d2f 100644 --- a/prometheus/collect_consumer_group_lags.go +++ b/prometheus/collect_consumer_group_lags.go @@ -27,13 +27,13 @@ func (e *Exporter) collectConsumerGroupLags(ctx context.Context, ch chan<- prome // Low Watermarks (at the moment they are not needed at all, they could be used to calculate the lag on partitions // that don't have any active offsets) - lowWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -2) + lowWaterMarks, err := e.minionSvc.ListStartOffsetsCached(ctx) if err != nil { e.logger.Error("failed to fetch low water marks", zap.Error(err)) return false } // High Watermarks - highWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -1) + highWaterMarks, err := e.minionSvc.ListEndOffsetsCached(ctx) if err != nil { e.logger.Error("failed to fetch low water marks", zap.Error(err)) return false diff --git a/prometheus/collect_topic_partition_offsets.go b/prometheus/collect_topic_partition_offsets.go index 4ca66956..784cfe6b 100644 --- a/prometheus/collect_topic_partition_offsets.go +++ b/prometheus/collect_topic_partition_offsets.go @@ -18,13 +18,13 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p isOk := true // Low Watermarks - lowWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -2) + lowWaterMarks, err := e.minionSvc.ListStartOffsetsCached(ctx) if err != nil { e.logger.Error("failed to fetch low water marks", zap.Error(err)) return false } // High Watermarks - highWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -1) + highWaterMarks, err := e.minionSvc.ListEndOffsetsCached(ctx) if err != nil { e.logger.Error("failed to fetch low water marks", zap.Error(err)) return false