Skip to content

feat(v2): time-based compaction #4098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 38 additions & 21 deletions pkg/experiment/metastore/compaction/compactor/compaction_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/grafana/pyroscope/pkg/util"
)

const defaultBlockBatchSize = 20

type compactionKey struct {
// Order of the fields is not important.
// Can be generalized.
Expand All @@ -21,14 +23,14 @@ type compactionKey struct {
}

type compactionQueue struct {
strategy Strategy
config Config
registerer prometheus.Registerer
levels []*blockQueue
}

// blockQueue stages blocks as they are being added. Once a batch of blocks
// within the compaction key reaches a certain size, it is pushed to the linked
// list in the arrival order and to the compaction key queue.
// within the compaction key reaches a certain size or age, it is pushed to
// the linked list in the arrival order and to the compaction key queue.
//
// This allows to iterate over the blocks in the order of arrival within the
// compaction dimension, while maintaining an ability to remove blocks from the
Expand All @@ -38,7 +40,7 @@ type compactionQueue struct {
// the queue is through explicit removal. Batch and block iterators provide
// the read access.
type blockQueue struct {
strategy Strategy
config Config
registerer prometheus.Registerer
staged map[compactionKey]*stagedBlocks
// Batches ordered by arrival.
Expand Down Expand Up @@ -97,11 +99,12 @@ type batch struct {
// Links to the local batch queue items:
// batches that share the same compaction key.
next, prev *batch
createdAt int64
}

func newCompactionQueue(strategy Strategy, registerer prometheus.Registerer) *compactionQueue {
func newCompactionQueue(config Config, registerer prometheus.Registerer) *compactionQueue {
return &compactionQueue{
strategy: strategy,
config: config,
registerer: registerer,
}
}
Expand All @@ -125,11 +128,11 @@ func (q *compactionQueue) push(e compaction.BlockEntry) bool {
shard: e.Shard,
level: e.Level,
})
staged.updatedAt = e.AppendedAt
pushed := staged.push(blockEntry{
id: e.ID,
index: e.Index,
})
staged.updatedAt = e.AppendedAt
heap.Fix(level.updates, staged.heapIndex)
level.flushOldest(e.AppendedAt)
return pushed
Expand All @@ -142,15 +145,15 @@ func (q *compactionQueue) blockQueue(l uint32) *blockQueue {
}
level := q.levels[l]
if level == nil {
level = newBlockQueue(q.strategy, q.registerer)
level = newBlockQueue(q.config, q.registerer)
q.levels[l] = level
}
return level
}

func newBlockQueue(strategy Strategy, registerer prometheus.Registerer) *blockQueue {
func newBlockQueue(config Config, registerer prometheus.Registerer) *blockQueue {
return &blockQueue{
strategy: strategy,
config: config,
registerer: registerer,
staged: make(map[compactionKey]*stagedBlocks),
updates: new(priorityBlockQueue),
Expand Down Expand Up @@ -201,14 +204,17 @@ func (s *stagedBlocks) push(block blockEntry) bool {
}
s.refs[block.id] = blockRef{batch: s.batch, index: len(s.batch.blocks)}
s.batch.blocks = append(s.batch.blocks, block)
if s.batch.size == 0 {
s.batch.createdAt = s.updatedAt
}
s.batch.size++
s.stats.blocks.Add(1)
if s.queue.strategy.flush(s.batch) && !s.flush() {
// An attempt to flush the same batch twice.
// Should not be possible.
return false
if !s.queue.config.exceedsMaxSize(s.batch) &&
!s.queue.config.exceedsMaxAge(s.batch, s.updatedAt) {
// The batch is still valid.
return true
}
return true
return s.flush()
}

func (s *stagedBlocks) flush() (flushed bool) {
Expand All @@ -221,7 +227,6 @@ func (s *stagedBlocks) flush() (flushed bool) {
}

func (s *stagedBlocks) resetBatch() {
// TODO(kolesnikovae): get from pool.
s.batch = &batch{
blocks: make([]blockEntry, 0, defaultBlockBatchSize),
staged: s,
Expand All @@ -244,7 +249,6 @@ func (s *stagedBlocks) delete(block string) blockEntry {
s.stats.blocks.Add(-1)
if ref.batch.size == 0 {
s.queue.removeBatch(ref.batch)
// TODO(kolesnikovae): return to pool.
}
delete(s.refs, block)
if len(s.refs) == 0 {
Expand Down Expand Up @@ -317,7 +321,7 @@ func (q *blockQueue) flushOldest(now int64) {
return
}
oldest := (*q.updates)[0]
if !q.strategy.flushByAge(oldest.batch, now) {
if !q.config.exceedsMaxAge(oldest.batch, now) {
return
}
heap.Pop(q.updates)
Expand Down Expand Up @@ -369,6 +373,8 @@ func (i *batchIter) next() (*batch, bool) {
return b, b != nil
}

func (i *batchIter) reset(b *batch) { i.batch = b }

// batchIter iterates over the batches in the queue, in the order of arrival
// within the compaction key. It's guaranteed that returned blocks are unique
// across all batched.
Expand All @@ -393,7 +399,14 @@ func (it *blockIter) setBatch(b *batch) {
it.i = 0
}

func (it *blockIter) next() (string, bool) {
func (it *blockIter) more() bool {
if it.batch == nil {
return false
}
return it.i < len(it.batch.blocks)
}

func (it *blockIter) peek() (string, bool) {
for it.batch != nil {
if it.i >= len(it.batch.blocks) {
it.setBatch(it.batch.next)
Expand All @@ -404,9 +417,13 @@ func (it *blockIter) next() (string, bool) {
it.i++
continue
}
it.visited[entry.id] = struct{}{}
it.i++
return entry.id, true
}
return "", false
}

func (it *blockIter) advance() {
entry := it.batch.blocks[it.i]
it.visited[entry.id] = struct{}{}
it.i++
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,18 @@ package compactor
import (
"strconv"
"testing"
"time"

"github.com/grafana/pyroscope/pkg/experiment/metastore/compaction"
)

func BenchmarkCompactionQueue_Push(b *testing.B) {
s := Strategy{
MaxBlocksPerLevel: []uint{20, 10, 10},
MaxBlocksDefault: defaultBlockBatchSize,
MaxBatchAge: defaultMaxBlockBatchAge,
}

q := newCompactionQueue(s, nil)
const (
tenants = 1
levels = 1
shards = 64
)

q := newCompactionQueue(DefaultConfig(), nil)
keys := make([]compactionKey, levels*tenants*shards)
for i := range keys {
keys[i] = compactionKey{
Expand All @@ -31,28 +24,18 @@ func BenchmarkCompactionQueue_Push(b *testing.B) {
}
}

writes := make([]int64, len(keys))
now := time.Now().UnixNano()
for i := range writes {
writes[i] = now
}

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
for j, key := range keys {
q.push(compaction.BlockEntry{
Index: uint64(j),
AppendedAt: writes[j],
ID: strconv.Itoa(j),
Tenant: key.tenant,
Shard: key.shard,
Level: key.level,
})
}
for j := range writes {
writes[j] += int64(time.Millisecond * 500)
}
k := keys[i%len(keys)]
q.push(compaction.BlockEntry{
Index: uint64(i),
AppendedAt: int64(i),
ID: strconv.Itoa(i),
Tenant: k.tenant,
Shard: k.shard,
Level: k.level,
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func testBlockEntry(id int) blockEntry { return blockEntry{id: strconv.Itoa(id)} }

func TestBlockQueue_Push(t *testing.T) {
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
key := compactionKey{tenant: "t", shard: 1}

result := q.stagedBlocks(key).push(testBlockEntry(1))
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestBlockQueue_Push(t *testing.T) {
}

func TestBlockQueue_DuplicateBlock(t *testing.T) {
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
key := compactionKey{tenant: "t", shard: 1}

require.True(t, q.stagedBlocks(key).push(testBlockEntry(1)))
Expand All @@ -56,7 +56,7 @@ func TestBlockQueue_DuplicateBlock(t *testing.T) {
}

func TestBlockQueue_Remove(t *testing.T) {
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
key := compactionKey{tenant: "t", shard: 1}
q.stagedBlocks(key).push(testBlockEntry(1))
q.stagedBlocks(key).push(testBlockEntry(2))
Expand All @@ -73,7 +73,7 @@ func TestBlockQueue_Remove(t *testing.T) {
}

func TestBlockQueue_RemoveNotFound(t *testing.T) {
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
key := compactionKey{tenant: "t", shard: 1}
remove(q, key, "1")
q.stagedBlocks(key).push(testBlockEntry(1))
Expand All @@ -85,7 +85,7 @@ func TestBlockQueue_RemoveNotFound(t *testing.T) {
}

func TestBlockQueue_Linking(t *testing.T) {
q := newBlockQueue(Strategy{MaxBlocksDefault: 2}, nil)
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 2}}}, nil)
key := compactionKey{tenant: "t", shard: 1}

q.stagedBlocks(key).push(testBlockEntry(1))
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestBlockQueue_EmptyQueue(t *testing.T) {
numBlocksPerKey = 100
)

q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
keys := make([]compactionKey, numKeys)
for i := 0; i < numKeys; i++ {
keys[i] = compactionKey{
Expand Down Expand Up @@ -165,9 +165,11 @@ func TestBlockQueue_EmptyQueue(t *testing.T) {
}

func TestBlockQueue_FlushByAge(t *testing.T) {
s := Strategy{
MaxBlocksDefault: 5,
MaxBatchAge: 1,
s := Config{
Levels: []LevelConfig{
{MaxBlocks: 3, MaxAge: 1},
{MaxBlocks: 5, MaxAge: 1},
},
}

c := newCompactionQueue(s, nil)
Expand Down Expand Up @@ -200,7 +202,7 @@ func TestBlockQueue_FlushByAge(t *testing.T) {
}

func TestBlockQueue_BatchIterator(t *testing.T) {
q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil)
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
keys := []compactionKey{
{tenant: "t-1", shard: 1},
{tenant: "t-2", shard: 2},
Expand Down
16 changes: 4 additions & 12 deletions pkg/experiment/metastore/compaction/compactor/compactor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package compactor

import (
"flag"
"time"

"github.com/hashicorp/raft"
Expand Down Expand Up @@ -31,15 +30,6 @@ type BlockQueueStore interface {
CreateBuckets(*bbolt.Tx) error
}

type Config struct {
Strategy
}

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
c.Strategy = DefaultStrategy()
// TODO
}

type Compactor struct {
config Config
queue *compactionQueue
Expand All @@ -53,7 +43,7 @@ func NewCompactor(
tombstones Tombstones,
reg prometheus.Registerer,
) *Compactor {
queue := newCompactionQueue(config.Strategy, reg)
queue := newCompactionQueue(config, reg)
return &Compactor{
config: config,
queue: queue,
Expand All @@ -67,7 +57,7 @@ func NewStore() *store.BlockQueueStore {
}

func (c *Compactor) Compact(tx *bbolt.Tx, entry compaction.BlockEntry) error {
if uint(entry.Level) >= c.config.MaxLevel {
if int(entry.Level) >= len(c.config.Levels) {
return nil
}
if err := c.store.StoreEntry(tx, entry); err != nil {
Expand All @@ -82,12 +72,14 @@ func (c *Compactor) enqueue(e compaction.BlockEntry) bool {
}

func (c *Compactor) NewPlan(cmd *raft.Log) compaction.Plan {
now := cmd.AppendedAt.UnixNano()
before := cmd.AppendedAt.Add(-c.config.CleanupDelay)
tombstones := c.tombstones.ListTombstones(before)
return &plan{
compactor: c,
tombstones: tombstones,
blocks: newBlockIter(),
now: now,
}
}

Expand Down
Loading
Loading