Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ type Opts struct {
// This check can produce false positives when querying time-series data which does not conform to the Prometheus data model,
// and can be disabled if it leads to false positives.
DisableDuplicateLabelChecks bool

// EnableHighOverlapBatching reduces memory for queries with long lookback windows.
// Requires DisableDuplicateLabelChecks=true to exceed the 64-step limit.
EnableHighOverlapBatching bool

// HighOverlapBatchSize is the series batch size for high-overlap queries.
// Defaults to 1000.
HighOverlapBatchSize int64

// HighOverlapThreshold is the overlap threshold that triggers the optimization.
// Defaults to 100.
HighOverlapThreshold int64
}

// QueryOpts implements promql.QueryOpts but allows to override more engine default options.
Expand Down Expand Up @@ -173,6 +185,17 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
}
selectorBatchSize := opts.SelectorBatchSize

enableHighOverlapBatching := opts.EnableHighOverlapBatching

highOverlapBatchSize := opts.HighOverlapBatchSize
if highOverlapBatchSize == 0 {
highOverlapBatchSize = 1000
}

highOverlapThreshold := opts.HighOverlapThreshold
if highOverlapThreshold == 0 {
highOverlapThreshold = 100
}
var queryTracker promql.QueryTracker = nopQueryTracker{}
if opts.ActiveQueryTracker != nil {
queryTracker = opts.ActiveQueryTracker
Expand All @@ -198,6 +221,10 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
},
decodingConcurrency: decodingConcurrency,
selectorBatchSize: selectorBatchSize,
enableHighOverlapBatching: enableHighOverlapBatching,
highOverlapBatchSize: highOverlapBatchSize,
highOverlapThreshold: highOverlapThreshold,

}
}

Expand Down Expand Up @@ -227,6 +254,10 @@ type Engine struct {
selectorBatchSize int64
enableAnalysis bool
noStepSubqueryIntervalFn func(time.Duration) time.Duration

enableHighOverlapBatching bool
highOverlapBatchSize int64
highOverlapThreshold int64
}

func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
Expand All @@ -246,7 +277,7 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
resultSort := newResultSort(expr)

qOpts := e.makeQueryOpts(ts, ts, 0, opts)
if qOpts.StepsBatch > 64 {
if !e.disableDuplicateLabelChecks && qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
}

Expand Down Expand Up @@ -292,7 +323,7 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
defer e.activeQueryTracker.Delete(idx)

