Skip to content

Conversation

@fionaliao
Copy link
Contributor

@fionaliao fionaliao commented Nov 11, 2025

What this PR does

Rough implementation for splitting and caching partial parts of instant queries for functions over range vectors. Currently only works for sum_over_time() (though it doesn't do Kahan summation properly) and does not work for subqueries.

Initially I tried getting something working for both range and instant queries. I got something to pass tests (see these commits), but the code got very complex, so I simplified to just instant queries for now.

This isn't in a state to be merged, the PR is just opened to show the current progress and allow for initial comments

Overview:

  • Added query splitting optimizer (ref) which goes through the nodes. If it finds a sum_over_time() function with a range vector selector that can be split with at least one split being cacheable, it'll create a new SplittableFunctionCall node over the FunctionCall node.
  • Materializing SplittableFunctionCall creates a newly introduced FunctionOverRangeVectorSplit operator ref. This should give the same results as FunctionOverRangeVector, but with the query splitting and caching logic.
    • The split operator has a reference to the inner range vector selector node (instead of inner materialized operator).
    • Most computation happens in SeriesMetadata() functions at the moment. I don't think SeriesMetadata is the best place to do all the work, it was just the simplest thing to implement for now.
    • SeriesMetadata() will load all the cache results to see what gaps there are. Uncached splits are merged if they're contiguous, and a RangeVectorSelector operator is materialized for each split. When calling SeriesMetadata() for each uncached split, loading samples and calculating the intermediate result is done (this part could be deferred later).
    • The FunctionOverRangeVectorSplit keeps track of a series -> split mapping. When NextSeries() is called, it just moves to the next series in the mapping, gets the results for each split and merges them.
    • The current code assumes series from the inner operator aren't sorted lexicographically. For range vector selectors, we can assume sorted I think, but possibly not with subqueries.
    • Storing results back into the cache is done in FunctionOverRangeVectorSplit.Finalize()
  • The splits are calculated based on the query time range at the moment. To be more effective, they should be calculated based on time ranges of the blocks loaded from storage (i.e. take into account offset and @ modifier)
  • Query splitting tests here. They all pass. Also ran Mimir locally in ingest storage mode and logs show splitting and caching is happening.
  • This implementation is not very memory efficient at the moment - all the intermediate results are held in memory until Finalize() is called for FunctionOverRangeVectorSplit. This includes the seriesmetadata for each split.

Checklist

  • Tests updated.
  • Documentation added.
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]. If changelog entry is not needed, please add the changelog-not-needed label to the PR.
  • about-versioning.md updated with experimental features.

@fionaliao fionaliao changed the title Add cache and sum over time splitting wip: splitting and caching instant queries for functions over range operators Nov 11, 2025
@fionaliao
Copy link
Contributor Author

Updated to track memory consumption for series metadata in 82f5024 (tracker was previously erroring) and added a few more TODOs 😅

if opts.EnableNarrowBinarySelectors {
planner.RegisterQueryPlanOptimizationPass(plan.NewNarrowSelectorsOptimizationPass(opts.Logger))
}

// TODO: figure out how query splitting iteracts with other optimisation passes
if opts.InstantQuerySplitting.Enabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's possible, I'd recommend running this before CSE - CSE is the point at which the query plan can change from a tree to a DAG, so all optimisation passes after CSE need to be aware of this which adds extra complexity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved in 3026f30

// CacheKey generates a unique cache key for a planning node for use with intermediate result caching.
// Currently only supports MatrixSelector nodes (range vector selectors).
// For other node types, this function panics.
func CacheKey(node Node) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine this would become a method on the Node interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored to make a SplittableNode interface with a QuerySplittingCacheKey() method for now instead of a method on Node, since at the moment, only MatrixSelector supports query splitting. Also this PR is getting quite large so I want to avoid making changes across all nodes if possible. commit: d09f20e

If we extend to support subqueries, we can move the method to Node since all nodes will have to be able to return a cache key at that point.

