From 63ff68591bd188ea6093b6157966b51a347e451b Mon Sep 17 00:00:00 2001 From: Aleksandr Krivoshchekov Date: Tue, 6 Jan 2026 07:03:04 +0000 Subject: [PATCH 1/7] Add RemoteEndpointsV2/V3 that requests pruned remote engines Signed-off-by: Aleksandr Krivoshchekov --- api/remote.go | 103 ++++++++++++++++++++++++++++ engine/distributed.go | 24 +++++++ logicalplan/distribute.go | 19 ++++- logicalplan/passthrough.go | 39 ++++++----- logicalplan/plan.go | 20 +++--- storage/prometheus/scanners.go | 2 +- storage/prometheus/scanners_test.go | 2 +- 7 files changed, 177 insertions(+), 32 deletions(-) diff --git a/api/remote.go b/api/remote.go index f1d18c4fc..7c277c07b 100644 --- a/api/remote.go +++ b/api/remote.go @@ -6,6 +6,8 @@ package api import ( "context" "fmt" + "math" + "sync" "time" "github.com/prometheus/prometheus/model/labels" @@ -16,10 +18,46 @@ type RemoteQuery interface { fmt.Stringer } +// Deprecated: RemoteEndpoints will be replaced with +// RemoteEndpointsV2 / RemoteEndpointsV3 in a future breaking change. type RemoteEndpoints interface { Engines() []RemoteEngine } +// RemoteEndpointsV2 describes endpoints that accept pruning hints when +// selecting remote engines. +// +// For example implementations may use the hints to prune the TSDBInfos, but +// also may safely ignore them and return all available remote engines. +// +// NOTE(Aleksandr Krivoshchekov): +// We add a new interface as a temporary backward compatibility. +// RemoteEndpoints will be replaced with it in a future breaking change. +type RemoteEndpointsV2 interface { + EnginesV2(mint, maxt int64) []RemoteEngine +} + +type RemoteEndpointsQuery struct { + MinT int64 + MaxT int64 +} + +// RemoteEndpointsV3 describes endpoints that accept pruning hints when +// selecting remote engines. +// +// For example implementations may use the hints to prune the TSDBInfos, but +// also may safely ignore them and return all available remote engines. +// +// NOTE(Aleksandr Krivoshchekov): +// We add a new interface as a temporary backward compatibility. +// RemoteEndpoints will be replaced with it in a future breaking change. +// +// Unlike RemoteEndpointsV2, this interface can be extended with more hints +// in the future, without making any breaking changes. +type RemoteEndpointsV3 interface { + EnginesV3(query RemoteEndpointsQuery) []RemoteEngine +} + type RemoteEngine interface { MaxT() int64 MinT() int64 @@ -44,6 +82,71 @@ func (m staticEndpoints) Engines() []RemoteEngine { return m.engines } +func (m staticEndpoints) EnginesV2(mint, maxt int64) []RemoteEngine { + return m.engines +} + +func (m staticEndpoints) EnginesV3(query RemoteEndpointsQuery) []RemoteEngine { + return m.engines +} + func NewStaticEndpoints(engines []RemoteEngine) RemoteEndpoints { return &staticEndpoints{engines: engines} } + +type cachedEndpoints struct { + endpoints RemoteEndpoints + + enginesOnce sync.Once + engines []RemoteEngine +} + +func (l *cachedEndpoints) Engines() []RemoteEngine { + return l.EnginesV3(RemoteEndpointsQuery{ + MaxT: math.MinInt64, + MinT: math.MaxInt64, + }) +} + +func (l *cachedEndpoints) EnginesV2(mint, maxt int64) []RemoteEngine { + return l.EnginesV3(RemoteEndpointsQuery{ + MaxT: maxt, + MinT: mint, + }) +} + +func (l *cachedEndpoints) EnginesV3(query RemoteEndpointsQuery) []RemoteEngine { + l.enginesOnce.Do(func() { + l.engines = getEngines(l.endpoints, query) + }) + return l.engines +} + +func getEngines(endpoints RemoteEndpoints, query RemoteEndpointsQuery) []RemoteEngine { + if v3, ok := endpoints.(RemoteEndpointsV3); ok { + return v3.EnginesV3(query) + } + + if v2, ok := endpoints.(RemoteEndpointsV2); ok { + return v2.EnginesV2(query.MinT, query.MaxT) + } + + return endpoints.Engines() +} + +// NewCachedEndpoints returns an endpoints wrapper that +// resolves and caches engines on first access. +// +// All subsequent Engines calls return cached engines, ignoring any query +// parameters. +func NewCachedEndpoints(endpoints RemoteEndpoints) RemoteEndpoints { + if endpoints == nil { + panic("api.NewCachedEndpoints: endpoints is nil") + } + + if le, ok := endpoints.(*cachedEndpoints); ok { + return le + } + + return &cachedEndpoints{endpoints: endpoints} +} diff --git a/engine/distributed.go b/engine/distributed.go index a5f12ab75..c9fa1c691 100644 --- a/engine/distributed.go +++ b/engine/distributed.go @@ -68,6 +68,12 @@ func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q stora // Some clients might only support second precision when executing queries. ts = ts.Truncate(time.Second) + // Cache engines to give optimizers a consistent view of Engines(). + // Some RemoteEndpoints implementations also compute and cache + // MinT() / MaxT() / LabelSets() on the fly, so the cache prevents + // recomputing those fields in each optimizer. + e = api.NewCachedEndpoints(e) + qOpts := fromPromQLOpts(opts) qOpts.LogicalOptimizers = []logicalplan.Optimizer{ logicalplan.PassthroughOptimizer{Endpoints: e}, @@ -84,6 +90,12 @@ func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage end = end.Truncate(time.Second) interval = interval.Truncate(time.Second) + // Cache engines to give optimizers a consistent view of Engines(). + // Some RemoteEndpoints implementations also compute and cache + // MinT() / MaxT() / LabelSets() on the fly, so the cache prevents + // recomputing those fields in each optimizer. + e = api.NewCachedEndpoints(e) + qOpts := fromPromQLOpts(opts) qOpts.LogicalOptimizers = []logicalplan.Optimizer{ logicalplan.PassthroughOptimizer{Endpoints: e}, @@ -98,6 +110,12 @@ func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Query // Some clients might only support second precision when executing queries. ts = ts.Truncate(time.Second) + // Cache engines to give optimizers a consistent view of Engines(). + // Some RemoteEndpoints implementations also compute and cache + // MinT() / MaxT() / LabelSets() on the fly, so the cache prevents + // recomputing those fields in each optimizer. + e = api.NewCachedEndpoints(e) + qOpts := fromPromQLOpts(opts) qOpts.LogicalOptimizers = []logicalplan.Optimizer{ logicalplan.PassthroughOptimizer{Endpoints: e}, @@ -114,6 +132,12 @@ func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryab end = end.Truncate(time.Second) interval = interval.Truncate(time.Second) + // Cache engines to give optimizers a consistent view of Engines(). + // Some RemoteEndpoints implementations also compute and cache + // MinT() / MaxT() / LabelSets() on the fly, so the cache prevents + // recomputing those fields in each optimizer. + e = api.NewCachedEndpoints(e) + qOpts := fromPromQLOpts(opts) qOpts.LogicalOptimizers = []logicalplan.Optimizer{ logicalplan.PassthroughOptimizer{Endpoints: e}, diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index 446ce5d9a..655cd2c10 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -157,7 +157,7 @@ type DistributedExecutionOptimizer struct { } func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) { - engines := m.Endpoints.Engines() + engines := getRemoteEngines(m.Endpoints, plan, opts) sort.Slice(engines, func(i, j int) bool { return engines[i].MinT() < engines[j].MinT() }) @@ -858,3 +858,20 @@ func maxDuration(a, b time.Duration) time.Duration { } return b } + +func getRemoteEngines(endpoints api.RemoteEndpoints, plan Node, opts *query.Options) []api.RemoteEngine { + if v3, ok := endpoints.(api.RemoteEndpointsV3); ok { + mint, maxt := MinMaxTime(plan, opts) + return v3.EnginesV3(api.RemoteEndpointsQuery{ + MinT: mint, + MaxT: maxt, + }) + } + + if v2, ok := endpoints.(api.RemoteEndpointsV2); ok { + mint, maxt := MinMaxTime(plan, opts) + return v2.EnginesV2(mint, maxt) + } + + return endpoints.Engines() +} diff --git a/logicalplan/passthrough.go b/logicalplan/passthrough.go index c256e80ca..ec938addf 100644 --- a/logicalplan/passthrough.go +++ b/logicalplan/passthrough.go @@ -43,40 +43,45 @@ func matchingEngineTime(e api.RemoteEngine, opts *query.Options) bool { } func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) { - engines := m.Endpoints.Engines() - if len(engines) == 1 { - if !matchingEngineTime(engines[0], opts) { - return plan, nil - } - return RemoteExecution{ - Engine: engines[0], - Query: plan.Clone(), - QueryRangeStart: opts.Start, - QueryRangeEnd: opts.End, - }, nil - } - + engines := getRemoteEngines(m.Endpoints, plan, opts) if len(engines) == 0 { return plan, nil } - matchingLabelsEngines := make([]api.RemoteEngine, 0, len(engines)) + var ( + hasSelector bool + matchingEngines int + firstMatchingEngine api.RemoteEngine + ) TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) { if vs, ok := (*current).(*VectorSelector); ok { + hasSelector = true + for _, e := range engines { if !labelSetsMatch(vs.LabelMatchers, e.LabelSets()...) { continue } - matchingLabelsEngines = append(matchingLabelsEngines, e) + matchingEngines++ + if matchingEngines > 1 { + return true + } + + firstMatchingEngine = e } } return false }) - if len(matchingLabelsEngines) == 1 && matchingEngineTime(matchingLabelsEngines[0], opts) { + // Fallback to all engines. + if !hasSelector && matchingEngines == 0 { + matchingEngines = len(engines) + firstMatchingEngine = engines[0] + } + + if matchingEngines == 1 && matchingEngineTime(firstMatchingEngine, opts) { return RemoteExecution{ - Engine: matchingLabelsEngines[0], + Engine: firstMatchingEngine, Query: plan.Clone(), QueryRangeStart: opts.Start, QueryRangeEnd: opts.End, diff --git a/logicalplan/plan.go b/logicalplan/plan.go index c9cf3f8f5..3f6753301 100644 --- a/logicalplan/plan.go +++ b/logicalplan/plan.go @@ -30,7 +30,6 @@ var DefaultOptimizers = []Optimizer{ type Plan interface { Optimize([]Optimizer) (Plan, annotations.Annotations) Root() Node - MinMaxTime(*query.Options) (int64, int64) } type Optimizer interface { @@ -152,15 +151,19 @@ func extractFuncFromPath(p []*Node) string { return extractFuncFromPath(p[:len(p)-1]) } -func (p *plan) MinMaxTime(qOpts *query.Options) (int64, int64) { +func (p *plan) Root() Node { + return p.expr +} + +// MinMaxTime returns the min and max timestamp that any selector in the query +// can read. +func MinMaxTime(root Node, qOpts *query.Options) (int64, int64) { var minTimestamp, maxTimestamp int64 = math.MaxInt64, math.MinInt64 // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. // The evaluation of the VectorSelector inside then evaluates the given range and unsets // the variable. var evalRange time.Duration - root := p.Root() - TraverseWithParents(nil, &root, func(parents []*Node, node *Node) { switch n := (*node).(type) { case *VectorSelector: @@ -205,10 +208,6 @@ func (p *plan) Optimize(optimizers []Optimizer) (Plan, annotations.Annotations) return &plan{expr: expr, opts: p.opts}, *annos } -func (p *plan) Root() Node { - return p.expr -} - func Traverse(expr *Node, transform func(*Node)) { children := (*expr).Children() transform(expr) @@ -230,10 +229,7 @@ func TraverseBottomUp(parent *Node, current *Node, transform func(parent *Node, for _, c := range (*current).Children() { stop = TraverseBottomUp(current, c, transform) || stop } - if stop { - return stop - } - return transform(parent, current) + return stop || transform(parent, current) } func replacePrometheusNodes(plan parser.Expr) Node { diff --git a/storage/prometheus/scanners.go b/storage/prometheus/scanners.go index dc7888f57..24831c322 100644 --- a/storage/prometheus/scanners.go +++ b/storage/prometheus/scanners.go @@ -35,7 +35,7 @@ func (s *Scanners) Close() error { func NewPrometheusScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (*Scanners, error) { var min, max int64 if lplan != nil { - min, max = lplan.MinMaxTime(qOpts) + min, max = logicalplan.MinMaxTime(lplan.Root(), qOpts) } else { min, max = qOpts.Start.UnixMilli(), qOpts.End.UnixMilli() } diff --git a/storage/prometheus/scanners_test.go b/storage/prometheus/scanners_test.go index f96cd7698..127f762db 100644 --- a/storage/prometheus/scanners_test.go +++ b/storage/prometheus/scanners_test.go @@ -80,7 +80,7 @@ func TestScannersMinMaxTime(t *testing.T) { plan, _ := logicalplan.NewFromAST(p, qOpts, logicalplan.PlanOptions{}) - min, max := plan.MinMaxTime(qOpts) + min, max := logicalplan.MinMaxTime(plan.Root(), qOpts) require.Equal(t, tcase.min, min) require.Equal(t, tcase.max, max) From ba2303e4d6144a7b28ac9c53cff61d7b4c697a01 Mon Sep 17 00:00:00 2001 From: Aleksandr Krivoshchekov Date: Wed, 21 Jan 2026 02:51:55 +0000 Subject: [PATCH 2/7] Remove RemoteEndpointsV3 Signed-off-by: Aleksandr Krivoshchekov --- api/remote.go | 50 ++++----------------------------------- logicalplan/distribute.go | 8 ------- 2 files changed, 5 insertions(+), 53 deletions(-) diff --git a/api/remote.go b/api/remote.go index 7c277c07b..df96183d1 100644 --- a/api/remote.go +++ b/api/remote.go @@ -18,8 +18,6 @@ type RemoteQuery interface { fmt.Stringer } -// Deprecated: RemoteEndpoints will be replaced with -// RemoteEndpointsV2 / RemoteEndpointsV3 in a future breaking change. type RemoteEndpoints interface { Engines() []RemoteEngine } @@ -37,27 +35,6 @@ type RemoteEndpointsV2 interface { EnginesV2(mint, maxt int64) []RemoteEngine } -type RemoteEndpointsQuery struct { - MinT int64 - MaxT int64 -} - -// RemoteEndpointsV3 describes endpoints that accept pruning hints when -// selecting remote engines. -// -// For example implementations may use the hints to prune the TSDBInfos, but -// also may safely ignore them and return all available remote engines. -// -// NOTE(Aleksandr Krivoshchekov): -// We add a new interface as a temporary backward compatibility. -// RemoteEndpoints will be replaced with it in a future breaking change. -// -// Unlike RemoteEndpointsV2, this interface can be extended with more hints -// in the future, without making any breaking changes. -type RemoteEndpointsV3 interface { - EnginesV3(query RemoteEndpointsQuery) []RemoteEngine -} - type RemoteEngine interface { MaxT() int64 MinT() int64 @@ -86,10 +63,6 @@ func (m staticEndpoints) EnginesV2(mint, maxt int64) []RemoteEngine { return m.engines } -func (m staticEndpoints) EnginesV3(query RemoteEndpointsQuery) []RemoteEngine { - return m.engines -} - func NewStaticEndpoints(engines []RemoteEngine) RemoteEndpoints { return &staticEndpoints{engines: engines} } @@ -102,33 +75,20 @@ type cachedEndpoints struct { } func (l *cachedEndpoints) Engines() []RemoteEngine { - return l.EnginesV3(RemoteEndpointsQuery{ - MaxT: math.MinInt64, - MinT: math.MaxInt64, - }) + const mint, maxt = math.MaxInt64, math.MinInt64 + return l.EnginesV2(mint, maxt) } func (l *cachedEndpoints) EnginesV2(mint, maxt int64) []RemoteEngine { - return l.EnginesV3(RemoteEndpointsQuery{ - MaxT: maxt, - MinT: mint, - }) -} - -func (l *cachedEndpoints) EnginesV3(query RemoteEndpointsQuery) []RemoteEngine { l.enginesOnce.Do(func() { - l.engines = getEngines(l.endpoints, query) + l.engines = getEngines(l.endpoints, mint, maxt) }) return l.engines } -func getEngines(endpoints RemoteEndpoints, query RemoteEndpointsQuery) []RemoteEngine { - if v3, ok := endpoints.(RemoteEndpointsV3); ok { - return v3.EnginesV3(query) - } - +func getEngines(endpoints RemoteEndpoints, mint, maxt int64) []RemoteEngine { if v2, ok := endpoints.(RemoteEndpointsV2); ok { - return v2.EnginesV2(query.MinT, query.MaxT) + return v2.EnginesV2(mint, maxt) } return endpoints.Engines() diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index 655cd2c10..660ff3d68 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -860,14 +860,6 @@ func maxDuration(a, b time.Duration) time.Duration { } func getRemoteEngines(endpoints api.RemoteEndpoints, plan Node, opts *query.Options) []api.RemoteEngine { - if v3, ok := endpoints.(api.RemoteEndpointsV3); ok { - mint, maxt := MinMaxTime(plan, opts) - return v3.EnginesV3(api.RemoteEndpointsQuery{ - MinT: mint, - MaxT: maxt, - }) - } - if v2, ok := endpoints.(api.RemoteEndpointsV2); ok { mint, maxt := MinMaxTime(plan, opts) return v2.EnginesV2(mint, maxt) From b51950aa555ffc9c8c1664401736f78c4621f871 Mon Sep 17 00:00:00 2001 From: Aleksandr Krivoshchekov Date: Wed, 21 Jan 2026 04:28:59 +0000 Subject: [PATCH 3/7] Fix default mint/maxt Signed-off-by: Aleksandr Krivoshchekov --- api/remote.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/remote.go b/api/remote.go index df96183d1..bb4285319 100644 --- a/api/remote.go +++ b/api/remote.go @@ -75,7 +75,7 @@ type cachedEndpoints struct { } func (l *cachedEndpoints) Engines() []RemoteEngine { - const mint, maxt = math.MaxInt64, math.MinInt64 + const mint, maxt = math.MinInt64, math.MaxInt64 return l.EnginesV2(mint, maxt) } From 75d41f3c0d2163d26ed50a6b3a6d9b903ff47828 Mon Sep 17 00:00:00 2001 From: Aleksandr Krivoshchekov Date: Wed, 21 Jan 2026 05:07:31 +0000 Subject: [PATCH 4/7] Add unit tests for CachedEndpoints Signed-off-by: Aleksandr Krivoshchekov --- api/remote_test.go | 89 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 api/remote_test.go diff --git a/api/remote_test.go b/api/remote_test.go new file mode 100644 index 000000000..6bbc5c76d --- /dev/null +++ b/api/remote_test.go @@ -0,0 +1,89 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +package api + +import ( + "math" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/model/labels" +) + +func TestCachedEndpoints(t *testing.T) { + engines := remoteEndpointsV2Func(func(mint, maxt int64) []RemoteEngine { + testutil.Equals(t, int64(10), mint) + testutil.Equals(t, int64(20), maxt) + return []RemoteEngine{newEngineMock(0, 1, nil)} + }) + endpoints := NewCachedEndpoints(engines) + + es := getEngines(endpoints, 10, 20) + testutil.Equals(t, 1, len(es)) +} + +func TestCachedEndpointsAllEnginesByDefault(t *testing.T) { + engines := remoteEndpointsV2Func(func(mint, maxt int64) []RemoteEngine { + testutil.Equals(t, int64(math.MinInt64), mint) + testutil.Equals(t, int64(math.MaxInt64), maxt) + return []RemoteEngine{newEngineMock(0, 1, nil)} + }) + endpoints := NewCachedEndpoints(engines) + + es := endpoints.Engines() + testutil.Equals(t, 1, len(es)) +} + +func TestCachedEndpointsCachesEngines(t *testing.T) { + var calls int + engines := remoteEndpointsV2Func(func(mint, maxt int64) []RemoteEngine { + calls++ + return []RemoteEngine{ + newEngineMock(100*int64(calls), 1000*int64(calls), nil), + newEngineMock(200*int64(calls), 2000*int64(calls), nil), + } + }) + endpoints := NewCachedEndpoints(engines) + + es1 := getEngines(endpoints, 10, 10000) + testutil.Equals(t, 2, len(es1)) + + es2 := getEngines(endpoints, 20, 20000) + testutil.Equals(t, 2, len(es2)) + + testutil.Equals(t, 1, calls) + testutil.Equals(t, es1, es2) + + // Engines must be mutable. + es1[0].(*engineMock).maxT = 1337 + testutil.Equals(t, int64(1337), es1[0].MaxT()) + testutil.Equals(t, int64(1337), es2[0].MaxT()) +} + +type remoteEndpointsV2Func func(mint, maxt int64) []RemoteEngine + +func (f remoteEndpointsV2Func) Engines() []RemoteEngine { + panic("Engines not implemented") +} + +func (f remoteEndpointsV2Func) EnginesV2(mint, maxt int64) []RemoteEngine { + return f(mint, maxt) +} + +type engineMock struct { + RemoteEngine + minT int64 + maxT int64 + labelSets []labels.Labels + partitionLabelSets []labels.Labels +} + +func (e engineMock) MaxT() int64 { return e.maxT } +func (e engineMock) MinT() int64 { return e.minT } +func (e engineMock) LabelSets() []labels.Labels { return e.labelSets } +func (e engineMock) PartitionLabelSets() []labels.Labels { return e.partitionLabelSets } + +func newEngineMock(mint, maxt int64, labelSets []labels.Labels) *engineMock { + return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets, partitionLabelSets: labelSets} +} From 808c854390a34a6a27ff86cf0b4e368808086e8f Mon Sep 17 00:00:00 2001 From: Aleksandr Krivoshchekov Date: Wed, 21 Jan 2026 05:31:13 +0000 Subject: [PATCH 5/7] Replace RemoteEndpoints with RemoteEndpointsV2 Signed-off-by: Aleksandr Krivoshchekov --- api/remote.go | 37 +++++++++---------------------------- api/remote_test.go | 31 +++++++------------------------ logicalplan/distribute.go | 8 ++------ 3 files changed, 18 insertions(+), 58 deletions(-) diff --git a/api/remote.go b/api/remote.go index bb4285319..3a8dcae15 100644 --- a/api/remote.go +++ b/api/remote.go @@ -6,7 +6,6 @@ package api import ( "context" "fmt" - "math" "sync" "time" @@ -18,11 +17,7 @@ type RemoteQuery interface { fmt.Stringer } -type RemoteEndpoints interface { - Engines() []RemoteEngine -} - -// RemoteEndpointsV2 describes endpoints that accept pruning hints when +// RemoteEndpoints describes endpoints that accept pruning hints when // selecting remote engines. // // For example implementations may use the hints to prune the TSDBInfos, but @@ -31,8 +26,11 @@ type RemoteEndpoints interface { // NOTE(Aleksandr Krivoshchekov): // We add a new interface as a temporary backward compatibility. // RemoteEndpoints will be replaced with it in a future breaking change. -type RemoteEndpointsV2 interface { - EnginesV2(mint, maxt int64) []RemoteEngine +type RemoteEndpoints interface { + // TODO comment. + // Should call with: + // const mint, maxt = math.MinInt64, math.MaxInt64 + Engines(mint, maxt int64) []RemoteEngine } type RemoteEngine interface { @@ -55,11 +53,7 @@ type staticEndpoints struct { engines []RemoteEngine } -func (m staticEndpoints) Engines() []RemoteEngine { - return m.engines -} - -func (m staticEndpoints) EnginesV2(mint, maxt int64) []RemoteEngine { +func (m staticEndpoints) Engines(mint, maxt int64) []RemoteEngine { return m.engines } @@ -74,26 +68,13 @@ type cachedEndpoints struct { engines []RemoteEngine } -func (l *cachedEndpoints) Engines() []RemoteEngine { - const mint, maxt = math.MinInt64, math.MaxInt64 - return l.EnginesV2(mint, maxt) -} - -func (l *cachedEndpoints) EnginesV2(mint, maxt int64) []RemoteEngine { +func (l *cachedEndpoints) Engines(mint, maxt int64) []RemoteEngine { l.enginesOnce.Do(func() { - l.engines = getEngines(l.endpoints, mint, maxt) + l.engines = l.endpoints.Engines(mint, maxt) }) return l.engines } -func getEngines(endpoints RemoteEndpoints, mint, maxt int64) []RemoteEngine { - if v2, ok := endpoints.(RemoteEndpointsV2); ok { - return v2.EnginesV2(mint, maxt) - } - - return endpoints.Engines() -} - // NewCachedEndpoints returns an endpoints wrapper that // resolves and caches engines on first access. // diff --git a/api/remote_test.go b/api/remote_test.go index 6bbc5c76d..b4bd4502d 100644 --- a/api/remote_test.go +++ b/api/remote_test.go @@ -4,7 +4,6 @@ package api import ( - "math" "testing" "github.com/efficientgo/core/testutil" @@ -12,32 +11,20 @@ import ( ) func TestCachedEndpoints(t *testing.T) { - engines := remoteEndpointsV2Func(func(mint, maxt int64) []RemoteEngine { + engines := remoteEndpointsFunc(func(mint, maxt int64) []RemoteEngine { testutil.Equals(t, int64(10), mint) testutil.Equals(t, int64(20), maxt) return []RemoteEngine{newEngineMock(0, 1, nil)} }) endpoints := NewCachedEndpoints(engines) - es := getEngines(endpoints, 10, 20) - testutil.Equals(t, 1, len(es)) -} - -func TestCachedEndpointsAllEnginesByDefault(t *testing.T) { - engines := remoteEndpointsV2Func(func(mint, maxt int64) []RemoteEngine { - testutil.Equals(t, int64(math.MinInt64), mint) - testutil.Equals(t, int64(math.MaxInt64), maxt) - return []RemoteEngine{newEngineMock(0, 1, nil)} - }) - endpoints := NewCachedEndpoints(engines) - - es := endpoints.Engines() + es := endpoints.Engines(10, 20) testutil.Equals(t, 1, len(es)) } func TestCachedEndpointsCachesEngines(t *testing.T) { var calls int - engines := remoteEndpointsV2Func(func(mint, maxt int64) []RemoteEngine { + engines := remoteEndpointsFunc(func(mint, maxt int64) []RemoteEngine { calls++ return []RemoteEngine{ newEngineMock(100*int64(calls), 1000*int64(calls), nil), @@ -46,10 +33,10 @@ func TestCachedEndpointsCachesEngines(t *testing.T) { }) endpoints := NewCachedEndpoints(engines) - es1 := getEngines(endpoints, 10, 10000) + es1 := endpoints.Engines(10, 10000) testutil.Equals(t, 2, len(es1)) - es2 := getEngines(endpoints, 20, 20000) + es2 := endpoints.Engines(20, 20000) testutil.Equals(t, 2, len(es2)) testutil.Equals(t, 1, calls) @@ -61,13 +48,9 @@ func TestCachedEndpointsCachesEngines(t *testing.T) { testutil.Equals(t, int64(1337), es2[0].MaxT()) } -type remoteEndpointsV2Func func(mint, maxt int64) []RemoteEngine - -func (f remoteEndpointsV2Func) Engines() []RemoteEngine { - panic("Engines not implemented") -} +type remoteEndpointsFunc func(mint, maxt int64) []RemoteEngine -func (f remoteEndpointsV2Func) EnginesV2(mint, maxt int64) []RemoteEngine { +func (f remoteEndpointsFunc) Engines(mint, maxt int64) []RemoteEngine { return f(mint, maxt) } diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index 660ff3d68..3b6c568a3 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -860,10 +860,6 @@ func maxDuration(a, b time.Duration) time.Duration { } func getRemoteEngines(endpoints api.RemoteEndpoints, plan Node, opts *query.Options) []api.RemoteEngine { - if v2, ok := endpoints.(api.RemoteEndpointsV2); ok { - mint, maxt := MinMaxTime(plan, opts) - return v2.EnginesV2(mint, maxt) - } - - return endpoints.Engines() + mint, maxt := MinMaxTime(plan, opts) + return endpoints.Engines(mint, maxt) } From 88405887566241a16c2e623ca25ab3dc22fef679 Mon Sep 17 00:00:00 2001 From: Aleksandr Krivoshchekov Date: Wed, 21 Jan 2026 05:33:47 +0000 Subject: [PATCH 6/7] Simplify Signed-off-by: Aleksandr Krivoshchekov --- logicalplan/distribute.go | 7 +------ logicalplan/passthrough.go | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index 3b6c568a3..d8e2d37ac 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -157,7 +157,7 @@ type DistributedExecutionOptimizer struct { } func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) { - engines := getRemoteEngines(m.Endpoints, plan, opts) + engines := m.Endpoints.Engines(MinMaxTime(plan, opts)) sort.Slice(engines, func(i, j int) bool { return engines[i].MinT() < engines[j].MinT() }) @@ -858,8 +858,3 @@ func maxDuration(a, b time.Duration) time.Duration { } return b } - -func getRemoteEngines(endpoints api.RemoteEndpoints, plan Node, opts *query.Options) []api.RemoteEngine { - mint, maxt := MinMaxTime(plan, opts) - return endpoints.Engines(mint, maxt) -} diff --git a/logicalplan/passthrough.go b/logicalplan/passthrough.go index ec938addf..e75aa1e43 100644 --- a/logicalplan/passthrough.go +++ b/logicalplan/passthrough.go @@ -43,7 +43,7 @@ func matchingEngineTime(e api.RemoteEngine, opts *query.Options) bool { } func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) { - engines := getRemoteEngines(m.Endpoints, plan, opts) + engines := m.Endpoints.Engines(MinMaxTime(plan, opts)) if len(engines) == 0 { return plan, nil } From e975db89c0cb3f3cf594ef06ceddadad09b17b89 Mon Sep 17 00:00:00 2001 From: Aleksandr Krivoshchekov Date: Wed, 21 Jan 2026 06:37:04 +0000 Subject: [PATCH 7/7] Update doc comments Signed-off-by: Aleksandr Krivoshchekov --- api/remote.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/api/remote.go b/api/remote.go index 3a8dcae15..21b7c2724 100644 --- a/api/remote.go +++ b/api/remote.go @@ -17,19 +17,19 @@ type RemoteQuery interface { fmt.Stringer } -// RemoteEndpoints describes endpoints that accept pruning hints when -// selecting remote engines. +// RemoteEndpoints returns remote engines. // -// For example implementations may use the hints to prune the TSDBInfos, but -// also may safely ignore them and return all available remote engines. +// Implementations should use mint and maxt to prune engine metadata +// (e.g., filter TSDBInfos to only those overlapping the time range), +// reducing unnecessary computations in subsequent calls to methods like +// RemoteEngine.LabelSets(). // -// NOTE(Aleksandr Krivoshchekov): -// We add a new interface as a temporary backward compatibility. -// RemoteEndpoints will be replaced with it in a future breaking change. +// All available engines should be returned regardless of pruning. type RemoteEndpoints interface { - // TODO comment. - // Should call with: - // const mint, maxt = math.MinInt64, math.MaxInt64 + // Engines returns remote engines. + // + // If mint and/or maxt of the query is unknown, the caller must pass + // math.MinInt64 and math.MaxInt64 respectively to retrieve unpruned engines. Engines(mint, maxt int64) []RemoteEngine }