qOpts := e.makeQueryOpts(ts, ts, 0, opts)
if qOpts.StepsBatch > 64 {
if !e.disableDuplicateLabelChecks && qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
}
planOpts := logicalplan.PlanOptions{
Expand Down Expand Up @@ -344,7 +375,7 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
}
qOpts := e.makeQueryOpts(start, end, step, opts)
if qOpts.StepsBatch > 64 {
if !e.disableDuplicateLabelChecks && qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
}
planOpts := logicalplan.PlanOptions{
Expand Down Expand Up @@ -389,7 +420,7 @@ func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable
defer e.activeQueryTracker.Delete(idx)

qOpts := e.makeQueryOpts(start, end, step, opts)
if qOpts.StepsBatch > 64 {
if !e.disableDuplicateLabelChecks && qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
}
planOpts := logicalplan.PlanOptions{
Expand Down Expand Up @@ -474,7 +505,12 @@ func (e *Engine) getLogicalOptimizers(opts *QueryOpts) []logicalplan.Optimizer {
if opts.SelectorBatchSize != 0 {
selectorBatchSize = opts.SelectorBatchSize
}
return append(optimizers, logicalplan.SelectorBatchSize{Size: selectorBatchSize})
return append(optimizers, logicalplan.SelectorBatchSize{
DefaultBatchSize: selectorBatchSize,
EnableHighOverlapBatching: e.enableHighOverlapBatching,
HighOverlapBatchSize: e.highOverlapBatchSize,
HighOverlapThreshold: e.highOverlapThreshold,
})
}

func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) {
Expand Down
79 changes: 75 additions & 4 deletions logicalplan/set_batch_size.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,39 @@ import (
// SelectorBatchSize configures the batch size of selector based on
// aggregates present in the plan.
type SelectorBatchSize struct {
Size int64
// DefaultBatchSize is the series batch size for standard batching.
// Applied to vector selectors under aggregations.
DefaultBatchSize int64

// EnableHighOverlapBatching reduces memory for queries with long lookback windows.
EnableHighOverlapBatching bool

// HighOverlapBatchSize is the series batch size for high-overlap queries. Defaults to 1000.
HighOverlapBatchSize int64

// HighOverlapThreshold is the overlap threshold that triggers the optimization. Defaults to 100.
HighOverlapThreshold int64
}

// Optimize configures the batch size of selector based on the query plan.
// If any aggregate is present in the plan, the batch size is set to the configured value.
// The two exceptions where this cannot be done is if the aggregate is quantile, or
// when a binary expression precedes the aggregate.
func (m SelectorBatchSize) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not sure this will even reach into ringbuffers, right now this only applies to vectors in direct aggregations i think?

//
// If EnableHighOverlapBatching is true, this optimizer also detects high-overlap queries
// and switches to high-overlap batching by setting StepsBatch to TotalSteps and reducing
// the series batch size.
func (m SelectorBatchSize) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) {
if m.EnableHighOverlapBatching && opts != nil {
m.applyHighOverlapBatching(plan, opts)
}

m.applyStandardBatchSize(plan)

return plan, nil
}

func (m SelectorBatchSize) applyStandardBatchSize(plan Node) {
canBatch := false
Traverse(&plan, func(current *Node) {
switch e := (*current).(type) {
Expand All @@ -39,10 +64,56 @@ func (m SelectorBatchSize) Optimize(plan Node, _ *query.Options) (Node, annotati
canBatch = true
case *VectorSelector:
if canBatch {
e.BatchSize = m.Size
e.BatchSize = m.DefaultBatchSize
}
canBatch = false
}
})
return plan, nil
}

func (m SelectorBatchSize) applyHighOverlapBatching(plan Node, opts *query.Options) {
overlapThreshold := m.HighOverlapThreshold
if overlapThreshold == 0 {
overlapThreshold = 100
}

seriesBatchSize := m.HighOverlapBatchSize
if seriesBatchSize == 0 {
seriesBatchSize = 1000
}

vectorSelectors := make(map[*VectorSelector]bool)
shouldBatch := false

Traverse(&plan, func(current *Node) {
ms, ok := (*current).(*MatrixSelector)
if !ok {
return
}

selectRangeMs := ms.Range.Milliseconds()
stepMs := opts.Step.Milliseconds()
if stepMs == 0 {
stepMs = 1
}

overlap := (selectRangeMs-1)/stepMs + 1
totalSteps := int64(opts.TotalSteps())
if overlap > totalSteps {
overlap = totalSteps
}

if overlap > overlapThreshold {
vectorSelectors[ms.VectorSelector] = true
shouldBatch = true
}
})

if shouldBatch {
opts.StepsBatch = opts.TotalSteps()

for vs := range vectorSelectors {
vs.BatchSize = seriesBatchSize
}
}
}
6 changes: 6 additions & 0 deletions ringbuffer/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Buffer interface {
Reset(mint int64, evalt int64)
Eval(ctx context.Context, _, _ float64, _ int64) (float64, *histogram.FloatHistogram, bool, error)
SampleCount() int
Clear()

// to handle extlookback properly, only used by buffers that implement xincrease or xrate
ReadIntoLast(f func(*Sample))
Expand Down Expand Up @@ -145,6 +146,11 @@ func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, scalarA
})
}

func (r *GenericRingBuffer) Clear() {
r.items = r.items[:0]
r.currentStep = 0
}

func resize(s []Sample, n int) []Sample {
if cap(s) >= n {
return s[:n]
Expand Down
17 changes: 17 additions & 0 deletions ringbuffer/overtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,23 @@ func (r *OverTimeBuffer) SampleCount() int {
return r.stepRanges[0].sampleCount
}

func (r *OverTimeBuffer) Clear() {
r.lastTimestamp = math.MinInt64

for i := range r.firstTimestamps {
r.firstTimestamps[i] = math.MaxInt64
}

for i := range r.stepStates {
r.stepStates[i].acc.Reset(0)
r.stepStates[i].warn = nil
}

for i := range r.stepRanges {
r.stepRanges[i].numSamples = 0
r.stepRanges[i].sampleCount = 0
}
}
func (r *OverTimeBuffer) MaxT() int64 { return r.lastTimestamp }

func (r *OverTimeBuffer) Push(t int64, v Value) {
Expand Down
41 changes: 41 additions & 0 deletions ringbuffer/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package ringbuffer

// BufferPool manages a pool of reusable ring buffers for memory efficiency.
// Buffers are pre-allocated and accessed via round-robin indexing.
type BufferPool struct {
// Pre-allocated buffers for deterministic behavior
buffers []Buffer
size int
}

// NewBufferPool creates a new buffer pool with the specified size.
// The factory function is called to create new buffers.
func NewBufferPool(size int, factory func() Buffer) *BufferPool {
if size <= 0 {
size = 1
}

buffers := make([]Buffer, size)
for i := range buffers {
buffers[i] = factory()
}

return &BufferPool{
buffers: buffers,
size: size,
}
}

// GetBuffer returns a buffer for the given index.
// Uses modulo to map any index to the pool size.
func (p *BufferPool) GetBuffer(index int) Buffer {
return p.buffers[index%p.size]
}

// Size returns the number of buffers in the pool.
func (p *BufferPool) Size() int {
return p.size
}
15 changes: 15 additions & 0 deletions ringbuffer/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,21 @@ func (r *RateBuffer) Eval(ctx context.Context, _, _ float64, _ int64) (float64,

func (r *RateBuffer) ReadIntoLast(func(*Sample)) {}

func (r *RateBuffer) Clear() {
r.resets = r.resets[:0]
r.lastSample = Sample{T: math.MinInt64}
r.currentMint = math.MaxInt64

for i := range r.firstSamples {
r.firstSamples[i] = Sample{T: math.MaxInt64}
}

for i := range r.stepRanges {
r.stepRanges[i].numSamples = 0
r.stepRanges[i].sampleCount = 0
}
}

func querySteps(o query.Options) int64 {
// Instant evaluation is executed as a range evaluation with one step.
if o.Step.Milliseconds() == 0 {
Expand Down
Loading