Skip to content

Commit 9405e06

Browse files
authored
execution: Add support for double_exponential_smoothing function (#517)
* Reset commit - add Signed-off-by: Subhramit Basu Bhowmick <[email protected]> * Fix failing workflows Signed-off-by: Subhramit Basu Bhowmick <[email protected]> * Working implementation - use matrix selector Signed-off-by: subhramit <[email protected]> * Rename parameter to old one, remove comment Signed-off-by: subhramit <[email protected]> * Refine comment for parameter Signed-off-by: subhramit <[email protected]> * Apply linter suggestion Signed-off-by: Subhramit Basu Bhowmick <[email protected]> * Improve implementation (add in explain) Signed-off-by: subhramit <[email protected]> * Use two parameter operator Signed-off-by: subhramit <[email protected]> * Move `calcTrendValue` down Signed-off-by: subhramit <[email protected]> --------- Signed-off-by: Subhramit Basu Bhowmick <[email protected]> Signed-off-by: subhramit <[email protected]>
1 parent 815830c commit 9405e06

File tree

10 files changed

+276
-20
lines changed

10 files changed

+276
-20
lines changed

README.md

+8-8
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ The engine intends to have full compatibility with the original engine used in P
1010

1111
The following table shows operations which are currently supported by the engine
1212

13-
| Type | Supported | Priority |
14-
|------------------------|----------------------------------------------------------------------------------------|----------|
15-
| Binary expressions | Full support | |
16-
| Histograms | Full support | |
17-
| Subqueries | Full support | |
18-
| Aggregations | Full support | |
19-
| Aggregations over time | Full support except for `quantile_over_time` with non-constant argument | Medium |
20-
| Functions | Full support except for `holt_winters` and `predict_linear` with non-constant argument | Medium |
13+
| Type | Supported | Priority |
14+
|------------------------|-------------------------------------------------------------------------|----------|
15+
| Binary expressions | Full support | |
16+
| Histograms | Full support | |
17+
| Subqueries | Full support | |
18+
| Aggregations | Full support | |
19+
| Aggregations over time | Full support except for `quantile_over_time` with non-constant argument | Medium |
20+
| Functions | Full support except for `predict_linear` with non-constant argument | Medium |
2121

2222
## Design
2323

engine/bench_test.go

+9
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,11 @@ func BenchmarkRangeQuery(b *testing.B) {
300300
query: `absent(nonexistent)`,
301301
storage: sixHourDataset,
302302
},
303+
{
304+
name: "double exponential smoothing",
305+
query: `double_exponential_smoothing(http_requests_total[1m], 0.1, 0.1)`,
306+
storage: sixHourDataset,
307+
},
303308
}
304309

305310
opts := engine.Opts{
@@ -541,6 +546,10 @@ func BenchmarkInstantQuery(b *testing.B) {
541546
name: "subquery sum_over_time",
542547
query: `sum_over_time(count(http_requests_total)[1h:10s])`,
543548
},
549+
{
550+
name: "double exponential smoothing",
551+
query: `double_exponential_smoothing(http_requests_total[1m], 0.1, 0.1)`,
552+
},
544553
}
545554

