Skip to content

Commit 31bfaeb

Browse files
engine: make logical optimizers configurable at querytime (#520)
* engine: make logical optimizers configurable at querytime, remove distributed engine Signed-off-by: Michael Hoffmann <[email protected]> * engine: add distributed engine back Signed-off-by: Michael Hoffmann <[email protected]> --------- Signed-off-by: Michael Hoffmann <[email protected]> Co-authored-by: Michael Hoffmann <[email protected]>
1 parent 4230034 commit 31bfaeb

File tree

8 files changed

+106
-153
lines changed

8 files changed

+106
-153
lines changed

engine/distributed.go

+35-28
Original file line numberDiff line numberDiff line change
@@ -50,64 +50,71 @@ func (l remoteEngine) NewRangeQuery(ctx context.Context, opts promql.QueryOpts,
5050
}
5151

5252
type DistributedEngine struct {
53-
endpoints api.RemoteEndpoints
54-
remoteEngine *Engine
53+
engine *Engine
5554
}
5655

57-
func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) *DistributedEngine {
58-
opts.LogicalOptimizers = []logicalplan.Optimizer{
59-
logicalplan.PassthroughOptimizer{Endpoints: endpoints},
60-
logicalplan.DistributedExecutionOptimizer{Endpoints: endpoints},
61-
}
62-
56+
func NewDistributedEngine(opts Opts) *DistributedEngine {
6357
return &DistributedEngine{
64-
endpoints: endpoints,
65-
remoteEngine: New(opts),
58+
engine: New(opts),
6659
}
6760
}
6861

69-
func (l DistributedEngine) SetQueryLogger(log promql.QueryLogger) {}
70-
71-
func (l DistributedEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
72-
return l.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts)
73-
}
74-
75-
func (l DistributedEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
76-
return l.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, interval)
77-
}
78-
79-
func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
62+
func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
8063
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
8164
// Some clients might only support second precision when executing queries.
8265
ts = ts.Truncate(time.Second)
8366

84-
return l.remoteEngine.MakeInstantQueryFromPlan(ctx, q, opts, plan, ts)
67+
qOpts := fromPromQLOpts(opts)
68+
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
69+
logicalplan.PassthroughOptimizer{Endpoints: e},
70+
logicalplan.DistributedExecutionOptimizer{Endpoints: e},
71+
}
72+
73+
return l.engine.MakeInstantQueryFromPlan(ctx, q, qOpts, plan, ts)
8574
}
8675

87-
func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) {
76+
func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) {
8877
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
8978
// Some clients might only support second precision when executing queries.
9079
start = start.Truncate(time.Second)
9180
end = end.Truncate(time.Second)
9281
interval = interval.Truncate(time.Second)
9382

94-
return l.remoteEngine.MakeRangeQueryFromPlan(ctx, q, opts, plan, start, end, interval)
83+
qOpts := fromPromQLOpts(opts)
84+
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
85+
logicalplan.PassthroughOptimizer{Endpoints: e},
86+
logicalplan.DistributedExecutionOptimizer{Endpoints: e},
87+
}
88+
89+
return l.engine.MakeRangeQueryFromPlan(ctx, q, qOpts, plan, start, end, interval)
9590
}
9691

97-
func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
92+
func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
9893
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
9994
// Some clients might only support second precision when executing queries.
10095
ts = ts.Truncate(time.Second)
10196

102-
return l.remoteEngine.MakeInstantQuery(ctx, q, opts, qs, ts)
97+
qOpts := fromPromQLOpts(opts)
98+
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
99+
logicalplan.PassthroughOptimizer{Endpoints: e},
100+
logicalplan.DistributedExecutionOptimizer{Endpoints: e},
101+
}
102+
103+
return l.engine.MakeInstantQuery(ctx, q, qOpts, qs, ts)
103104
}
104105

