Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2387,6 +2387,43 @@ or
}
}

func TestDuplicateLabelCheckWithBatching(t *testing.T) {
t.Parallel()
// These two metrics will produce duplicate output labels after changes()
// drops __name__. With DecodingConcurrency=1 (single shard) and
// SelectorBatchSize=1, they will be returned in separate batches.
load := `load 30s
metric0{src="a",dst="b"} 0+1x40
metric1{src="a",dst="b"} 1+1x40`

storage := promqltest.LoadedStorage(t, load)
defer storage.Close()

opts := promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 1e10,
}

newEngine := engine.New(engine.Opts{
EngineOpts: opts,
LogicalOptimizers: logicalplan.AllOptimizers,
SelectorBatchSize: 1,
DecodingConcurrency: 1,
})

ctx := context.Background()
queryStr := `sum(changes({__name__=~"metric0|metric1"}[1m]))`

q, err := newEngine.NewInstantQuery(ctx, storage, nil, queryStr, time.Unix(60, 0))
testutil.Ok(t, err)
defer q.Close()

result := q.Exec(ctx)
testutil.Assert(t, result.Err != nil, "expected duplicate label set error, got result: %v", result.Value)
testutil.Assert(t, strings.Contains(result.Err.Error(), "same labelset"),
"expected duplicate label set error, got: %v", result.Err)
}

// mergeWithSampleDedup merges samples from series with the same labels,
// removing samples with identical timestamps.
func mergeWithSampleDedup(series []*mockSeries) []storage.Series {
Expand Down
27 changes: 13 additions & 14 deletions execution/exchange/duplicate_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ type duplicateLabelCheckOperator struct {
once sync.Once
next model.VectorOperator

p []pair
c []uint64
p []pair
c []uint64
lastTs int64
}

func NewDuplicateLabelCheck(next model.VectorOperator, opts *query.Options) model.VectorOperator {
Expand Down Expand Up @@ -51,19 +52,18 @@ func (d *duplicateLabelCheckOperator) Next(ctx context.Context, buf []model.Step
return 0, nil
}

// TODO: currently there is a bug, we need to reset 'd.c's state
// if the current timestamp changes. With configured BatchSize we
// dont see all samples for a timestamp in the same batch, but this
// logic relies on that.
if len(d.p) > 0 {
for i := range d.p {
d.c[d.p[i].a] = 0
d.c[d.p[i].b] = 0
ts := buf[0].T
if ts != d.lastTs {
d.lastTs = ts
for i := range d.p {
d.c[d.p[i].a] = 0
d.c[d.p[i].b] = 0
}
}
for i := range n {
sv := &buf[i]
for _, sid := range sv.SampleIDs {
d.c[sid] |= 2 << i
for _, sid := range buf[i].SampleIDs {
d.c[sid] |= 1 << i
}
}
for i := range d.p {
Expand Down Expand Up @@ -105,7 +105,6 @@ func (d *duplicateLabelCheckOperator) init(ctx context.Context) error {
}
m := make(map[uint64]int, len(series))
p := make([]pair, 0)
c := make([]uint64, len(series))
for i := range series {
h := series[i].Hash()
if j, ok := m[h]; ok {
Expand All @@ -115,7 +114,7 @@ func (d *duplicateLabelCheckOperator) init(ctx context.Context) error {
}
}
d.p = p
d.c = c
d.c = make([]uint64, len(series))
})
return err
}
3 changes: 3 additions & 0 deletions logicalplan/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func renderExprTree(expr Node) string {
if t.BatchSize > 0 {
base += fmt.Sprintf("[batch=%d]", t.BatchSize)
}
if t.SelectTimestamp {
base += "[timestamp]"
}
if t.Projection != nil {
sort.Strings(t.Projection.Labels)
if t.Projection.Include {
Expand Down
16 changes: 12 additions & 4 deletions logicalplan/set_batch_size.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import (
"github.com/prometheus/prometheus/util/annotations"
)

// aggregationLikeFuncs contains functions that modify the labelset or series count.
var aggregationLikeFuncs = map[string]struct{}{
"scalar": {},
"absent": {},
"absent_over_time": {},
"histogram_quantile": {},
"histogram_fraction": {},
}

// SelectorBatchSize configures the batch size of selector based on
// aggregates present in the plan.
type SelectorBatchSize struct {
Expand All @@ -25,10 +34,9 @@ func (m SelectorBatchSize) Optimize(plan Node, _ *query.Options) (Node, annotati
Traverse(&plan, func(current *Node) {
switch e := (*current).(type) {
case *FunctionCall:
//TODO: calls can reduce the labelset of the input; think histogram_quantile reducing
// multiple "le" labels into one output. We cannot handle this in batching. Revisit
// what is safe here.
canBatch = false
if _, aggregationLike := aggregationLikeFuncs[e.Func.Name]; aggregationLike {
canBatch = false
}
case *Binary:
canBatch = false
case *Aggregation:
Expand Down
54 changes: 54 additions & 0 deletions logicalplan/set_batch_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,60 @@ func TestSetBatchSize(t *testing.T) {
expr: `histogram_quantile(scalar(max(quantile)), http_requests_total)`,
expected: `histogram_quantile(scalar(max(quantile[batch=10])), http_requests_total)`,
},
// Range vector functions should allow batching to propagate
{
name: "sum with rate",
expr: `sum(rate(http_requests_total[5m]))`,
expected: `sum(rate(http_requests_total[batch=10][5m0s]))`,
},
{
name: "avg with increase",
expr: `avg(increase(http_requests_total[5m]))`,
expected: `avg(increase(http_requests_total[batch=10][5m0s]))`,
},
// Label-preserving functions should allow batching to propagate
{
name: "sum with abs",
expr: `sum(abs(metric))`,
expected: `sum(abs(metric[batch=10]))`,
},
{
name: "avg with ceil",
expr: `avg(ceil(metric))`,
expected: `avg(ceil(metric[batch=10]))`,
},
{
name: "max with clamp_max",
expr: `max(clamp_max(metric, 100))`,
expected: `max(clamp_max(metric[batch=10], 100))`,
},
{
name: "nested math functions",
expr: `sum(abs(floor(metric)))`,
expected: `sum(abs(floor(metric[batch=10])))`,
},
{
name: "aggregation with timestamp",
expr: `sum(timestamp(metric))`,
// timestamp() is pushed down into VectorSelector with SelectTimestamp=true
expected: `sum(metric[batch=10][timestamp])`,
},
// Label-modifying functions should disable batching
{
name: "label_replace disables batching",
expr: `sum(label_replace(metric, "dst", "$1", "src", "(.*)"))`,
expected: `sum(label_replace(metric[batch=10], "dst", "$1", "src", "(.*)"))`,
},
{
name: "label_join disables batching",
expr: `sum(label_join(metric, "dst", ",", "src"))`,
expected: `sum(label_join(metric[batch=10], "dst", ",", "src"))`,
},
{
name: "histogram_fraction disables batching",
expr: `sum(histogram_fraction(0, 0.5, metric))`,
expected: `sum(histogram_fraction(0, 0.5, metric))`,
},
}

optimizers := append([]Optimizer{SelectorBatchSize{Size: 10}}, DefaultOptimizers...)
Expand Down
Loading