Skip to content

Commit c3f4923

Browse files
committed
Implement rate/increase
1 parent 8564d7a commit c3f4923

24 files changed

+1927
-551
lines changed

integration/getting_started_with_gossiped_ring_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func runTestGettingStartedWithGossipedRing(t *testing.T, mimir1 *e2emimir.MimirS
126126
require.NoError(t, mimir1.WaitSumMetrics(e2e.Equals(0+blocksLoadedOffset), "cortex_bucket_store_blocks_loaded"))
127127
require.NoError(t, mimir2.WaitSumMetrics(e2e.Equals(0+blocksLoadedOffset), "cortex_bucket_store_blocks_loaded"))
128128

129-
// Finalize blocks from ingesters to the store.
129+
// Flush blocks from ingesters to the store.
130130
for _, instance := range []*e2emimir.MimirService{mimir1, mimir2} {
131131
res, err := e2e.DoGet("http://" + instance.HTTPEndpoint() + "/ingester/flush")
132132
require.NoError(t, err)

pkg/frontend/querymiddleware/shard_active_series.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (s *shardActiveSeriesMiddleware) writeMergedResponse(ctx context.Context, c
276276
item.Reset(labels.EmptyLabels())
277277
labelBuilderPool.Put(item)
278278

279-
// Finalize the stream buffer if it's getting too large.
279+
// Flush the stream buffer if it's getting too large.
280280
if stream.Buffered() > jsoniterMaxBufferSize {
281281
_ = stream.Flush()
282282
}
@@ -344,7 +344,7 @@ func (s *shardActiveSeriesMiddleware) writeMergedResponseWithZeroAllocationDecod
344344
// Write the value as is, since it's already a JSON array.
345345
stream.WriteRaw(rawStr)
346346

347-
// Finalize the stream buffer if it's getting too large.
347+
// Flush the stream buffer if it's getting too large.
348348
if stream.Buffered() > jsoniterMaxBufferSize {
349349
_ = stream.Flush()
350350
}

pkg/ingester/ingester.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ func (i *Ingester) generateHeadStatisticsForAllUsers(context.Context) error {
580580
}
581581

582582
// NewForFlusher is a special version of ingester used by Flusher. This
583-
// ingester is not ingesting anything, its only purpose is to react on Finalize
583+
// ingester is not ingesting anything, its only purpose is to react on Flush
584584
// method and flush all openened TSDBs when called.
585585
func NewForFlusher(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
586586
i, err := newIngester(cfg, limits, registerer, logger)
@@ -593,7 +593,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
593593
i.limiter = NewLimiter(limits, flusherLimiterStrategy{})
594594

595595
// This ingester will not start any subservices (lifecycler, compaction, shipping),
596-
// and will only open TSDBs, wait for Finalize to be called, and then close TSDBs again.
596+
// and will only open TSDBs, wait for Flush to be called, and then close TSDBs again.
597597
i.BasicService = services.NewIdleService(i.startingForFlusher, i.stoppingForFlusher)
598598
return i, nil
599599
}

pkg/ingester/ingester_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5894,7 +5894,7 @@ func Benchmark_Ingester_MetricsForLabelMatchers(b *testing.B) {
58945894
i := createIngesterWithSeries(b, userID, numSeries, numSamplesPerSeries, startTimestamp, step)
58955895
ctx := user.InjectOrgID(context.Background(), "test")
58965896

5897-
// Finalize the ingester to ensure blocks have been compacted, so we'll test
5897+
// Flush the ingester to ensure blocks have been compacted, so we'll test
58985898
// fetching labels from blocks.
58995899
i.Flush()
59005900

@@ -8432,7 +8432,7 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) {
84328432
lockState, err := db.acquireAppendLock(0)
84338433
require.NoError(t, err)
84348434

8435-
// Finalize handler only triggers compactions, but doesn't wait for them to finish. We cannot use ?wait=true here,
8435+
// Flush handler only triggers compactions, but doesn't wait for them to finish. We cannot use ?wait=true here,
84368436
// because it would deadlock -- flush will wait for appendLock to be released.
84378437
i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/ingester/flush", nil))
84388438

pkg/ingester/label_names_and_values.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func labelValuesCardinality(
162162
if respSize < msgSizeThreshold {
163163
continue
164164
}
165-
// Finalize the response when reached message threshold.
165+
// Flush the response when reached message threshold.
166166
if err := client.SendLabelValuesCardinalityResponse(srv, &resp); err != nil {
167167
return err
168168
}

pkg/mimirpb/split.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func SplitWriteRequestByMaxMarshalSizeRW2(req *WriteRequest, reqSize, maxSize in
9494
// If the next partial request doesn't have any timeseries yet, we add the series anyway, in order to avoid an infinite loop
9595
// if a single timeseries is bigger than the limit.
9696
if nextReqSize+seriesSize+symbolsSize > maxSize && len(nextReq.TimeseriesRW2) > 0 {
97-
// Finalize the next partial request.
97+
// Flush the next partial request.
9898
nextReq.SymbolsRW2 = nextReqSymbols.Symbols()
9999
partialReqs = append(partialReqs, nextReq)
100100

@@ -118,7 +118,7 @@ func SplitWriteRequestByMaxMarshalSizeRW2(req *WriteRequest, reqSize, maxSize in
118118
}
119119

120120
if len(nextReq.TimeseriesRW2) > 0 {
121-
// Finalize the last partial request.
121+
// Flush the last partial request.
122122
nextReq.SymbolsRW2 = nextReqSymbols.Symbols()
123123
partialReqs = append(partialReqs, nextReq)
124124
}
@@ -168,7 +168,7 @@ func splitTimeseriesByMaxMarshalSize(req *WriteRequest, reqSize, maxSize int) []
168168
// If the next partial request doesn't have any timeseries yet, we add the series anyway, in order to avoid an infinite loop
169169
// if a single timeseries is bigger than the limit.
170170
if nextReqSize+seriesSize > maxSize && nextReqTimeseriesLength > 0 {
171-
// Finalize the next partial request.
171+
// Flush the next partial request.
172172
nextReq.Timeseries = req.Timeseries[nextReqTimeseriesStart : nextReqTimeseriesStart+nextReqTimeseriesLength]
173173
partialReqs = append(partialReqs, nextReq)
174174

@@ -184,7 +184,7 @@ func splitTimeseriesByMaxMarshalSize(req *WriteRequest, reqSize, maxSize int) []
184184
}
185185

186186
if nextReqTimeseriesLength > 0 {
187-
// Finalize the last partial request.
187+
// Flush the last partial request.
188188
nextReq.Timeseries = req.Timeseries[nextReqTimeseriesStart : nextReqTimeseriesStart+nextReqTimeseriesLength]
189189
partialReqs = append(partialReqs, nextReq)
190190
}
@@ -234,7 +234,7 @@ func splitMetadataByMaxMarshalSize(req *WriteRequest, reqSize, maxSize int) []*W
234234
// If the next partial request doesn't have any metadata yet, we add the metadata anyway, in order to avoid an infinite loop
235235
// if a single metadata is bigger than the limit.
236236
if nextReqSize+metadataSize > maxSize && nextReqMetadataLength > 0 {
237-
// Finalize the next partial request.
237+
// Flush the next partial request.
238238
nextReq.Metadata = req.Metadata[nextReqMetadataStart : nextReqMetadataStart+nextReqMetadataLength]
239239
partialReqs = append(partialReqs, nextReq)
240240

@@ -250,7 +250,7 @@ func splitMetadataByMaxMarshalSize(req *WriteRequest, reqSize, maxSize int) []*W
250250
}
251251

252252
if nextReqMetadataLength > 0 {
253-
// Finalize the last partial request.
253+
// Flush the last partial request.
254254
nextReq.Metadata = req.Metadata[nextReqMetadataStart : nextReqMetadataStart+nextReqMetadataLength]
255255
partialReqs = append(partialReqs, nextReq)
256256
}

pkg/streamingpromql/evaluator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func (e *Evaluator) Evaluate(ctx context.Context, observer EvaluationObserver) (
166166
}
167167

168168
if e.engine.pedantic {
169-
// Finalize the root operator a second time to ensure all operators behave correctly if Finalize is called multiple times.
169+
// Flush the root operator a second time to ensure all operators behave correctly if Finalize is called multiple times.
170170
if err := e.root.Finalize(ctx); err != nil {
171171
return fmt.Errorf("pedantic mode: failed to finalize operator a second time after successfully finalizing the first time: %w", err)
172172
}

pkg/streamingpromql/operators/functions/factories.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,7 @@ func init() {
704704
must(RegisterFunction(FUNCTION_HISTOGRAM_SUM, "histogram_sum", parser.ValueTypeVector, InstantVectorTransformationFunctionOperatorFactory("histogram_sum", HistogramSum)))
705705
must(RegisterFunction(FUNCTION_HOUR, "hour", parser.ValueTypeVector, TimeTransformationFunctionOperatorFactory("hour", Hour)))
706706
must(RegisterFunction(FUNCTION_IDELTA, "idelta", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("idelta", Idelta)))
707-
must(RegisterFunction(FUNCTION_INCREASE, "increase", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("increase", Increase)))
707+
must(RegisterFunctionWithSplitFactory(FUNCTION_INCREASE, "increase", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("increase", Increase), SplitIncrease))
708708
must(RegisterFunction(FUNCTION_IRATE, "irate", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("irate", Irate)))
709709
must(RegisterFunction(FUNCTION_LABEL_JOIN, "label_join", parser.ValueTypeVector, LabelJoinFunctionOperatorFactory))
710710
must(RegisterFunction(FUNCTION_LABEL_REPLACE, "label_replace", parser.ValueTypeVector, LabelReplaceFunctionOperatorFactory))
@@ -722,7 +722,7 @@ func init() {
722722
must(RegisterFunction(FUNCTION_PRESENT_OVER_TIME, "present_over_time", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("present_over_time", PresentOverTime)))
723723
must(RegisterFunction(FUNCTION_QUANTILE_OVER_TIME, "quantile_over_time", parser.ValueTypeVector, QuantileOverTimeFactory))
724724
must(RegisterFunction(FUNCTION_RAD, "rad", parser.ValueTypeVector, InstantVectorTransformationFunctionOperatorFactory("rad", Rad)))
725-
must(RegisterFunction(FUNCTION_RATE, "rate", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("rate", Rate)))
725+
must(RegisterFunctionWithSplitFactory(FUNCTION_RATE, "rate", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("rate", Rate), SplitRate))
726726
must(RegisterFunction(FUNCTION_RESETS, "resets", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("resets", Resets)))
727727
must(RegisterFunction(FUNCTION_ROUND, "round", parser.ValueTypeVector, RoundFunctionOperatorFactory))
728728
must(RegisterFunction(FUNCTION_SCALAR, "scalar", parser.ValueTypeScalar, instantVectorToScalarOperatorFactory))

pkg/streamingpromql/operators/functions/function_over_instant_vector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type FunctionOverInstantVector struct {
2121
// as an argument. We can assume this will always be the Inner operator and therefore
2222
// what we use for the SeriesMetadata.
2323
Inner types.InstantVectorOperator
24-
// Any scalar arguments will be read once and passed to FuncDef.SeriesDataFunc.
24+
// Any scalar arguments will be read once and passed to Func.SeriesDataFunc.
2525
ScalarArgs []types.ScalarOperator
2626
MemoryConsumptionTracker *limiter.MemoryConsumptionTracker
2727
Func FunctionOverInstantVectorDefinition

pkg/streamingpromql/operators/functions/function_over_range_vector.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,13 @@ func (m *FunctionOverRangeVector) NextSeries(ctx context.Context) (types.Instant
132132

133133
for {
134134
step, err := m.Inner.NextStepSamples(ctx)
135+
135136
// nolint:errorlint // errors.Is introduces a performance overhead, and NextStepSamples is guaranteed to return exactly EOS, never a wrapped error.
136137
if err == types.EOS {
137138
if m.seriesValidationFunc != nil {
138139
m.seriesValidationFunc(data, m.metricNames.GetMetricNameForSeries(m.currentSeriesIndex), m.emitAnnotationFunc)
139140
}
141+
140142
return data, nil
141143
} else if err != nil {
142144
return types.InstantVectorSeriesData{}, err
@@ -146,7 +148,6 @@ func (m *FunctionOverRangeVector) NextSeries(ctx context.Context) (types.Instant
146148
if err != nil {
147149
return types.InstantVectorSeriesData{}, err
148150
}
149-
150151
if hasFloat {
151152
if data.Floats == nil {
152153
// Only get FPoint slice once we are sure we have float points.

0 commit comments

Comments
 (0)