105-
func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
106+
func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
106107
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
107108
// Some clients might only support second precision when executing queries.
108109
start = start.Truncate(time.Second)
109110
end = end.Truncate(time.Second)
110111
interval = interval.Truncate(time.Second)
111112

112-
return l.remoteEngine.MakeRangeQuery(ctx, q, opts, qs, start, end, interval)
113+
qOpts := fromPromQLOpts(opts)
114+
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
115+
logicalplan.PassthroughOptimizer{Endpoints: e},
116+
logicalplan.DistributedExecutionOptimizer{Endpoints: e},
117+
}
118+
119+
return l.engine.MakeRangeQuery(ctx, q, qOpts, qs, start, end, interval)
113120
}

engine/distributed_test.go

+16-63
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func TestDistributedAggregations(t *testing.T) {
273273
completeSeriesSet := storageWithSeries(mergeWithSampleDedup(allSeries)...)
274274
t.Run(test.name, func(t *testing.T) {
275275
for _, lookbackDelta := range lookbackDeltas {
276-
localOpts := engine.Opts{
276+
opts := engine.Opts{
277277
EngineOpts: promql.EngineOpts{
278278
Timeout: 1 * time.Hour,
279279
MaxSamples: 1e10,
@@ -285,7 +285,7 @@ func TestDistributedAggregations(t *testing.T) {
285285

286286
for _, s := range test.seriesSets {
287287
remoteEngines = append(remoteEngines, engine.NewRemoteEngine(
288-
localOpts,
288+
opts,
289289
storageWithMockSeries(s.series...),
290290
s.mint(),
291291
s.maxt(),
@@ -294,25 +294,24 @@ func TestDistributedAggregations(t *testing.T) {
294294
}
295295
if len(test.timeOverlap.series) > 0 {
296296
remoteEngines = append(remoteEngines, engine.NewRemoteEngine(
297-
localOpts,
297+
opts,
298298
storageWithMockSeries(test.timeOverlap.series...),
299299
test.timeOverlap.mint(),
300300
test.timeOverlap.maxt(),
301301
test.timeOverlap.extLset,
302302
))
303303
}
304-
304+
endpoints := api.NewStaticEndpoints(remoteEngines)
305305
for _, queryOpts := range allQueryOpts {
306306
ctx := context.Background()
307-
distOpts := localOpts
308307
for _, instantTS := range instantTSs {
309308
t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) {
310-
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines))
311-
distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
309+
distEngine := engine.NewDistributedEngine(opts)
310+
distQry, err := distEngine.MakeInstantQuery(ctx, completeSeriesSet, endpoints, queryOpts, query.query, instantTS)
312311
testutil.Ok(t, err)
313312

314313
distResult := distQry.Exec(ctx)
315-
promEngine := promql.NewEngine(localOpts.EngineOpts)
314+
promEngine := promql.NewEngine(opts.EngineOpts)
316315
promQry, err := promEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
317316
testutil.Ok(t, err)
318317
promResult := promQry.Exec(ctx)
@@ -328,12 +327,12 @@ func TestDistributedAggregations(t *testing.T) {
328327
if test.rangeEnd == (time.Time{}) {
329328
test.rangeEnd = rangeEnd
330329
}
331-
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines))
332-
distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, query.rangeStart, test.rangeEnd, rangeStep)
330+
distEngine := engine.NewDistributedEngine(opts)
331+
distQry, err := distEngine.MakeRangeQuery(ctx, completeSeriesSet, endpoints, queryOpts, query.query, query.rangeStart, test.rangeEnd, rangeStep)
333332
testutil.Ok(t, err)
334333

335334
distResult := distQry.Exec(ctx)
336-
promEngine := promql.NewEngine(localOpts.EngineOpts)
335+
promEngine := promql.NewEngine(opts.EngineOpts)
337336
promQry, err := promEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, query.rangeStart, test.rangeEnd, rangeStep)
338337
testutil.Ok(t, err)
339338
promResult := promQry.Exec(ctx)
@@ -350,73 +349,27 @@ func TestDistributedAggregations(t *testing.T) {
350349

351350
func TestDistributedEngineWarnings(t *testing.T) {
352351
t.Parallel()
353-
querier := &storage.MockQueryable{
354-
MockQuerier: &storage.MockQuerier{
355-
SelectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
356-
return newWarningsSeriesSet(annotations.New().Add(errors.New("test warning")))
357-
},
358-
},
359-
}
360352

361353
opts := engine.Opts{
362354
EngineOpts: promql.EngineOpts{
363355
MaxSamples: math.MaxInt64,
364356
Timeout: 1 * time.Minute,
365357
},
366358
}
367-
remote := engine.NewRemoteEngine(opts, querier, math.MinInt64, math.MaxInt64, nil)
368-
ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remote}))
369-
var (
370-
start = time.UnixMilli(0)
371-
end = time.UnixMilli(600)
372-
step = 30 * time.Second
373-
)
374-
q, err := ng.NewRangeQuery(context.Background(), querier, nil, "test", start, end, step)
375-
testutil.Ok(t, err)
376359

377-
res := q.Exec(context.Background())
378-
testutil.Equals(t, 1, len(res.Warnings))
379-
}
380-
381-
func TestDistributedEnginePartialResponses(t *testing.T) {
382-
t.Parallel()
383-
384-
querierErr := &storage.MockQueryable{
360+
querier := &storage.MockQueryable{
385361
MockQuerier: &storage.MockQuerier{
386362
SelectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
387-
return newErrorSeriesSet(errors.New("test error"))
363+
return newWarningsSeriesSet(annotations.New().Add(errors.New("test warning")))
388364
},
389365
},
390366
}
391-
querierOk := storageWithMockSeries(newMockSeries([]string{labels.MetricName, "foo", "zone", "west"}, []int64{0, 30, 60, 90}, []float64{0, 3, 4, 5}))
392-
querierNoop := &storage.MockQueryable{MockQuerier: storage.NoopQuerier()}
393-
394-
opts := engine.Opts{
395-
EnablePartialResponses: true,
396-
EngineOpts: promql.EngineOpts{
397-
MaxSamples: math.MaxInt64,
398-
Timeout: 1 * time.Minute,
399-
},
400-
}
401-
402-
remoteErr := engine.NewRemoteEngine(opts, querierErr, math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("zone", "east")})
403-
remoteOk := engine.NewRemoteEngine(opts, querierOk, math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("zone", "west")})
404-
ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remoteErr, remoteOk}))
405-
var (
406-
start = time.UnixMilli(0)
407-
end = time.UnixMilli(600 * 1000)
408-
step = 30 * time.Second
409-
)
410-
q, err := ng.NewRangeQuery(context.Background(), querierNoop, nil, "sum by (zone) (foo)", start, end, step)
367+
remote := engine.NewRemoteEngine(opts, querier, math.MinInt64, math.MaxInt64, nil)
368+
endpoints := api.NewStaticEndpoints([]api.RemoteEngine{remote})
369+
ng := engine.NewDistributedEngine(opts)
370+
q, err := ng.MakeInstantQuery(context.Background(), querier, endpoints, nil, "test", time.UnixMilli(0))
411371
testutil.Ok(t, err)
412372

413373
res := q.Exec(context.Background())
414-
testutil.Ok(t, res.Err)
415374
testutil.Equals(t, 1, len(res.Warnings))
416-
testutil.Equals(t, `remote exec error [[{zone="east"}]]: test error`, res.Warnings.AsErrors()[0].Error())
417-
418-
m, err := res.Matrix()
419-
testutil.Ok(t, err)
420-
testutil.Equals(t, 1, m.Len())
421-
testutil.Equals(t, labels.FromStrings("zone", "west"), m[0].Metric)
422375
}

0 commit comments

Comments
 (0)