Skip to content

Commit 4230034

Browse files
engine: remove implicit fallback (#518)
Signed-off-by: Michael Hoffmann <[email protected]> Co-authored-by: Michael Hoffmann <[email protected]>
1 parent fab1185 commit 4230034

7 files changed

+108
-430
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ The project is currently under active development.
66

77
## Roadmap
88

9-
The engine intends to have full compatibility with the original engine used in Prometheus. Since implementing the full specification will take time, we aim to add support for most commonly used expressions while falling back to the original engine for operations that are not yet supported. This will allow us to have smaller and faster releases, and gather feedback on a regular basis. Instructions on using the engine will be added after we have enough confidence in its correctness.
9+
The engine intends to have full compatibility with the original engine used in Prometheus. Since implementing the full specification will take time, we aim to add support for most commonly used expressions. Instructions on using the engine will be added after we have enough confidence in its correctness. If the engine encounters an expression it does not support it will return an error that can be tested with `engine.IsUnimplemented(err)`, the calling code is expected to handle this fallback.
1010

1111
The following table shows operations which are currently supported by the engine
1212

engine/bench_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ func BenchmarkSingleQuery(b *testing.B) {
102102
query := "sum(rate(http_requests_total[2m]))"
103103
opts := engine.Opts{
104104
EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second},
105-
DisableFallback: true,
106105
SelectorBatchSize: 256,
107106
}
108107
b.ReportAllocs()

engine/distributed_test.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,9 @@ func TestDistributedAggregations(t *testing.T) {
193193
}
194194

195195
queries := []struct {
196-
name string
197-
query string
198-
rangeStart time.Time
199-
expectFallback bool
196+
name string
197+
query string
198+
rangeStart time.Time
200199
}{
201200
{name: "binop with selector and constant series", query: `bar or on () vector(0)`},
202201
{name: "binop with aggregation and constant series", query: `sum(bar) or on () vector(0)`},
@@ -226,7 +225,7 @@ func TestDistributedAggregations(t *testing.T) {
226225
{name: "binary nested with constants", query: `(1 + 2) + (1 atan2 (-1 % -1))`},
227226
{name: "binary nested with functions", query: `(1 + exp(vector(1))) + (1 atan2 (-1 % -1))`},
228227
{name: "filtered selector interaction", query: `sum by (region) (bar{region="east"}) / sum by (region) (bar)`},
229-
{name: "unsupported aggregation", query: `count_values("pod", bar)`, expectFallback: true},
228+
{name: "unsupported aggregation", query: `count_values("pod", bar)`},
230229
{name: "absent_over_time for non-existing metric", query: `absent_over_time(foo[2m])`},
231230
{name: "absent_over_time for existing metric", query: `absent_over_time(bar{pod="nginx-1"}[2m])`},
232231
{name: "absent for non-existing metric", query: `absent(foo)`},
@@ -249,7 +248,7 @@ func TestDistributedAggregations(t *testing.T) {
249248
{name: "query with @start() absolute timestamp", query: `sum(bar @ start())`},
250249
{name: "query with @end() timestamp", query: `sum(bar @ end())`},
251250
{name: "query with numeric timestamp", query: `sum(bar @ 140.000)`},
252-
{name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`, expectFallback: true},
251+
{name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`},
253252
{name: `subquery with @end() timestamp`, query: `bar @ 100.000 - bar @ 150.000`},
254253
}
255254

@@ -306,7 +305,6 @@ func TestDistributedAggregations(t *testing.T) {
306305
for _, queryOpts := range allQueryOpts {
307306
ctx := context.Background()
308307
distOpts := localOpts
309-
distOpts.DisableFallback = !query.expectFallback
310308
for _, instantTS := range instantTSs {
311309
t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) {
312310
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines))

engine/engine.go

+13-51
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
"sort"
1212
"time"
1313

14-
"github.com/thanos-io/promql-engine/execution/telemetry"
15-
1614
"github.com/efficientgo/core/errors"
1715
"github.com/prometheus/client_golang/prometheus"
1816
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -27,6 +25,7 @@ import (
2725
"github.com/thanos-io/promql-engine/execution/function"
2826
"github.com/thanos-io/promql-engine/execution/model"
2927
"github.com/thanos-io/promql-engine/execution/parse"
28+
"github.com/thanos-io/promql-engine/execution/telemetry"
3029
"github.com/thanos-io/promql-engine/execution/warnings"
3130
"github.com/thanos-io/promql-engine/extlabels"
3231
"github.com/thanos-io/promql-engine/logicalplan"
@@ -39,7 +38,7 @@ type QueryType int
3938

4039
type engineMetrics struct {
4140
currentQueries prometheus.Gauge
42-
queries *prometheus.CounterVec
41+
totalQueries prometheus.Counter
4342
}
4443

4544
const (
@@ -50,16 +49,16 @@ const (
5049
stepsBatch = 10
5150
)
5251

52+
func IsUnimplemented(err error) bool {
53+
return errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented)
54+
}
55+
5356
type Opts struct {
5457
promql.EngineOpts
5558

5659
// LogicalOptimizers are optimizers that are run if the value is not nil. If it is nil then the default optimizers are run. Default optimizer list is available in the logicalplan package.
5760
LogicalOptimizers []logicalplan.Optimizer
5861

59-
// DisableFallback enables mode where engine returns error if some expression of feature is not yet implemented
60-
// in the new engine, instead of falling back to prometheus engine.
61-
DisableFallback bool
62-
6362
// ExtLookbackDelta specifies what time range to use to determine valid previous sample for extended range functions.
6463
// Defaults to 1 hour if not specified.
6564
ExtLookbackDelta time.Duration
@@ -71,9 +70,6 @@ type Opts struct {
7170
// This will default to false.
7271
EnableXFunctions bool
7372

74-
// FallbackEngine
75-
Engine promql.QueryEngine
76-
7773
// EnableAnalysis enables query analysis.
7874
EnableAnalysis bool
7975

@@ -177,23 +173,16 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
177173
Help: "The current number of queries being executed or waiting.",
178174
},
179175
),
180-
queries: promauto.With(opts.Reg).NewCounterVec(
176+
totalQueries: promauto.With(opts.Reg).NewCounter(
181177
prometheus.CounterOpts{
182178
Namespace: namespace,
183179
Subsystem: subsystem,
184180
Name: "queries_total",
185181
Help: "Number of PromQL queries.",
186-
}, []string{"fallback"},
182+
},
187183
),
188184
}
189185

190-
var engine promql.QueryEngine
191-
if opts.Engine == nil {
192-
engine = promql.NewEngine(opts.EngineOpts)
193-
} else {
194-
engine = opts.Engine
195-
}
196-
197186
decodingConcurrency := opts.DecodingConcurrency
198187
if opts.DecodingConcurrency < 1 {
199188
decodingConcurrency = runtime.GOMAXPROCS(0) / 2
@@ -208,13 +197,11 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
208197
}
209198

210199
return &Engine{
211-
prom: engine,
212200
functions: functions,
213201
scanners: scanners,
214202
activeQueryTracker: queryTracker,
215203

216204
disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks,
217-
disableFallback: opts.DisableFallback,
218205

219206
logger: opts.Logger,
220207
lookbackDelta: opts.LookbackDelta,
@@ -240,13 +227,11 @@ var (
240227
)
241228

242229
type Engine struct {
243-
prom promql.QueryEngine
244230
functions map[string]*parser.Function
245231
scanners engstorage.Scanners
246232
activeQueryTracker promql.QueryTracker
247233

248234
disableDuplicateLabelChecks bool
249-
disableFallback bool
250235

251236
logger *slog.Logger
252237
lookbackDelta time.Duration
@@ -290,14 +275,10 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
290275
ctx = warnings.NewContext(ctx)
291276
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
292277
exec, err := execution.New(ctx, lplan.Root(), scanners, qOpts)
293-
if e.triggerFallback(err) {
294-
e.metrics.queries.WithLabelValues("true").Inc()
295-
return e.prom.NewInstantQuery(ctx, q, opts, qs, ts)
296-
}
297-
e.metrics.queries.WithLabelValues("false").Inc()
298278
if err != nil {
299279
return nil, err
300280
}
281+
e.metrics.totalQueries.Inc()
301282
return &compatibilityQuery{
302283
Query: &Query{exec: exec, opts: opts},
303284
engine: e,
@@ -338,14 +319,10 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
338319
}
339320

340321
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
341-
if e.triggerFallback(err) {
342-
e.metrics.queries.WithLabelValues("true").Inc()
343-
return e.prom.NewInstantQuery(ctx, q, opts, root.String(), ts)
344-
}
345-
e.metrics.queries.WithLabelValues("false").Inc()
346322
if err != nil {
347323
return nil, err
348324
}
325+
e.metrics.totalQueries.Inc()
349326

350327
return &compatibilityQuery{
351328
Query: &Query{exec: exec, opts: opts},
@@ -396,14 +373,10 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
396373
}
397374

398375
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
399-
if e.triggerFallback(err) {
400-
e.metrics.queries.WithLabelValues("true").Inc()
401-
return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step)
402-
}
403-
e.metrics.queries.WithLabelValues("false").Inc()
404376
if err != nil {
405377
return nil, err
406378
}
379+
e.metrics.totalQueries.Inc()
407380

408381
return &compatibilityQuery{
409382
Query: &Query{exec: exec, opts: opts},
@@ -442,14 +415,11 @@ func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable
442415
ctx = warnings.NewContext(ctx)
443416
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
444417
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
445-
if e.triggerFallback(err) {
446-
e.metrics.queries.WithLabelValues("true").Inc()
447-
return e.prom.NewRangeQuery(ctx, q, opts, lplan.Root().String(), start, end, step)
448-
}
449-
e.metrics.queries.WithLabelValues("false").Inc()
450418
if err != nil {
451419
return nil, err
452420
}
421+
e.metrics.totalQueries.Inc()
422+
453423
return &compatibilityQuery{
454424
Query: &Query{exec: exec, opts: opts},
455425
engine: e,
@@ -516,14 +486,6 @@ func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Optio
516486
return e.scanners, nil
517487
}
518488

519-
func (e *Engine) triggerFallback(err error) bool {
520-
if e.disableFallback {
521-
return false
522-
}
523-
524-
return errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented)
525-
}
526-
527489
type Query struct {
528490
exec model.VectorOperator
529491
opts promql.QueryOpts

0 commit comments

Comments
 (0)