Skip to content

Commit 0d1c95d

Browse files
authored
MQE: fix issues where "remove statically empty expressions" behaves incorrectly (#15014)
#### What this PR does This PR fixes two issues in the "remove statically empty expressions" optimisation pass: * it is ineffective when applied to queries rewritten by Grafana Cloud's Adaptive Metrics feature * it can incorrectly determine that an expression is empty if the expression is a `timestamp()` function call directly on a selector and that selector contains an offset or @ modifier #### Which issue(s) this PR fixes or relates to #14989 #### Checklist - [x] Tests updated. - [n/a] Documentation added. - [x] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [n/a] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches query-planning optimization logic for `timestamp()` comparisons; mistakes could change whether parts of queries are skipped, but the change is narrowly scoped and backed by expanded tests. > > **Overview** > Fixes the MQE `RemoveStaticallyEmptyExpressions` optimization so it no longer misclassifies `timestamp()` comparisons as empty when `timestamp()` wraps a selector with `offset`/`@` (now derives the earliest possible sample time via the selector’s queried time range), while still optimizing non-selector `timestamp(<expr>)` comparisons based only on the evaluation start. > > Makes the optimization effective for Grafana Cloud Adaptive Metrics rewrites by unwrapping the reserved wrapper functions around `timestamp()` calls, adds defensive error handling, and expands unit/integration tests plus a changelog reference update. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 2342b78. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 0a8d383 commit 0d1c95d

5 files changed

Lines changed: 326 additions & 87 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@
188188
* [ENHANCEMENT] Query-frontend: Stream JSON encoding directly to the response body to avoid a full-copy allocation of the serialized payload. #14840
189189
* [ENHANCEMENT] Activity tracker: Added `activity_tracker_unfinished_activities_loaded` metric to report the number of unfinished activities detected on startup. #14860
190190
* [ENHANCEMENT] Distributor now uses record validation time as Kafka record timestamp to reduce rejections among consumers. #14921
191-
* [ENHANCEMENT] MQE: Add optimisation pass to optimise away expressions containing comparisons with `timestamp()` that can't produce results due to the query time range. #14989
191+
* [ENHANCEMENT] MQE: Add optimisation pass to optimise away expressions containing comparisons with `timestamp()` that can't produce results due to the query time range. #14989 #15014
192192
* [ENHANCEMENT] Distributor: OTLP endpoint now returns partial success (HTTP 200) instead of HTTP 429 when the usage tracker rejects some series due to the active series limit but other series are successfully ingested. The `RejectedDataPoints` field reports the count of distributor-side rejections (usage tracker filtering). #14789
193193
* [ENHANCEMENT] MQE: Account for memory consumption of labels returned by binary operations in query memory consumption estimate earlier. #15033
194194
* [ENHANCEMENT] Query-frontend: Log the number of series and samples returned for queries in `query stats` log lines. #15044

pkg/streamingpromql/config.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,14 @@ func NewTestEngineOpts() EngineOpts {
122122
Logger: log.NewNopLogger(),
123123
Limits: NewStaticQueryLimitsProvider(),
124124

125-
EnablePruneToggles: true,
126-
EnableCommonSubexpressionElimination: true,
127-
EnableSubsetSelectorElimination: true,
128-
EnableNarrowBinarySelectors: true,
129-
EnableEliminateDeduplicateAndMerge: true,
130-
EnableReduceMatchers: true,
131-
EnableProjectionPushdown: true,
132-
EnableMultiAggregation: true,
125+
EnablePruneToggles: true,
126+
EnableCommonSubexpressionElimination: true,
127+
EnableSubsetSelectorElimination: true,
128+
EnableNarrowBinarySelectors: true,
129+
EnableEliminateDeduplicateAndMerge: true,
130+
EnableReduceMatchers: true,
131+
EnableProjectionPushdown: true,
132+
EnableMultiAggregation: true,
133+
EnableRemoveStaticallyEmptyExpressions: true,
133134
}
134135
}

pkg/streamingpromql/optimize/plan/remove_statically_empty_expressions.go

Lines changed: 104 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ package plan
44

