Skip to content

Commit 7860176

Browse files
committed
add basic caching bucket config & benchmark stats to BenchmarkLabelValuesOffsetsIndexV2_WithPrefix - TODO separate benchmark & make it more interesting
1 parent 5b6e0f0 commit 7860176

File tree

3 files changed

+79
-15
lines changed

3 files changed

+79
-15
lines changed

pkg/storage/indexheader/bucket_binary_reader.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@ import (
2929

3030
// BucketBinaryReader reads index header data directly from object storage.
3131
type BucketBinaryReader struct {
32-
bkt objstore.BucketReader
33-
factory *streamencoding.BucketDecbufFactory
34-
bufReaderStats *streamencoding.BufReaderStats
32+
bkt objstore.BucketReader
33+
factory *streamencoding.BucketDecbufFactory
3534

3635
toc *index.TOC
3736

@@ -81,10 +80,8 @@ func NewBucketBinaryReader(
8180
indexPath := filepath.Join(blockID.String(), block.IndexFilename)
8281

8382
r := &BucketBinaryReader{
84-
bkt: bkt,
85-
86-
factory: streamencoding.NewBucketDecbufFactory(ctx, bkt, indexPath),
87-
bufReaderStats: &streamencoding.BufReaderStats{},
83+
bkt: bkt,
84+
factory: streamencoding.NewBucketDecbufFactory(ctx, bkt, indexPath),
8885
}
8986

9087
indexAttrs, err := bkt.Attributes(ctx, indexPath)

pkg/storage/indexheader/reader_benchmarks_test.go

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import (
1313
"time"
1414

1515
"github.com/go-kit/log"
16+
"github.com/grafana/dskit/cache"
17+
"github.com/grafana/mimir/pkg/storage/tsdb/bucketcache"
1618
"github.com/oklog/ulid/v2"
19+
"github.com/prometheus/client_golang/prometheus"
1720
"github.com/prometheus/prometheus/model/labels"
1821
"github.com/stretchr/testify/require"
1922
"github.com/thanos-io/objstore"
@@ -267,11 +270,23 @@ func BenchmarkLabelValuesOffsetsIndexV2_WithPrefix(b *testing.B) {
267270
require.NoError(b, bkt.Close())
268271
})
269272

270-
diskReader, err := NewStreamBinaryReader(ctx, log.NewNopLogger(), instBkt, bucketDir, blockID, 32, NewStreamBinaryReaderMetrics(nil), Config{})
273+
// We reuse cache between tests (!)
274+
cache := cache.NewMockCache()
275+
276+
const cfgName = "index"
277+
subrangeSize := int64(16000)
278+
cfg := bucketcache.NewCachingBucketConfig()
279+
cfg.CacheGetRange(cfgName, cache, func(string) bool { return true }, subrangeSize, cache, time.Hour, time.Hour, 0)
280+
281+
cacheBucketReg := prometheus.NewPedanticRegistry()
282+
cachingBucket, err := bucketcache.NewCachingBucket("test", instBkt, cfg, log.NewNopLogger(), cacheBucketReg)
283+
require.NoError(b, err)
284+
285+
diskReader, err := NewStreamBinaryReader(ctx, log.NewNopLogger(), cachingBucket, bucketDir, blockID, 32, NewStreamBinaryReaderMetrics(nil), Config{})
271286
require.NoError(b, err)
272287
b.Cleanup(func() { require.NoError(b, diskReader.Close()) })
273288

274-
bucketReader, err := NewBucketBinaryReader(ctx, log.NewNopLogger(), instBkt, bucketDir, blockID, 32, Config{})
289+
bucketReader, err := NewBucketBinaryReader(ctx, log.NewNopLogger(), cachingBucket, bucketDir, blockID, 32, Config{})
275290
require.NoError(b, err)
276291
b.Cleanup(func() { require.NoError(b, bucketReader.Close()) })
277292

@@ -296,9 +311,30 @@ func BenchmarkLabelValuesOffsetsIndexV2_WithPrefix(b *testing.B) {
296311
b.ReportAllocs()
297312
for _, lbl := range sortedTestLbls {
298313
tcs := tests[lbl]
299-
for readerName, reader := range readers {
314+
for readerName, _ := range readers {
300315
for _, tc := range tcs {
301316
b.Run(fmt.Sprintf("Reader=%s/Label=%s/Prefix='%s'/Desc=%s", readerName, lbl, tc.prefix, tc.desc), func(b *testing.B) {
317+
cacheBucketReg := prometheus.NewPedanticRegistry()
318+
cachingBucket, err := bucketcache.NewCachingBucket("test", instBkt, cfg, log.NewNopLogger(), cacheBucketReg)
319+
require.NoError(b, err)
320+
321+
var reader Reader
322+
switch readerName {
323+
case "disk":
324+
r, err := NewStreamBinaryReader(ctx, log.NewNopLogger(), cachingBucket, bucketDir, blockID, 32, NewStreamBinaryReaderMetrics(nil), Config{})
325+
require.NoError(b, err)
326+
b.Cleanup(func() { require.NoError(b, r.Close()) })
327+
reader = r
328+
case "bucket":
329+
r, err := NewBucketBinaryReader(ctx, log.NewNopLogger(), cachingBucket, bucketDir, blockID, 32, Config{})
330+
require.NoError(b, err)
331+
b.Cleanup(func() { require.NoError(b, r.Close()) })
332+
reader = r
333+
default:
334+
b.Fatalf("unknown reader: %s", readerName)
335+
}
336+
337+
b.ResetTimer()
302338
for i := 0; i < b.N; i++ {
303339
startBytesDiscarded := reader.BufReaderStats().BytesDiscarded.Load()
304340
values, err := reader.LabelValuesOffsets(context.Background(), lbl, tc.prefix, tc.filter)
@@ -309,6 +345,39 @@ func BenchmarkLabelValuesOffsetsIndexV2_WithPrefix(b *testing.B) {
309345
diff := endBytesDiscarded - startBytesDiscarded
310346
output := float64(diff)
311347
b.ReportMetric(output, "buffered-bytes-discarded/op")
348+
349+
metrics, err := cacheBucketReg.Gather()
350+
require.NoError(b, err)
351+
352+
promToBenchmarkMetrics := map[string]string{
353+
"thanos_store_bucket_cache_operation_requests_total": "get-range-ops",
354+
"thanos_store_bucket_cache_operation_hits_total": "get-range-hits",
355+
"thanos_store_bucket_cache_getrange_requested_bytes_total": "get-range-bytes-requested",
356+
"thanos_store_bucket_cache_getrange_fetched_bytes_total": "get-range-bytes-fetched",
357+
"thanos_store_bucket_cache_getrange_refetched_bytes_total": "get-range-bytes-refetched",
358+
}
359+
MetricFamilyLoop:
360+
for _, mf := range metrics {
361+
MetricLoop:
362+
for _, m := range mf.GetMetric() {
363+
name := mf.GetName()
364+
365+
benchMetricName, ok := promToBenchmarkMetrics[name]
366+
if !ok {
367+
continue MetricFamilyLoop
368+
}
369+
370+
for _, l := range m.GetLabel() {
371+
lName, lValue := l.GetName(), l.GetValue()
372+
if lName == "operation" && lValue != "get_range" {
373+
continue MetricLoop
374+
}
375+
}
376+
377+
value := m.GetCounter().GetValue()
378+
b.ReportMetric(value, benchMetricName+"/op")
379+
}
380+
}
312381
}
313382
})
314383
}

pkg/storage/indexheader/stream_binary_reader.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ func NewStreamBinaryReaderMetrics(reg prometheus.Registerer) *StreamBinaryReader
4343
}
4444

4545
type StreamBinaryReader struct {
46-
factory *streamencoding.DecbufFactory
47-
bufReaderStats *streamencoding.BufReaderStats
48-
toc *BinaryTOC
46+
factory *streamencoding.DecbufFactory
47+
toc *BinaryTOC
4948

5049
// Symbols struct that keeps only 1/postingOffsetsInMemSampling in the memory, then looks up the
5150
// rest via seeking to offsets in the index-header.
@@ -109,8 +108,7 @@ func NewFileStreamBinaryReader(ctx context.Context, binPath string, id ulid.ULID
109108
logger = log.With(logger, "id", id, "path", sparseHeadersPath, "inmem_sampling_rate", postingOffsetsInMemSampling)
110109

111110
r := &StreamBinaryReader{
112-
factory: streamencoding.NewDecbufFactory(binPath, cfg.MaxIdleFileHandles, metrics.decbufFactory),
113-
bufReaderStats: &streamencoding.BufReaderStats{},
111+
factory: streamencoding.NewDecbufFactory(binPath, cfg.MaxIdleFileHandles, metrics.decbufFactory),
114112
}
115113

116114
// Create a new raw decoding buffer with access to the entire index-header file to

0 commit comments

Comments
 (0)