Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 4422ae1

Browse files
committedJan 15, 2025··
engine,execution: make remote endpoints injectable
Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
1 parent 1630e99 commit 4422ae1

File tree

6 files changed

+163
-368
lines changed

6 files changed

+163
-368
lines changed
 

‎engine/distributed.go

-64
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"time"
99

1010
"github.com/thanos-io/promql-engine/api"
11-
"github.com/thanos-io/promql-engine/logicalplan"
1211

1312
"github.com/prometheus/prometheus/model/labels"
1413
"github.com/prometheus/prometheus/promql"
@@ -48,66 +47,3 @@ func (l remoteEngine) LabelSets() []labels.Labels {
4847
func (l remoteEngine) NewRangeQuery(ctx context.Context, opts promql.QueryOpts, plan api.RemoteQuery, start, end time.Time, interval time.Duration) (promql.Query, error) {
4948
return l.engine.NewRangeQuery(ctx, l.q, opts, plan.String(), start, end, interval)
5049
}
51-
52-
type DistributedEngine struct {
53-
endpoints api.RemoteEndpoints
54-
remoteEngine *Engine
55-
}
56-
57-
func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) *DistributedEngine {
58-
opts.LogicalOptimizers = []logicalplan.Optimizer{
59-
logicalplan.PassthroughOptimizer{Endpoints: endpoints},
60-
logicalplan.DistributedExecutionOptimizer{Endpoints: endpoints},
61-
}
62-
63-
return &DistributedEngine{
64-
endpoints: endpoints,
65-
remoteEngine: New(opts),
66-
}
67-
}
68-
69-
func (l DistributedEngine) SetQueryLogger(log promql.QueryLogger) {}
70-
71-
func (l DistributedEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
72-
return l.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts)
73-
}
74-
75-
func (l DistributedEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
76-
return l.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, interval)
77-
}
78-
79-
func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
80-
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
81-
// Some clients might only support second precision when executing queries.
82-
ts = ts.Truncate(time.Second)
83-
84-
return l.remoteEngine.MakeInstantQueryFromPlan(ctx, q, opts, plan, ts)
85-
}
86-
87-
func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) {
88-
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
89-
// Some clients might only support second precision when executing queries.
90-
start = start.Truncate(time.Second)
91-
end = end.Truncate(time.Second)
92-
interval = interval.Truncate(time.Second)
93-
94-
return l.remoteEngine.MakeRangeQueryFromPlan(ctx, q, opts, plan, start, end, interval)
95-
}
96-
97-
func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
98-
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
99-
// Some clients might only support second precision when executing queries.
100-
ts = ts.Truncate(time.Second)
101-
102-
return l.remoteEngine.MakeInstantQuery(ctx, q, opts, qs, ts)
103-
}
104-
105-
func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
106-
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
107-
// Some clients might only support second precision when executing queries.
108-
start = start.Truncate(time.Second)
109-
end = end.Truncate(time.Second)
110-
interval = interval.Truncate(time.Second)
111-
112-
return l.remoteEngine.MakeRangeQuery(ctx, q, opts, qs, start, end, interval)
113-
}

‎engine/distributed_test.go

+8-49
Original file line numberDiff line numberDiff line change
@@ -299,14 +299,15 @@ func TestDistributedAggregations(t *testing.T) {
299299
))
300300
}
301301

302+
endpoints := api.NewStaticEndpoints(remoteEngines)
302303
for _, queryOpts := range allQueryOpts {
303304
ctx := context.Background()
304305
distOpts := localOpts
305306
distOpts.DisableFallback = !query.expectFallback
306307
for _, instantTS := range instantTSs {
307308
t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) {
308-
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines))
309-
distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
309+
distEngine := engine.New(distOpts)
310+
distQry, err := distEngine.MakeDistributedInstantQuery(ctx, completeSeriesSet, endpoints, engine.FromPromQLOpts(queryOpts), query.query, instantTS)
310311
testutil.Ok(t, err)
311312

