Skip to content

Commit 6d70d9c

Browse files
committed
Implement rate/increase
1 parent 8564d7a commit 6d70d9c

File tree

9 files changed

+1902
-527
lines changed

9 files changed

+1902
-527
lines changed

pkg/streamingpromql/operators/functions/factories.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,7 @@ func init() {
704704
must(RegisterFunction(FUNCTION_HISTOGRAM_SUM, "histogram_sum", parser.ValueTypeVector, InstantVectorTransformationFunctionOperatorFactory("histogram_sum", HistogramSum)))
705705
must(RegisterFunction(FUNCTION_HOUR, "hour", parser.ValueTypeVector, TimeTransformationFunctionOperatorFactory("hour", Hour)))
706706
must(RegisterFunction(FUNCTION_IDELTA, "idelta", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("idelta", Idelta)))
707-
must(RegisterFunction(FUNCTION_INCREASE, "increase", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("increase", Increase)))
707+
must(RegisterFunctionWithSplitFactory(FUNCTION_INCREASE, "increase", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("increase", Increase), SplitIncrease))
708708
must(RegisterFunction(FUNCTION_IRATE, "irate", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("irate", Irate)))
709709
must(RegisterFunction(FUNCTION_LABEL_JOIN, "label_join", parser.ValueTypeVector, LabelJoinFunctionOperatorFactory))
710710
must(RegisterFunction(FUNCTION_LABEL_REPLACE, "label_replace", parser.ValueTypeVector, LabelReplaceFunctionOperatorFactory))
@@ -722,7 +722,7 @@ func init() {
722722
must(RegisterFunction(FUNCTION_PRESENT_OVER_TIME, "present_over_time", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("present_over_time", PresentOverTime)))
723723
must(RegisterFunction(FUNCTION_QUANTILE_OVER_TIME, "quantile_over_time", parser.ValueTypeVector, QuantileOverTimeFactory))
724724
must(RegisterFunction(FUNCTION_RAD, "rad", parser.ValueTypeVector, InstantVectorTransformationFunctionOperatorFactory("rad", Rad)))
725-
must(RegisterFunction(FUNCTION_RATE, "rate", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("rate", Rate)))
725+
must(RegisterFunctionWithSplitFactory(FUNCTION_RATE, "rate", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("rate", Rate), SplitRate))
726726
must(RegisterFunction(FUNCTION_RESETS, "resets", parser.ValueTypeVector, FunctionOverRangeVectorOperatorFactory("resets", Resets)))
727727
must(RegisterFunction(FUNCTION_ROUND, "round", parser.ValueTypeVector, RoundFunctionOperatorFactory))
728728
must(RegisterFunction(FUNCTION_SCALAR, "scalar", parser.ValueTypeScalar, instantVectorToScalarOperatorFactory))

pkg/streamingpromql/operators/functions/rate_increase.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,17 @@ func rate(isRate bool) RangeVectorStepFunction {
7575
}
7676

7777
func histogramRate(isRate bool, hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) {
78-
firstPoint := hHead[0]
78+
firstPoint, lastPoint, delta, fpHistCount, err := calculateHistogramDelta(hHead, hTail, emitAnnotation)
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
val := calculateHistogramRate(true, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, hCount, fpHistCount)
84+
return val, nil
85+
}
86+
87+
func calculateHistogramDelta(hHead, hTail []promql.HPoint, emitAnnotation types.EmitAnnotationFunc) (firstPoint, lastPoint promql.HPoint, delta *histogram.FloatHistogram, fpHistCount float64, err error) {
88+
firstPoint = hHead[0]
7989
hHead = hHead[1:]
8090

8191
if firstPoint.H.CounterResetHint == histogram.GaugeType {
@@ -91,7 +101,7 @@ func histogramRate(isRate bool, hCount int, hHead []promql.HPoint, hTail []promq
91101

92102
// Store the original first point count before potential reset.
93103
// It's needed to calculate the rate correctly later.
94-
fpHistCount := firstPoint.H.Count
104+
fpHistCount = firstPoint.H.Count
95105

96106
// Ignore the first point if there is a counter reset between the first and second point.
97107
// This means we'll ignore any incompatibility between the layout of the first and second point,
@@ -104,7 +114,6 @@ func histogramRate(isRate bool, hCount int, hHead []promql.HPoint, hTail []promq
104114
}
105115
}
106116

107-
var lastPoint promql.HPoint
108117
if len(hTail) > 0 {
109118
lastPoint = hTail[len(hTail)-1]
110119
} else {
@@ -122,13 +131,13 @@ func histogramRate(isRate bool, hCount int, hHead []promql.HPoint, hTail []promq
122131

123132
usingCustomBuckets := firstPoint.H.UsesCustomBuckets()
124133
if lastPoint.H.UsesCustomBuckets() != usingCustomBuckets {
125-
return nil, histogram.ErrHistogramsIncompatibleSchema
134+
return promql.HPoint{}, promql.HPoint{}, nil, 0, histogram.ErrHistogramsIncompatibleSchema
126135
}
127136

128-
delta := lastPoint.H.CopyToSchema(desiredSchema)
137+
delta = lastPoint.H.CopyToSchema(desiredSchema)
129138
_, _, nhcbBoundsReconciled, err := delta.Sub(firstPoint.H)
130139
if err != nil {
131-
return nil, err
140+
return promql.HPoint{}, promql.HPoint{}, nil, 0, err
132141
}
133142
if nhcbBoundsReconciled {
134143
emitAnnotation(newSubMismatchedCustomBucketsHistogramInfo)
@@ -167,35 +176,37 @@ func histogramRate(isRate bool, hCount int, hHead []promql.HPoint, hTail []promq
167176

168177
err = accumulate(hHead)
169178
if err != nil {
170-
return nil, err
171-
179+
return promql.HPoint{}, promql.HPoint{}, nil, 0, err
172180
}
173181
err = accumulate(hTail)
174182
if err != nil {
175-
return nil, err
176-
183+
return promql.HPoint{}, promql.HPoint{}, nil, 0, err
177184
}
178185

179186
if delta.Schema != desiredSchema {
180187
delta = delta.CopyToSchema(desiredSchema)
181188
}
182189

183-
val := calculateHistogramRate(true, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, hCount, fpHistCount)
184-
return val, err
190+
return firstPoint, lastPoint, delta, fpHistCount, nil
185191
}
186192

187193
func floatRate(isRate bool, fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 {
188-
firstPoint := fHead[0]
194+
firstPoint, lastPoint, delta := calculateFloatDelta(fHead, fTail)
195+
val := calculateFloatRate(true, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
196+
return val
197+
}
198+
199+
func calculateFloatDelta(fHead, fTail []promql.FPoint) (firstPoint, lastPoint promql.FPoint, delta float64) {
200+
firstPoint = fHead[0]
189201
fHead = fHead[1:]
190202

191-
var lastPoint promql.FPoint
192203
if len(fTail) > 0 {
193204
lastPoint = fTail[len(fTail)-1]
194205
} else {
195206
lastPoint = fHead[len(fHead)-1]
196207
}
197208

198-
delta := lastPoint.F - firstPoint.F
209+
delta = lastPoint.F - firstPoint.F
199210
previousValue := firstPoint.F
200211

201212
accumulate := func(points []promql.FPoint) {
@@ -212,8 +223,7 @@ func floatRate(isRate bool, fCount int, fHead []promql.FPoint, fTail []promql.FP
212223
accumulate(fHead)
213224
accumulate(fTail)
214225

215-
val := calculateFloatRate(true, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
216-
return val
226+
return firstPoint, lastPoint, delta
217227
}
218228

219229
// This is based on extrapolatedRate from promql/functions.go.

0 commit comments

Comments
 (0)