When looking into the cache key more, one concern I have was that if new fields are added to the node that might affect the results being returned. As an example - right now we need to include the SkipHistogramBuckets field in the cache key since if that's true then histograms will have empty buckets. It's possible newer optimisation passes will do something similar. There's not an automatic way of updating the cache key, people have to remember to update the QuerySplittingCacheKey() method.

I was considering just serializing the node protobuf (plus its children's protobuf) and using that as the cache key, but that includes the ExpressionPosition field which can vary if multiple queries have overlapping time ranges/splits that can be cached.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored to make a SplittableNode interface with a QuerySplittingCacheKey() method for now instead of a method on Node, since at the moment, only MatrixSelector supports query splitting. Also this PR is getting quite large so I want to avoid making changes across all nodes if possible. commit: d09f20e

If we extend to support subqueries, we can move the method to Node since all nodes will have to be able to return a cache key at that point.

Sounds good to me.

When looking into the cache key more, one concern I have was that if new fields are added to the node that might affect the results being returned. As an example - right now we need to include the SkipHistogramBuckets field in the cache key since if that's true then histograms will have empty buckets. It's possible newer optimisation passes will do something similar. There's not an automatic way of updating the cache key, people have to remember to update the QuerySplittingCacheKey() method.

I was considering just serializing the node protobuf (plus its children's protobuf) and using that as the cache key, but that includes the ExpressionPosition field which can vary if multiple queries have overlapping time ranges/splits that can be cached.

EquivalentToIgnoringHintsAndChildren is another method that has a similar problem - it's very easy to forget when adding a new field. Ditto for MergeHints. One thing I've long wondered but never got round to implementing is some kind of code generation for these methods, so new fields aren't missed - maybe this is the thing that would force us to do this?

We could likely also generate Child, ChildCount, SetChildren, ReplaceChildren and possibly ChildrenLabels as well.

Comment on lines 183 to 184
// hacky way to get the planning nodes to the operator
PlanningNodes []Node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a look at how remote execution handles this - it might provide some inspiration for what to do here.

I'm imagining the materializer would pass the nodes and the Materializer to the operator, and then it can decide (in Prepare) whether or not to materialize the nodes based on whether or not there are cache hits for that time range.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PlanningNodes field wasn't actually being used, it was a renmant from a previous version that should be removed. I have updated the materialization process to be a bit more like the remote exec one though in b6a77ff - so having a new query splitting materializer, and the materializer now the reference to the cache rather than passing it as an operator parameter, since only the splittable function nodes need the cache.

@@ -118,6 +118,7 @@ func FunctionOverRangeVectorOperatorFactory(
name string,
f FunctionOverRangeVectorDefinition,
) FunctionOperatorFactory {
f.Name = name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean why we need to store the function name? It forms part of the cache key - we need the function name alongside the inner node cache key (and time range) since the intermediate result is specifically for the function rather than just the inner node.

We could set the function name directly in the function definition rather than on each operator factory call though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could set the function name directly in the function definition rather than on each operator factory call though.

This makes sense to me.

Another option could be using the Function value from operators/functions/functions.proto, as this will just be an integer rather than a string, which might save a little space in the key and be cheaper to hash.

}

func (m *FunctionOverRangeVectorSplit) Prepare(ctx context.Context, params *types.PrepareParams) error {
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to call Prepare on all the child operators here - so some of the logic in SeriesMetadata will need to move here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we need to materialize the child operators here? I.e. in Prepare(), we will need to query the cache to decide what splits should be cached or uncached and materialize range vector operators for the uncached portions?

This could make it tricky to work with the cached data if we use memcached:

  • If we load all cached results for all splits and keep them in memory from Prepare() onwards, it can take up a lot of memory
  • An alternative is to query the cache multiple times for the same keys (e.g. Prepare() checks cache for what keys are present, SeriesMetadata() reads cache data). this may fail if entries are evicted between calls though. I don't think there's a way in memcached to avoid this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved creating splits into Prepare() and also calling Prepare() on child operators here (38c1bdf), though this currently means all cached results are completely loaded within Prepare()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we can figure out a way to stream the cache entries out of Memcached, then that mitigates the issue of loading everything upfront.


func (m *FunctionOverRangeVectorSplit) materializeOperatorForTimeRange(start int64, end int64) (types.RangeVectorOperator, error) {
subRange := time.Duration(end-start) * time.Millisecond
subNode := m.innerNode.CreateNodeForSubRange(subRange)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than creating a new node like this, what if we passed this overriding range to ConvertNodeToOperator?

Then, when we're materializing the node, we use that when creating the operator (if the range is set).

Most node materializers would ignore this, but range vector selectors and subqueries could adjust the range on the created operator to match.

One possible wrinkle: for an expression like max_over_time(foo[12h]) - min_over_time(foo[12h]) as an instant query, CSE will identify the foo[12h] as common and introduce a Duplicate node, so we'd likely also need to handle this there. If we change the key for Materializer.operatorFactories to include the time range and overridden range, then we'll get different operators for each corresponding split range, so things would then work fine. With this in place, we'll benefit from CSE if both functions have identical uncached splits (which would be the common case), and if only one function has a given uncached split, then it'll still behave correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, but does this mean that we need to run the query splitting optimisation pass after CSE? So disregard this comment: #13472 (comment)

If we didn't merge uncached splits then we could run query splitting before CSE and still get the deduplication done but but then there's the issue of having a lot more calls to ingesters/store-gateways if all splits are uncached

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query splitting optimisation pass can still run before CSE - materialization happens after planning is finished (or even later in Prepare).

Then the uncached splits can still be merged, and any duplicate expressions with the same uncached range(s) can still be deduplicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as per suggestion in 1730442 - TestQuerySplitting_WithCSE checks that storage is only called once per deduped split (Code needs to be refactored a bit though)

Includes big refactor of moving operator into querysplitting package
and pre-caculating ranges.

Also fixes split ranges so they will align with block boundaries.
@fionaliao fionaliao force-pushed the intermediate-cache-new branch from 6d70d9c to c3f4923 Compare November 28, 2025 16:24
@fionaliao fionaliao force-pushed the intermediate-cache-new branch from c3eaf9f to 86ffbe1 Compare November 28, 2025 18:40
@fionaliao
Copy link
Contributor Author

fionaliao commented Nov 28, 2025

Since the initial PR was put up, the major updates have been:

  • Getting query splitting to work in conjunction with CSE
  • Improved polymorphism - FunctionOverRangeVectorSplit is now generic. Each splittable function now defines its own SplittableOperatorFactory, including its combine and generate functions and also how to serialize/deserialize its results from the cache. The cache entry proto now just has results with bytes as its type and separate protos are defined for each result type.
  • Additional functions implemented: (count|min|max)_over_time and rate/increase. Testing for these do need to be more comprehensive though.
  • Adjusting the time ranges queried before to align splits with each other and also with block boundaries better, accounting for the offset and @ modifers. See these comments and also these ones.

Next steps

  • Add stats around cache entry size and series per metadata and then run with some real workloads to see how well it works and how excessively it uses memory, then iterating on cache entry format.
  • Testing histograms
  • Handling cases where query splitting is worse (e.g. too big to fit in cache entry). Initial version can just be to cache "problematic" queries and fall back to non-query splitting instead.
  • Probably some renaming
  • Not caching/reading from the cache for ranges in the OOO window
  • Annotation handling (possibly in a later PR)
  • Subquery support (in later PRs)
  • Other function support (in later PRs)

This PR itself will be split up into more PRs before being set as ready for review.

@fionaliao fionaliao force-pushed the intermediate-cache-new branch from ce0a143 to 90ad2bd Compare December 11, 2025 11:30
@fionaliao fionaliao force-pushed the intermediate-cache-new branch from 90ad2bd to a61a9bd Compare December 11, 2025 11:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants