Skip to content

Commit 15d9715

Browse files
committed
storage: use reader pattern to consume iterators
Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 4a3bf4b commit 15d9715

File tree

2 files changed

+92
-28
lines changed

2 files changed

+92
-28
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright (c) The Thanos Community Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package prometheus
5+
6+
import (
7+
"io"
8+
9+
"github.com/prometheus/prometheus/tsdb/chunkenc"
10+
"github.com/thanos-io/promql-engine/ringbuffer"
11+
)
12+
13+
type BatchIterator struct {
14+
it chunkenc.Iterator
15+
}
16+
17+
func NewBatchIterator(it chunkenc.Iterator) *BatchIterator {
18+
return &BatchIterator{it: it}
19+
}
20+
21+
func (b *BatchIterator) NextBatch(buf []ringbuffer.Sample) (int, error) {
22+
n := 0
23+
for n < len(buf) {
24+
valType := b.it.Next()
25+
if valType == chunkenc.ValNone {
26+
break
27+
}
28+
29+
switch valType {
30+
case chunkenc.ValFloat:
31+
buf[n].T, buf[n].V.F = b.it.At()
32+
buf[n].V.H = nil
33+
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
34+
buf[n].T, buf[n].V.H = b.it.AtFloatHistogram(buf[n].V.H)
35+
buf[n].V.F = 0
36+
}
37+
n++
38+
}
39+
if err := b.it.Err(); err != nil {
40+
return n, err
41+
}
42+
if n < len(buf) {
43+
return n, io.EOF
44+
}
45+
return n, nil
46+
}

storage/prometheus/matrix_selector.go

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package prometheus
66
import (
77
"context"
88
"fmt"
9+
"io"
910
"math"
1011
"strings"
1112
"sync"
@@ -24,7 +25,6 @@ import (
2425
"github.com/prometheus/prometheus/model/labels"
2526
"github.com/prometheus/prometheus/model/value"
2627
"github.com/prometheus/prometheus/promql/parser/posrange"
27-
"github.com/prometheus/prometheus/tsdb/chunkenc"
2828
"github.com/prometheus/prometheus/util/annotations"
2929
)
3030

@@ -33,9 +33,14 @@ type matrixScanner struct {
3333
signature uint64
3434

3535
buffer ringbuffer.Buffer
36-
iterator chunkenc.Iterator
36+
iterator *BatchIterator
3737
lastSample ringbuffer.Sample
3838
metricAppearedTs int64
39+
40+
// Batch reading state
41+
sampleBatch [16]ringbuffer.Sample
42+
batchLen int
43+
batchPos int
3944
}
4045

4146
type matrixSelector struct {
@@ -241,7 +246,7 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error {
241246
o.scanners[i] = matrixScanner{
242247
labels: lbls,
243248
signature: s.Signature,
244-
iterator: s.Iterator(nil),
249+
iterator: NewBatchIterator(s.Iterator(nil)),
245250
lastSample: ringbuffer.Sample{T: math.MinInt64},
246251
buffer: o.newBuffer(ctx),
247252
metricAppearedTs: math.MinInt64,
@@ -345,56 +350,69 @@ func (m *matrixScanner) selectPoints(
345350
}
346351

347352
appendedPointBeforeMint := !ringbuffer.Empty(m.buffer)
348-
for valType := m.iterator.Next(); valType != chunkenc.ValNone; valType = m.iterator.Next() {
349-
switch valType {
350-
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
353+
for {
354+
if m.batchPos >= m.batchLen {
355+
n, err := m.iterator.NextBatch(m.sampleBatch[:])
356+
if err != nil && err != io.EOF {
357+
return err
358+
}
359+
if n == 0 {
360+
break
361+
}
362+
m.batchLen = n
363+
m.batchPos = 0
364+
}
365+
366+
s := &m.sampleBatch[m.batchPos]
367+
m.batchPos++
368+
369+
if s.V.H != nil {
370+
// Histogram path
351371
if isExtFunction {
352372
return ErrNativeHistogramsNotSupported
353373
}
354-
var t int64
355-
t, fh = m.iterator.AtFloatHistogram(fh)
356-
if value.IsStaleNaN(fh.Sum) || t < mint {
374+
if value.IsStaleNaN(s.V.H.Sum) || s.T < mint {
357375
continue
358376
}
359-
if t > maxt {
360-
m.lastSample.T = t
377+
if s.T > maxt {
378+
m.lastSample.T = s.T
361379
if m.lastSample.V.H == nil {
362-
m.lastSample.V.H = fh.Copy()
380+
m.lastSample.V.H = s.V.H.Copy()
363381
} else {
364-
fh.CopyTo(m.lastSample.V.H)
382+
s.V.H.CopyTo(m.lastSample.V.H)
365383
}
366384
return nil
367385
}
368-
if t > mint {
369-
m.buffer.Push(t, ringbuffer.Value{H: fh})
386+
if s.T > mint {
387+
m.buffer.Push(s.T, s.V)
370388
}
371-
case chunkenc.ValFloat:
372-
t, v := m.iterator.At()
373-
if value.IsStaleNaN(v) {
389+
} else {
390+
// Float path
391+
if value.IsStaleNaN(s.V.F) {
374392
continue
375393
}
376394
if m.metricAppearedTs == math.MinInt64 {
377-
m.metricAppearedTs = t
395+
m.metricAppearedTs = s.T
378396
}
379-
if t > maxt {
380-
m.lastSample.T, m.lastSample.V.F, m.lastSample.V.H = t, v, nil
397+
if s.T > maxt {
398+
m.lastSample.T, m.lastSample.V.F, m.lastSample.V.H = s.T, s.V.F, nil
381399
return nil
382400
}
383401
if isExtFunction {
384-
if t > mint || !appendedPointBeforeMint {
385-
m.buffer.Push(t, ringbuffer.Value{F: v})
402+
if s.T > mint || !appendedPointBeforeMint {
403+
m.buffer.Push(s.T, s.V)
386404
appendedPointBeforeMint = true
387405
} else {
388-
m.buffer.ReadIntoLast(func(s *ringbuffer.Sample) {
389-
s.T, s.V.F, s.V.H = t, v, nil
406+
m.buffer.ReadIntoLast(func(last *ringbuffer.Sample) {
407+
last.T, last.V.F, last.V.H = s.T, s.V.F, nil
390408
})
391409
}
392410
} else {
393-
if t > mint {
394-
m.buffer.Push(t, ringbuffer.Value{F: v})
411+
if s.T > mint {
412+
m.buffer.Push(s.T, s.V)
395413
}
396414
}
397415
}
398416
}
399-
return m.iterator.Err()
417+
return nil
400418
}

0 commit comments

Comments
 (0)