Skip to content

Commit dc73094

Browse files
committed
Fix rate combine when a middle split has no samples
1 parent 267ca09 commit dc73094

File tree

2 files changed

+128
-2
lines changed

2 files changed

+128
-2
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
3+
package functions
4+
5+
import (
6+
"context"
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/grafana/mimir/pkg/mimirpb"
12+
"github.com/grafana/mimir/pkg/streamingpromql/types"
13+
)
14+
15+
func TestResultGetter_OutOfOrderAccess(t *testing.T) {
16+
const totalSeries = 3
17+
callCount := 0
18+
19+
nextSeriesFunc := func(ctx context.Context) ([]string, error) {
20+
callCount++
21+
if callCount > totalSeries {
22+
// Simulate the "no more series to pop" panic
23+
return nil, types.EOS
24+
}
25+
return []string{string(rune('A' + callCount - 1))}, nil
26+
}
27+
28+
getter := NewResultGetter(nextSeriesFunc)
29+
30+
result, err := getter.GetResultsAtIdx(context.Background(), 2)
31+
require.NoError(t, err)
32+
require.Equal(t, []string{"C"}, result)
33+
34+
require.Equal(t, 3, callCount)
35+
36+
result, err = getter.GetResultsAtIdx(context.Background(), 0)
37+
require.NoError(t, err)
38+
require.Equal(t, []string{"A"}, result)
39+
40+
result, err = getter.GetResultsAtIdx(context.Background(), 1)
41+
require.NoError(t, err)
42+
require.Equal(t, []string{"B"}, result)
43+
}
44+
45+
func TestResultGetter_SequentialAccess(t *testing.T) {
46+
callCount := 0
47+
48+
nextSeriesFunc := func(ctx context.Context) ([]string, error) {
49+
callCount++
50+
if callCount > 3 {
51+
return nil, types.EOS
52+
}
53+
return []string{string(rune('A' + callCount - 1))}, nil
54+
}
55+
56+
getter := NewResultGetter(nextSeriesFunc)
57+
58+
result0, err := getter.GetResultsAtIdx(context.Background(), 0)
59+
require.NoError(t, err)
60+
require.Equal(t, []string{"A"}, result0)
61+
require.Equal(t, 1, callCount)
62+
63+
result1, err := getter.GetResultsAtIdx(context.Background(), 1)
64+
require.NoError(t, err)
65+
require.Equal(t, []string{"B"}, result1)
66+
require.Equal(t, 2, callCount)
67+
68+
result2, err := getter.GetResultsAtIdx(context.Background(), 2)
69+
require.NoError(t, err)
70+
require.Equal(t, []string{"C"}, result2)
71+
require.Equal(t, 3, callCount)
72+
}
73+
74+
func TestRateCombineFloat_WithEmptySplits(t *testing.T) {
75+
splits := []RateIntermediate{
76+
// Split 0: has samples
77+
{
78+
FirstSample: &mimirpb.Sample{
79+
TimestampMs: 1000,
80+
Value: 100.0,
81+
},
82+
LastSample: &mimirpb.Sample{
83+
TimestampMs: 2000,
84+
Value: 110.0,
85+
},
86+
Delta: 10.0,
87+
SampleCount: 5,
88+
IsHistogram: false,
89+
SplitRangeStart: 0,
90+
SplitRangeEnd: 3000,
91+
},
92+
// Split 1: empty (no samples)
93+
{
94+
FirstSample: nil,
95+
LastSample: nil,
96+
Delta: 0,
97+
SampleCount: 0,
98+
IsHistogram: false,
99+
SplitRangeStart: 3000,
100+
SplitRangeEnd: 6000,
101+
},
102+
// Split 2: has samples
103+
{
104+
FirstSample: &mimirpb.Sample{
105+
TimestampMs: 6000,
106+
Value: 120.0,
107+
},
108+
LastSample: &mimirpb.Sample{
109+
TimestampMs: 7000,
110+
Value: 130.0,
111+
},
112+
Delta: 10.0,
113+
SampleCount: 5,
114+
IsHistogram: false,
115+
SplitRangeStart: 6000,
116+
SplitRangeEnd: 9000,
117+
},
118+
}
119+
120+
result, hasResult, err := rateCombineFloat(splits, true)
121+
require.NoError(t, err)
122+
require.True(t, hasResult)
123+
require.NotZero(t, result)
124+
}

pkg/streamingpromql/operators/functions/split_rate_functions.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,16 +185,17 @@ func rateCombineFloat(splits []RateIntermediate, isRate bool) (float64, bool, er
185185
totalDelta := firstPiece.Delta
186186
totalCount := int(firstPiece.SampleCount)
187187

188+
lastSplitIdx := startIdx
188189
for i := startIdx + 1; i < len(splits); i++ {
189190
split := splits[i]
190191
if split.SampleCount == 0 {
191192
continue
192193
}
193194

194-
if split.FirstSample.Value < splits[i-1].LastSample.Value {
195+
if split.FirstSample.Value < splits[lastSplitIdx].LastSample.Value {
195196
totalDelta += split.FirstSample.Value
196197
} else {
197-
interSplitDelta := split.FirstSample.Value - splits[i-1].LastSample.Value
198+
interSplitDelta := split.FirstSample.Value - splits[lastSplitIdx].LastSample.Value
198199
totalDelta += interSplitDelta
199200
}
200201

@@ -205,6 +206,7 @@ func rateCombineFloat(splits []RateIntermediate, isRate bool) (float64, bool, er
205206
T: split.LastSample.TimestampMs,
206207
F: split.LastSample.Value,
207208
}
209+
lastSplitIdx = i
208210
}
209211

210212
// Need at least 2 samples total to calculate rate

0 commit comments

Comments
 (0)