@@ -11,7 +11,10 @@ import (
1111 "fmt"
1212 "time"
1313
14+ "github.com/go-kit/log"
15+ "github.com/go-kit/log/level"
1416 "github.com/grafana/mimir/pkg/streamingpromql/optimize/plan/querysplitting/cache"
17+ "github.com/grafana/mimir/pkg/util/spanlogger"
1518 promts "github.com/prometheus/prometheus/model/timestamp"
1619 "github.com/prometheus/prometheus/promql"
1720 "github.com/prometheus/prometheus/promql/parser/posrange"
@@ -53,6 +56,9 @@ type FunctionOverRangeVectorSplit[T any] struct {
5356 // seriesToSplits is ordered the same way as SeriesMetadata
5457 seriesToSplits [][]SplitSeries
5558 currentSeriesIdx int
59+
60+ logger log.Logger
61+ cacheStats * cache.CacheStats
5662}
5763
5864var _ types.InstantVectorOperator = (* FunctionOverRangeVectorSplit [any ])(nil )
@@ -73,6 +79,7 @@ func NewSplittingFunctionOverRangeVector[T any](
7379 annotations * annotations.Annotations ,
7480 memoryConsumptionTracker * limiter.MemoryConsumptionTracker ,
7581 enableDelayedNameRemoval bool ,
82+ logger log.Logger ,
7683) (* FunctionOverRangeVectorSplit [T ], error ) {
7784 if ! timeRange .IsInstant {
7885 return nil , fmt .Errorf ("FunctionOverRangeVectorSplit only supports instant queries" )
@@ -95,6 +102,8 @@ func NewSplittingFunctionOverRangeVector[T any](
95102 expressionPosition : expressionPosition ,
96103 timeRange : timeRange ,
97104 enableDelayedNameRemoval : enableDelayedNameRemoval ,
105+ logger : logger ,
106+ cacheStats : & cache.CacheStats {},
98107 }
99108
100109 if funcDef .SeriesValidationFuncFactory != nil {
@@ -141,7 +150,7 @@ func (m *FunctionOverRangeVectorSplit[T]) createSplits(ctx context.Context) ([]S
141150 for _ , splitRange := range m .splitRanges {
142151 // For cacheable (aligned) ranges, check the cache
143152 if splitRange .Cacheable {
144- cacheEntry , found , err := cache .NewReadEntry [T ](m .cache , m .codec , ctx , int32 (m .FuncId ), m .innerCacheKey , splitRange .Start , splitRange .End )
153+ cacheEntry , found , err := cache .NewReadEntry [T ](m .cache , m .codec , ctx , int32 (m .FuncId ), m .innerCacheKey , splitRange .Start , splitRange .End , m . cacheStats )
145154 if err != nil {
146155 return nil , err
147156 }
@@ -375,12 +384,41 @@ func (m *FunctionOverRangeVectorSplit[T]) emitAnnotation(generator types.Annotat
375384}
376385
377386func (m * FunctionOverRangeVectorSplit [T ]) Finalize (ctx context.Context ) error {
387+ logger := spanlogger .FromContext (ctx , m .logger )
388+
389+ // Count cached vs uncached splits
390+ var cachedCount , uncachedCount int
391+ for _ , split := range m .splits {
392+ if split .IsCached () {
393+ cachedCount ++
394+ } else {
395+ uncachedCount ++
396+ }
397+ }
398+
399+ // Finalize all splits to ensure cache writes are complete
378400 for _ , split := range m .splits {
379401 if err := split .Finalize (ctx ); err != nil {
380402 return err
381403 }
382404 }
383405
406+ // TODO: currently at info level while testing, may also modify and remove some stats post tests
407+ level .Info (logger ).Log (
408+ "msg" , "query splitting stats" ,
409+ "function" , m .FuncId .PromQLName (),
410+ "inner_cache_key" , m .innerCacheKey ,
411+ "query_start_ms" , m .queryTimeRange .StartT ,
412+ "query_end_ms" , m .queryTimeRange .EndT ,
413+ "splits_total" , len (m .splits ),
414+ "splits_cached" , cachedCount ,
415+ "splits_uncached" , uncachedCount ,
416+ "cache_entries_cached" , m .cacheStats .CachedEntries ,
417+ "cache_entries_uncached" , m .cacheStats .UncachedEntries ,
418+ "total_series" , m .cacheStats .TotalSeries ,
419+ "total_cache_bytes" , m .cacheStats .TotalBytes ,
420+ )
421+
384422 return nil
385423}
386424
@@ -396,6 +434,7 @@ type Split[T any] interface {
396434 ReadResultsAt (ctx context.Context , idx int ) ([]T , error )
397435 Finalize (ctx context.Context ) error
398436 Close ()
437+ IsCached () bool
399438}
400439
401440type SplitSeries struct {
@@ -440,6 +479,10 @@ func (c *CachedSplit[T]) Finalize(ctx context.Context) error {
440479func (c * CachedSplit [T ]) Close () {
441480}
442481
482+ func (c * CachedSplit [T ]) IsCached () bool {
483+ return true
484+ }
485+
443486type UncachedSplit [T any ] struct {
444487 ranges []Range
445488 operator types.RangeVectorOperator
@@ -466,7 +509,7 @@ func NewUncachedSplit[T any](
466509 continue
467510 }
468511
469- cacheEntries [i ], err = cache .NewWriteEntry [T ](parent .cache , parent .codec , ctx , int32 (parent .FuncId ), parent .innerCacheKey , splitRange .Start , splitRange .End )
512+ cacheEntries [i ], err = cache .NewWriteEntry [T ](parent .cache , parent .codec , ctx , int32 (parent .FuncId ), parent .innerCacheKey , splitRange .Start , splitRange .End , parent . cacheStats )
470513 if err != nil {
471514 return nil , err
472515 }
@@ -568,6 +611,10 @@ func (p *UncachedSplit[T]) Close() {
568611 }
569612}
570613
614+ func (p * UncachedSplit [T ]) IsCached () bool {
615+ return false
616+ }
617+
571618type ResultGetter [T any ] struct {
572619 resultBuffer map [int ][]T
573620 nextSeriesIdx int
0 commit comments