Skip to content

Commit 0788d96

Browse files
authored
Add hints support to GroupedVectorVectorBinaryOperation (#15137)
#### What this PR does This PR adds hint support to GroupedVectorVectorBinaryOperation and ensures that the narrower side loads first so we can use that to generate the hints for the wider side. In queries like `many_metric * on(env) group_left() one_metric`, if `one_metric` has 10 env values but `many_metric` has 1k series across 100 env values, the optimisation reduces the the many-side fetch by 90% before a single sample is processed. #### Which issue(s) this PR fixes or relates to Part of grafana/mimir-squad#2915 #### Checklist - [x] Tests updated. - [ ] Documentation added. - [x] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [ ] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features.
1 parent 8d9bec6 commit 0788d96

7 files changed

Lines changed: 440 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@
201201
* `-ingest-storage.kafka.ingestion-concurrency-target-flushes-per-shard` from `80` to `40`
202202
* `-ingest-storage.kafka.max-buffered-bytes` from `100MB` to `1GB`
203203
* [ENHANCEMENT] MQE: Enable narrow selectors optimisation and hints passing for `and`/`unless` binary operation. #15096
204+
* [ENHANCEMENT] MQE: Use series selected for one side to reduce data selected on the other side in one-to-many and many-to-one binary operations (eg. `group_left` and `group_right`). #15137
204205
* [BUGFIX] Tracing: Respect `OTEL_TRACES_SAMPLER` and `OTEL_TRACES_SAMPLER_ARG` environment variables in `NewOTelFromEnv()`. Previously, the sampler was always hardcoded to `AlwaysSample()` when no Jaeger remote sampler was configured, making it impossible to control trace volume through standard OpenTelemetry configuration. #15128
205206
* [BUGFIX] API: Scope activity tracking middleware to query routes only, preventing it from rejecting write requests that have an unexpected `Content-Type` header with HTTP 500. #15129
206207
* [BUGFIX] Ingester: enforce a minimum 10s delay between TSDB head compaction iterations when an iteration approaches or exceeds the configured `-blocks-storage.tsdb.head-compaction-interval`, so ingestion is not starved by back-to-back compactions. #15061

pkg/streamingpromql/benchmarks/benchmarks.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,15 @@ func TestCases(metricSizes []int) []BenchCase {
230230
{
231231
Expr: "h_X * on(l) group_left() a_X",
232232
},
233+
// Test the hint-based narrowing for group_left/group_right: the one side selects only 2
234+
// out of 2000 distinct "l" values, so the optimizer passes {l=~"1|3"} to the many side
235+
// and reduces the series it needs to fetch from ~12000 down to ~12.
236+
{
237+
Expr: `h_2000 * on(l) group_left() a_2000{l=~"[13]"}`,
238+
},
239+
{
240+
Expr: `a_2000{l=~"[13]"} * on(l) group_right() h_2000`,
241+
},
233242
// Test the case where one side of a binary operation has many more series than the other.
234243
{
235244
Expr: `a_100{l=~"[13579]."} - b_100`,

pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"slices"
1313
"sort"
1414

15+
"github.com/go-kit/log"
1516
"github.com/prometheus/common/model"
1617
"github.com/prometheus/prometheus/model/labels"
1718
"github.com/prometheus/prometheus/promql/parser"
@@ -21,6 +22,7 @@ import (
2122
"github.com/grafana/mimir/pkg/streamingpromql/operators"
2223
"github.com/grafana/mimir/pkg/streamingpromql/types"
2324
"github.com/grafana/mimir/pkg/util/limiter"
25+
"github.com/grafana/mimir/pkg/util/spanlogger"
2426
)
2527

2628
var errMultipleMatchesOnManySide = errors.New("multiple matches for labels: grouping labels must ensure unique matches")
@@ -39,6 +41,8 @@ type GroupedVectorVectorBinaryOperation struct {
3941
expressionPosition posrange.PositionRange
4042
annotations *annotations.Annotations
4143
timeRange types.QueryTimeRange
44+
hints *Hints
45+
logger log.Logger
4246

4347
evaluator vectorVectorBinaryOperationEvaluator
4448
remainingSeries []*groupedBinaryOperationOutputSeries
@@ -151,6 +155,8 @@ func NewGroupedVectorVectorBinaryOperation(
151155
annotations *annotations.Annotations,
152156
expressionPosition posrange.PositionRange,
153157
timeRange types.QueryTimeRange,
158+
hints *Hints,
159+
logger log.Logger,
154160
) (*GroupedVectorVectorBinaryOperation, error) {
155161
e, err := newVectorVectorBinaryOperationEvaluator(op, returnBool, memoryConsumptionTracker, annotations, expressionPosition)
156162
if err != nil {
@@ -169,6 +175,8 @@ func NewGroupedVectorVectorBinaryOperation(
169175
expressionPosition: expressionPosition,
170176
annotations: annotations,
171177
timeRange: timeRange,
178+
hints: hints,
179+
logger: logger,
172180
}
173181

174182
switch g.VectorMatching.Card {
@@ -244,8 +252,17 @@ func (g *GroupedVectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Cont
244252
// We retain the series labels for later so we can use them to generate error messages.
245253
// We'll return them to the pool in Close().
246254

255+
// Load the "one" side first: it is the smaller side, and once we have its metadata
256+
// we can use it to build hint-based matchers for the "many" side.
257+
//
258+
// Labels in VectorMatching.Include come from the many side, so any outer matchers for
259+
// those labels must not be forwarded to the one side: the one side won't have them and
260+
// would be incorrectly over-filtered. Split them out and keep them to forward to the
261+
// many side instead.
262+
oneSideMatchers, includeMatchers := separateIncludeLabelMatchers(matchers, g.VectorMatching.Include)
263+
247264
var err error
248-
g.oneSideMetadata, err = g.oneSide.SeriesMetadata(ctx, matchers)
265+
g.oneSideMetadata, err = g.oneSide.SeriesMetadata(ctx, oneSideMatchers)
249266
if err != nil {
250267
return false, err
251268
}
@@ -255,7 +272,25 @@ func (g *GroupedVectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Cont
255272
return false, nil
256273
}
257274

258-
g.manySideMetadata, err = g.manySide.SeriesMetadata(ctx, matchers)
275+
// Use the "one" side series to narrow the data we need to fetch on the "many" side.
276+
// When hints have been set by the optimization pass, build matchers from the "one" side
277+
// metadata and merge them with any outer matchers for included labels (which belong to the
278+
// many side). Otherwise fall back to the same outer matchers used for the "one" side.
279+
manySideMatchers := matchers
280+
if g.hints != nil {
281+
ignored := matchers
282+
manySideMatchers = append(BuildMatchers(g.oneSideMetadata, g.hints), includeMatchers...)
283+
284+
sl := spanlogger.FromContext(ctx, g.logger)
285+
sl.DebugLog(
286+
"msg", "binary operator passing additional matchers to many side",
287+
"fields", g.hints.Include,
288+
"hint_matchers", len(manySideMatchers),
289+
"ignored_matchers", len(ignored),
290+
)
291+
}
292+
293+
g.manySideMetadata, err = g.manySide.SeriesMetadata(ctx, manySideMatchers)
259294
if err != nil {
260295
return false, err
261296
}
@@ -758,6 +793,30 @@ func (g *GroupedVectorVectorBinaryOperation) mergeManySide(data []types.InstantV
758793
return merged, nil
759794
}
760795

796+
// separateIncludeLabelMatchers partitions matchers into two groups: those whose label name
797+
// appears in includeLabels (extra labels sourced from the many side), and all others.
798+
// If includeLabels is empty or matchers is empty, the original slice is returned unchanged
799+
// and includeMatchers is nil.
800+
func separateIncludeLabelMatchers(matchers types.Matchers, includeLabels []string) (otherMatchers, includeMatchers types.Matchers) {
801+
if len(matchers) == 0 || len(includeLabels) == 0 {
802+
return matchers, nil
803+
}
804+
805+
includeSet := make(map[string]struct{}, len(includeLabels))
806+
for _, l := range includeLabels {
807+
includeSet[l] = struct{}{}
808+
}
809+
810+
for _, m := range matchers {
811+
if _, ok := includeSet[m.Name]; ok {
812+
includeMatchers = append(includeMatchers, m)
813+
} else {
814+
otherMatchers = append(otherMatchers, m)
815+
}
816+
}
817+
return otherMatchers, includeMatchers
818+
}
819+
761820
func (g *GroupedVectorVectorBinaryOperation) oneSideHandedness() string {
762821
switch g.VectorMatching.Card {
763822
case parser.CardOneToMany:

0 commit comments

Comments
 (0)