Skip to content

Commit 76cfaa9

Browse files
committed
feat(v2): time-based compaction
1 parent 078ab1b commit 76cfaa9

File tree

9 files changed

+371
-174
lines changed

9 files changed

+371
-174
lines changed

Diff for: pkg/experiment/metastore/compaction/compactor/compaction_queue.go

+38-21
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/grafana/pyroscope/pkg/util"
1313
)
1414

15+
const defaultBlockBatchSize = 20
16+
1517
type compactionKey struct {
1618
// Order of the fields is not important.
1719
// Can be generalized.
@@ -21,14 +23,14 @@ type compactionKey struct {
2123
}
2224

2325
type compactionQueue struct {
24-
strategy Strategy
26+
config Config
2527
registerer prometheus.Registerer
2628
levels []*blockQueue
2729
}
2830

2931
// blockQueue stages blocks as they are being added. Once a batch of blocks
30-
// within the compaction key reaches a certain size, it is pushed to the linked
31-
// list in the arrival order and to the compaction key queue.
32+
// within the compaction key reaches a certain size or age, it is pushed to
33+
// the linked list in the arrival order and to the compaction key queue.
3234
//
3335
// This allows to iterate over the blocks in the order of arrival within the
3436
// compaction dimension, while maintaining an ability to remove blocks from the
@@ -38,7 +40,7 @@ type compactionQueue struct {
3840
// the queue is through explicit removal. Batch and block iterators provide
3941
// the read access.
4042
type blockQueue struct {
41-
strategy Strategy
43+
config Config
4244
registerer prometheus.Registerer
4345
staged map[compactionKey]*stagedBlocks
4446
// Batches ordered by arrival.
@@ -97,11 +99,12 @@ type batch struct {
9799
// Links to the local batch queue items:
98100
// batches that share the same compaction key.
99101
next, prev *batch
102+
createdAt int64
100103
}
101104

102-
func newCompactionQueue(strategy Strategy, registerer prometheus.Registerer) *compactionQueue {
105+
func newCompactionQueue(config Config, registerer prometheus.Registerer) *compactionQueue {
103106
return &compactionQueue{
104-
strategy: strategy,
107+
config: config,
105108
registerer: registerer,
106109
}
107110
}
@@ -125,11 +128,11 @@ func (q *compactionQueue) push(e compaction.BlockEntry) bool {
125128
shard: e.Shard,
126129
level: e.Level,
127130
})
131+
staged.updatedAt = e.AppendedAt
128132
pushed := staged.push(blockEntry{
129133
id: e.ID,
130134
index: e.Index,
131135
})
132-
staged.updatedAt = e.AppendedAt
133136
heap.Fix(level.updates, staged.heapIndex)
134137
level.flushOldest(e.AppendedAt)
135138
return pushed
@@ -142,15 +145,15 @@ func (q *compactionQueue) blockQueue(l uint32) *blockQueue {
142145
}
143146
level := q.levels[l]
144147
if level == nil {
145-
level = newBlockQueue(q.strategy, q.registerer)
148+
level = newBlockQueue(q.config, q.registerer)
146149
q.levels[l] = level
147150
}
148151
return level
149152
}
150153

151-
func newBlockQueue(strategy Strategy, registerer prometheus.Registerer) *blockQueue {
154+
func newBlockQueue(config Config, registerer prometheus.Registerer) *blockQueue {
152155
return &blockQueue{
153-
strategy: strategy,
156+
config: config,
154157
registerer: registerer,
155158
staged: make(map[compactionKey]*stagedBlocks),
156159
updates: new(priorityBlockQueue),
@@ -201,14 +204,17 @@ func (s *stagedBlocks) push(block blockEntry) bool {
201204
}
202205
s.refs[block.id] = blockRef{batch: s.batch, index: len(s.batch.blocks)}
203206
s.batch.blocks = append(s.batch.blocks, block)
207+
if s.batch.size == 0 {
208+
s.batch.createdAt = s.updatedAt
209+
}
204210
s.batch.size++
205211
s.stats.blocks.Add(1)
206-
if s.queue.strategy.flush(s.batch) && !s.flush() {
207-
// An attempt to flush the same batch twice.
208-
// Should not be possible.
209-
return false
212+
if !s.queue.config.exceedsMaxSize(s.batch) &&
213+
!s.queue.config.exceedsMaxAge(s.batch, s.updatedAt) {
214+
// The batch is still valid.
215+
return true
210216
}
211-
return true
217+
return s.flush()
212218
}
213219

214220
func (s *stagedBlocks) flush() (flushed bool) {
@@ -221,7 +227,6 @@ func (s *stagedBlocks) flush() (flushed bool) {
221227
}
222228

223229
func (s *stagedBlocks) resetBatch() {
224-
// TODO(kolesnikovae): get from pool.
225230
s.batch = &batch{
226231
blocks: make([]blockEntry, 0, defaultBlockBatchSize),
227232
staged: s,
@@ -244,7 +249,6 @@ func (s *stagedBlocks) delete(block string) blockEntry {
244249
s.stats.blocks.Add(-1)
245250
if ref.batch.size == 0 {
246251
s.queue.removeBatch(ref.batch)
247-
// TODO(kolesnikovae): return to pool.
248252
}
249253
delete(s.refs, block)
250254
if len(s.refs) == 0 {
@@ -317,7 +321,7 @@ func (q *blockQueue) flushOldest(now int64) {
317321
return
318322
}
319323
oldest := (*q.updates)[0]
320-
if !q.strategy.flushByAge(oldest.batch, now) {
324+
if !q.config.exceedsMaxAge(oldest.batch, now) {
321325
return
322326
}
323327
heap.Pop(q.updates)
@@ -369,6 +373,8 @@ func (i *batchIter) next() (*batch, bool) {
369373
return b, b != nil
370374
}
371375

376+
func (i *batchIter) reset(b *batch) { i.batch = b }
377+
372378
// batchIter iterates over the batches in the queue, in the order of arrival
373379
// within the compaction key. It's guaranteed that returned blocks are unique
374380
// across all batched.
@@ -393,7 +399,14 @@ func (it *blockIter) setBatch(b *batch) {
393399
it.i = 0
394400
}
395401

396-
func (it *blockIter) next() (string, bool) {
402+
func (it *blockIter) more() bool {
403+
if it.batch == nil {
404+
return false
405+
}
406+
return it.i < len(it.batch.blocks)
407+
}
408+
409+
func (it *blockIter) peek() (string, bool) {
397410
for it.batch != nil {
398411
if it.i >= len(it.batch.blocks) {
399412
it.setBatch(it.batch.next)
@@ -404,9 +417,13 @@ func (it *blockIter) next() (string, bool) {
404417
it.i++
405418
continue
406419
}
407-
it.visited[entry.id] = struct{}{}
408-
it.i++
409420
return entry.id, true
410421
}
411422
return "", false
412423
}
424+
425+
func (it *blockIter) advance() {
426+
entry := it.batch.blocks[it.i]
427+
it.visited[entry.id] = struct{}{}
428+
it.i++
429+
}

Diff for: pkg/experiment/metastore/compaction/compactor/compaction_queue_bench_test.go

+10-27
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,18 @@ package compactor
33
import (
44
"strconv"
55
"testing"
6-
"time"
76

87
"github.com/grafana/pyroscope/pkg/experiment/metastore/compaction"
98
)
109

1110
func BenchmarkCompactionQueue_Push(b *testing.B) {
12-
s := Strategy{
13-
MaxBlocksPerLevel: []uint{20, 10, 10},
14-
MaxBlocksDefault: defaultBlockBatchSize,
15-
MaxBatchAge: defaultMaxBlockBatchAge,
16-
}
17-
18-
q := newCompactionQueue(s, nil)
1911
const (
2012
tenants = 1
2113
levels = 1
2214
shards = 64
2315
)
2416

17+
q := newCompactionQueue(DefaultConfig(), nil)
2518
keys := make([]compactionKey, levels*tenants*shards)
2619
for i := range keys {
2720
keys[i] = compactionKey{
@@ -31,28 +24,18 @@ func BenchmarkCompactionQueue_Push(b *testing.B) {
3124
}
3225
}
3326

34-
writes := make([]int64, len(keys))
35-
now := time.Now().UnixNano()
36-
for i := range writes {
37-
writes[i] = now
38-
}
39-
4027
b.ResetTimer()
4128
b.ReportAllocs()
4229

4330
for i := 0; i < b.N; i++ {
44-
for j, key := range keys {
45-
q.push(compaction.BlockEntry{
46-
Index: uint64(j),
47-
AppendedAt: writes[j],
48-
ID: strconv.Itoa(j),
49-
Tenant: key.tenant,
50-
Shard: key.shard,
51-
Level: key.level,
52-
})
53-
}
54-
for j := range writes {
55-
writes[j] += int64(time.Millisecond * 500)
56-
}
31+
k := keys[i%len(keys)]
32+
q.push(compaction.BlockEntry{
33+
Index: uint64(i),
34+
AppendedAt: int64(i),
35+
ID: strconv.Itoa(i),
36+
Tenant: k.tenant,
37+
Shard: k.shard,
38+
Level: k.level,
39+
})
5740
}
5841
}

Diff for: pkg/experiment/metastore/compaction/compactor/compaction_queue_test.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
func testBlockEntry(id int) blockEntry { return blockEntry{id: strconv.Itoa(id)} }
1616

1717
func TestBlockQueue_Push(t *testing.T) {
18-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
18+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
1919
key := compactionKey{tenant: "t", shard: 1}
2020

2121
result := q.stagedBlocks(key).push(testBlockEntry(1))
@@ -46,7 +46,7 @@ func TestBlockQueue_Push(t *testing.T) {
4646
}
4747

4848
func TestBlockQueue_DuplicateBlock(t *testing.T) {
49-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
49+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
5050
key := compactionKey{tenant: "t", shard: 1}
5151

5252
require.True(t, q.stagedBlocks(key).push(testBlockEntry(1)))
@@ -56,7 +56,7 @@ func TestBlockQueue_DuplicateBlock(t *testing.T) {
5656
}
5757

5858
func TestBlockQueue_Remove(t *testing.T) {
59-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
59+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
6060
key := compactionKey{tenant: "t", shard: 1}
6161
q.stagedBlocks(key).push(testBlockEntry(1))
6262
q.stagedBlocks(key).push(testBlockEntry(2))
@@ -73,7 +73,7 @@ func TestBlockQueue_Remove(t *testing.T) {
7373
}
7474

7575
func TestBlockQueue_RemoveNotFound(t *testing.T) {
76-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
76+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
7777
key := compactionKey{tenant: "t", shard: 1}
7878
remove(q, key, "1")
7979
q.stagedBlocks(key).push(testBlockEntry(1))
@@ -85,7 +85,7 @@ func TestBlockQueue_RemoveNotFound(t *testing.T) {
8585
}
8686

8787
func TestBlockQueue_Linking(t *testing.T) {
88-
q := newBlockQueue(Strategy{MaxBlocksDefault: 2}, nil)
88+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 2}}}, nil)
8989
key := compactionKey{tenant: "t", shard: 1}
9090

9191
q.stagedBlocks(key).push(testBlockEntry(1))
@@ -125,7 +125,7 @@ func TestBlockQueue_EmptyQueue(t *testing.T) {
125125
numBlocksPerKey = 100
126126
)
127127

128-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
128+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
129129
keys := make([]compactionKey, numKeys)
130130
for i := 0; i < numKeys; i++ {
131131
keys[i] = compactionKey{
@@ -165,9 +165,11 @@ func TestBlockQueue_EmptyQueue(t *testing.T) {
165165
}
166166

167167
func TestBlockQueue_FlushByAge(t *testing.T) {
168-
s := Strategy{
169-
MaxBlocksDefault: 5,
170-
MaxBatchAge: 1,
168+
s := Config{
169+
Levels: []LevelConfig{
170+
{MaxBlocks: 3, MaxAge: 1},
171+
{MaxBlocks: 5, MaxAge: 1},
172+
},
171173
}
172174

173175
c := newCompactionQueue(s, nil)
@@ -200,7 +202,7 @@ func TestBlockQueue_FlushByAge(t *testing.T) {
200202
}
201203

202204
func TestBlockQueue_BatchIterator(t *testing.T) {
203-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
205+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
204206
keys := []compactionKey{
205207
{tenant: "t-1", shard: 1},
206208
{tenant: "t-2", shard: 2},

Diff for: pkg/experiment/metastore/compaction/compactor/compactor.go

+4-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package compactor
22

33
import (
4-
"flag"
54
"time"
65

76
"github.com/hashicorp/raft"
@@ -31,15 +30,6 @@ type BlockQueueStore interface {
3130
CreateBuckets(*bbolt.Tx) error
3231
}
3332

34-
type Config struct {
35-
Strategy
36-
}
37-
38-
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
39-
c.Strategy = DefaultStrategy()
40-
// TODO
41-
}
42-
4333
type Compactor struct {
4434
config Config
4535
queue *compactionQueue
@@ -53,7 +43,7 @@ func NewCompactor(
5343
tombstones Tombstones,
5444
reg prometheus.Registerer,
5545
) *Compactor {
56-
queue := newCompactionQueue(config.Strategy, reg)
46+
queue := newCompactionQueue(config, reg)
5747
return &Compactor{
5848
config: config,
5949
queue: queue,
@@ -67,7 +57,7 @@ func NewStore() *store.BlockQueueStore {
6757
}
6858

6959
func (c *Compactor) Compact(tx *bbolt.Tx, entry compaction.BlockEntry) error {
70-
if uint(entry.Level) >= c.config.MaxLevel {
60+
if int(entry.Level) >= len(c.config.Levels) {
7161
return nil
7262
}
7363
if err := c.store.StoreEntry(tx, entry); err != nil {
@@ -82,12 +72,14 @@ func (c *Compactor) enqueue(e compaction.BlockEntry) bool {
8272
}
8373

8474
func (c *Compactor) NewPlan(cmd *raft.Log) compaction.Plan {
75+
now := cmd.AppendedAt.UnixNano()
8576
before := cmd.AppendedAt.Add(-c.config.CleanupDelay)
8677
tombstones := c.tombstones.ListTombstones(before)
8778
return &plan{
8879
compactor: c,
8980
tombstones: tombstones,
9081
blocks: newBlockIter(),
82+
now: now,
9183
}
9284
}
9385

0 commit comments

Comments
 (0)