Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
EnableAnalysis: e.enableAnalysis,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
SampleTracker: query.NewSampleTracker(e.maxSamplesPerQuery),
SampleLimiter: telemetry.NewSampleLimiter(e.maxSamplesPerQuery, start.UnixMilli(), end.UnixMilli(), telemetry.StepTrackingInterval(step)),
}

if opts == nil {
Expand All @@ -459,7 +459,6 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
if opts.EnablePerStepStats() {
res.EnablePerStepStats = opts.EnablePerStepStats()
}

if opts.DecodingConcurrency != 0 {
res.DecodingConcurrency = opts.DecodingConcurrency
}
Expand Down
7 changes: 3 additions & 4 deletions engine/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/telemetry"
"github.com/thanos-io/promql-engine/logicalplan"

"github.com/prometheus/prometheus/promql"
)
Expand Down Expand Up @@ -64,10 +63,10 @@ func (a *AnalyzeOutputNode) aggregateSamples() {
childPeak := child.PeakSamples()
a.peakSamples = max(a.peakSamples, childPeak)

switch a.OperatorTelemetry.LogicalNode().(type) {
case *logicalplan.Subquery:
switch {
case a.OperatorTelemetry.IsSubquery():
// Skip aggregating samples for subquery
case *logicalplan.StepInvariantExpr:
case a.OperatorTelemetry.IsStepInvariant():
childSamples := child.TotalSamples()
for i := range a.totalSamplesPerStep {
a.totalSamples += childSamples
Expand Down
2 changes: 1 addition & 1 deletion execution/aggregate/count_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewCountValues(next model.VectorOperator, param string, by bool, grouping [
by: by,
grouping: grouping,
}
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts), op)
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), op)
}

func (c *countValuesOperator) Explain() []model.VectorOperator {
Expand Down
2 changes: 1 addition & 1 deletion execution/aggregate/hashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewHashAggregate(
params: make([]float64, opts.StepsBatch),
}

return telemetry.NewOperator(telemetry.NewTelemetry(a, opts), a), nil
return telemetry.NewOperator(telemetry.NewTelemetry(a, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), a), nil
}

func (a *aggregate) String() string {
Expand Down
2 changes: 1 addition & 1 deletion execution/aggregate/khashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewKHashAggregate(
stepsBatch: opts.StepsBatch,
}

return telemetry.NewOperator(telemetry.NewTelemetry(op, opts), op), nil
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), op), nil
}

func (a *kAggregate) Next(ctx context.Context, buf []model.StepVector) (int, error) {
Expand Down
2 changes: 1 addition & 1 deletion execution/binary/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewScalar(
stepsBatch: opts.StepsBatch,
}

return telemetry.NewOperator(telemetry.NewTelemetry(op, opts), op), nil
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), op), nil
}

func (o *scalarOperator) Explain() (next []model.VectorOperator) {
Expand Down
2 changes: 1 addition & 1 deletion execution/binary/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewVectorOperator(
stepsBatch: opts.StepsBatch,
}

return telemetry.NewOperator(telemetry.NewTelemetry(op, opts), op), nil
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), op), nil
}

func (o *vectorOperator) String() string {
Expand Down
2 changes: 1 addition & 1 deletion execution/exchange/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewCoalesce(opts *query.Options, batchSize int64, operators ...model.Vector
batchSize: batchSize,
}

return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
}

func (c *coalesce) Explain() (next []model.VectorOperator) {
Expand Down
2 changes: 1 addition & 1 deletion execution/exchange/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewConcurrent(next model.VectorOperator, bufferSize int, opts *query.Option
returnChan: make(chan []model.StepVector, bufferSize+2),
}

return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
}

func (c *concurrencyOperator) Explain() (next []model.VectorOperator) {
Expand Down
2 changes: 1 addition & 1 deletion execution/exchange/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewDedupOperator(next model.VectorOperator, opts *query.Options) model.Vect
oper := &dedupOperator{
next: next,
}
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
}

func (d *dedupOperator) Next(ctx context.Context, buf []model.StepVector) (int, error) {
Expand Down
2 changes: 1 addition & 1 deletion execution/exchange/duplicate_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewDuplicateLabelCheck(next model.VectorOperator, opts *query.Options) mode
oper := &duplicateLabelCheckOperator{
next: next,
}
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
}

