Skip to content

Commit 90ad2bd

Browse files
committed
Add more metrics and logging
1 parent fe7c664 commit 90ad2bd

File tree

8 files changed

+181
-20
lines changed

8 files changed

+181
-20
lines changed

pkg/streamingpromql/operators/functions/split_model.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package functions
44

55
import (
6+
"github.com/go-kit/log"
67
"github.com/grafana/mimir/pkg/streamingpromql/optimize/plan/querysplitting/cache"
78
"github.com/grafana/mimir/pkg/streamingpromql/planning"
89
"github.com/grafana/mimir/pkg/streamingpromql/types"
@@ -49,6 +50,7 @@ type SplittableOperatorFactory func(
4950
annotations *annotations.Annotations,
5051
memoryConsumptionTracker *limiter.MemoryConsumptionTracker,
5152
enableDelayedNameRemoval bool,
53+
logger log.Logger,
5254
) (types.Operator, error)
5355

5456
func NewSplitOperatorFactory[T any](
@@ -69,6 +71,7 @@ func NewSplitOperatorFactory[T any](
6971
annotations *annotations.Annotations,
7072
memoryConsumptionTracker *limiter.MemoryConsumptionTracker,
7173
enableDelayedNameRemoval bool,
74+
logger log.Logger,
7275
) (types.Operator, error) {
7376
return NewSplittingFunctionOverRangeVector[T](
7477
innerNode,
@@ -86,6 +89,7 @@ func NewSplitOperatorFactory[T any](
8689
annotations,
8790
memoryConsumptionTracker,
8891
enableDelayedNameRemoval,
92+
logger,
8993
)
9094
}
9195
}

pkg/streamingpromql/operators/functions/split_operator.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ import (
1111
"fmt"
1212
"time"
1313

14+
"github.com/go-kit/log"
15+
"github.com/go-kit/log/level"
1416
"github.com/grafana/mimir/pkg/streamingpromql/optimize/plan/querysplitting/cache"
17+
"github.com/grafana/mimir/pkg/util/spanlogger"
1518
promts "github.com/prometheus/prometheus/model/timestamp"
1619
"github.com/prometheus/prometheus/promql"
1720
"github.com/prometheus/prometheus/promql/parser/posrange"
@@ -53,6 +56,9 @@ type FunctionOverRangeVectorSplit[T any] struct {
5356
// seriesToSplits is ordered the same way as SeriesMetadata
5457
seriesToSplits [][]SplitSeries
5558
currentSeriesIdx int
59+
60+
logger log.Logger
61+
cacheStats *cache.CacheStats
5662
}
5763

5864
var _ types.InstantVectorOperator = (*FunctionOverRangeVectorSplit[any])(nil)
@@ -73,6 +79,7 @@ func NewSplittingFunctionOverRangeVector[T any](
7379
annotations *annotations.Annotations,
7480
memoryConsumptionTracker *limiter.MemoryConsumptionTracker,
7581
enableDelayedNameRemoval bool,
82+
logger log.Logger,
7683
) (*FunctionOverRangeVectorSplit[T], error) {
7784
if !timeRange.IsInstant {
7885
return nil, fmt.Errorf("FunctionOverRangeVectorSplit only supports instant queries")
@@ -95,6 +102,8 @@ func NewSplittingFunctionOverRangeVector[T any](
95102
expressionPosition: expressionPosition,
96103
timeRange: timeRange,
97104
enableDelayedNameRemoval: enableDelayedNameRemoval,
105+
logger: logger,
106+
cacheStats: &cache.CacheStats{},
98107
}
99108

100109
if funcDef.SeriesValidationFuncFactory != nil {
@@ -141,7 +150,7 @@ func (m *FunctionOverRangeVectorSplit[T]) createSplits(ctx context.Context) ([]S
141150
for _, splitRange := range m.splitRanges {
142151
// For cacheable (aligned) ranges, check the cache
143152
if splitRange.Cacheable {
144-
cacheEntry, found, err := cache.NewReadEntry[T](m.cache, m.codec, ctx, int32(m.FuncId), m.innerCacheKey, splitRange.Start, splitRange.End)
153+
cacheEntry, found, err := cache.NewReadEntry[T](m.cache, m.codec, ctx, int32(m.FuncId), m.innerCacheKey, splitRange.Start, splitRange.End, m.cacheStats)
145154
if err != nil {
146155
return nil, err
147156
}
@@ -375,12 +384,41 @@ func (m *FunctionOverRangeVectorSplit[T]) emitAnnotation(generator types.Annotat
375384
}
376385

377386
func (m *FunctionOverRangeVectorSplit[T]) Finalize(ctx context.Context) error {
387+
logger := spanlogger.FromContext(ctx, m.logger)
388+
389+
// Count cached vs uncached splits
390+
var cachedCount, uncachedCount int
391+
for _, split := range m.splits {
392+
if split.IsCached() {
393+
cachedCount++
394+
} else {
395+
uncachedCount++
396+
}
397+
}
398+
399+
// Finalize all splits to ensure cache writes are complete
378400
for _, split := range m.splits {
379401
if err := split.Finalize(ctx); err != nil {
380402
return err
381403
}
382404
}
383405

406+
// TODO: currently at info level while testing, may also modify and remove some stats post tests
407+
level.Info(logger).Log(
408+
"msg", "query splitting stats",
409+
"function", m.FuncId.PromQLName(),
410+
"inner_cache_key", m.innerCacheKey,
411+
"query_start_ms", m.queryTimeRange.StartT,
412+
"query_end_ms", m.queryTimeRange.EndT,
413+
"splits_total", len(m.splits),
414+
"splits_cached", cachedCount,
415+
"splits_uncached", uncachedCount,
416+
"cache_entries_cached", m.cacheStats.CachedEntries,
417+
"cache_entries_uncached", m.cacheStats.UncachedEntries,
418+
"total_series", m.cacheStats.TotalSeries,
419+
"total_cache_bytes", m.cacheStats.TotalBytes,
420+
)
421+
384422
return nil
385423
}
386424

@@ -396,6 +434,7 @@ type Split[T any] interface {
396434
ReadResultsAt(ctx context.Context, idx int) ([]T, error)
397435
Finalize(ctx context.Context) error
398436
Close()
437+
IsCached() bool
399438
}
400439

401440
type SplitSeries struct {
@@ -440,6 +479,10 @@ func (c *CachedSplit[T]) Finalize(ctx context.Context) error {
440479
func (c *CachedSplit[T]) Close() {
441480
}
442481

482+
func (c *CachedSplit[T]) IsCached() bool {
483+
return true
484+
}
485+
443486
type UncachedSplit[T any] struct {
444487
ranges []Range
445488
operator types.RangeVectorOperator
@@ -466,7 +509,7 @@ func NewUncachedSplit[T any](
466509
continue
467510
}
468511

469-
cacheEntries[i], err = cache.NewWriteEntry[T](parent.cache, parent.codec, ctx, int32(parent.FuncId), parent.innerCacheKey, splitRange.Start, splitRange.End)
512+
cacheEntries[i], err = cache.NewWriteEntry[T](parent.cache, parent.codec, ctx, int32(parent.FuncId), parent.innerCacheKey, splitRange.Start, splitRange.End, parent.cacheStats)
470513
if err != nil {
471514
return nil, err
472515
}
@@ -568,6 +611,10 @@ func (p *UncachedSplit[T]) Close() {
568611
}
569612
}
570613

614+
func (p *UncachedSplit[T]) IsCached() bool {
615+
return false
616+
}
617+
571618
type ResultGetter[T any] struct {
572619
resultBuffer map[int][]T
573620
nextSeriesIdx int

pkg/streamingpromql/optimize/plan/querysplitting/cache/cache.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/grafana/dskit/tenant"
1313
"github.com/prometheus/client_golang/prometheus"
14+
"github.com/prometheus/client_golang/prometheus/promauto"
1415

1516
"github.com/go-kit/log"
1617
"github.com/go-kit/log/level"
@@ -29,9 +30,28 @@ type Backend interface {
2930

3031
type Cache struct {
3132
backend Backend
33+
metrics *resultsCacheMetrics
3234
logger log.Logger
3335
}
3436

37+
type resultsCacheMetrics struct {
38+
cacheRequests prometheus.Counter
39+
cacheHits prometheus.Counter
40+
}
41+
42+
func newResultsCacheMetrics(reg prometheus.Registerer) *resultsCacheMetrics {
43+
return &resultsCacheMetrics{
44+
cacheRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
45+
Name: "mimir_query_engine_intermediate_result_cache_requests_total",
46+
Help: "Total number of requests (or partial requests) looked up in the results cache.",
47+
}),
48+
cacheHits: promauto.With(reg).NewCounter(prometheus.CounterOpts{
49+
Name: "mimir_query_engine_intermediate_result_cache_hits_total",
50+
Help: "Total number of requests (or partial requests) fetched from the results cache.",
51+
}),
52+
}
53+
}
54+
3555
func NewResultsCache(cfg Config, logger log.Logger, reg prometheus.Registerer) (*Cache, error) {
3656
client, err := cache.CreateClient("intermediate-result-cache", cfg.BackendConfig, logger, prometheus.WrapRegistererWithPrefix("mimir_", reg))
3757
if err != nil {
@@ -46,12 +66,13 @@ func NewResultsCache(cfg Config, logger log.Logger, reg prometheus.Registerer) (
4666
)
4767

4868
logger.Log("msg", "intermediate results cache initialized", "backend", cfg.Backend)
49-
return NewResultsCacheWithBackend(backend, logger), nil
69+
return NewResultsCacheWithBackend(backend, reg, logger), nil
5070
}
5171

52-
func NewResultsCacheWithBackend(backend Backend, logger log.Logger) *Cache {
72+
func NewResultsCacheWithBackend(backend Backend, reg prometheus.Registerer, logger log.Logger) *Cache {
5373
return &Cache{
5474
backend: backend,
75+
metrics: newResultsCacheMetrics(reg),
5576
logger: logger,
5677
}
5778
}
@@ -90,12 +111,14 @@ func NewReadEntry[T any](
90111
function int32,
91112
innerKey string,
92113
start, end int64,
114+
stats *CacheStats,
93115
) (ReadEntry[T], bool, error) {
94116
tenant, err := user.ExtractOrgID(ctx)
95117
if err != nil {
96118
return nil, false, err
97119
}
98120

121+
c.metrics.cacheRequests.Inc()
99122
cacheKey := generateCacheKey(tenant, function, innerKey, start, end)
100123
hashedKey := cacheHashKey(cacheKey)
101124

@@ -107,19 +130,25 @@ func NewReadEntry[T any](
107130

108131
var cached CachedSeries
109132
if err := cached.Unmarshal(data); err != nil {
133+
level.Warn(c.logger).Log("msg", "failed to decode cached result", "hashed_cache_key", hashedKey, "cache_key", cacheKey, "err", err)
110134
return nil, false, nil
111135
}
112136

113137
if cached.CacheKey != cacheKey {
138+
level.Warn(c.logger).Log("msg", "skipped cached result because a cache key collision has been found", "hashed_cache_key", hashedKey, "cache_key", cacheKey)
139+
114140
return nil, false, nil
115141
}
116142

143+
c.metrics.cacheHits.Inc()
144+
level.Debug(c.logger).Log("msg", "cache hit", "tenant", tenant, "function", function, "innerKey", innerKey, "start", start, "end", end)
145+
117146
reader, err := codec.NewReader(cached.Results)
118147
if err != nil {
119148
return nil, false, err
120149
}
121150

122-
level.Debug(c.logger).Log("msg", "cache hit", "tenant", tenant, "function", function, "innerKey", innerKey, "start", start, "end", end)
151+
stats.AddCachedEntryStat(len(cached.Series), len(data))
123152

124153
return &bufferedReadEntry[T]{
125154
cached: cached,
@@ -134,6 +163,7 @@ func NewWriteEntry[T any](
134163
function int32,
135164
innerKey string,
136165
start, end int64,
166+
stats *CacheStats,
137167
) (WriteEntry[T], error) {
138168
tenant, err := user.ExtractOrgID(ctx)
139169
if err != nil {
@@ -151,6 +181,7 @@ func NewWriteEntry[T any](
151181
},
152182
finalized: false,
153183
logger: c.logger,
184+
stats: stats,
154185
}
155186

156187
writer, err := codec.NewWriter(func(data []byte) {
@@ -195,6 +226,7 @@ type bufferedWriteEntry[T any] struct {
195226
cached CachedSeries
196227
writer SplitWriter[T]
197228
finalized bool
229+
stats *CacheStats
198230
logger log.Logger
199231
}
200232

@@ -231,6 +263,8 @@ func (e *bufferedWriteEntry[T]) Finalize() error {
231263

232264
level.Debug(e.logger).Log("msg", "cache entry written", "cache_key", e.cached.CacheKey, "series_count", len(e.cached.Series), "entry_size", len(data))
233265

266+
e.stats.AddUncachedEntryStat(len(e.cached.Series), len(data))
267+
234268
e.finalized = true
235269
return nil
236270
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package cache
2+
3+
// TODO: this may just be temporary during testing
4+
type CacheStats struct {
5+
CachedEntries int // Entries read from cache
6+
UncachedEntries int // Entries written to cache
7+
TotalSeries int // not deduped
8+
MinSeries int
9+
MaxSeries int
10+
TotalBytes int
11+
MinBytes int
12+
MaxBytes int
13+
}
14+
15+
func (c *CacheStats) AddCachedEntryStat(seriesCount, size int) {
16+
c.CachedEntries++
17+
c.TotalSeries += seriesCount
18+
if c.MinSeries == 0 || seriesCount < c.MinSeries {
19+
c.MinSeries = seriesCount
20+
}
21+
if seriesCount > c.MaxSeries {
22+
c.MaxSeries = seriesCount
23+
}
24+
c.TotalBytes += size
25+
if c.MinBytes == 0 || size < c.MinBytes {
26+
c.MinBytes = size
27+
}
28+
if size > c.MaxBytes {
29+
c.MaxBytes = size
30+
}
31+
}
32+
33+
func (c *CacheStats) AddUncachedEntryStat(seriesCount, size int) {
34+
c.UncachedEntries++
35+
c.TotalSeries += seriesCount
36+
if c.MinSeries == 0 || seriesCount < c.MinSeries {
37+
c.MinSeries = seriesCount
38+
}
39+
if seriesCount > c.MaxSeries {
40+
c.MaxSeries = seriesCount
41+
}
42+
c.TotalBytes += size
43+
if c.MinBytes == 0 || size < c.MinBytes {
44+
c.MinBytes = size
45+
}
46+
if size > c.MaxBytes {
47+
c.MaxBytes = size
48+
}
49+
}

pkg/streamingpromql/optimize/plan/querysplitting/node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ func (m Materializer) Materialize(n planning.Node, materializer *planning.Materi
183183
params.Annotations,
184184
params.MemoryConsumptionTracker,
185185
params.EnableDelayedNameRemoval,
186+
params.Logger,
186187
)
187188
if err != nil {
188189
return nil, err

0 commit comments

Comments
 (0)