312313
distResult := distQry.Exec(ctx)
@@ -326,8 +327,8 @@ func TestDistributedAggregations(t *testing.T) {
326327
if test.rangeEnd == (time.Time{}) {
327328
test.rangeEnd = rangeEnd
328329
}
329-
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines))
330-
distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, query.rangeStart, test.rangeEnd, rangeStep)
330+
distEngine := engine.New(distOpts)
331+
distQry, err := distEngine.MakeDistributedRangeQuery(ctx, completeSeriesSet, endpoints, engine.FromPromQLOpts(queryOpts), query.query, query.rangeStart, test.rangeEnd, rangeStep)
331332
testutil.Ok(t, err)
332333

333334
distResult := distQry.Exec(ctx)
@@ -363,58 +364,16 @@ func TestDistributedEngineWarnings(t *testing.T) {
363364
},
364365
}
365366
remote := engine.NewRemoteEngine(opts, querier, math.MinInt64, math.MaxInt64, nil)
366-
ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remote}))
367+
endpoints := api.NewStaticEndpoints([]api.RemoteEngine{remote})
368+
ng := engine.New(opts)
367369
var (
368370
start = time.UnixMilli(0)
369371
end = time.UnixMilli(600)
370372
step = 30 * time.Second
371373
)
372-
q, err := ng.NewRangeQuery(context.Background(), querier, nil, "test", start, end, step)
374+
q, err := ng.MakeDistributedRangeQuery(context.Background(), querier, endpoints, nil, "test", start, end, step)
373375
testutil.Ok(t, err)
374376

375377
res := q.Exec(context.Background())
376378
testutil.Equals(t, 1, len(res.Warnings))
377379
}
378-
379-
func TestDistributedEnginePartialResponses(t *testing.T) {
380-
t.Parallel()
381-
382-
querierErr := &storage.MockQueryable{
383-
MockQuerier: &storage.MockQuerier{
384-
SelectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
385-
return newErrorSeriesSet(errors.New("test error"))
386-
},
387-
},
388-
}
389-
querierOk := storageWithMockSeries(newMockSeries([]string{labels.MetricName, "foo", "zone", "west"}, []int64{0, 30, 60, 90}, []float64{0, 3, 4, 5}))
390-
querierNoop := &storage.MockQueryable{MockQuerier: storage.NoopQuerier()}
391-
392-
opts := engine.Opts{
393-
EnablePartialResponses: true,
394-
EngineOpts: promql.EngineOpts{
395-
MaxSamples: math.MaxInt64,
396-
Timeout: 1 * time.Minute,
397-
},
398-
}
399-
400-
remoteErr := engine.NewRemoteEngine(opts, querierErr, math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("zone", "east")})
401-
remoteOk := engine.NewRemoteEngine(opts, querierOk, math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("zone", "west")})
402-
ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remoteErr, remoteOk}))
403-
var (
404-
start = time.UnixMilli(0)
405-
end = time.UnixMilli(600 * 1000)
406-
step = 30 * time.Second
407-
)
408-
q, err := ng.NewRangeQuery(context.Background(), querierNoop, nil, "sum by (zone) (foo)", start, end, step)
409-
testutil.Ok(t, err)
410-
411-
res := q.Exec(context.Background())
412-
testutil.Ok(t, res.Err)
413-
testutil.Equals(t, 1, len(res.Warnings))
414-
testutil.Equals(t, `remote exec error [[{zone="east"}]]: test error`, res.Warnings.AsErrors()[0].Error())
415-
416-
m, err := res.Matrix()
417-
testutil.Ok(t, err)
418-
testutil.Equals(t, 1, m.Len())
419-
testutil.Equals(t, labels.FromStrings("zone", "west"), m[0].Metric)
420-
}

0 commit comments

Comments
 (0)