546555
for _, tc := range cases {

engine/engine_test.go

+105
Original file line numberDiff line numberDiff line change
@@ -5872,3 +5872,108 @@ func queryExplanation(q promql.Query) string {
58725872

58735873
return fmt.Sprintf("Query: %s\nExplanation:\n%s\n", q.String(), b.String())
58745874
}
5875+
5876+
// Adapted from: https://github.com/prometheus/prometheus/blob/906f6a33b60cec2596018ac8cc97ac41b16b06b7/promql/promqltest/testdata/functions.test#L814
5877+
func TestDoubleExponentialSmoothing(t *testing.T) {
5878+
t.Parallel()
5879+
5880+
const (
5881+
testTimeout = 1 * time.Hour
5882+
testMaxSamples = math.MaxInt64
5883+
testQueryStart = 0
5884+
testQueryEnd = 3600
5885+
testQueryStep = 30
5886+
)
5887+
5888+
defaultStart := time.Unix(testQueryStart, 0)
5889+
defaultEnd := time.Unix(testQueryEnd, 0)
5890+
defaultStep := testQueryStep * time.Second
5891+
5892+
cases := []struct {
5893+
name string
5894+
5895+
load string
5896+
query string
5897+
5898+
start time.Time
5899+
end time.Time
5900+
step time.Duration
5901+
}{
5902+
{
5903+
name: "double exponential smoothing basic",
5904+
load: `load 30s
5905+
http_requests_total{pod="nginx-1"} 1+1x15
5906+
http_requests_total{pod="nginx-2"} 1+2x18`,
5907+
query: `double_exponential_smoothing(http_requests_total[5m], 0.1, 0.1)`,
5908+
},
5909+
{
5910+
name: "double exponential smoothing with positive trend",
5911+
load: `load 10s
5912+
http_requests{job="api-server", instance="0", group="production"} 0+10x1000 100+30x1000
5913+
http_requests{job="api-server", instance="1", group="production"} 0+20x1000 200+30x1000`,
5914+
query: `double_exponential_smoothing(http_requests[5m], 0.01, 0.1)`,
5915+
},
5916+
{
5917+
name: "double exponential smoothing with negative trend",
5918+
load: `load 10s
5919+
http_requests{job="api-server", instance="0", group="production"} 8000-10x1000
5920+
http_requests{job="api-server", instance="1", group="production"} 0-20x1000`,
5921+
query: `double_exponential_smoothing(http_requests[5m], 0.01, 0.1)`,
5922+
},
5923+
{
5924+
name: "double exponential smoothing with mixed histogram data",
5925+
load: `load 30s
5926+
http_requests_mix{job="api-server", instance="0"} 0+10x1000 100+30x1000 {{schema:0 count:1 sum:2}}x1000
5927+
http_requests_mix{job="api-server", instance="1"} 0+20x1000 200+30x1000 {{schema:0 count:1 sum:2}}x1000`,
5928+
query: `double_exponential_smoothing(http_requests_mix[5m], 0.01, 0.1)`,
5929+
},
5930+
{
5931+
name: "double exponential smoothing with pure histogram data",
5932+
load: `load 30s
5933+
http_requests_histogram{job="api-server", instance="1"} {{schema:0 count:1 sum:2}}x1000`,
5934+
query: `double_exponential_smoothing(http_requests_histogram[5m], 0.01, 0.1)`,
5935+
},
5936+
}
5937+
5938+
for _, tcase := range cases {
5939+
t.Run(tcase.name, func(t *testing.T) {
5940+
t.Parallel()
5941+
5942+
storage := promqltest.LoadedStorage(t, tcase.load)
5943+
defer storage.Close()
5944+
5945+
opts := promql.EngineOpts{
5946+
Timeout: testTimeout,
5947+
MaxSamples: testMaxSamples,
5948+
EnableNegativeOffset: true,
5949+
EnableAtModifier: true,
5950+
}
5951+
5952+
start := defaultStart
5953+
if !tcase.start.IsZero() {
5954+
start = tcase.start
5955+
}
5956+
end := defaultEnd
5957+
if !tcase.end.IsZero() {
5958+
end = tcase.end
5959+
}
5960+
step := defaultStep
5961+
if tcase.step != 0 {
5962+
step = tcase.step
5963+
}
5964+
5965+
ctx := context.Background()
5966+
oldEngine := promql.NewEngine(opts)
5967+
q1, err := oldEngine.NewRangeQuery(ctx, storage, nil, tcase.query, start, end, step)
5968+
testutil.Ok(t, errors.Wrap(err, "create old engine range query"))
5969+
oldResult := q1.Exec(ctx)
5970+
5971+
newEngine := engine.New(engine.Opts{EngineOpts: opts})
5972+
q2, err := newEngine.NewRangeQuery(ctx, storage, nil, tcase.query, start, end, step)
5973+
testutil.Ok(t, errors.Wrap(err, "create new engine range query"))
5974+
newResult := q2.Exec(ctx)
5975+
5976+
testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q2))
5977+
})
5978+
}
5979+
}

execution/execution.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ func newSubqueryFunction(ctx context.Context, e *logicalplan.FunctionCall, t *lo
213213
}
214214

215215
var scalarArg model.VectorOperator
216+
var scalarArg2 model.VectorOperator
216217
switch e.Func.Name {
217218
case "quantile_over_time":
218219
// quantile_over_time(scalar, range-vector)
@@ -226,9 +227,19 @@ func newSubqueryFunction(ctx context.Context, e *logicalplan.FunctionCall, t *lo
226227
if err != nil {
227228
return nil, err
228229
}
230+
case "double_exponential_smoothing":
231+
// double_exponential_smoothing(range-vector, scalar, scalar)
232+
scalarArg, err = newOperator(ctx, e.Args[1], storage, opts, hints)
233+
if err != nil {
234+
return nil, err
235+
}
236+
scalarArg2, err = newOperator(ctx, e.Args[2], storage, opts, hints)
237+
if err != nil {
238+
return nil, err
239+
}
229240
}
230241

231-
return scan.NewSubqueryOperator(model.NewVectorPool(opts.StepsBatch), inner, scalarArg, &outerOpts, e, t)
242+
return scan.NewSubqueryOperator(model.NewVectorPool(opts.StepsBatch), inner, scalarArg, scalarArg2, &outerOpts, e, t)
232243
}
233244

234245
func newInstantVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {

execution/scan/subquery.go

+28-5
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ import (
2424
type subqueryOperator struct {
2525
telemetry.OperatorTelemetry
2626

27-
next model.VectorOperator
28-
paramOp model.VectorOperator
27+
next model.VectorOperator
28+
paramOp model.VectorOperator
29+
paramOp2 model.VectorOperator
2930

3031
pool *model.VectorPool
3132
call ringbuffer.FunctionCall
@@ -47,10 +48,13 @@ type subqueryOperator struct {
4748
buffers []*ringbuffer.GenericRingBuffer
4849

4950
// params holds the function parameter for each step.
50-
params []float64
51+
// quantile_over time and predict_linear use one parameter (params)
52+
// double_exponential_smoothing uses two (params, params2) for (sf, tf)
53+
params []float64
54+
params2 []float64
5155
}
5256

53-
func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
57+
func NewSubqueryOperator(pool *model.VectorPool, next, paramOp, paramOp2 model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
5458
call, err := ringbuffer.NewRangeVectorFunc(funcExpr.Func.Name)
5559
if err != nil {
5660
return nil, err
@@ -63,6 +67,7 @@ func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOpera
6367
o := &subqueryOperator{
6468
next: next,
6569
paramOp: paramOp,
70+
paramOp2: paramOp2,
6671
call: call,
6772
pool: pool,
6873
funcExpr: funcExpr,
@@ -75,6 +80,7 @@ func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOpera
7580
stepsBatch: opts.StepsBatch,
7681
lastCollected: -1,
7782
params: make([]float64, opts.StepsBatch),
83+
params2: make([]float64, opts.StepsBatch),
7884
}
7985
o.OperatorTelemetry = telemetry.NewSubqueryTelemetry(o, opts)
8086

@@ -89,6 +95,8 @@ func (o *subqueryOperator) Explain() (next []model.VectorOperator) {
8995
switch o.funcExpr.Func.Name {
9096
case "quantile_over_time", "predict_linear":
9197
return []model.VectorOperator{o.paramOp, o.next}
98+
case "double_exponential_smoothing":
99+
return []model.VectorOperator{o.paramOp, o.paramOp2, o.next}
92100
default:
93101
return []model.VectorOperator{o.next}
94102
}
@@ -127,6 +135,21 @@ func (o *subqueryOperator) Next(ctx context.Context) ([]model.StepVector, error)
127135
o.paramOp.GetPool().PutVectors(args)
128136
}
129137

138+
if o.paramOp2 != nil { // double_exponential_smoothing
139+
args, err := o.paramOp2.Next(ctx)
140+
if err != nil {
141+
return nil, err
142+
}
143+
for i := range args {
144+
o.params2[i] = math.NaN()
145+
if len(args[i].Samples) == 1 {
146+
o.params2[i] = args[i].Samples[0]
147+
}
148+
o.paramOp2.GetPool().PutStepVector(args[i])
149+
}
150+
o.paramOp2.GetPool().PutVectors(args)
151+
}
152+
130153
res := o.pool.GetVectorBatch()
131154
for i := 0; o.currentStep <= o.maxt && i < o.stepsBatch; i++ {
132155
mint := o.currentStep - o.subQuery.Range.Milliseconds() - o.subQuery.OriginalOffset.Milliseconds() + 1
@@ -171,7 +194,7 @@ func (o *subqueryOperator) Next(ctx context.Context) ([]model.StepVector, error)
171194

172195
sv := o.pool.GetStepVector(o.currentStep)
173196
for sampleId, rangeSamples := range o.buffers {
174-
f, h, ok, err := rangeSamples.Eval(ctx, o.params[i], nil)
197+
f, h, ok, err := rangeSamples.Eval(ctx, o.params[i], o.params2[i], nil)
175198
if err != nil {
176199
return nil, err
177200
}

ringbuffer/functions.go

+88-2
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ type FunctionArgs struct {
2626
Offset int64
2727
MetricAppearedTs *int64
2828

29-
// Only holt-winters uses two arguments, we fall back for that.
3029
// quantile_over_time and predict_linear use one, so we only use one here.
31-
ScalarPoint float64
30+
ScalarPoint float64
31+
ScalarPoint2 float64 // only for double_exponential_smoothing (trend factor)
3232
}
3333

3434
type FunctionCall func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error)
@@ -299,6 +299,33 @@ var rangeVectorFuncs = map[string]FunctionCall{
299299
v := predictLinear(f.Samples, f.ScalarPoint, f.StepTime)
300300
return &v, nil, true, nil
301301
},
302+
"double_exponential_smoothing": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) {
303+
if len(f.Samples) < 2 {
304+
if len(f.Samples) == 1 && f.Samples[0].V.H != nil {
305+
warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, f.ctx)
306+
return nil, nil, false, nil
307+
}
308+
return nil, nil, false, nil
309+
}
310+
311+
// Annotate mix of float and histogram.
312+
for _, s := range f.Samples {
313+
if s.V.H != nil {
314+
warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, f.ctx)
315+
return nil, nil, false, nil
316+
}
317+
}
318+
319+
sf := f.ScalarPoint // smoothing factor or alpha
320+
tf := f.ScalarPoint2 // trend factor argument or beta
321+
322+
v, ok := doubleExponentialSmoothing(f.Samples, sf, tf)
323+
if !ok {
324+
return nil, nil, false, nil
325+
}
326+
327+
return &v, nil, true, nil
328+
},
302329
}
303330

304331
func NewRangeVectorFunc(name string) (FunctionCall, error) {
@@ -735,6 +762,65 @@ func predictLinear(points []Sample, duration float64, stepTime int64) float64 {
735762
return slope*duration + intercept
736763
}
737764

765+
// Based on https://github.com/prometheus/prometheus/blob/8baad1a73e471bd3cf3175a1608199e27484f179/promql/functions.go#L438
766+
// doubleExponentialSmoothing calculates the smoothed out value for the given series.
767+
// It is similar to a weighted moving average, where historical data has exponentially less influence on the current data.
768+
// It also accounts for trends in data. The smoothing factor (0 < sf < 1), aka "alpha", affects how historical data will affect the current data.
769+
// A lower smoothing factor increases the influence of historical data.
770+
// The trend factor (0 < tf < 1), aka "beta", affects how trends in historical data will affect the current data.
771+
// A higher trend factor increases the influence of trends.
772+
// Algorithm taken from https://en.wikipedia.org/wiki/Exponential_smoothing
773+
func doubleExponentialSmoothing(points []Sample, sf, tf float64) (float64, bool) {
774+
// Check that the input parameters are valid
775+
if sf <= 0 || sf >= 1 || tf <= 0 || tf >= 1 {
776+
return 0, false
777+
}
778+
779+
// Can't do the smoothing operation with less than two points
780+
if len(points) < 2 {
781+
return 0, false
782+
}
783+
784+
// Check for histograms in the samples
785+
for _, s := range points {
786+
if s.V.H != nil {
787+
return 0, false
788+
}
789+
}
790+
791+
var s0, s1, b float64
792+
// Set initial values
793+
s1 = points[0].V.F
794+
b = points[1].V.F - points[0].V.F
795+
796+
// Run the smoothing operation
797+
for i := 1; i < len(points); i++ {
798+
// Scale the raw value against the smoothing factor
799+
x := sf * points[i].V.F
800+
// Scale the last smoothed value with the trend at this point
801+
b = calcTrendValue(i-1, tf, s0, s1, b)
802+
y := (1 - sf) * (s1 + b)
803+
s0, s1 = s1, x+y
804+
}
805+
806+
return s1, true
807+
}
808+
809+
// calcTrendValue calculates the trend value at the given index i.
810+
// This is somewhat analogous to the slope of the trend at the given index.
811+
// The argument "tf" is the trend factor.
812+
// The argument "s0" is the previous smoothed value.
813+
// The argument "s1" is the current smoothed value.
814+
// The argument "b" is the previous trend value.
815+
func calcTrendValue(i int, tf, s0, s1, b float64) float64 {
816+
if i == 0 {
817+
return b
818+
}
819+
x := tf * (s1 - s0)
820+
y := (1 - tf) * b
821+
return x + y
822+
}
823+
738824
func resets(points []Sample) float64 {
739825
var histogramPoints []Sample
740826
var floatPoints []Sample

ringbuffer/generic.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,15 @@ func (r *GenericRingBuffer) Reset(mint int64, evalt int64) {
106106
r.items = r.items[:keep]
107107
}
108108

109-
func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, metricAppearedTs *int64) (*float64, *histogram.FloatHistogram, bool, error) {
109+
func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, scalarArg2 float64, metricAppearedTs *int64) (*float64, *histogram.FloatHistogram, bool, error) {
110110
return r.call(FunctionArgs{
111111
ctx: ctx,
112112
Samples: r.items,
113113
StepTime: r.currentStep,
114114
SelectRange: r.selectRange,
115115
Offset: r.offset,
116116
ScalarPoint: scalarArg,
117+
ScalarPoint2: scalarArg2, // only for double_exponential_smoothing
117118
MetricAppearedTs: metricAppearedTs,
118119
})
119120
}

0 commit comments

Comments
 (0)