func (d *duplicateLabelCheckOperator) Next(ctx context.Context, buf []model.StepVector) (int, error) {
Expand Down
2 changes: 1 addition & 1 deletion execution/function/absent.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newAbsentOperator(
funcExpr: funcExpr,
next: next,
}
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
}

func (o *absentOperator) String() string {
Expand Down
2 changes: 1 addition & 1 deletion execution/function/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newHistogramOperator(
default:
panic("unsupported function passed")
}
return telemetry.NewOperator(telemetry.NewTelemetry(o, opts), o)
return telemetry.NewOperator(telemetry.NewTelemetry(o, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), o)
}

func (o *histogramOperator) String() string {
Expand Down
4 changes: 2 additions & 2 deletions execution/function/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func newNoArgsFunctionOperator(funcExpr *logicalplan.FunctionCall, stepsBatch in
op.sampleIDs = []uint64{0}
}

return telemetry.NewOperator(telemetry.NewTelemetry(op, opts), op), nil
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), op), nil
}

// functionOperator returns []model.StepVector after processing input with desired function.
Expand Down Expand Up @@ -122,7 +122,7 @@ func newInstantVectorFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOp
// Check selector type.
switch funcExpr.Args[f.vectorIndex].ReturnType() {
case parser.ValueTypeVector, parser.ValueTypeScalar:
return telemetry.NewOperator(telemetry.NewTelemetry(f, opts), f), nil
return telemetry.NewOperator(telemetry.NewTelemetry(f, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), f), nil
default:
return nil, errors.Wrapf(parse.ErrNotImplemented, "got %s:", funcExpr.String())
}
Expand Down
2 changes: 1 addition & 1 deletion execution/function/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newRelabelOperator(
next: next,
funcExpr: funcExpr,
}
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
}

func (o *relabelOperator) String() string {
Expand Down
2 changes: 1 addition & 1 deletion execution/function/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newScalarOperator(next model.VectorOperator, opts *query.Options) model.Vec
next: next,
}

return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
}

func (o *scalarOperator) String() string {
Expand Down
2 changes: 1 addition & 1 deletion execution/function/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func newTimestampOperator(next model.VectorOperator, opts *query.Options) model.
oper := &timestampOperator{
next: next,
}
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
}

func (o *timestampOperator) Explain() (next []model.VectorOperator) {
Expand Down
8 changes: 4 additions & 4 deletions execution/remote/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ type Execution struct {
}

func NewExecution(query promql.Query, queryRangeStart, queryRangeEnd time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) model.VectorOperator {
storage := newStorageFromQuery(query, opts, engineLabels)
stor := newStorageFromQuery(query, opts, engineLabels)
oper := &Execution{
storage: storage,
storage: stor,
query: query,
opts: opts,
queryRangeStart: queryRangeStart,
queryRangeEnd: queryRangeEnd,
vectorSelector: promstorage.NewVectorSelector(storage, opts, 0, 0, false, 0, 1),
vectorSelector: promstorage.NewVectorSelector(stor, opts, 0, 0, false, 0, 1),
}

return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
}

func (e *Execution) Series(ctx context.Context) ([]labels.Labels, error) {
Expand Down
2 changes: 1 addition & 1 deletion execution/scan/literal_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewNumberLiteralSelector(opts *query.Options, val float64) model.VectorOper
val: val,
}

return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
}

func (o *numberLiteralSelector) Explain() (next []model.VectorOperator) {
Expand Down
57 changes: 4 additions & 53 deletions execution/scan/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/prometheus/prometheus/model/labels"
)

const sampleLimitCheckPercentage = 0.05

