Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions engine/user_defined_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ func (c logicalVectorSelector) MakeExecutionOperator(_ context.Context, vectors
maxt: opts.End.UnixMilli(),
step: opts.Step.Milliseconds(),
currentStep: opts.Start.UnixMilli(),

shard: c.VectorSelector.Shard,
numShards: c.VectorSelector.NumShards,
}

return oper, nil
Expand All @@ -103,6 +106,9 @@ type vectorSelectorOperator struct {
maxt int64
step int64
currentStep int64

shard int
numShards int
}

func (c *vectorSelectorOperator) Next(ctx context.Context) ([]model.StepVector, error) {
Expand All @@ -112,8 +118,12 @@ func (c *vectorSelectorOperator) Next(ctx context.Context) ([]model.StepVector,
vectors := c.vectors.GetVectorBatch()
for i := 0; i < c.stepsBatch && c.currentStep <= c.maxt; i++ {
vector := c.vectors.GetStepVector(c.currentStep)
vector.AppendSample(c.vectors, 1, 7)
vector.AppendSample(c.vectors, 2, 7)
if c.shard == 0 {
vector.AppendSample(c.vectors, 1, 7)
}
if c.shard == 1 {
vector.AppendSample(c.vectors, 2, 7)
}
vectors = append(vectors, vector)
c.currentStep += c.step
}
Expand Down
14 changes: 14 additions & 0 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func New(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, o

func newOperator(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
switch e := expr.(type) {
case *logicalplan.Coalesce:
return newCoalesce(ctx, e, storage, opts, hints)
case *logicalplan.NumberLiteral:
return scan.NewNumberLiteralSelector(model.NewVectorPool(opts.StepsBatch), opts, e.Val), nil
case *logicalplan.VectorSelector:
Expand Down Expand Up @@ -86,6 +88,18 @@ func newOperator(ctx context.Context, expr logicalplan.Node, storage storage.Sca
}
}

func newCoalesce(ctx context.Context, e *logicalplan.Coalesce, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
operators := make([]model.VectorOperator, len(e.Exprs))
for i, expr := range e.Exprs {
operator, err := newOperator(ctx, expr, scanners, opts, hints)
if err != nil {
return nil, err
}
operators[i] = operator
}
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, 0, operators...), nil
}

func newVectorSelector(ctx context.Context, e *logicalplan.VectorSelector, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
start, end := getTimeRangesForVectorSelector(e, opts, 0)
hints.Start = start
Expand Down
20 changes: 13 additions & 7 deletions execution/function/absent.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,19 @@ func (o *absentOperator) loadSeries() {

// https://github.com/prometheus/prometheus/blob/df1b4da348a7c2f8c0b294ffa1f05db5f6641278/promql/functions.go#L1857
var lm []*labels.Matcher
switch n := o.funcExpr.Args[0].(type) {
case *logicalplan.VectorSelector:
lm = append(n.LabelMatchers, n.Filters...)
case *logicalplan.MatrixSelector:
v := n.VectorSelector
lm = append(v.LabelMatchers, v.Filters...)
default:
logicalplan.TraverseBottomUp(nil, &o.funcExpr.Args[0], func(parent, node *logicalplan.Node) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be in the logical plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sector labels of the vector sector of the absent function? I think we could put it into the functioncall node and populate it in the plan yeah

switch n := (*node).(type) {
case *logicalplan.VectorSelector:
lm = append(n.LabelMatchers, n.Filters...)
case *logicalplan.CheckDuplicateLabels, *logicalplan.Coalesce, *logicalplan.MatrixSelector:
// ignore logical nodes between the absent function and the first vectorselector
default:
lm = lm[:0]
}
return false
})

if len(lm) == 0 {
o.series = []labels.Labels{labels.EmptyLabels()}
return
}
Expand Down
69 changes: 69 additions & 0 deletions logicalplan/coalesce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package logicalplan

import (
"github.com/prometheus/prometheus/util/annotations"

Check failure on line 7 in logicalplan/coalesce.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

File is not properly formatted (gci)

"github.com/thanos-io/promql-engine/query"
)

type CoalesceOptimizer struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the optimizer be called something like ConcurrentDecodeOptimizer?

Copy link
Collaborator

@fpetkovski fpetkovski Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea is to allow for things like

sum(
  coalesce(
    sum(shard_a), 
    sum(shard_b)
  )
)

down the line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should a different optimizer should be responsible for sharding aggregations?


func (c CoalesceOptimizer) Optimize(expr Node, opts *query.Options) (Node, annotations.Annotations) {
numShards := opts.NumShards()

TraverseBottomUp(nil, &expr, func(parent, e *Node) bool {
switch t := (*e).(type) {
case *VectorSelector:
if parent != nil {
// we coalesce matrix selectors in a different branch
if _, ok := (*parent).(*MatrixSelector); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about subqueries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can parent of vectorselector be a subquery? I guess you could have last_over_time(http_request_total[10m:5s]).

return false
}
}
exprs := make([]Node, numShards)
for i := range numShards {
vs := t.Clone().(*VectorSelector)
vs.Shard = i
vs.NumShards = numShards
exprs[i] = vs
}
*e = &Coalesce{Exprs: exprs}
case *MatrixSelector:
// handled in *parser.Call branch
return false
case *FunctionCall:
// non-recursively handled in execution.go
if t.Func.Name == "absent_over_time" {
return true
}
var (
ms *MatrixSelector
marg int
)
for i := range t.Args {
if arg, ok := t.Args[i].(*MatrixSelector); ok {
ms = arg
marg = i
}
}
if ms == nil {
return false
}
exprs := make([]Node, numShards)
for i := range numShards {
aux := ms.Clone().(*MatrixSelector)
aux.VectorSelector.Shard = i
aux.VectorSelector.NumShards = numShards
f := t.Clone().(*FunctionCall)
f.Args[marg] = aux
exprs[i] = f
}
*e = &Coalesce{Exprs: exprs}
}
return true
})
return expr, nil
}
13 changes: 13 additions & 0 deletions logicalplan/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,19 @@ func unmarshalNode(data []byte) (Node, error) {
return nil, err
}
return u, nil
case CoalesceNode:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this cause remote execution to be planned in the central node?

n := &Coalesce{}
if err := json.Unmarshal(t.Data, n); err != nil {
return nil, err
}
for _, c := range t.Children {
child, err := unmarshalNode(c)
if err != nil {
return nil, err
}
n.Exprs = append(n.Exprs, child)
}
return n, nil
}
return nil, nil
}
42 changes: 41 additions & 1 deletion logicalplan/logical_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ const (
NumberLiteralNode = "number_literal"
StringLiteralNode = "string_literal"
SubqueryNode = "subquery"
CheckDuplicateNode = "check_duplicate"
StepInvariantNode = "step_invariant"
ParensNode = "parens"
UnaryNode = "unary"

RemoteExecutionNode = "remote_exec"
DeduplicateNode = "dedup"
NoopNode = "noop"

CheckDuplicateNode = "check_duplicate"
CoalesceNode = "coalesce"
)

type Cloneable interface {
Expand Down Expand Up @@ -78,6 +80,9 @@ type VectorSelector struct {
// CounterResetHint, Count and Sum values populated. Histogram buckets and spans
// will not be used during query evaluation.
DecodeNativeHistogramStats bool

Shard int
NumShards int
}

func (f *VectorSelector) Clone() Node {
Expand All @@ -94,6 +99,11 @@ func (f *VectorSelector) Clone() Node {
clone.Timestamp = &ts
}

clone.Shard = f.Shard
clone.NumShards = f.NumShards
clone.DecodeNativeHistogramStats = f.DecodeNativeHistogramStats
clone.BatchSize = f.BatchSize

return &clone
}

Expand Down Expand Up @@ -570,3 +580,33 @@ func isAvgAggregation(expr *Node) bool {
}
return false
}

type Coalesce struct {
// We assume to always have at least one expression
Exprs []Node `json:"-"`
}

func (c *Coalesce) ReturnType() parser.ValueType { return parser.ValueTypeVector }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should do Exprs[0].ReturnType()?


func (c *Coalesce) Type() NodeType { return CoalesceNode }

func (c *Coalesce) Clone() Node {
clone := *c
clone.Exprs = make([]Node, 0, len(c.Exprs))
for _, arg := range c.Exprs {
clone.Exprs = append(clone.Exprs, arg.Clone())
}
return &clone
}

func (c *Coalesce) Children() []*Node {
children := make([]*Node, 0, len(c.Exprs))
for i := range c.Exprs {
children = append(children, &c.Exprs[i])
}
return children
}

func (c *Coalesce) String() string {
return c.Exprs[0].String()
}
9 changes: 6 additions & 3 deletions logicalplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var DefaultOptimizers = []Optimizer{
SortMatchers{},
MergeSelectsOptimizer{},
DetectHistogramStatsOptimizer{},
CoalesceOptimizer{},
}

type Plan interface {
Expand Down Expand Up @@ -255,20 +256,22 @@ func replacePrometheusNodes(plan parser.Expr) Node {
return &MatrixSelector{
VectorSelector: &VectorSelector{
VectorSelector: t.VectorSelector.(*parser.VectorSelector),
Shard: 0,
NumShards: 1,
},
Range: t.Range,
OriginalString: t.String(),
}
case *parser.VectorSelector:
return &VectorSelector{VectorSelector: t}
return &VectorSelector{VectorSelector: t, Shard: 0, NumShards: 1}

// TODO: we dont yet have logical nodes for these, keep traversing here but set fields in-place
case *parser.Call:
if t.Func.Name == "timestamp" {
// pushed-down timestamp function
switch v := UnwrapParens(t.Args[0]).(type) {
case *parser.VectorSelector:
return &VectorSelector{VectorSelector: v, SelectTimestamp: true}
return &VectorSelector{VectorSelector: v, SelectTimestamp: true, Shard: 0, NumShards: 1}
case *parser.StepInvariantExpr:
vs, ok := UnwrapParens(v.Expr).(*parser.VectorSelector)
if ok {
Expand All @@ -277,7 +280,7 @@ func replacePrometheusNodes(plan parser.Expr) Node {
vs.OriginalOffset = 0
}
return &StepInvariantExpr{
Expr: &VectorSelector{VectorSelector: vs, SelectTimestamp: true},
Expr: &VectorSelector{VectorSelector: vs, SelectTimestamp: true, Shard: 0, NumShards: 1},
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions query/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type Options struct {
DecodingConcurrency int
}

func (o *Options) NumShards() int {
return max(o.DecodingConcurrency, 1)
}
Copy link
Contributor

@yeya24 yeya24 Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking this shard size could be different for different operators? I wonder if the default value works for all operators

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is numCores / 2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I meant if different operators should use the same concurrency


func (o *Options) NumSteps() int {
// Instant evaluation is executed as a range evaluation with one step.
if o.Step.Milliseconds() == 0 {
Expand Down
65 changes: 27 additions & 38 deletions storage/prometheus/scanners.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,17 @@ func (p Scanners) NewVectorSelector(
selector = newHistogramStatsSelector(selector)
}

operators := make([]model.VectorOperator, 0, opts.DecodingConcurrency)
for i := range opts.DecodingConcurrency {
operator := exchange.NewConcurrent(
NewVectorSelector(
model.NewVectorPool(opts.StepsBatch),
selector,
opts,
logicalNode.Offset,
logicalNode.BatchSize,
logicalNode.SelectTimestamp,
i,
opts.DecodingConcurrency,
), 2, opts)
operators = append(operators, operator)
}

return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, logicalNode.BatchSize*int64(opts.DecodingConcurrency), operators...), nil
return exchange.NewConcurrent(
NewVectorSelector(
model.NewVectorPool(opts.StepsBatch),
selector,
opts,
logicalNode.Offset,
logicalNode.BatchSize,
logicalNode.SelectTimestamp,
logicalNode.Shard,
logicalNode.NumShards,
), 2, opts), nil
}

func (p Scanners) NewMatrixSelector(
Expand Down Expand Up @@ -126,28 +120,23 @@ func (p Scanners) NewMatrixSelector(
selector = newHistogramStatsSelector(selector)
}

operators := make([]model.VectorOperator, 0, opts.DecodingConcurrency)
for i := range opts.DecodingConcurrency {
operator, err := NewMatrixSelector(
model.NewVectorPool(opts.StepsBatch),
selector,
call.Func.Name,
arg,
arg2,
opts,
logicalNode.Range,
vs.Offset,
vs.BatchSize,
i,
opts.DecodingConcurrency,
)
if err != nil {
return nil, err
}
operators = append(operators, exchange.NewConcurrent(operator, 2, opts))
mat, err := NewMatrixSelector(
model.NewVectorPool(opts.StepsBatch),
selector,
call.Func.Name,
arg,
arg2,
opts,
logicalNode.Range,
vs.Offset,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could simplify this by passing in the entire vs object.

vs.BatchSize,
vs.Shard,
vs.NumShards,
)
if err != nil {
return nil, err
}

return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, vs.BatchSize*int64(opts.DecodingConcurrency), operators...), nil
return exchange.NewConcurrent(mat, 2, opts), nil
}

type histogramStatsSelector struct {
Expand Down
Loading