diff --git a/pkg/experiment/metastore/compaction/compactor/compaction_queue.go b/pkg/experiment/metastore/compaction/compactor/compaction_queue.go index 9fade46d2f..72be959aa5 100644 --- a/pkg/experiment/metastore/compaction/compactor/compaction_queue.go +++ b/pkg/experiment/metastore/compaction/compactor/compaction_queue.go @@ -12,6 +12,8 @@ import ( "github.com/grafana/pyroscope/pkg/util" ) +const defaultBlockBatchSize = 20 + type compactionKey struct { // Order of the fields is not important. // Can be generalized. @@ -21,14 +23,14 @@ type compactionKey struct { } type compactionQueue struct { - strategy Strategy + config Config registerer prometheus.Registerer levels []*blockQueue } // blockQueue stages blocks as they are being added. Once a batch of blocks -// within the compaction key reaches a certain size, it is pushed to the linked -// list in the arrival order and to the compaction key queue. +// within the compaction key reaches a certain size or age, it is pushed to +// the linked list in the arrival order and to the compaction key queue. // // This allows to iterate over the blocks in the order of arrival within the // compaction dimension, while maintaining an ability to remove blocks from the @@ -38,7 +40,7 @@ type compactionQueue struct { // the queue is through explicit removal. Batch and block iterators provide // the read access. type blockQueue struct { - strategy Strategy + config Config registerer prometheus.Registerer staged map[compactionKey]*stagedBlocks // Batches ordered by arrival. @@ -97,11 +99,12 @@ type batch struct { // Links to the local batch queue items: // batches that share the same compaction key. next, prev *batch + createdAt int64 } -func newCompactionQueue(strategy Strategy, registerer prometheus.Registerer) *compactionQueue { +func newCompactionQueue(config Config, registerer prometheus.Registerer) *compactionQueue { return &compactionQueue{ - strategy: strategy, + config: config, registerer: registerer, } } @@ -125,11 +128,11 @@ func (q *compactionQueue) push(e compaction.BlockEntry) bool { shard: e.Shard, level: e.Level, }) + staged.updatedAt = e.AppendedAt pushed := staged.push(blockEntry{ id: e.ID, index: e.Index, }) - staged.updatedAt = e.AppendedAt heap.Fix(level.updates, staged.heapIndex) level.flushOldest(e.AppendedAt) return pushed @@ -142,15 +145,15 @@ func (q *compactionQueue) blockQueue(l uint32) *blockQueue { } level := q.levels[l] if level == nil { - level = newBlockQueue(q.strategy, q.registerer) + level = newBlockQueue(q.config, q.registerer) q.levels[l] = level } return level } -func newBlockQueue(strategy Strategy, registerer prometheus.Registerer) *blockQueue { +func newBlockQueue(config Config, registerer prometheus.Registerer) *blockQueue { return &blockQueue{ - strategy: strategy, + config: config, registerer: registerer, staged: make(map[compactionKey]*stagedBlocks), updates: new(priorityBlockQueue), @@ -201,14 +204,17 @@ func (s *stagedBlocks) push(block blockEntry) bool { } s.refs[block.id] = blockRef{batch: s.batch, index: len(s.batch.blocks)} s.batch.blocks = append(s.batch.blocks, block) + if s.batch.size == 0 { + s.batch.createdAt = s.updatedAt + } s.batch.size++ s.stats.blocks.Add(1) - if s.queue.strategy.flush(s.batch) && !s.flush() { - // An attempt to flush the same batch twice. - // Should not be possible. - return false + if !s.queue.config.exceedsMaxSize(s.batch) && + !s.queue.config.exceedsMaxAge(s.batch, s.updatedAt) { + // The batch is still valid. + return true } - return true + return s.flush() } func (s *stagedBlocks) flush() (flushed bool) { @@ -221,7 +227,6 @@ func (s *stagedBlocks) flush() (flushed bool) { } func (s *stagedBlocks) resetBatch() { - // TODO(kolesnikovae): get from pool. s.batch = &batch{ blocks: make([]blockEntry, 0, defaultBlockBatchSize), staged: s, @@ -244,7 +249,6 @@ func (s *stagedBlocks) delete(block string) blockEntry { s.stats.blocks.Add(-1) if ref.batch.size == 0 { s.queue.removeBatch(ref.batch) - // TODO(kolesnikovae): return to pool. } delete(s.refs, block) if len(s.refs) == 0 { @@ -317,7 +321,7 @@ func (q *blockQueue) flushOldest(now int64) { return } oldest := (*q.updates)[0] - if !q.strategy.flushByAge(oldest.batch, now) { + if !q.config.exceedsMaxAge(oldest.batch, now) { return } heap.Pop(q.updates) @@ -369,6 +373,8 @@ func (i *batchIter) next() (*batch, bool) { return b, b != nil } +func (i *batchIter) reset(b *batch) { i.batch = b } + // batchIter iterates over the batches in the queue, in the order of arrival // within the compaction key. It's guaranteed that returned blocks are unique // across all batched. @@ -393,7 +399,14 @@ func (it *blockIter) setBatch(b *batch) { it.i = 0 } -func (it *blockIter) next() (string, bool) { +func (it *blockIter) more() bool { + if it.batch == nil { + return false + } + return it.i < len(it.batch.blocks) +} + +func (it *blockIter) peek() (string, bool) { for it.batch != nil { if it.i >= len(it.batch.blocks) { it.setBatch(it.batch.next) @@ -404,9 +417,13 @@ func (it *blockIter) next() (string, bool) { it.i++ continue } - it.visited[entry.id] = struct{}{} - it.i++ return entry.id, true } return "", false } + +func (it *blockIter) advance() { + entry := it.batch.blocks[it.i] + it.visited[entry.id] = struct{}{} + it.i++ +} diff --git a/pkg/experiment/metastore/compaction/compactor/compaction_queue_bench_test.go b/pkg/experiment/metastore/compaction/compactor/compaction_queue_bench_test.go index e17581058a..f6632fbb9f 100644 --- a/pkg/experiment/metastore/compaction/compactor/compaction_queue_bench_test.go +++ b/pkg/experiment/metastore/compaction/compactor/compaction_queue_bench_test.go @@ -3,25 +3,18 @@ package compactor import ( "strconv" "testing" - "time" "github.com/grafana/pyroscope/pkg/experiment/metastore/compaction" ) func BenchmarkCompactionQueue_Push(b *testing.B) { - s := Strategy{ - MaxBlocksPerLevel: []uint{20, 10, 10}, - MaxBlocksDefault: defaultBlockBatchSize, - MaxBatchAge: defaultMaxBlockBatchAge, - } - - q := newCompactionQueue(s, nil) const ( tenants = 1 levels = 1 shards = 64 ) + q := newCompactionQueue(DefaultConfig(), nil) keys := make([]compactionKey, levels*tenants*shards) for i := range keys { keys[i] = compactionKey{ @@ -31,28 +24,18 @@ func BenchmarkCompactionQueue_Push(b *testing.B) { } } - writes := make([]int64, len(keys)) - now := time.Now().UnixNano() - for i := range writes { - writes[i] = now - } - b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - for j, key := range keys { - q.push(compaction.BlockEntry{ - Index: uint64(j), - AppendedAt: writes[j], - ID: strconv.Itoa(j), - Tenant: key.tenant, - Shard: key.shard, - Level: key.level, - }) - } - for j := range writes { - writes[j] += int64(time.Millisecond * 500) - } + k := keys[i%len(keys)] + q.push(compaction.BlockEntry{ + Index: uint64(i), + AppendedAt: int64(i), + ID: strconv.Itoa(i), + Tenant: k.tenant, + Shard: k.shard, + Level: k.level, + }) } } diff --git a/pkg/experiment/metastore/compaction/compactor/compaction_queue_test.go b/pkg/experiment/metastore/compaction/compactor/compaction_queue_test.go index 150fd43cfd..8336260b29 100644 --- a/pkg/experiment/metastore/compaction/compactor/compaction_queue_test.go +++ b/pkg/experiment/metastore/compaction/compactor/compaction_queue_test.go @@ -15,7 +15,7 @@ import ( func testBlockEntry(id int) blockEntry { return blockEntry{id: strconv.Itoa(id)} } func TestBlockQueue_Push(t *testing.T) { - q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil) + q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil) key := compactionKey{tenant: "t", shard: 1} result := q.stagedBlocks(key).push(testBlockEntry(1)) @@ -46,7 +46,7 @@ func TestBlockQueue_Push(t *testing.T) { } func TestBlockQueue_DuplicateBlock(t *testing.T) { - q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil) + q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil) key := compactionKey{tenant: "t", shard: 1} require.True(t, q.stagedBlocks(key).push(testBlockEntry(1))) @@ -56,7 +56,7 @@ func TestBlockQueue_DuplicateBlock(t *testing.T) { } func TestBlockQueue_Remove(t *testing.T) { - q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil) + q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil) key := compactionKey{tenant: "t", shard: 1} q.stagedBlocks(key).push(testBlockEntry(1)) q.stagedBlocks(key).push(testBlockEntry(2)) @@ -73,7 +73,7 @@ func TestBlockQueue_Remove(t *testing.T) { } func TestBlockQueue_RemoveNotFound(t *testing.T) { - q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil) + q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil) key := compactionKey{tenant: "t", shard: 1} remove(q, key, "1") q.stagedBlocks(key).push(testBlockEntry(1)) @@ -85,7 +85,7 @@ func TestBlockQueue_RemoveNotFound(t *testing.T) { } func TestBlockQueue_Linking(t *testing.T) { - q := newBlockQueue(Strategy{MaxBlocksDefault: 2}, nil) + q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 2}}}, nil) key := compactionKey{tenant: "t", shard: 1} q.stagedBlocks(key).push(testBlockEntry(1)) @@ -125,7 +125,7 @@ func TestBlockQueue_EmptyQueue(t *testing.T) { numBlocksPerKey = 100 ) - q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil) + q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil) keys := make([]compactionKey, numKeys) for i := 0; i < numKeys; i++ { keys[i] = compactionKey{ @@ -165,9 +165,11 @@ func TestBlockQueue_EmptyQueue(t *testing.T) { } func TestBlockQueue_FlushByAge(t *testing.T) { - s := Strategy{ - MaxBlocksDefault: 5, - MaxBatchAge: 1, + s := Config{ + Levels: []LevelConfig{ + {MaxBlocks: 3, MaxAge: 1}, + {MaxBlocks: 5, MaxAge: 1}, + }, } c := newCompactionQueue(s, nil) @@ -200,7 +202,7 @@ func TestBlockQueue_FlushByAge(t *testing.T) { } func TestBlockQueue_BatchIterator(t *testing.T) { - q := newBlockQueue(Strategy{MaxBlocksDefault: 3}, nil) + q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil) keys := []compactionKey{ {tenant: "t-1", shard: 1}, {tenant: "t-2", shard: 2}, diff --git a/pkg/experiment/metastore/compaction/compactor/compactor.go b/pkg/experiment/metastore/compaction/compactor/compactor.go index 3b2b588ec0..886860e2de 100644 --- a/pkg/experiment/metastore/compaction/compactor/compactor.go +++ b/pkg/experiment/metastore/compaction/compactor/compactor.go @@ -1,7 +1,6 @@ package compactor import ( - "flag" "time" "github.com/hashicorp/raft" @@ -31,15 +30,6 @@ type BlockQueueStore interface { CreateBuckets(*bbolt.Tx) error } -type Config struct { - Strategy -} - -func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - c.Strategy = DefaultStrategy() - // TODO -} - type Compactor struct { config Config queue *compactionQueue @@ -53,7 +43,7 @@ func NewCompactor( tombstones Tombstones, reg prometheus.Registerer, ) *Compactor { - queue := newCompactionQueue(config.Strategy, reg) + queue := newCompactionQueue(config, reg) return &Compactor{ config: config, queue: queue, @@ -67,7 +57,7 @@ func NewStore() *store.BlockQueueStore { } func (c *Compactor) Compact(tx *bbolt.Tx, entry compaction.BlockEntry) error { - if uint(entry.Level) >= c.config.MaxLevel { + if int(entry.Level) >= len(c.config.Levels) { return nil } if err := c.store.StoreEntry(tx, entry); err != nil { @@ -82,12 +72,14 @@ func (c *Compactor) enqueue(e compaction.BlockEntry) bool { } func (c *Compactor) NewPlan(cmd *raft.Log) compaction.Plan { + now := cmd.AppendedAt.UnixNano() before := cmd.AppendedAt.Add(-c.config.CleanupDelay) tombstones := c.tombstones.ListTombstones(before) return &plan{ compactor: c, tombstones: tombstones, blocks: newBlockIter(), + now: now, } } diff --git a/pkg/experiment/metastore/compaction/compactor/compactor_config.go b/pkg/experiment/metastore/compaction/compactor/compactor_config.go new file mode 100644 index 0000000000..62500c0cdf --- /dev/null +++ b/pkg/experiment/metastore/compaction/compactor/compactor_config.go @@ -0,0 +1,81 @@ +package compactor + +import ( + "flag" + "time" +) + +type Config struct { + Levels []LevelConfig + + CleanupBatchSize int32 + CleanupDelay time.Duration + CleanupJobMinLevel int32 + CleanupJobMaxLevel int32 +} + +type LevelConfig struct { + MaxBlocks uint + MaxAge int64 +} + +func DefaultConfig() Config { + return Config{ + Levels: []LevelConfig{ + {MaxBlocks: 20, MaxAge: int64(1 * 36 * time.Second)}, + {MaxBlocks: 10, MaxAge: int64(2 * 360 * time.Second)}, + {MaxBlocks: 10, MaxAge: int64(3 * 3600 * time.Second)}, + }, + + CleanupBatchSize: 2, + CleanupDelay: 15 * time.Minute, + CleanupJobMaxLevel: 1, + CleanupJobMinLevel: 0, + } +} + +func (c *Config) RegisterFlagsWithPrefix(string, *flag.FlagSet) { + // NOTE(kolesnikovae): I'm not sure if making this configurable + // is a good idea; however, we might want to add a flag to tune + // the parameters based on e.g., segment size or max duration. + *c = DefaultConfig() +} + +// exceedsSize is called after the block has been added to the batch. +// If the function returns true, the batch is flushed to the global +// queue and becomes available for compaction. +func (c *Config) exceedsMaxSize(b *batch) bool { + return uint(b.size) >= c.maxBlocks(b.staged.key.level) +} + +// exceedsAge reports whether the batch update time is older than the +// maximum age for the level threshold. The function is used in two +// cases: if the batch is not flushed to the global queue and is the +// oldest one, or if the batch is flushed (and available to the planner) +// but the job plan is not complete yet. +func (c *Config) exceedsMaxAge(b *batch, now int64) bool { + if m := c.maxAge(b.staged.key.level); m > 0 { + age := now - b.createdAt + return age > m + } + return false +} + +func (c *Config) maxBlocks(l uint32) uint { + if l < uint32(len(c.Levels)) { + return c.Levels[l].MaxBlocks + } + return 0 +} + +func (c *Config) maxAge(l uint32) int64 { + if l < uint32(len(c.Levels)) { + return c.Levels[l].MaxAge + } + return 0 +} + +func (c *Config) maxLevel() uint32 { + // Assuming that there is at least one level. + return uint32(len(c.Levels) - 1) +} diff --git a/pkg/experiment/metastore/compaction/compactor/compactor_strategy.go b/pkg/experiment/metastore/compaction/compactor/compactor_strategy.go deleted file mode 100644 index 2f3f951399..0000000000 --- a/pkg/experiment/metastore/compaction/compactor/compactor_strategy.go +++ /dev/null @@ -1,69 +0,0 @@ -package compactor - -import ( - "flag" - "time" -) - -const ( - defaultBlockBatchSize = 20 - defaultMaxBlockBatchAge = int64(15 * time.Minute) -) - -// TODO: Almost everything here should be level specific. - -type Strategy struct { - MaxBlocksPerLevel []uint - MaxBatchAge int64 - MaxLevel uint - - CleanupBatchSize int32 - CleanupDelay time.Duration - - MaxBlocksDefault uint - CleanupJobMinLevel int32 - CleanupJobMaxLevel int32 -} - -func DefaultStrategy() Strategy { - return Strategy{ - MaxBlocksPerLevel: []uint{20, 10, 10}, - MaxBlocksDefault: 10, - MaxLevel: 3, - MaxBatchAge: defaultMaxBlockBatchAge, - CleanupBatchSize: 2, - CleanupDelay: 15 * time.Minute, - CleanupJobMaxLevel: 1, - } -} - -func (s *Strategy) RegisterFlags(prefix string, f *flag.FlagSet) {} - -// compact is called after the block has been added to the batch. -// If the function returns true, the batch is flushed to the global -// queue and becomes available for compaction. -func (s Strategy) flush(b *batch) bool { - return uint(b.size) >= s.maxBlocks(b.staged.key.level) -} - -func (s Strategy) flushByAge(b *batch, now int64) bool { - if s.MaxBatchAge > 0 && b.staged.updatedAt > 0 { - age := now - b.staged.updatedAt - return age > s.MaxBatchAge - } - return false -} - -// complete is called after the block is added to the job plan. -// If the function returns true, the job plan is considered complete -// and the job should be scheduled for execution. -func (s Strategy) complete(j *jobPlan) bool { - return uint(len(j.blocks)) >= s.maxBlocks(j.level) -} - -func (s Strategy) maxBlocks(l uint32) uint { - if l >= uint32(len(s.MaxBlocksPerLevel)) || len(s.MaxBlocksPerLevel) == 0 { - return s.MaxBlocksDefault - } - return s.MaxBlocksPerLevel[l] -} diff --git a/pkg/experiment/metastore/compaction/compactor/plan.go b/pkg/experiment/metastore/compaction/compactor/plan.go index 021e6f104f..45aaa6feb0 100644 --- a/pkg/experiment/metastore/compaction/compactor/plan.go +++ b/pkg/experiment/metastore/compaction/compactor/plan.go @@ -2,7 +2,7 @@ package compactor import ( "fmt" - "slices" + "math" "strconv" "strings" @@ -11,6 +11,7 @@ import ( metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1/raft_log" "github.com/grafana/pyroscope/pkg/iter" + "github.com/grafana/pyroscope/pkg/util" ) // plan should be used to prepare the compaction plan update. @@ -23,6 +24,7 @@ type plan struct { compactor *Compactor batches *batchIter blocks *blockIter + now int64 } func (p *plan) CreateJob() (*raft_log.CompactionJobPlan, error) { @@ -43,7 +45,10 @@ func (p *plan) CreateJob() (*raft_log.CompactionJobPlan, error) { type jobPlan struct { compactionKey + config *Config name string + minT int64 + maxT int64 tombstones []*metastorev1.Tombstones blocks []string } @@ -55,7 +60,7 @@ type jobPlan struct { // removed). Therefore, we navigate to the next batch with the same // compaction key in this case. func (p *plan) nextJob() *jobPlan { - var job jobPlan + job := p.newJob() for p.level < uint32(len(p.compactor.queue.levels)) { if p.batches == nil { level := p.compactor.queue.levels[p.level] @@ -79,34 +84,120 @@ func (p *plan) nextJob() *jobPlan { // Job levels are zero based: L0 job means that it includes blocks // with compaction level 0. This can be altered (1-based levels): // job.level++ - job.compactionKey = b.staged.key - job.blocks = slices.Grow(job.blocks, defaultBlockBatchSize)[:0] + job.reset(b.staged.key) p.blocks.setBatch(b) - // Once we finish with the current batch blocks, the iterator moves + // Once we finish with the current block batch, the iterator moves // to the next batch–with-the-same-compaction-key, which is not // necessarily the next in-order-batch from the batch iterator. for { - block, ok := p.blocks.next() - if !ok { + if block, ok := p.blocks.peek(); ok { + if job.tryAdd(block) { + p.blocks.advance() + if !job.isComplete() { + // Try to add more blocks. + continue + } + } + } else { // No more blocks with this compaction key at the level. - // The current job plan is to be cancelled, and we move - // on to the next in-order batch. - break + // We may want to force compaction even if the current job + // is incomplete: e.g., if the blocks remain in the + // queue for too long. Note that we do not check the block + // timestamps: we only care when the batch was created. + if !p.compactor.config.exceedsMaxAge(b, p.now) { + // The current job plan is to be cancelled, and + // we move on to the next in-order batch. + break + } } - - job.blocks = append(job.blocks, block) - if p.compactor.config.complete(&job) { - nameJob(&job) - p.getTombstones(&job) - return &job + // Typically, we want to proceed to the next compaction key, + // but if the batch is not empty (i.e., we could not put all + // the blocks into the job), we must finish it first. + if p.blocks.more() { + p.batches.reset(b) + } + if len(job.blocks) == 0 { + // Should not be possible. + break } + p.getTombstones(job) + job.finalize() + return job } } return nil } +func (p *plan) getTombstones(job *jobPlan) { + if int32(p.level) > p.compactor.config.CleanupJobMaxLevel { + return + } + if int32(p.level) < p.compactor.config.CleanupJobMinLevel { + return + } + s := int(p.compactor.config.CleanupBatchSize) + for i := 0; i < s && p.tombstones.Next(); i++ { + job.tombstones = append(job.tombstones, p.tombstones.At()) + } +} + +func (p *plan) newJob() *jobPlan { + return &jobPlan{ + config: &p.compactor.config, + blocks: make([]string, 0, defaultBlockBatchSize), + minT: math.MaxInt64, + maxT: math.MinInt64, + } +} + +func (job *jobPlan) reset(k compactionKey) { + job.compactionKey = k + job.blocks = job.blocks[:0] + job.minT = math.MaxInt64 + job.maxT = math.MinInt64 +} + +// We may not want to add a bock to the job if it extends the +// compacted block time range beyond the desired limit. +func (job *jobPlan) tryAdd(block string) bool { + t := util.ULIDStringUnixNano(block) + if len(job.blocks) > 0 && !job.isInAllowedTimeRange(t) { + return false + } + job.blocks = append(job.blocks, block) + job.maxT = max(job.maxT, t) + job.minT = min(job.minT, t) + return true +} + +func (job *jobPlan) isInAllowedTimeRange(t int64) bool { + if age := job.config.maxAge(job.config.maxLevel()); age > 0 { + // minT maxT + // --t------|===========|------t-- + // | |---------a--------| + // |---------b--------| + a := t - job.minT + b := job.maxT - t + if a > age || b > age { + return false + } + } + return true +} + +func (job *jobPlan) isComplete() bool { + return uint(len(job.blocks)) >= job.config.maxBlocks(job.level) +} + +func (job *jobPlan) finalize() { + nameJob(job) + job.minT = 0 + job.maxT = 0 + job.config = nil +} + // Job name is a variable length string that should be globally unique // and is used as a tiebreaker in the compaction job queue ordering. func nameJob(plan *jobPlan) { @@ -128,16 +219,3 @@ func nameJob(plan *jobPlan) { name.WriteString(strconv.FormatUint(uint64(plan.level), 10)) plan.name = name.String() } - -func (p *plan) getTombstones(job *jobPlan) { - if int32(p.level) > p.compactor.config.CleanupJobMaxLevel { - return - } - if int32(p.level) < p.compactor.config.CleanupJobMinLevel { - return - } - s := int(p.compactor.config.CleanupBatchSize) - for i := 0; i < s && p.tombstones.Next(); i++ { - job.tombstones = append(job.tombstones, p.tombstones.At()) - } -} diff --git a/pkg/experiment/metastore/compaction/compactor/plan_test.go b/pkg/experiment/metastore/compaction/compactor/plan_test.go index be2a182fe3..5060810aea 100644 --- a/pkg/experiment/metastore/compaction/compactor/plan_test.go +++ b/pkg/experiment/metastore/compaction/compactor/plan_test.go @@ -3,18 +3,19 @@ package compactor import ( "strconv" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/grafana/pyroscope/pkg/experiment/metastore/compaction" + "github.com/grafana/pyroscope/pkg/test" ) var testConfig = Config{ - Strategy: Strategy{ - MaxBlocksPerLevel: []uint{3, 2, 2}, - MaxBlocksDefault: 2, - MaxBatchAge: 0, - MaxLevel: 3, + Levels: []LevelConfig{ + {MaxBlocks: 3}, + {MaxBlocks: 2}, + {MaxBlocks: 2}, }, } @@ -253,7 +254,7 @@ func TestPlan_deleted_blocks(t *testing.T) { func TestPlan_deleted_batch(t *testing.T) { c := NewCompactor(testConfig, nil, nil, nil) - for i, e := range []compaction.BlockEntry{{}, {}, {}} { + for i, e := range make([]compaction.BlockEntry, 3) { e.Index = uint64(i) e.ID = strconv.Itoa(i) c.enqueue(e) @@ -264,3 +265,99 @@ func TestPlan_deleted_batch(t *testing.T) { p := &plan{compactor: c, blocks: newBlockIter()} assert.Nil(t, p.nextJob()) } + +func TestPlan_compact_by_time(t *testing.T) { + c := NewCompactor(Config{ + Levels: []LevelConfig{ + {MaxBlocks: 5, MaxAge: 5}, + {MaxBlocks: 5, MaxAge: 5}, + }, + }, nil, nil, nil) + + for _, e := range []compaction.BlockEntry{ + {Tenant: "A", Shard: 1, Level: 0, Index: 1, AppendedAt: 10, ID: "1"}, + {Tenant: "B", Shard: 0, Level: 0, Index: 2, AppendedAt: 20, ID: "2"}, + {Tenant: "A", Shard: 1, Level: 0, Index: 3, AppendedAt: 30, ID: "3"}, + } { + c.enqueue(e) + } + + // Third block remains in the queue as + // we need another push to evict it. + expected := []*jobPlan{ + { + compactionKey: compactionKey{tenant: "A", shard: 1, level: 0}, + name: "b7b41276360564d4-TA-S1-L0", + blocks: []string{"1"}, + }, + { + compactionKey: compactionKey{tenant: "B", shard: 0, level: 0}, + name: "6021b5621680598b-TB-S0-L0", + blocks: []string{"2"}, + }, + } + + p := &plan{ + compactor: c, + blocks: newBlockIter(), + now: 40, + } + + planned := make([]*jobPlan, 0, len(expected)) + for j := p.nextJob(); j != nil; j = p.nextJob() { + planned = append(planned, j) + } + + assert.Equal(t, expected, planned) +} + +func TestPlan_time_split(t *testing.T) { + s := DefaultConfig() + // To skip tombstones for simplicity. + s.CleanupBatchSize = 0 + c := NewCompactor(s, nil, nil, nil) + now := test.Time("2024-09-23T00:00:00Z") + + for i := 0; i < 10; i++ { + now = now.Add(15 * time.Second) + e := compaction.BlockEntry{ + Index: uint64(i), + AppendedAt: now.UnixNano(), + Tenant: "A", + Shard: 1, + Level: 0, + ID: test.ULID(now.Format(time.RFC3339)), + } + c.enqueue(e) + } + + now = now.Add(time.Hour * 6) + for i := 0; i < 5; i++ { + now = now.Add(15 * time.Second) + e := compaction.BlockEntry{ + Index: uint64(i), + AppendedAt: now.UnixNano(), + Tenant: "A", + Shard: 1, + Level: 0, + ID: test.ULID(now.Format(time.RFC3339)), + } + c.enqueue(e) + } + + p := &plan{ + compactor: c, + blocks: newBlockIter(), + now: now.UnixNano(), + } + + var i int + var n int + for j := p.nextJob(); j != nil; j = p.nextJob() { + i++ + n += len(j.blocks) + } + + assert.Equal(t, 2, i) + assert.Equal(t, 15, n) +} diff --git a/pkg/util/ulid.go b/pkg/util/ulid.go new file mode 100644 index 0000000000..c755eac51f --- /dev/null +++ b/pkg/util/ulid.go @@ -0,0 +1,16 @@ +package util + +import ( + "unsafe" + + "github.com/oklog/ulid" +) + +func ULIDStringUnixNano(s string) int64 { + var u ulid.ULID + b := unsafe.Slice(unsafe.StringData(s), len(s)) + if err := u.UnmarshalText(b); err == nil { + return int64(u.Time()) * 1e6 + } + return -1 +}