Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execution: Add support for double_exponential_smoothing function #517

Merged
merged 9 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ The engine intends to have full compatibility with the original engine used in P

The following table shows operations which are currently supported by the engine

| Type | Supported | Priority |
|------------------------|----------------------------------------------------------------------------------------|----------|
| Binary expressions | Full support | |
| Histograms | Full support | |
| Subqueries | Full support | |
| Aggregations | Full support | |
| Aggregations over time | Full support except for `quantile_over_time` with non-constant argument | Medium |
| Functions | Full support except for `holt_winters` and `predict_linear` with non-constant argument | Medium |
| Type | Supported | Priority |
|------------------------|-------------------------------------------------------------------------|----------|
| Binary expressions | Full support | |
| Histograms | Full support | |
| Subqueries | Full support | |
| Aggregations | Full support | |
| Aggregations over time | Full support except for `quantile_over_time` with non-constant argument | Medium |
| Functions | Full support except for `predict_linear` with non-constant argument | Medium |

## Design

Expand Down
9 changes: 9 additions & 0 deletions engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ func BenchmarkRangeQuery(b *testing.B) {
query: `absent(nonexistent)`,
storage: sixHourDataset,
},
{
name: "double exponential smoothing",
query: `double_exponential_smoothing(http_requests_total[1m], 0.1, 0.1)`,
storage: sixHourDataset,
},
}

opts := engine.Opts{
Expand Down Expand Up @@ -541,6 +546,10 @@ func BenchmarkInstantQuery(b *testing.B) {
name: "subquery sum_over_time",
query: `sum_over_time(count(http_requests_total)[1h:10s])`,
},
{
name: "double exponential smoothing",
query: `double_exponential_smoothing(http_requests_total[1m], 0.1, 0.1)`,
},
}

for _, tc := range cases {
Expand Down
105 changes: 105 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5872,3 +5872,108 @@ func queryExplanation(q promql.Query) string {

return fmt.Sprintf("Query: %s\nExplanation:\n%s\n", q.String(), b.String())
}

// Adapted from: https://github.com/prometheus/prometheus/blob/906f6a33b60cec2596018ac8cc97ac41b16b06b7/promql/promqltest/testdata/functions.test#L814
func TestDoubleExponentialSmoothing(t *testing.T) {
t.Parallel()

const (
testTimeout = 1 * time.Hour
testMaxSamples = math.MaxInt64
testQueryStart = 0
testQueryEnd = 3600
testQueryStep = 30
)

defaultStart := time.Unix(testQueryStart, 0)
defaultEnd := time.Unix(testQueryEnd, 0)
defaultStep := testQueryStep * time.Second

cases := []struct {
name string

load string
query string

start time.Time
end time.Time
step time.Duration
}{
{
name: "double exponential smoothing basic",
load: `load 30s
http_requests_total{pod="nginx-1"} 1+1x15
http_requests_total{pod="nginx-2"} 1+2x18`,
query: `double_exponential_smoothing(http_requests_total[5m], 0.1, 0.1)`,
},
{
name: "double exponential smoothing with positive trend",
load: `load 10s
http_requests{job="api-server", instance="0", group="production"} 0+10x1000 100+30x1000
http_requests{job="api-server", instance="1", group="production"} 0+20x1000 200+30x1000`,
query: `double_exponential_smoothing(http_requests[5m], 0.01, 0.1)`,
},
{
name: "double exponential smoothing with negative trend",
load: `load 10s
http_requests{job="api-server", instance="0", group="production"} 8000-10x1000
http_requests{job="api-server", instance="1", group="production"} 0-20x1000`,
query: `double_exponential_smoothing(http_requests[5m], 0.01, 0.1)`,
},
{
name: "double exponential smoothing with mixed histogram data",
load: `load 30s
http_requests_mix{job="api-server", instance="0"} 0+10x1000 100+30x1000 {{schema:0 count:1 sum:2}}x1000
http_requests_mix{job="api-server", instance="1"} 0+20x1000 200+30x1000 {{schema:0 count:1 sum:2}}x1000`,
query: `double_exponential_smoothing(http_requests_mix[5m], 0.01, 0.1)`,
},
{
name: "double exponential smoothing with pure histogram data",
load: `load 30s
http_requests_histogram{job="api-server", instance="1"} {{schema:0 count:1 sum:2}}x1000`,
query: `double_exponential_smoothing(http_requests_histogram[5m], 0.01, 0.1)`,
},
}

for _, tcase := range cases {
t.Run(tcase.name, func(t *testing.T) {
t.Parallel()

storage := promqltest.LoadedStorage(t, tcase.load)
defer storage.Close()

opts := promql.EngineOpts{
Timeout: testTimeout,
MaxSamples: testMaxSamples,
EnableNegativeOffset: true,
EnableAtModifier: true,
}

start := defaultStart
if !tcase.start.IsZero() {
start = tcase.start
}
end := defaultEnd
if !tcase.end.IsZero() {
end = tcase.end
}
step := defaultStep
if tcase.step != 0 {
step = tcase.step
}

ctx := context.Background()
oldEngine := promql.NewEngine(opts)
q1, err := oldEngine.NewRangeQuery(ctx, storage, nil, tcase.query, start, end, step)
testutil.Ok(t, errors.Wrap(err, "create old engine range query"))
oldResult := q1.Exec(ctx)

newEngine := engine.New(engine.Opts{EngineOpts: opts})
q2, err := newEngine.NewRangeQuery(ctx, storage, nil, tcase.query, start, end, step)
testutil.Ok(t, errors.Wrap(err, "create new engine range query"))
newResult := q2.Exec(ctx)

testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q2))
})
}
}
13 changes: 12 additions & 1 deletion execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func newSubqueryFunction(ctx context.Context, e *logicalplan.FunctionCall, t *lo
}

