Skip to content

Commit 54b94b6

Browse files
committed
Update max_samples tracking to selector operators
Signed-off-by: Paurush Garg <[email protected]>
1 parent 744e9ad commit 54b94b6

File tree

10 files changed

+91
-197
lines changed

10 files changed

+91
-197
lines changed

engine/engine.go

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,6 @@ type QueryOpts struct {
102102

103103
// LogicalOptimizers can be used to override the LogicalOptimizers engine setting.
104104
LogicalOptimizers []logicalplan.Optimizer
105-
106-
// MaxSamples can be used to override the MaxSamples engine setting.
107-
MaxSamples int
108105
}
109106

110107
func (opts QueryOpts) LookbackDelta() time.Duration { return opts.LookbackDeltaParam }
@@ -453,12 +450,11 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
453450
EnableAnalysis: e.enableAnalysis,
454451
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
455452
DecodingConcurrency: e.decodingConcurrency,
456-
MaxSamples: e.maxSamples,
457453
}
458454

459455
// Initialize SampleTracker if MaxSamples is set
460-
if res.MaxSamples > 0 {
461-
res.SampleTracker = query.NewSampleTracker(res.MaxSamples)
456+
if e.maxSamples > 0 {
457+
res.SampleTracker = query.NewSampleTracker(e.maxSamples)
462458
}
463459

464460
if opts == nil {
@@ -476,12 +472,6 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
476472
res.DecodingConcurrency = opts.DecodingConcurrency
477473
}
478474

479-
// Allow per-query override of MaxSamples
480-
if opts.MaxSamples > 0 {
481-
res.MaxSamples = opts.MaxSamples
482-
res.SampleTracker = query.NewSampleTracker(res.MaxSamples)
483-
}
484-
485475
return res
486476
}
487477

@@ -757,10 +747,5 @@ func recoverEngine(logger *slog.Logger, plan logicalplan.Plan, errp *error) {
757747

758748
logger.Error("runtime panic in engine", "expr", plan.Root().String(), "err", e, "stacktrace", string(buf))
759749
*errp = errors.Wrap(err, "unexpected error")
760-
case error:
761-
// Handle regular errors (like maxSamples exceeded)
762-
*errp = err
763-
default:
764-
*errp = errors.Newf("unexpected panic: %v", e)
765750
}
766751
}

execution/exchange/concurrent.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,6 @@ func (c *concurrencyOperator) Next(ctx context.Context, buf []model.StepVector)
119119