55
import (
66
"context"
7+
"fmt"
78

89
"github.com/go-kit/log"
910
"github.com/prometheus/client_golang/prometheus"
1011
"github.com/prometheus/client_golang/prometheus/promauto"
12+
"github.com/prometheus/prometheus/model/timestamp"
1113

1214
"github.com/grafana/mimir/pkg/streamingpromql/operators/functions"
1315
"github.com/grafana/mimir/pkg/streamingpromql/planning"
@@ -60,7 +62,7 @@ func NewRemoveStaticallyEmptyExpressionsOptimizationPass(reg prometheus.Register
6062
}
6163

6264
func (s *RemoveStaticallyEmptyExpressionsOptimizationPass) Name() string {
63-
return "remove statically empty expressions"
65+
return "Remove statically empty expressions"
6466
}
6567

6668
func (s *RemoveStaticallyEmptyExpressionsOptimizationPass) Apply(ctx context.Context, plan *planning.QueryPlan, maximumSupportedQueryPlanVersion planning.QueryPlanVersion) (*planning.QueryPlan, error) {
@@ -117,7 +119,9 @@ func (s *RemoveStaticallyEmptyExpressionsOptimizationPass) apply(node planning.N
117119
}
118120
}
119121

120-
if isAlwaysEmpty(node, params) {
122+
if empty, err := isAlwaysEmpty(node, params); err != nil {
123+
return nil, false, err
124+
} else if empty {
121125
noOp := &core.NoOp{NoOpDetails: &core.NoOpDetails{}}
122126
return noOp, true, nil
123127
}
@@ -127,83 +131,134 @@ func (s *RemoveStaticallyEmptyExpressionsOptimizationPass) apply(node planning.N
127131

128132
// isAlwaysEmpty returns true if node can be statically determined to produce an empty instant
129133
// vector for the entire query time range described by params.
130-
func isAlwaysEmpty(node planning.Node, params *planning.QueryParameters) bool {
134+
func isAlwaysEmpty(node planning.Node, params *planning.QueryParameters) (bool, error) {
131135
node = unwrap(node)
132136

133137
switch node := node.(type) {
134138
case *core.NoOp:
135-
return true
139+
return true, nil
136140
case *core.BinaryExpression:
137141
return isAlwaysEmptyBinaryExpression(node, params)
138142
default:
139-
return false
143+
return false, nil
140144
}
141145
}
142146

143-
func isAlwaysEmptyBinaryExpression(node *core.BinaryExpression, params *planning.QueryParameters) bool {
144-
earliestPossibleTimestampValueInMilliseconds := float64(params.TimeRange.StartT - params.LookbackDelta.Milliseconds())
145-
147+
func isAlwaysEmptyBinaryExpression(node *core.BinaryExpression, params *planning.QueryParameters) (bool, error) {
146148
if node.ReturnBool {
147-
return false
149+
return false, nil
148150
}
149151

150152
switch node.Op {
151153
case core.BINARY_LAND:
152-
return isAlwaysEmpty(node.LHS, params) || isAlwaysEmpty(node.RHS, params)
154+
lhsEmpty, err := isAlwaysEmpty(node.LHS, params)
155+
if err != nil {
156+
return false, err
157+
}
153158

154-
case core.BINARY_LSS:
155-
// timestamp(v) < C: always empty when C <= the earliest value that timestamp() could return
156-
// timestamp() returns the value in seconds since the epoch, so we need to convert to milliseconds.
157-
if constant, ok := isTimestampComparison(node.LHS, node.RHS); ok {
158-
return constant*1000 <= earliestPossibleTimestampValueInMilliseconds
159+
if lhsEmpty {
160+
return true, nil
159161
}
160162

163+
return isAlwaysEmpty(node.RHS, params)
164+
165+
case core.BINARY_LSS:
166+
// Check for timestamp(v) < C.
167+
return isAlwaysEmptyTimestampComparison(node.LHS, node.RHS, false, params)
168+
161169
case core.BINARY_LTE:
162-
// timestamp(v) <= C: always empty when C < the earliest value that timestamp() could return
163-
if constant, ok := isTimestampComparison(node.LHS, node.RHS); ok {
164-
return constant*1000 < earliestPossibleTimestampValueInMilliseconds
165-
}
170+
// Check for timestamp(v) <= C.
171+
return isAlwaysEmptyTimestampComparison(node.LHS, node.RHS, true, params)
166172

167173
case core.BINARY_GTR:
168-
// C > timestamp(v): equivalent to timestamp(v) < C.
169-
if constant, ok := isTimestampComparison(node.RHS, node.LHS); ok {
170-
return constant*1000 <= earliestPossibleTimestampValueInMilliseconds
171-
}
174+
// Check for C > timestamp(v), equivalent to timestamp(v) < C.
175+
return isAlwaysEmptyTimestampComparison(node.RHS, node.LHS, false, params)
172176

173177
case core.BINARY_GTE:
174-
// C >= timestamp(v): equivalent to timestamp(v) <= C.
175-
if constant, ok := isTimestampComparison(node.RHS, node.LHS); ok {
176-
return constant*1000 < earliestPossibleTimestampValueInMilliseconds
177-
}
178+
// Check for C >= timestamp(v), equivalent to timestamp(v) <= C.
179+
return isAlwaysEmptyTimestampComparison(node.RHS, node.LHS, true, params)
178180
}
179181

180-
return false
182+
return false, nil
181183
}
182184

183-
// isTimestampComparison checks whether timestampSide is (or wraps) a timestamp()
184-
// function call and constantSide is (or wraps) a NumberLiteral. If so, it returns the constant
185-
// value and true.
186-
func isTimestampComparison(timestampSide, constantSide planning.Node) (float64, bool) {
187-
if !isTimestampCall(timestampSide) {
188-
return 0, false
185+
// isAlwaysEmptyTimestampComparison returns true if timestampSide and constantSide represent
186+
// a timestamp(...) invocation and number literal respectively, and the value of constantSide
187+
// is such that the expression timestampSide < constantSide (inclusive=false) or
188+
// timestampSide <= constantSide (inclusive=true) can never return any results.
189+
func isAlwaysEmptyTimestampComparison(timestampSide, constantSide planning.Node, inclusive bool, params *planning.QueryParameters) (bool, error) {
190+
timestampCall, ok := findTimestampCall(timestampSide)
191+
if !ok {
192+
return false, nil
189193
}
190194

191-
constantSide = unwrap(constantSide)
192-
literal, ok := constantSide.(*core.NumberLiteral)
195+
constant, ok := findConstant(constantSide)
193196
if !ok {
194-
return 0, false
197+
return false, nil
198+
}
199+
200+
if len(timestampCall.Args) < 1 {
201+
// Should never happen, but check to avoid panicking here.
202+
return false, fmt.Errorf("expected at least one argument in call to timestamp(), got %d", len(timestampCall.Args))
195203
}
196204

197-
return literal.Value, true
205+
selector, timestampWrapsSelector := timestampCall.Args[0].(*core.VectorSelector)
206+
207+
// The expression timestamp(X) < C is guaranteed to return no results if the lowest possible
208+
// value of timestamp(X) is greater than or equal to C.
209+
//
210+
// The expression timestamp(X) <= C is guaranteed to return no results if the lowest possible
211+
// value of timestamp is greater than C.
212+
//
213+
// If X is a selector, then timestamp(X) will return the timestamps of the underlying samples, so we need to check
214+
// the time range queried to account for the lookback window, offsets and @ modifiers.
215+
//
216+
// If X is not a selector, then timestamp(X) can only return timestamps of the steps in the query time range.
217+
218+
var earliestPossibleTimestampValueInMilliseconds float64
219+
if timestampWrapsSelector {
220+
timeRange, err := selector.QueriedTimeRange(params.TimeRange, params.LookbackDelta)
221+
if err != nil {
222+
return false, err
223+
}
224+
earliestPossibleTimestampValueInMilliseconds = float64(timestamp.FromTime(timeRange.MinT))
225+
} else {
226+
earliestPossibleTimestampValueInMilliseconds = float64(params.TimeRange.StartT)
227+
}
228+
229+
constantInMilliseconds := constant.Value * 1000
230+
231+
if inclusive {
232+
return earliestPossibleTimestampValueInMilliseconds > constantInMilliseconds, nil
233+
}
234+
235+
return earliestPossibleTimestampValueInMilliseconds >= constantInMilliseconds, nil
198236
}
199237

200-
// isTimestampCall returns true if node is (or wraps) a FunctionCall for the timestamp() function.
238+
// findTimestampCall returns the function node and true if node is (or wraps) a FunctionCall for the timestamp() function.
201239
// It unwraps DeduplicateAndMerge, DropName, and StepInvariantExpression layers transparently.
202-
func isTimestampCall(node planning.Node) bool {
240+
func findTimestampCall(node planning.Node) (*core.FunctionCall, bool) {
203241
node = unwrap(node)
204242

205-
fc, ok := node.(*core.FunctionCall)
206-
return ok && fc.Function == functions.FUNCTION_TIMESTAMP
243+
f, ok := node.(*core.FunctionCall)
244+
if !ok {
245+
return nil, false
246+
}
247+
248+
if f.Function == functions.FUNCTION_TIMESTAMP {
249+
return f, true
250+
}
251+
252+
return nil, false
253+
}
254+
255+
// findConstant returns the number literal node and true if node is (or wraps) a NumberLiteral.
256+
// It unwraps DeduplicateAndMerge, DropName, and StepInvariantExpression layers transparently.
257+
func findConstant(node planning.Node) (*core.NumberLiteral, bool) {
258+
node = unwrap(node)
259+
260+
literal, ok := node.(*core.NumberLiteral)
261+
return literal, ok
207262
}
208263

209264
// unwrap removes transparent wrapper nodes returning the innermost non-wrapper node.
@@ -216,6 +271,13 @@ func unwrap(node planning.Node) planning.Node {
216271
node = n.Inner
217272
case *core.StepInvariantExpression:
218273
node = n.Inner
274+
case *core.FunctionCall:
275+
if (n.Function == functions.FUNCTION_ADAPTIVE_METRICS_RESERVED_1 || n.Function == functions.FUNCTION_ADAPTIVE_METRICS_RESERVED_2) && len(n.Args) > 0 {
276+
// The Adaptive Metrics query rewriting can wrap a timestamp() call (eg. expression becomes wrapper(timestamp(...)) < T), so unwrap it.
277+
node = n.Args[0]
278+
} else {
279+
return node
280+
}
219281
default:
220282
return node
221283
}

0 commit comments

Comments
 (0)