Skip to content

Commit f276d56

Browse files
authored
MQE: rename statsTracker to subsetStats, introduce support for serializing OperatorEvaluationStats (#15045)
#### What this PR does This PR makes two changes to MQE's support for tracking query statistics: * it renames `statsTracker` to `subsetStats`, which better conveys its purpose * it introduces support for serializing `OperatorEvaluationStats` to / from Protobuf, which is a prerequisite for supporting query stats with remote execution and range vector splitting #### Which issue(s) this PR fixes or relates to (none) #### Checklist - [x] Tests updated. - [n/a] Documentation added. - [x] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [n/a] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Adds a new protobuf-serialized representation for `OperatorEvaluationStats` and changes how per-subset stats are allocated/decoded, which could affect query stats correctness and memory accounting in MQE paths. > > **Overview** > Adds protobuf serialization support for MQE operator evaluation stats by introducing `EncodedOperatorEvaluationStats`/`EncodedSubsetStats` (new `stats.proto` + generated `stats.pb.go`) and `OperatorEvaluationStats.Encode()` / `Decode()` methods. > > Refactors internal tracking by renaming `statsTracker` to `subsetStats`, avoiding subset slice allocation when `subsetCount==0`, and adds tests covering round-trip encoding/decoding plus validation errors and memory-consumption tracking. Updates the changelog entry to include the additional PR reference. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 55be141. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 839ded0 commit f276d56

File tree

5 files changed

+1158
-25
lines changed

5 files changed

+1158
-25
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
* [FEATURE] Ingest storage: Add `-ingest-storage.kafka.tls*` flags to connect to Kafka using TLS. #14550
6060
* [FEATURE] Ingest storage: Add `-ingest-storage.ingestion-partition-tenant-write-shard-size` to limit the number of partitions used for writes independently from reads, allowing safely reducing the shard size without losing query coverage during the migration. #14780
6161
* [FEATURE] MQE: Add experimental support for splitting and caching intermediate results for functions over range vectors in instant queries. #13472 #14479 #14506 #14499 #14517 #14536 #14614 #14645 #14677 #14788
62-
* [FEATURE] MQE: Add experimental support for reporting the number of samples read per query. #14828 #14839 #14952
62+
* [FEATURE] MQE: Add experimental support for reporting the number of samples read per query. #14828 #14839 #14952 #15045
6363
* [FEATURE] Compactor: Add `-compactor.ooo-split-and-merge-shards` per-tenant limit to allow a separate shard count for blocks with the out-of-order external label. #14704
6464
* [FEATURE] Distributor: add experimental support for controlling OTLP metric name suffix addition and translation strategy via `X-Mimir-OTLP-AddSuffixes` and `X-Mimir-OTLP-TranslationStrategy` request headers on the OTLP push path, gated by `-api.otlp-translation-headers-enabled` (off by default). #14782
6565
* [ENHANCEMENT] Distributor: Add per-tenant `-distributor.active-series-limit-response-code` override to configure the HTTP response code returned when rejecting series due to the active series limit. Defaults to 429 (Too Many Requests). Set to 400 (Bad Request) to prevent clients from retrying rejected requests. #14981

pkg/streamingpromql/types/stats.go

Lines changed: 104 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -50,37 +50,42 @@ type OperatorEvaluationStats struct {
5050
timeRange QueryTimeRange
5151
memoryConsumptionTracker *limiter.MemoryConsumptionTracker
5252

53-
allSeries *statsTracker
54-
subsets []*statsTracker
53+
allSeries *subsetStats
54+
subsets []*subsetStats
5555
}
5656

5757
// NewOperatorEvaluationStats creates a new OperatorEvaluationStats for the given time range.
5858
//
5959
// subsetCount is the number of subsets to track. It is the caller's responsibility to track
6060
// which subset is which.
6161
func NewOperatorEvaluationStats(timeRange QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker, subsetCount int) (*OperatorEvaluationStats, error) {
62-
allSeries, err := newStatsTracker(timeRange, memoryConsumptionTracker)
62+
allSeries, err := newSubsetStats(timeRange, memoryConsumptionTracker)
6363
if err != nil {
6464
return nil, err
6565
}
6666

67-
subsets := make([]*statsTracker, 0, subsetCount)
68-
for range subsetCount {
69-
stats, err := newStatsTracker(timeRange, memoryConsumptionTracker)
70-
if err != nil {
71-
return nil, err
72-
}
73-
74-
subsets = append(subsets, stats)
75-
}
76-
77-
return &OperatorEvaluationStats{
67+
stats := &OperatorEvaluationStats{
7868
timeRange: timeRange,
7969
memoryConsumptionTracker: memoryConsumptionTracker,
8070

8171
allSeries: allSeries,
82-
subsets: subsets,
83-
}, nil
72+
}
73+
74+
if subsetCount > 0 {
75+
// Only bother allocating a slice if we actually need it.
76+
stats.subsets = make([]*subsetStats, 0, subsetCount)
77+
78+
for range subsetCount {
79+
subset, err := newSubsetStats(timeRange, memoryConsumptionTracker)
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
stats.subsets = append(stats.subsets, subset)
85+
}
86+
}
87+
88+
return stats, nil
8489
}
8590

8691
// TrackSampleForInstantVectorSelector records a sample for an instant vector selector at output timestamp stepT.
@@ -270,6 +275,54 @@ func (s *OperatorEvaluationStats) ComputeForSubquery(
270275
return result, nil
271276
}
272277

278+
// Encode returns the encoded form of this instance, suitable for serialization.
279+
// The encoded form may share memory with this instance, and so may be modified
280+
// if this instance is modified, and becomes invalid when this instance is closed.
281+
func (s *OperatorEvaluationStats) Encode() *EncodedOperatorEvaluationStats {
282+
encoded := &EncodedOperatorEvaluationStats{
283+
AllSeries: s.allSeries.Encode(),
284+
}
285+
286+
if len(s.subsets) > 0 {
287+
// Only bother allocating a slice for subsets if there are any.
288+
encoded.Subsets = make([]EncodedSubsetStats, 0, len(s.subsets))
289+
for _, subset := range s.subsets {
290+
encoded.Subsets = append(encoded.Subsets, subset.Encode())
291+
}
292+
}
293+
294+
return encoded
295+
}
296+
297+
func (e *EncodedOperatorEvaluationStats) Decode(timeRange QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) (*OperatorEvaluationStats, error) {
298+
allSeries, err := e.AllSeries.decode(timeRange, memoryConsumptionTracker)
299+
if err != nil {
300+
return nil, err
301+
}
302+
303+
decoded := &OperatorEvaluationStats{
304+
timeRange: timeRange,
305+
memoryConsumptionTracker: memoryConsumptionTracker,
306+
allSeries: allSeries,
307+
}
308+
309+
if len(e.Subsets) > 0 {
310+
// Only bother allocating a slice for subsets if there are any.
311+
decoded.subsets = make([]*subsetStats, 0, len(e.Subsets))
312+
313+
for _, encodedSubset := range e.Subsets {
314+
decodedSubset, err := encodedSubset.decode(timeRange, memoryConsumptionTracker)
315+
if err != nil {
316+
return nil, err
317+
}
318+
319+
decoded.subsets = append(decoded.subsets, decodedSubset)
320+
}
321+
}
322+
323+
return decoded, nil
324+
}
325+
273326
func (s *OperatorEvaluationStats) Close() {
274327
s.allSeries.Close()
275328

@@ -278,14 +331,14 @@ func (s *OperatorEvaluationStats) Close() {
278331
}
279332
}
280333

281-
type statsTracker struct {
334+
type subsetStats struct {
282335
samplesProcessedPerStep []int64
283336
newSamplesReadPerStep []int64
284337

285338
memoryConsumptionTracker *limiter.MemoryConsumptionTracker
286339
}
287340

288-
func newStatsTracker(timeRange QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) (*statsTracker, error) {
341+
func newSubsetStats(timeRange QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) (*subsetStats, error) {
289342
samplesProcessed, err := Int64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
290343
if err != nil {
291344
return nil, err
@@ -296,19 +349,19 @@ func newStatsTracker(timeRange QueryTimeRange, memoryConsumptionTracker *limiter
296349
return nil, err
297350
}
298351

299-
return &statsTracker{
352+
return &subsetStats{
300353
samplesProcessedPerStep: samplesProcessed[:timeRange.StepCount],
301354
newSamplesReadPerStep: newSamplesRead[:timeRange.StepCount],
302355
memoryConsumptionTracker: memoryConsumptionTracker,
303356
}, nil
304357
}
305358

306-
func (s *statsTracker) Add(pointIndex int64, samplesProcessed int64, newSamplesRead int64) {
359+
func (s *subsetStats) Add(pointIndex int64, samplesProcessed int64, newSamplesRead int64) {
307360
s.samplesProcessedPerStep[pointIndex] += samplesProcessed
308361
s.newSamplesReadPerStep[pointIndex] += newSamplesRead
309362
}
310363

311-
func (s *statsTracker) SetFromSubquery(source *statsTracker, parentIdx, firstInnerIdx, firstNewSamplesInnerIdx, lastIdx int) {
364+
func (s *subsetStats) SetFromSubquery(source *subsetStats, parentIdx, firstInnerIdx, firstNewSamplesInnerIdx, lastIdx int) {
312365
for innerIdx := firstInnerIdx; innerIdx <= lastIdx; innerIdx++ {
313366
s.samplesProcessedPerStep[parentIdx] += source.samplesProcessedPerStep[innerIdx]
314367
}
@@ -318,19 +371,46 @@ func (s *statsTracker) SetFromSubquery(source *statsTracker, parentIdx, firstInn
318371
}
319372
}
320373

321-
func (s *statsTracker) SetFromStepInvariant(samplesProcessed int64, newSamplesRead int64) {
374+
func (s *subsetStats) SetFromStepInvariant(samplesProcessed int64, newSamplesRead int64) {
322375
s.newSamplesReadPerStep[0] = newSamplesRead
323376
for idx := range s.samplesProcessedPerStep {
324377
s.samplesProcessedPerStep[idx] = samplesProcessed
325378
}
326379
}
327380

328-
func (s *statsTracker) CopyFrom(source *statsTracker) {
381+
func (s *subsetStats) CopyFrom(source *subsetStats) {
329382
copy(s.samplesProcessedPerStep, source.samplesProcessedPerStep)
330383
copy(s.newSamplesReadPerStep, source.newSamplesReadPerStep)
331384
}
332385

333-
func (s *statsTracker) Close() {
386+
func (s *subsetStats) Encode() EncodedSubsetStats {
387+
return EncodedSubsetStats{
388+
SamplesProcessedPerStep: s.samplesProcessedPerStep,
389+
NewSamplesReadPerStep: s.newSamplesReadPerStep,
390+
}
391+
}
392+
393+
func (e *EncodedSubsetStats) decode(timeRange QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) (*subsetStats, error) {
394+
if len(e.SamplesProcessedPerStep) != timeRange.StepCount {
395+
return nil, fmt.Errorf("number of samples processed steps in encoded form (%d) does not match expected (%d)", len(e.SamplesProcessedPerStep), timeRange.StepCount)
396+
}
397+
398+
if len(e.NewSamplesReadPerStep) != timeRange.StepCount {
399+
return nil, fmt.Errorf("number of new samples read steps in encoded form (%d) does not match expected (%d)", len(e.NewSamplesReadPerStep), timeRange.StepCount)
400+
}
401+
402+
if err := memoryConsumptionTracker.IncreaseMemoryConsumption(uint64(cap(e.SamplesProcessedPerStep)+cap(e.NewSamplesReadPerStep))*Int64Size, limiter.Int64Slices); err != nil {
403+
return nil, err
404+
}
405+
406+
return &subsetStats{
407+
samplesProcessedPerStep: e.SamplesProcessedPerStep,
408+
newSamplesReadPerStep: e.NewSamplesReadPerStep,
409+
memoryConsumptionTracker: memoryConsumptionTracker,
410+
}, nil
411+
}
412+
413+
func (s *subsetStats) Close() {
334414
Int64SlicePool.Put(&s.samplesProcessedPerStep, s.memoryConsumptionTracker)
335415
Int64SlicePool.Put(&s.newSamplesReadPerStep, s.memoryConsumptionTracker)
336416
}

0 commit comments

Comments
 (0)