120120
func (c *concurrencyOperator) pull(ctx context.Context) {
121121
defer close(c.buffer)
122-
defer func() {
123-
if r := recover(); r != nil {
124-
if err, ok := r.(error); ok {
125-
c.buffer <- maybeStepVector{err: err}
126-
} else {
127-
c.buffer <- maybeStepVector{err: fmt.Errorf("%v", r)}
128-
}
129-
}
130-
}()
131122

132123
for {
133124
select {

execution/model/vector.go

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/prometheus/prometheus/model/histogram"
1010
"github.com/prometheus/prometheus/model/labels"
11-
"github.com/thanos-io/promql-engine/query"
1211
)
1312

1413
type Series struct {
@@ -25,12 +24,6 @@ type StepVector struct {
2524

2625
HistogramIDs []uint64
2726
Histograms []*histogram.FloatHistogram
28-
29-
tracker *query.SampleTracker
30-
}
31-
32-
func (s *StepVector) SetTracker(tracker *query.SampleTracker) {
33-
s.tracker = tracker
3427
}
3528

3629
// Reset resets the StepVector to the given timestamp while preserving slice capacity.
@@ -69,13 +62,6 @@ func (s *StepVector) AppendSampleWithSizeHint(id uint64, val float64, hint int)
6962
}
7063
s.SampleIDs = append(s.SampleIDs, id)
7164
s.Samples = append(s.Samples, val)
72-
73-
// Track sample added
74-
if s.tracker != nil {
75-
if err := s.tracker.Add(1); err != nil {
76-
panic(err)
77-
}
78-
}
7965
}
8066

8167
func (s *StepVector) AppendSamples(ids []uint64, vals []float64) {
@@ -84,23 +70,11 @@ func (s *StepVector) AppendSamples(ids []uint64, vals []float64) {
8470
}
8571
s.SampleIDs = append(s.SampleIDs, ids...)
8672
s.Samples = append(s.Samples, vals...)
87-
88-
// Track samples added
89-
if s.tracker != nil {
90-
if err := s.tracker.Add(len(vals)); err != nil {
91-
panic(err)
92-
}
93-
}
9473
}
9574

9675
func (s *StepVector) RemoveSample(index int) {
9776
s.Samples = slices.Delete(s.Samples, index, index+1)
9877
s.SampleIDs = slices.Delete(s.SampleIDs, index, index+1)
99-
100-
// Track sample removed
101-
if s.tracker != nil {
102-
s.tracker.Remove(1)
103-
}
10478
}
10579

10680
func (s *StepVector) AppendHistogram(histogramID uint64, h *histogram.FloatHistogram) {
@@ -122,15 +96,6 @@ func (s *StepVector) AppendHistogramWithSizeHint(histogramID uint64, h *histogra
12296
}
12397
s.HistogramIDs = append(s.HistogramIDs, histogramID)
12498
s.Histograms = append(s.Histograms, h)
125-
126-
// Track histogram added (histograms count as multiple samples based on bucket count)
127-
if s.tracker != nil && h != nil {
128-
// Count histogram buckets as samples
129-
count := len(h.PositiveBuckets) + len(h.NegativeBuckets) + 2 // +2 for count and sum
130-
if err := s.tracker.Add(count); err != nil {
131-
panic(err)
132-
}
133-
}
13499
}
135100

136101
func (s *StepVector) AppendHistograms(histogramIDs []uint64, hs []*histogram.FloatHistogram) {
@@ -139,33 +104,9 @@ func (s *StepVector) AppendHistograms(histogramIDs []uint64, hs []*histogram.Flo
139104
}
140105
s.HistogramIDs = append(s.HistogramIDs, histogramIDs...)
141106
s.Histograms = append(s.Histograms, hs...)
142-
143-
// Track histograms added
144-
if s.tracker != nil {
145-
totalCount := 0
146-
for _, h := range hs {
147-
if h != nil {
148-
totalCount += len(h.PositiveBuckets) + len(h.NegativeBuckets) + 2
149-
}
150-
}
151-
if totalCount > 0 {
152-
if err := s.tracker.Add(totalCount); err != nil {
153-
panic(err)
154-
}
155-
}
156-
}
157107
}
158108

159109
func (s *StepVector) RemoveHistogram(index int) {
160-
// Track histogram being removed
161-
if s.tracker != nil && index < len(s.Histograms) {
162-
h := s.Histograms[index]
163-
if h != nil {
164-
count := len(h.PositiveBuckets) + len(h.NegativeBuckets) + 2
165-
s.tracker.Remove(count)
166-
}
167-
}
168-
169110
s.Histograms = slices.Delete(s.Histograms, index, index+1)
170111
s.HistogramIDs = slices.Delete(s.HistogramIDs, index, index+1)
171112
}

execution/scan/subquery.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ type subqueryOperator struct {
5252
paramBuf []model.StepVector
5353
param2Buf []model.StepVector
5454
tempBuf []model.StepVector
55+
56+
lastTrackedSamples int
5557
}
5658

5759
func NewSubqueryOperator(next, paramOp, paramOp2 model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
@@ -207,6 +209,22 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
207209
o.currentStep += o.step
208210
}
209211

212+
if o.opts.SampleTracker != nil {
213+
totalSamplesInBatch := 0
214+
for _, b := range o.buffers {
215+
totalSamplesInBatch += b.SampleCount()
216+
}
217+
if totalSamplesInBatch > o.lastTrackedSamples {
218+
o.opts.SampleTracker.Add(totalSamplesInBatch - o.lastTrackedSamples)
219+
} else if totalSamplesInBatch < o.lastTrackedSamples {
220+
o.opts.SampleTracker.Remove(o.lastTrackedSamples - totalSamplesInBatch)
221+
}
222+
o.lastTrackedSamples = totalSamplesInBatch
223+
if err := o.opts.SampleTracker.CheckLimit(); err != nil {
224+
return 0, err
225+
}
226+
}
227+
210228
return n, nil
211229
}
212230

