Skip to content

Commit 86ffbe1

Browse files
committed
Clean up
1 parent 2d6593a commit 86ffbe1

File tree

13 files changed

+51
-46
lines changed

13 files changed

+51
-46
lines changed

pkg/mimirpb/split.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func SplitWriteRequestByMaxMarshalSizeRW2(req *WriteRequest, reqSize, maxSize in
9494
// If the next partial request doesn't have any timeseries yet, we add the series anyway, in order to avoid an infinite loop
9595
// if a single timeseries is bigger than the limit.
9696
if nextReqSize+seriesSize+symbolsSize > maxSize && len(nextReq.TimeseriesRW2) > 0 {
97-
// Flush the next partial request.
97+
// Finalize the next partial request.
9898
nextReq.SymbolsRW2 = nextReqSymbols.Symbols()
9999
partialReqs = append(partialReqs, nextReq)
100100

@@ -118,7 +118,7 @@ func SplitWriteRequestByMaxMarshalSizeRW2(req *WriteRequest, reqSize, maxSize in
118118
}
119119

120120
if len(nextReq.TimeseriesRW2) > 0 {
121-
// Flush the last partial request.
121+
// Finalize the next partial request.
122122
nextReq.SymbolsRW2 = nextReqSymbols.Symbols()
123123
partialReqs = append(partialReqs, nextReq)
124124
}
@@ -168,7 +168,7 @@ func splitTimeseriesByMaxMarshalSize(req *WriteRequest, reqSize, maxSize int) []
168168
// If the next partial request doesn't have any timeseries yet, we add the series anyway, in order to avoid an infinite loop
169169
// if a single timeseries is bigger than the limit.
170170
if nextReqSize+seriesSize > maxSize && nextReqTimeseriesLength > 0 {
171-
// Flush the next partial request.
171+
// Finalize the next partial request.
172172
nextReq.Timeseries = req.Timeseries[nextReqTimeseriesStart : nextReqTimeseriesStart+nextReqTimeseriesLength]
173173
partialReqs = append(partialReqs, nextReq)
174174

@@ -184,7 +184,7 @@ func splitTimeseriesByMaxMarshalSize(req *WriteRequest, reqSize, maxSize int) []
184184
}
185185

186186
if nextReqTimeseriesLength > 0 {
187-
// Flush the last partial request.
187+
// Finalize the last partial request.
188188
nextReq.Timeseries = req.Timeseries[nextReqTimeseriesStart : nextReqTimeseriesStart+nextReqTimeseriesLength]
189189
partialReqs = append(partialReqs, nextReq)
190190
}
@@ -234,7 +234,7 @@ func splitMetadataByMaxMarshalSize(req *WriteRequest, reqSize, maxSize int) []*W
234234
// If the next partial request doesn't have any metadata yet, we add the metadata anyway, in order to avoid an infinite loop
235235
// if a single metadata is bigger than the limit.
236236
if nextReqSize+metadataSize > maxSize && nextReqMetadataLength > 0 {
237-
// Flush the next partial request.
237+
// Finalize the next partial request.
238238
nextReq.Metadata = req.Metadata[nextReqMetadataStart : nextReqMetadataStart+nextReqMetadataLength]
239239
partialReqs = append(partialReqs, nextReq)
240240

pkg/streamingpromql/evaluator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func (e *Evaluator) Evaluate(ctx context.Context, observer EvaluationObserver) (
166166
}
167167

168168
if e.engine.pedantic {
169-
// Flush the root operator a second time to ensure all operators behave correctly if Finalize is called multiple times.
169+
// Finalize the root operator a second time to ensure all operators behave correctly if Finalize is called multiple times.
170170
if err := e.root.Finalize(ctx); err != nil {
171171
return fmt.Errorf("pedantic mode: failed to finalize operator a second time after successfully finalizing the first time: %w", err)
172172
}

pkg/streamingpromql/operators/functions/split_functions.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
package functions
44

55
import (
6-
"github.com/grafana/mimir/pkg/streamingpromql/optimize/plan/querysplitting/cache"
76
"io"
87
"math"
98

9+
"github.com/grafana/mimir/pkg/streamingpromql/optimize/plan/querysplitting/cache"
10+
1011
"github.com/pkg/errors"
1112
"github.com/prometheus/prometheus/model/histogram"
1213
"github.com/prometheus/prometheus/util/annotations"
@@ -17,6 +18,7 @@ import (
1718
"github.com/grafana/mimir/pkg/util/limiter"
1819
)
1920

21+
// TODO: investigate Kahan summation more, are we using the compensation correctly?
2022
var SplitSumOverTime = NewSplitOperatorFactory[SumOverTimeIntermediate](sumOverTimeGenerate, sumOverTimeCombine, SumOverTimeCodec, SumOverTime, FUNCTION_SUM_OVER_TIME)
2123

2224
func sumOverTimeGenerate(
@@ -76,7 +78,7 @@ func sumOverTimeCombine(
7678
return 0, false, nil, nil
7779
}
7880

79-
return sumF, haveFloats, sumH, nil
81+
return sumF + c, haveFloats, sumH, nil
8082
}
8183

8284
var SumOverTimeCodec = newProtoListCodec(

pkg/streamingpromql/operators/functions/split_operator.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import (
2323
"github.com/grafana/mimir/pkg/util/limiter"
2424
)
2525

26-
// SplittingFunctionOverRangeVector performs range vector function calculation with intermediate result caching.
26+
// FunctionOverRangeVectorSplit performs range vector function calculation with intermediate result caching.
2727
// T is the type of intermediate result produced by the function's generate step.
28-
type SplittingFunctionOverRangeVector[T any] struct {
28+
type FunctionOverRangeVectorSplit[T any] struct {
2929
MemoryConsumptionTracker *limiter.MemoryConsumptionTracker
3030
FuncId Function
3131
FuncDef FunctionOverRangeVectorDefinition
@@ -55,7 +55,7 @@ type SplittingFunctionOverRangeVector[T any] struct {
5555
currentSeriesIdx int
5656
}
5757

58-
var _ types.InstantVectorOperator = (*SplittingFunctionOverRangeVector[any])(nil)
58+
var _ types.InstantVectorOperator = (*FunctionOverRangeVectorSplit[any])(nil)
5959

6060
func NewSplittingFunctionOverRangeVector[T any](
6161
innerNode planning.Node,
@@ -73,12 +73,12 @@ func NewSplittingFunctionOverRangeVector[T any](
7373
annotations *annotations.Annotations,
7474
memoryConsumptionTracker *limiter.MemoryConsumptionTracker,
7575
enableDelayedNameRemoval bool,
76-
) (*SplittingFunctionOverRangeVector[T], error) {
76+
) (*FunctionOverRangeVectorSplit[T], error) {
7777
if !timeRange.IsInstant {
78-
return nil, fmt.Errorf("SplittingFunctionOverRangeVector only supports instant queries")
78+
return nil, fmt.Errorf("FunctionOverRangeVectorSplit only supports instant queries")
7979
}
8080

81-
o := &SplittingFunctionOverRangeVector[T]{
81+
o := &FunctionOverRangeVectorSplit[T]{
8282
innerNode: innerNode,
8383
materializer: materializer,
8484
queryTimeRange: timeRange,
@@ -110,11 +110,11 @@ func NewSplittingFunctionOverRangeVector[T any](
110110
return o, nil
111111
}
112112

113-
func (m *SplittingFunctionOverRangeVector[T]) ExpressionPosition() posrange.PositionRange {
113+
func (m *FunctionOverRangeVectorSplit[T]) ExpressionPosition() posrange.PositionRange {
114114
return m.expressionPosition
115115
}
116116

117-
func (m *SplittingFunctionOverRangeVector[T]) Prepare(ctx context.Context, params *types.PrepareParams) error {
117+
func (m *FunctionOverRangeVectorSplit[T]) Prepare(ctx context.Context, params *types.PrepareParams) error {
118118
var err error
119119
m.splits, err = m.createSplits(ctx)
120120
if err != nil {
@@ -132,7 +132,7 @@ func (m *SplittingFunctionOverRangeVector[T]) Prepare(ctx context.Context, param
132132
// createSplits creates splits for the given time range, checking for cache entries and merging contiguous uncached
133133
// split ranges to create uncached splits.
134134
// Uses pre-computed split ranges from the optimization pass.
135-
func (m *SplittingFunctionOverRangeVector[T]) createSplits(ctx context.Context) ([]Split[T], error) {
135+
func (m *FunctionOverRangeVectorSplit[T]) createSplits(ctx context.Context) ([]Split[T], error) {
136136
var splits []Split[T]
137137
var currentUncachedStart int64
138138
var currentUncachedRanges []Range
@@ -197,7 +197,7 @@ func (m *SplittingFunctionOverRangeVector[T]) createSplits(ctx context.Context)
197197
return splits, nil
198198
}
199199

200-
func (m *SplittingFunctionOverRangeVector[T]) materializeOperatorForTimeRange(start int64, end int64) (types.RangeVectorOperator, error) {
200+
func (m *FunctionOverRangeVectorSplit[T]) materializeOperatorForTimeRange(start int64, end int64) (types.RangeVectorOperator, error) {
201201
subRange := time.Duration(end-start) * time.Millisecond
202202

203203
overrideTimeParams := types.TimeRangeParams{
@@ -224,7 +224,7 @@ func (m *SplittingFunctionOverRangeVector[T]) materializeOperatorForTimeRange(st
224224
return innerOperator, nil
225225
}
226226

227-
func (m *SplittingFunctionOverRangeVector[T]) SeriesMetadata(ctx context.Context, matchers types.Matchers) ([]types.SeriesMetadata, error) {
227+
func (m *FunctionOverRangeVectorSplit[T]) SeriesMetadata(ctx context.Context, matchers types.Matchers) ([]types.SeriesMetadata, error) {
228228
var err error
229229
var metadata []types.SeriesMetadata
230230
metadata, m.seriesToSplits, err = m.mergeSplitsMetadata(ctx, matchers)
@@ -243,7 +243,7 @@ func (m *SplittingFunctionOverRangeVector[T]) SeriesMetadata(ctx context.Context
243243
return metadata, nil
244244
}
245245

246-
func (m *SplittingFunctionOverRangeVector[T]) mergeSplitsMetadata(ctx context.Context, matchers types.Matchers) ([]types.SeriesMetadata, [][]SplitSeries, error) {
246+
func (m *FunctionOverRangeVectorSplit[T]) mergeSplitsMetadata(ctx context.Context, matchers types.Matchers) ([]types.SeriesMetadata, [][]SplitSeries, error) {
247247
if len(m.splits) == 0 {
248248
return nil, nil, nil
249249
}
@@ -315,7 +315,7 @@ func (m *SplittingFunctionOverRangeVector[T]) mergeSplitsMetadata(ctx context.Co
315315
return mergedMetadata, seriesToSplits, nil
316316
}
317317

318-
func (m *SplittingFunctionOverRangeVector[T]) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
318+
func (m *FunctionOverRangeVectorSplit[T]) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
319319
if m.currentSeriesIdx >= len(m.seriesToSplits) {
320320
return types.InstantVectorSeriesData{}, types.EOS
321321
}
@@ -367,14 +367,14 @@ func (m *SplittingFunctionOverRangeVector[T]) NextSeries(ctx context.Context) (t
367367
return data, nil
368368
}
369369

370-
func (m *SplittingFunctionOverRangeVector[T]) emitAnnotation(generator types.AnnotationGenerator) {
370+
func (m *FunctionOverRangeVectorSplit[T]) emitAnnotation(generator types.AnnotationGenerator) {
371371
metricName := m.metricNames.GetMetricNameForSeries(m.currentSeriesIdx)
372372
pos := m.innerNode.ExpressionPosition()
373373

374374
m.Annotations.Add(generator(metricName, pos))
375375
}
376376

377-
func (m *SplittingFunctionOverRangeVector[T]) Finalize(ctx context.Context) error {
377+
func (m *FunctionOverRangeVectorSplit[T]) Finalize(ctx context.Context) error {
378378
for _, split := range m.splits {
379379
if err := split.Finalize(ctx); err != nil {
380380
return err
@@ -384,7 +384,7 @@ func (m *SplittingFunctionOverRangeVector[T]) Finalize(ctx context.Context) erro
384384
return nil
385385
}
386386

387-
func (m *SplittingFunctionOverRangeVector[T]) Close() {
387+
func (m *FunctionOverRangeVectorSplit[T]) Close() {
388388
for _, split := range m.splits {
389389
split.Close()
390390
}
@@ -405,10 +405,10 @@ type SplitSeries struct {
405405

406406
type CachedSplit[T any] struct {
407407
cachedResults cache.ReadEntry[T]
408-
parent *SplittingFunctionOverRangeVector[T]
408+
parent *FunctionOverRangeVectorSplit[T]
409409
}
410410

411-
func NewCachedSplit[T any](cachedResults cache.ReadEntry[T], parent *SplittingFunctionOverRangeVector[T]) *CachedSplit[T] {
411+
func NewCachedSplit[T any](cachedResults cache.ReadEntry[T], parent *FunctionOverRangeVectorSplit[T]) *CachedSplit[T] {
412412
return &CachedSplit[T]{
413413
cachedResults: cachedResults,
414414
parent: parent,
@@ -444,7 +444,7 @@ type UncachedSplit[T any] struct {
444444
ranges []Range
445445
operator types.RangeVectorOperator
446446

447-
parent *SplittingFunctionOverRangeVector[T]
447+
parent *FunctionOverRangeVectorSplit[T]
448448

449449
cacheWriteEntries []cache.WriteEntry[T]
450450
finalized bool
@@ -456,7 +456,7 @@ func NewUncachedSplit[T any](
456456
ctx context.Context,
457457
ranges []Range,
458458
operator types.RangeVectorOperator,
459-
parent *SplittingFunctionOverRangeVector[T],
459+
parent *FunctionOverRangeVectorSplit[T],
460460
) (*UncachedSplit[T], error) {
461461
cacheEntries := make([]cache.WriteEntry[T], len(ranges))
462462
var err error

pkg/streamingpromql/operators/functions/split_rate_function.go renamed to pkg/streamingpromql/operators/functions/split_rate_functions.go

File renamed without changes.

pkg/streamingpromql/optimize/plan/commonsubexpressionelimination/instant_vector_operator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func TestInstantVectorOperator_Buffering(t *testing.T) {
106106
require.False(t, inner.Finalized)
107107
require.False(t, inner.Closed)
108108

109-
// Flush each consumer, and check that the inner operator was only finalized after the last consumer is finalized.
109+
// Finalize each consumer, and check that the inner operator was only finalized after the last consumer is finalized.
110110
require.NoError(t, consumer1.Finalize(ctx))
111111
require.False(t, inner.Finalized)
112112
require.NoError(t, consumer2.Finalize(ctx))

pkg/streamingpromql/optimize/plan/commonsubexpressionelimination/range_vector_operator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func TestRangeVectorOperator_Buffering(t *testing.T) {
124124
require.False(t, inner.finalized)
125125
require.False(t, inner.closed)
126126

127-
// Flush each consumer, and check that the inner operator was only finalized after the last consumer is finalized.
127+
// Finalize each consumer, and check that the inner operator was only finalized after the last consumer is finalized.
128128
require.NoError(t, consumer1.Finalize(ctx))
129129
require.False(t, inner.finalized)
130130
require.NoError(t, consumer2.Finalize(ctx))

pkg/streamingpromql/optimize/plan/querysplitting/node.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package querysplitting
44

55
import (
66
"fmt"
7+
"slices"
78
"time"
89

910
"github.com/gogo/protobuf/proto"
@@ -71,7 +72,13 @@ func (s *SplittableFunctionCall) MergeHints(other planning.Node) error {
7172

7273
func (s *SplittableFunctionCall) EquivalentToIgnoringHintsAndChildren(other planning.Node) bool {
7374
otherSplit, ok := other.(*SplittableFunctionCall)
74-
return ok && s.Inner.EquivalentToIgnoringHintsAndChildren(otherSplit.Inner)
75+
if !ok {
76+
return false
77+
}
78+
79+
return slices.EqualFunc(s.SplitRanges, otherSplit.SplitRanges, func(a, b SplitRange) bool {
80+
return a.Start == b.Start && a.End == b.End && a.Cacheable == b.Cacheable
81+
})
7582
}
7683

7784
func (s *SplittableFunctionCall) Describe() string {

pkg/streamingpromql/planning/core/matrix_selector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type MatrixSelector struct {
2727
var _ planning.SplittableNode = &MatrixSelector{}
2828

2929
func (m *MatrixSelector) Describe() string {
30-
return describeSelector(m.Matchers, m.Timestamp, m.Offset, &m.Range, m.SkipHistogramBuckets, false)
30+
return describeSelector(m.Matchers, m.Timestamp, m.Offset, &m.Range, m.SkipHistogramBuckets)
3131
}
3232

3333
// QuerySplittingCacheKey returns the cache key for the matrix selector.

pkg/streamingpromql/planning/core/selectors.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/prometheus/prometheus/model/timestamp"
1212
)
1313

14-
func describeSelector(matchers []*LabelMatcher, ts *time.Time, offset time.Duration, rng *time.Duration, skipHistogramBuckets bool, forCacheKey bool) string {
14+
func describeSelector(matchers []*LabelMatcher, ts *time.Time, offset time.Duration, rng *time.Duration, skipHistogramBuckets bool) string {
1515
builder := &strings.Builder{}
1616
builder.WriteRune('{')
1717
for i, m := range matchers {
@@ -31,18 +31,16 @@ func describeSelector(matchers []*LabelMatcher, ts *time.Time, offset time.Durat
3131
builder.WriteRune(']')
3232
}
3333

34-
if ts != nil && !forCacheKey { // @ modifiers are adjusted for when doing query splitting/caching
34+
if ts != nil {
3535
builder.WriteString(" @ ")
3636
builder.WriteString(strconv.FormatInt(timestamp.FromTime(*ts), 10))
37-
if !forCacheKey {
38-
// Only include human-readable timestamp for display purposes (redundant with unix timestamp)
39-
builder.WriteString(" (")
40-
builder.WriteString(ts.Format(time.RFC3339Nano))
41-
builder.WriteRune(')')
42-
}
37+
// Only include human-readable timestamp for display purposes (redundant with unix timestamp)
38+
builder.WriteString(" (")
39+
builder.WriteString(ts.Format(time.RFC3339Nano))
40+
builder.WriteRune(')')
4341
}
4442

45-
if offset != 0 && !forCacheKey { // @ modifiers are adjusted for when doing query splitting/caching
43+
if offset != 0 {
4644
builder.WriteString(" offset ")
4745
builder.WriteString(offset.String())
4846
}

0 commit comments

Comments
 (0)