var scalarArg model.VectorOperator
var scalarArg2 model.VectorOperator
switch e.Func.Name {
case "quantile_over_time":
// quantile_over_time(scalar, range-vector)
Expand All @@ -226,9 +227,19 @@ func newSubqueryFunction(ctx context.Context, e *logicalplan.FunctionCall, t *lo
if err != nil {
return nil, err
}
case "double_exponential_smoothing":
// double_exponential_smoothing(range-vector, scalar, scalar)
scalarArg, err = newOperator(ctx, e.Args[1], storage, opts, hints)
if err != nil {
return nil, err
}
scalarArg2, err = newOperator(ctx, e.Args[2], storage, opts, hints)
if err != nil {
return nil, err
}
}

return scan.NewSubqueryOperator(model.NewVectorPool(opts.StepsBatch), inner, scalarArg, &outerOpts, e, t)
return scan.NewSubqueryOperator(model.NewVectorPool(opts.StepsBatch), inner, scalarArg, scalarArg2, &outerOpts, e, t)
}

func newInstantVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
Expand Down
33 changes: 28 additions & 5 deletions execution/scan/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (
type subqueryOperator struct {
telemetry.OperatorTelemetry

next model.VectorOperator
paramOp model.VectorOperator
next model.VectorOperator
paramOp model.VectorOperator
paramOp2 model.VectorOperator

pool *model.VectorPool
call ringbuffer.FunctionCall
Expand All @@ -47,10 +48,13 @@ type subqueryOperator struct {
buffers []*ringbuffer.GenericRingBuffer

// params holds the function parameter for each step.
params []float64
// quantile_over time and predict_linear use one parameter (params)
// double_exponential_smoothing uses two (params, params2) for (sf, tf)
params []float64
params2 []float64
}

func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
func NewSubqueryOperator(pool *model.VectorPool, next, paramOp, paramOp2 model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
call, err := ringbuffer.NewRangeVectorFunc(funcExpr.Func.Name)
if err != nil {
return nil, err
Expand All @@ -63,6 +67,7 @@ func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOpera
o := &subqueryOperator{
next: next,
paramOp: paramOp,
paramOp2: paramOp2,
call: call,
pool: pool,
funcExpr: funcExpr,
Expand All @@ -75,6 +80,7 @@ func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOpera
stepsBatch: opts.StepsBatch,
lastCollected: -1,
params: make([]float64, opts.StepsBatch),
params2: make([]float64, opts.StepsBatch),
}
o.OperatorTelemetry = telemetry.NewSubqueryTelemetry(o, opts)

Expand All @@ -89,6 +95,8 @@ func (o *subqueryOperator) Explain() (next []model.VectorOperator) {
switch o.funcExpr.Func.Name {
case "quantile_over_time", "predict_linear":
return []model.VectorOperator{o.paramOp, o.next}
case "double_exponential_smoothing":
return []model.VectorOperator{o.paramOp, o.paramOp2, o.next}
default:
return []model.VectorOperator{o.next}
}
Expand Down Expand Up @@ -127,6 +135,21 @@ func (o *subqueryOperator) Next(ctx context.Context) ([]model.StepVector, error)
o.paramOp.GetPool().PutVectors(args)
}

if o.paramOp2 != nil { // double_exponential_smoothing
args, err := o.paramOp2.Next(ctx)
if err != nil {
return nil, err
}
for i := range args {
o.params2[i] = math.NaN()
if len(args[i].Samples) == 1 {
o.params2[i] = args[i].Samples[0]
}
o.paramOp2.GetPool().PutStepVector(args[i])
}
o.paramOp2.GetPool().PutVectors(args)
}

res := o.pool.GetVectorBatch()
for i := 0; o.currentStep <= o.maxt && i < o.stepsBatch; i++ {
mint := o.currentStep - o.subQuery.Range.Milliseconds() - o.subQuery.OriginalOffset.Milliseconds() + 1
Expand Down Expand Up @@ -171,7 +194,7 @@ func (o *subqueryOperator) Next(ctx context.Context) ([]model.StepVector, error)

sv := o.pool.GetStepVector(o.currentStep)
for sampleId, rangeSamples := range o.buffers {
f, h, ok, err := rangeSamples.Eval(ctx, o.params[i], nil)
f, h, ok, err := rangeSamples.Eval(ctx, o.params[i], o.params2[i], nil)
if err != nil {
return nil, err
}
Expand Down
90 changes: 88 additions & 2 deletions ringbuffer/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ type FunctionArgs struct {
Offset int64
MetricAppearedTs *int64

// Only holt-winters uses two arguments, we fall back for that.
// quantile_over_time and predict_linear use one, so we only use one here.
ScalarPoint float64
ScalarPoint float64
ScalarPoint2 float64 // only for double_exponential_smoothing (trend factor)
}

type FunctionCall func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error)
Expand Down Expand Up @@ -299,6 +299,33 @@ var rangeVectorFuncs = map[string]FunctionCall{
v := predictLinear(f.Samples, f.ScalarPoint, f.StepTime)
return &v, nil, true, nil
},
"double_exponential_smoothing": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) {
if len(f.Samples) < 2 {
if len(f.Samples) == 1 && f.Samples[0].V.H != nil {
warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, f.ctx)
return nil, nil, false, nil
}
return nil, nil, false, nil
}

// Annotate mix of float and histogram.
for _, s := range f.Samples {
if s.V.H != nil {
warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, f.ctx)
return nil, nil, false, nil
}
}

sf := f.ScalarPoint // smoothing factor or alpha
tf := f.ScalarPoint2 // trend factor argument or beta

v, ok := doubleExponentialSmoothing(f.Samples, sf, tf)
if !ok {
return nil, nil, false, nil
}

return &v, nil, true, nil
},
}

func NewRangeVectorFunc(name string) (FunctionCall, error) {
Expand Down Expand Up @@ -735,6 +762,65 @@ func predictLinear(points []Sample, duration float64, stepTime int64) float64 {
return slope*duration + intercept
}

// Based on https://github.com/prometheus/prometheus/blob/8baad1a73e471bd3cf3175a1608199e27484f179/promql/functions.go#L438
// doubleExponentialSmoothing calculates the smoothed out value for the given series.
// It is similar to a weighted moving average, where historical data has exponentially less influence on the current data.
// It also accounts for trends in data. The smoothing factor (0 < sf < 1), aka "alpha", affects how historical data will affect the current data.
// A lower smoothing factor increases the influence of historical data.
// The trend factor (0 < tf < 1), aka "beta", affects how trends in historical data will affect the current data.
// A higher trend factor increases the influence of trends.
// Algorithm taken from https://en.wikipedia.org/wiki/Exponential_smoothing
func doubleExponentialSmoothing(points []Sample, sf, tf float64) (float64, bool) {
// Check that the input parameters are valid
if sf <= 0 || sf >= 1 || tf <= 0 || tf >= 1 {
return 0, false
}

// Can't do the smoothing operation with less than two points
if len(points) < 2 {
return 0, false
}

// Check for histograms in the samples
for _, s := range points {
if s.V.H != nil {
return 0, false
}
}

var s0, s1, b float64
// Set initial values
s1 = points[0].V.F
b = points[1].V.F - points[0].V.F

// Run the smoothing operation
for i := 1; i < len(points); i++ {
// Scale the raw value against the smoothing factor
x := sf * points[i].V.F
// Scale the last smoothed value with the trend at this point
b = calcTrendValue(i-1, tf, s0, s1, b)
y := (1 - sf) * (s1 + b)
s0, s1 = s1, x+y
}

return s1, true
}

// calcTrendValue calculates the trend value at the given index i.
// This is somewhat analogous to the slope of the trend at the given index.
// The argument "tf" is the trend factor.
// The argument "s0" is the previous smoothed value.
// The argument "s1" is the current smoothed value.
// The argument "b" is the previous trend value.
func calcTrendValue(i int, tf, s0, s1, b float64) float64 {
if i == 0 {
return b
}
x := tf * (s1 - s0)
y := (1 - tf) * b
return x + y
}

func resets(points []Sample) float64 {
var histogramPoints []Sample
var floatPoints []Sample
Expand Down
3 changes: 2 additions & 1 deletion ringbuffer/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ func (r *GenericRingBuffer) Reset(mint int64, evalt int64) {
r.items = r.items[:keep]
}

func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, metricAppearedTs *int64) (*float64, *histogram.FloatHistogram, bool, error) {
func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, scalarArg2 float64, metricAppearedTs *int64) (*float64, *histogram.FloatHistogram, bool, error) {
return r.call(FunctionArgs{
ctx: ctx,
Samples: r.items,
StepTime: r.currentStep,
SelectRange: r.selectRange,
Offset: r.offset,
ScalarPoint: scalarArg,
ScalarPoint2: scalarArg2, // only for double_exponential_smoothing
MetricAppearedTs: metricAppearedTs,
})
}
Expand Down
Loading
Loading