Skip to content

Commit 684f8ac

Browse files
committed
feat(v2): time-based compaction
1 parent 078ab1b commit 684f8ac

File tree

9 files changed

+279
-144
lines changed

9 files changed

+279
-144
lines changed

Diff for: pkg/experiment/metastore/compaction/compactor/compaction_queue.go

+29-11
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/grafana/pyroscope/pkg/util"
1313
)
1414

15+
const defaultBlockBatchSize = 20
16+
1517
type compactionKey struct {
1618
// Order of the fields is not important.
1719
// Can be generalized.
@@ -21,14 +23,14 @@ type compactionKey struct {
2123
}
2224

2325
type compactionQueue struct {
24-
strategy Strategy
26+
config Config
2527
registerer prometheus.Registerer
2628
levels []*blockQueue
2729
}
2830

2931
// blockQueue stages blocks as they are being added. Once a batch of blocks
30-
// within the compaction key reaches a certain size, it is pushed to the linked
31-
// list in the arrival order and to the compaction key queue.
32+
// within the compaction key reaches a certain size or age, it is pushed to
33+
// the linked list in the arrival order and to the compaction key queue.
3234
//
3335
// This allows to iterate over the blocks in the order of arrival within the
3436
// compaction dimension, while maintaining an ability to remove blocks from the
@@ -38,7 +40,7 @@ type compactionQueue struct {
3840
// the queue is through explicit removal. Batch and block iterators provide
3941
// the read access.
4042
type blockQueue struct {
41-
strategy Strategy
43+
config Config
4244
registerer prometheus.Registerer
4345
staged map[compactionKey]*stagedBlocks
4446
// Batches ordered by arrival.
@@ -99,9 +101,9 @@ type batch struct {
99101
next, prev *batch
100102
}
101103

102-
func newCompactionQueue(strategy Strategy, registerer prometheus.Registerer) *compactionQueue {
104+
func newCompactionQueue(strategy Config, registerer prometheus.Registerer) *compactionQueue {
103105
return &compactionQueue{
104-
strategy: strategy,
106+
config: strategy,
105107
registerer: registerer,
106108
}
107109
}
@@ -142,15 +144,15 @@ func (q *compactionQueue) blockQueue(l uint32) *blockQueue {
142144
}
143145
level := q.levels[l]
144146
if level == nil {
145-
level = newBlockQueue(q.strategy, q.registerer)
147+
level = newBlockQueue(q.config, q.registerer)
146148
q.levels[l] = level
147149
}
148150
return level
149151
}
150152

151-
func newBlockQueue(strategy Strategy, registerer prometheus.Registerer) *blockQueue {
153+
func newBlockQueue(strategy Config, registerer prometheus.Registerer) *blockQueue {
152154
return &blockQueue{
153-
strategy: strategy,
155+
config: strategy,
154156
registerer: registerer,
155157
staged: make(map[compactionKey]*stagedBlocks),
156158
updates: new(priorityBlockQueue),
@@ -203,7 +205,7 @@ func (s *stagedBlocks) push(block blockEntry) bool {
203205
s.batch.blocks = append(s.batch.blocks, block)
204206
s.batch.size++
205207
s.stats.blocks.Add(1)
206-
if s.queue.strategy.flush(s.batch) && !s.flush() {
208+
if s.queue.config.exceedsMaxSize(s.batch) && !s.flush() {
207209
// An attempt to flush the same batch twice.
208210
// Should not be possible.
209211
return false
@@ -317,7 +319,7 @@ func (q *blockQueue) flushOldest(now int64) {
317319
return
318320
}
319321
oldest := (*q.updates)[0]
320-
if !q.strategy.flushByAge(oldest.batch, now) {
322+
if !q.config.exceedsMaxAge(oldest.batch, now) {
321323
return
322324
}
323325
heap.Pop(q.updates)
@@ -393,6 +395,22 @@ func (it *blockIter) setBatch(b *batch) {
393395
it.i = 0
394396
}
395397

398+
func (it *blockIter) peek() (string, bool) {
399+
for it.batch != nil {
400+
if it.i >= len(it.batch.blocks) {
401+
it.setBatch(it.batch.next)
402+
continue
403+
}
404+
entry := it.batch.blocks[it.i]
405+
if _, visited := it.visited[entry.id]; visited {
406+
it.i++
407+
continue
408+
}
409+
return entry.id, true
410+
}
411+
return "", false
412+
}
413+
396414
func (it *blockIter) next() (string, bool) {
397415
for it.batch != nil {
398416
if it.i >= len(it.batch.blocks) {

Diff for: pkg/experiment/metastore/compaction/compactor/compaction_queue_bench_test.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,7 @@ import (
99
)
1010

1111
func BenchmarkCompactionQueue_Push(b *testing.B) {
12-
s := Strategy{
13-
MaxBlocksPerLevel: []uint{20, 10, 10},
14-
MaxBlocksDefault: defaultBlockBatchSize,
15-
MaxBatchAge: defaultMaxBlockBatchAge,
16-
}
17-
18-
q := newCompactionQueue(s, nil)
12+
q := newCompactionQueue(DefaultConfig(), nil)
1913
const (
2014
tenants = 1
2115
levels = 1

Diff for: pkg/experiment/metastore/compaction/compactor/compaction_queue_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
func testBlockEntry(id int) blockEntry { return blockEntry{id: strconv.Itoa(id)} }
1616

1717
func TestBlockQueue_Push(t *testing.T) {
18-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
18+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
1919
key := compactionKey{tenant: "t", shard: 1}
2020

2121
result := q.stagedBlocks(key).push(testBlockEntry(1))
@@ -46,7 +46,7 @@ func TestBlockQueue_Push(t *testing.T) {
4646
}
4747

4848
func TestBlockQueue_DuplicateBlock(t *testing.T) {
49-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
49+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
5050
key := compactionKey{tenant: "t", shard: 1}
5151

5252
require.True(t, q.stagedBlocks(key).push(testBlockEntry(1)))
@@ -56,7 +56,7 @@ func TestBlockQueue_DuplicateBlock(t *testing.T) {
5656
}
5757

5858
func TestBlockQueue_Remove(t *testing.T) {
59-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
59+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
6060
key := compactionKey{tenant: "t", shard: 1}
6161
q.stagedBlocks(key).push(testBlockEntry(1))
6262
q.stagedBlocks(key).push(testBlockEntry(2))
@@ -73,7 +73,7 @@ func TestBlockQueue_Remove(t *testing.T) {
7373
}
7474

7575
func TestBlockQueue_RemoveNotFound(t *testing.T) {
76-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
76+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
7777
key := compactionKey{tenant: "t", shard: 1}
7878
remove(q, key, "1")
7979
q.stagedBlocks(key).push(testBlockEntry(1))
@@ -85,7 +85,7 @@ func TestBlockQueue_RemoveNotFound(t *testing.T) {
8585
}
8686

8787
func TestBlockQueue_Linking(t *testing.T) {
88-
q := newBlockQueue(Strategy{MaxBlocksDefault: 2}, nil)
88+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 2}}}, nil)
8989
key := compactionKey{tenant: "t", shard: 1}
9090

9191
q.stagedBlocks(key).push(testBlockEntry(1))
@@ -125,7 +125,7 @@ func TestBlockQueue_EmptyQueue(t *testing.T) {
125125
numBlocksPerKey = 100
126126
)
127127

128-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
128+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
129129
keys := make([]compactionKey, numKeys)
130130
for i := 0; i < numKeys; i++ {
131131
keys[i] = compactionKey{
@@ -165,10 +165,10 @@ func TestBlockQueue_EmptyQueue(t *testing.T) {
165165
}
166166

167167
func TestBlockQueue_FlushByAge(t *testing.T) {
168-
s := Strategy{
169-
MaxBlocksDefault: 5,
170-
MaxBatchAge: 1,
171-
}
168+
s := Config{Levels: []LevelConfig{
169+
{MaxBlocks: 3, MaxAge: 1},
170+
{MaxBlocks: 5, MaxAge: 1},
171+
}}
172172

173173
c := newCompactionQueue(s, nil)
174174
for _, e := range []compaction.BlockEntry{
@@ -200,7 +200,7 @@ func TestBlockQueue_FlushByAge(t *testing.T) {
200200
}
201201

202202
func TestBlockQueue_BatchIterator(t *testing.T) {
203-
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
203+
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
204204
keys := []compactionKey{
205205
{tenant: "t-1", shard: 1},
206206
{tenant: "t-2", shard: 2},

Diff for: pkg/experiment/metastore/compaction/compactor/compactor.go

+4-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package compactor
22

33
import (
4-
"flag"
54
"time"
65

76
"github.com/hashicorp/raft"
@@ -31,15 +30,6 @@ type BlockQueueStore interface {
3130
CreateBuckets(*bbolt.Tx) error
3231
}
3332

34-
type Config struct {
35-
Strategy
36-
}
37-
38-
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
39-
c.Strategy = DefaultStrategy()
40-
// TODO
41-
}
42-
4333
type Compactor struct {
4434
config Config
4535
queue *compactionQueue
@@ -53,7 +43,7 @@ func NewCompactor(
5343
tombstones Tombstones,
5444
reg prometheus.Registerer,
5545
) *Compactor {
56-
queue := newCompactionQueue(config.Strategy, reg)
46+
queue := newCompactionQueue(config, reg)
5747
return &Compactor{
5848
config: config,
5949
queue: queue,
@@ -67,7 +57,7 @@ func NewStore() *store.BlockQueueStore {
6757
}
6858

6959
func (c *Compactor) Compact(tx *bbolt.Tx, entry compaction.BlockEntry) error {
70-
if uint(entry.Level) >= c.config.MaxLevel {
60+
if int(entry.Level) >= len(c.config.Levels) {
7161
return nil
7262
}
7363
if err := c.store.StoreEntry(tx, entry); err != nil {
@@ -82,12 +72,14 @@ func (c *Compactor) enqueue(e compaction.BlockEntry) bool {
8272
}
8373

8474
func (c *Compactor) NewPlan(cmd *raft.Log) compaction.Plan {
75+
now := cmd.AppendedAt.UnixNano()
8576
before := cmd.AppendedAt.Add(-c.config.CleanupDelay)
8677
tombstones := c.tombstones.ListTombstones(before)
8778
return &plan{
8879
compactor: c,
8980
tombstones: tombstones,
9081
blocks: newBlockIter(),
82+
now: now,
9183
}
9284
}
9385

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package compactor
2+
3+
import (
4+
"flag"
5+
"time"
6+
)
7+
8+
type Config struct {
9+
Levels []LevelConfig
10+
11+
CleanupBatchSize int32
12+
CleanupDelay time.Duration
13+
CleanupJobMinLevel int32
14+
CleanupJobMaxLevel int32
15+
}
16+
17+
type LevelConfig struct {
18+
MaxBlocks uint
19+
MaxAge int64
20+
}
21+
22+
func DefaultConfig() Config {
23+
return Config{
24+
Levels: []LevelConfig{
25+
{MaxBlocks: 20, MaxAge: int64(36 * time.Second)},
26+
{MaxBlocks: 10, MaxAge: int64(360 * time.Second)},
27+
{MaxBlocks: 10, MaxAge: int64(3600 * time.Second)},
28+
},
29+
30+
CleanupBatchSize: 2,
31+
CleanupDelay: 15 * time.Minute,
32+
CleanupJobMaxLevel: 1,
33+
CleanupJobMinLevel: 0,
34+
}
35+
}
36+
37+
func (c *Config) RegisterFlagsWithPrefix(string, *flag.FlagSet) {
38+
// NOTE(kolesnikovae): I'm not sure if making this configurable
39+
// is a good idea; however, we might want to add a flag to tune
40+
// the parameters based on e.g., segment size or max duration.
41+
*c = DefaultConfig()
42+
}
43+
44+
// exceedsSize is called after the block has been added to the batch.
45+
// If the function returns true, the batch is flushed to the global
46+
// queue and becomes available for compaction.
47+
func (c *Config) exceedsMaxSize(b *batch) bool {
48+
return uint(b.size) >= c.maxBlocks(b.staged.key.level)
49+
}
50+
51+
// exceedsAge reports whether the batch update time is older than the
52+
// maximum age for the level threshold. The function is used in two
53+
// cases: if the batch is not flushed to the global queue and is the
54+
// oldest one, or if the batch is flushed (and available to the planner)
55+
// but the job plan is not complete yet.
56+
func (c *Config) exceedsMaxAge(b *batch, now int64) bool {
57+
if m := c.maxAge(b.staged.key.level); m > 0 && b.staged.updatedAt > 0 {
58+
return now-b.staged.updatedAt > m
59+
}
60+
return false
61+
}
62+
63+
func (c *Config) maxBlocks(l uint32) uint {
64+
if l < uint32(len(c.Levels)) {
65+
return c.Levels[l].MaxBlocks
66+
}
67+
return 0
68+
}
69+
70+
func (c *Config) maxAge(l uint32) int64 {
71+
if l < uint32(len(c.Levels)) {
72+
return c.Levels[l].MaxAge
73+
}
74+
return 0
75+
}

0 commit comments

Comments
 (0)