type subqueryOperator struct {
next model.VectorOperator
paramOp model.VectorOperator
Expand Down Expand Up @@ -54,9 +52,6 @@ type subqueryOperator struct {
paramBuf []model.StepVector
param2Buf []model.StepVector
tempBuf []model.StepVector

currentTrackedSamples int
lastTrackedSamples int
}

func NewSubqueryOperator(next, paramOp, paramOp2 model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
Expand Down Expand Up @@ -86,7 +81,7 @@ func NewSubqueryOperator(next, paramOp, paramOp2 model.VectorOperator, opts *que
params: make([]float64, opts.StepsBatch),
params2: make([]float64, opts.StepsBatch),
}
o.telemetry = telemetry.NewSubqueryTelemetry(o, opts)
o.telemetry = telemetry.NewSubqueryTelemetry(o, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter)
return telemetry.NewOperator(o.telemetry, o), nil
}

Expand Down Expand Up @@ -155,9 +150,6 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
for _, b := range o.buffers {
b.Reset(mint, maxt+o.subQuery.Offset.Milliseconds())
}
o.currentTrackedSamples = 0
o.lastTrackedSamples = 0
checkSampleLimitCounter := 0
if len(o.lastVectors) > 0 {
for _, v := range o.lastVectors[o.lastCollected+1:] {
if v.T > maxt {
Expand All @@ -167,7 +159,6 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
o.lastCollected++
}
if o.lastCollected == len(o.lastVectors)-1 {

o.lastVectors = nil
o.lastCollected = -1
}
Expand All @@ -192,18 +183,6 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
o.collect(vector, mint)
}

checkSampleLimitCounter++
if o.shouldCheckSampleLimit(checkSampleLimitCounter) {
if err := o.checkSampleLimit(); err != nil {
return 0, err
}
checkSampleLimitCounter = 0
}
}
if checkSampleLimitCounter > 0 {
if err := o.checkSampleLimit(); err != nil {
return 0, err
}
}

buf[n].Reset(o.currentStep)
Expand All @@ -221,7 +200,9 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
buf[n].AppendSampleWithSizeHint(uint64(sampleId), f, hint)
}
}
o.telemetry.IncrementSamplesAtTimestamp(rangeSamples.SampleCount(), buf[n].T)
if err := o.telemetry.IncrementSamplesAtTimestamp(rangeSamples.SampleCount(), buf[n].T); err != nil {
return 0, err
}
}
n++
o.currentStep += o.step
Expand All @@ -230,15 +211,6 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
return n, nil
}

func (o *subqueryOperator) checkSampleLimit() error {
delta := o.currentTrackedSamples - o.lastTrackedSamples
if delta > 0 {
o.opts.SampleTracker.Add(delta)
}
o.lastTrackedSamples = o.currentTrackedSamples
return o.opts.SampleTracker.CheckLimit()
}

func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
if v.T < mint {
return
Expand All @@ -249,7 +221,6 @@ func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
continue
}
buffer.Push(v.T, ringbuffer.Value{F: s})
o.currentTrackedSamples++
}
for i, s := range v.Histograms {
buffer := o.buffers[v.HistogramIDs[i]]
Expand All @@ -275,9 +246,7 @@ func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
s.CounterResetHint = histogram.UnknownCounterReset
}
buffer.Push(v.T, ringbuffer.Value{H: s})
o.currentTrackedSamples += telemetry.CalculateHistogramSampleCount(s)
}

}

func (o *subqueryOperator) Series(ctx context.Context) ([]labels.Labels, error) {
Expand Down Expand Up @@ -322,21 +291,3 @@ func (o *subqueryOperator) initSeries(ctx context.Context) error {
})
return err
}

func (o *subqueryOperator) shouldCheckSampleLimit(checkSampleLimitCounter int) bool {
if len(o.series) == 0 {
return checkSampleLimitCounter >= 1
}

limit := o.opts.SampleTracker.Limit()
targetSamplesPerCheck := int(float64(limit) * sampleLimitCheckPercentage)

maxSamplesPerCall := len(o.series) * o.stepsBatch
if maxSamplesPerCall == 0 {
return checkSampleLimitCounter >= 1
}

interval := max(targetSamplesPerCheck/maxSamplesPerCall, 1)

return checkSampleLimitCounter >= interval
}
2 changes: 1 addition & 1 deletion execution/step_invariant/step_invariant.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewStepInvariantOperator(
u.cacheResult = false
}

return telemetry.NewOperator(telemetry.NewStepInvariantTelemetry(u, opts), u), nil
return telemetry.NewOperator(telemetry.NewStepInvariantTelemetry(u, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), u), nil
}

func (u *stepInvariantOperator) Series(ctx context.Context) ([]labels.Labels, error) {
Expand Down
Loading
Loading