@@ -277,7 +295,7 @@ func (o *subqueryOperator) initSeries(ctx context.Context) error {
277295
o.series = make([]labels.Labels, len(series))
278296
o.buffers = make([]*ringbuffer.GenericRingBuffer, len(series))
279297
for i := range o.buffers {
280-
o.buffers[i] = ringbuffer.New(ctx, 8, o.subQuery.Range.Milliseconds(), o.subQuery.Offset.Milliseconds(), o.call, o.opts)
298+
o.buffers[i] = ringbuffer.New(ctx, 8, o.subQuery.Range.Milliseconds(), o.subQuery.Offset.Milliseconds(), o.call)
281299
}
282300
var b labels.ScratchBuilder
283301
for i, s := range series {

query/options.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ type Options struct {
1818
NoStepSubqueryIntervalFn func(time.Duration) time.Duration
1919
EnableAnalysis bool
2020
DecodingConcurrency int
21-
MaxSamples int // Maximum samples allowed in query execution
22-
SampleTracker *SampleTracker // Tracks current samples in memory (internal)
21+
SampleTracker *SampleTracker // Tracks current samples in memory
2322
}
2423

2524
// TotalSteps returns the total number of steps in the query, regardless of batching.

query/sample_tracker.go

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,52 +4,49 @@
44
package query
55

66
import (
7+
"fmt"
78
"sync/atomic"
8-
9-
"github.com/efficientgo/core/errors"
109
)
1110

12-
// SampleTracker tracks the number of samples currently in memory during query execution.
13-
// It enforces a maximum sample limit to prevent out-of-memory errors.
1411
type SampleTracker struct {
15-
current atomic.Int64 // Current samples in memory
16-
limit int64 // Maximum samples allowed (0 = no limit)
12+
current atomic.Int64
13+
limit int64
1714
}
1815

19-
// NewSampleTracker creates a new sample tracker with the given limit.
2016
func NewSampleTracker(maxSamples int) *SampleTracker {
2117
return &SampleTracker{
2218
limit: int64(maxSamples),
2319
}
2420
}
2521

26-
// Add increments the current sample count and checks against the limit.
27-
// Returns an error if adding these samples would exceed the limit.
28-
func (st *SampleTracker) Add(count int) error {
29-
if count <= 0 {
30-
return nil
31-
}
32-
33-
newCurrent := st.current.Add(int64(count))
34-
35-
// Check limit
36-
if st.limit > 0 && newCurrent > st.limit {
37-
return errors.Newf("query exceeded maximum samples limit: current=%d, limit=%d", newCurrent, st.limit)
38-
}
39-
40-
return nil
22+
func (st *SampleTracker) Add(count int) {
23+
st.current.Add(int64(count))
4124
}
4225

43-
// Remove decrements the current sample count.
44-
// This should be called when samples are freed from memory.
4526
func (st *SampleTracker) Remove(count int) {
46-
if count <= 0 {
47-
return
48-
}
4927
st.current.Add(-int64(count))
5028
}
5129

52-
// Current returns the current number of samples in memory.
5330
func (st *SampleTracker) Current() int64 {
5431
return st.current.Load()
5532
}
33+
34+
func (st *SampleTracker) CheckLimit() error {
35+
if st.limit <= 0 {
36+
return nil
37+
}
38+
current := st.current.Load()
39+
if current > st.limit {
40+
return ErrMaxSamplesExceeded{Current: current, Limit: st.limit}
41+
}
42+
return nil
43+
}
44+
45+
type ErrMaxSamplesExceeded struct {
46+
Current int64
47+
Limit int64
48+
}
49+
50+
func (e ErrMaxSamplesExceeded) Error() string {
51+
return fmt.Sprintf("query exceeded maximum samples limit: current=%d, limit=%d", e.Current, e.Limit)
52+
}

ringbuffer/generic.go

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/thanos-io/promql-engine/execution/telemetry"
1111
"github.com/thanos-io/promql-engine/warnings"
12-
"github.com/thanos-io/promql-engine/query"
1312

1413
"github.com/prometheus/prometheus/model/histogram"
1514
)
@@ -47,27 +46,20 @@ type GenericRingBuffer struct {
4746
selectRange int64
4847
extLookback int64
4948
call FunctionCall
50-
51-
tracker *query.SampleTracker
5249
}
5350

54-
func New(ctx context.Context, size int, selectRange, offset int64, call FunctionCall, opts *query.Options) *GenericRingBuffer {
55-
return NewWithExtLookback(ctx, size, selectRange, offset, 0, call, opts)
51+
func New(ctx context.Context, size int, selectRange, offset int64, call FunctionCall) *GenericRingBuffer {
52+
return NewWithExtLookback(ctx, size, selectRange, offset, 0, call)
5653
}
5754

58-
func NewWithExtLookback(ctx context.Context, size int, selectRange, offset, extLookback int64, call FunctionCall, opts *query.Options) *GenericRingBuffer {
59-
var tracker *query.SampleTracker
60-
if opts != nil {
61-
tracker = opts.SampleTracker
62-
}
55+
func NewWithExtLookback(ctx context.Context, size int, selectRange, offset, extLookback int64, call FunctionCall) *GenericRingBuffer {
6356
return &GenericRingBuffer{
6457
ctx: ctx,
6558
items: make([]Sample, 0, size),
6659
selectRange: selectRange,
6760
offset: offset,
6861
extLookback: extLookback,
6962
call: call,
70-
tracker: tracker,
7163
}
7264
}
7365

@@ -118,35 +110,11 @@ func (r *GenericRingBuffer) Push(t int64, v Value) {
118110
} else {
119111
r.items[n].V.H = nil
120112
}
121-
122-
// Track samples added (count histogram buckets like Prometheus)
123-
if r.tracker != nil {
124-
samplesAdded := 1
125-
if v.H != nil {
126-
// Count histogram size in 16-byte units (same as Prometheus)
127-
samplesAdded = (v.H.Size() + 8) / 16
128-
}
129-
if err := r.tracker.Add(samplesAdded); err != nil {
130-
panic(err)
131-
}
132-
}
133113
}
134114

135115
func (r *GenericRingBuffer) Reset(mint int64, evalt int64) {
136116
r.currentStep = evalt
137117
if r.extLookback == 0 && (len(r.items) == 0 || r.items[len(r.items)-1].T < mint) {
138-
// Track samples removed (count histogram sizes like Prometheus)
139-
if r.tracker != nil && len(r.items) > 0 {
140-
samplesRemoved := 0
141-
for _, item := range r.items {
142-
if item.V.H != nil {
143-
samplesRemoved += (item.V.H.Size() + 8) / 16
144-
} else {
145-
samplesRemoved++
146-
}
147-
}
148-
r.tracker.Remove(samplesRemoved)
149-
}
150118
r.items = r.items[:0]
151119
return
152120
}
@@ -157,19 +125,6 @@ func (r *GenericRingBuffer) Reset(mint int64, evalt int64) {
157125
drop--
158126
}
159127

160-
// Track samples removed (count histogram sizes like Prometheus)
161-
if r.tracker != nil && drop > 0 {
162-
samplesRemoved := 0
163-
for i := 0; i < drop; i++ {
164-
if r.items[i].V.H != nil {
165-
samplesRemoved += (r.items[i].V.H.Size() + 8) / 16
166-
} else {
167-
samplesRemoved++
168-
}
169-
}
170-
r.tracker.Remove(samplesRemoved)
171-
}
172-
173128
keep := len(r.items) - drop
174129
r.tail = resize(r.tail, drop)
175130
copy(r.tail, r.items[:drop])

0 commit comments

Comments
 (0)