Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type conversionOpts struct {
rowGroupCount int
downloadConcurrency int
encodingConcurrency int
skipRedundant bool

tempDir string
}
Expand All @@ -76,6 +77,7 @@ func (opts *conversionOpts) registerFlags(cmd *kingpin.CmdClause) {
cmd.Flag("convert.sorting.label", "label to sort by").Default("__name__").StringsVar(&opts.sortLabels)
cmd.Flag("convert.download.concurrency", "concurrency for downloading tsdb blocks").Default("4").IntVar(&opts.downloadConcurrency)
cmd.Flag("convert.encoding.concurrency", "concurrency for encoding chunks").Default("4").IntVar(&opts.encodingConcurrency)
cmd.Flag("convert.skip-redundant", "skip conversion of blocks that have already been converted").Default("true").BoolVar(&opts.skipRedundant)
}

func (opts *bucketOpts) registerConvertParquetFlags(cmd *kingpin.CmdClause) {
Expand Down Expand Up @@ -204,7 +206,7 @@ func advanceConversion(
parquetMetas := parquetDiscoverer.Metas()
tsdbMetas := tsdbDiscoverer.Metas()

plan, ok := convert.NewPlanner(time.Now().Add(-opts.gracePeriod)).Plan(tsdbMetas, parquetMetas)
plan, ok := convert.NewPlannerWithOptions(time.Now().Add(-opts.gracePeriod), opts.skipRedundant).Plan(tsdbMetas, parquetMetas)
if !ok {
log.Info("Nothing to do")
return nil
Expand Down
20 changes: 16 additions & 4 deletions convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ func ConvertTSDBBlock(
}
defer rr.Close()

// Extract source block ULIDs
sourceBlocks := make([]string, len(blks))
for i, blk := range blks {
sourceBlocks[i] = blk.Meta().ULID.String()
}

converter := newConverter(
name,
start.UnixMilli(),
Expand All @@ -176,6 +182,7 @@ func ConvertTSDBBlock(
cfg.chunkbufferPool,
cfg.labelPageBufferSize,
cfg.chunkPageBufferSize,
sourceBlocks,
)

if err := converter.convert(ctx); err != nil {
Expand Down Expand Up @@ -207,6 +214,8 @@ type converter struct {

labelPageBufferSize int
chunkPageBufferSize int

sourceBlocks []string // ULIDs of source TSDB blocks
}

func newConverter(
Expand All @@ -223,6 +232,7 @@ func newConverter(
chunkBufferPool parquet.BufferPool,
labelPageBufferSize int,
chunkPageBufferSize int,
sourceBlocks []string,

) *converter {
return &converter{
Expand All @@ -242,6 +252,7 @@ func newConverter(

labelPageBufferSize: labelPageBufferSize,
chunkPageBufferSize: chunkPageBufferSize,
sourceBlocks: sourceBlocks,
}
}

Expand Down Expand Up @@ -283,10 +294,11 @@ func (c *converter) convert(ctx context.Context) error {

func (c *converter) writeMetaFile(ctx context.Context) error {
meta := &metapb.Metadata{
Version: schema.V2,
Mint: c.mint,
Maxt: c.maxt,
Shards: int64(c.currentShard) + 1,
Version: schema.V2,
Mint: c.mint,
Maxt: c.maxt,
Shards: int64(c.currentShard) + 1,
SourceBlocks: c.sourceBlocks,
}

metaBytes, err := proto.Marshal(meta)
Expand Down
141 changes: 141 additions & 0 deletions convert/lineage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

package convert

import (
"log/slog"
"slices"
"time"

"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/thanos-io/thanos-parquet-gateway/internal/util"
"github.com/thanos-io/thanos-parquet-gateway/schema"
)

// BlockLineageChecker analyzes block relationships to prevent redundant conversions
type BlockLineageChecker struct {
logger *slog.Logger
}

func NewBlockLineageChecker(logger *slog.Logger) *BlockLineageChecker {
return &BlockLineageChecker{logger: logger}
}

// IsRedundantConversion checks if a TSDB block's data has already been converted
// by analyzing the lineage of existing parquet blocks
func (blc *BlockLineageChecker) IsRedundantConversion(
tsdbBlock metadata.Meta,
day time.Time,
parquetMetas map[string]schema.Meta,
) bool {
blockULID := tsdbBlock.ULID.String()

// Check if this exact TSDB block has already been used as source for any parquet block
for _, parquetMeta := range parquetMetas {
if slices.Contains(parquetMeta.SourceBlocks, blockULID) {
blc.logger.Info("Block already converted",
"tsdb_block", blockULID,
"parquet_block", parquetMeta.Name,
"day", day.Format("2006-01-02"))
return true
}
}

// Check if this is a compacted block and its constituent blocks have been converted
if len(tsdbBlock.BlockMeta.Compaction.Parents) > 0 {
return blc.isCompactedBlockRedundant(tsdbBlock, day, parquetMetas)
}

return false
}

// isCompactedBlockRedundant checks if a compacted block's constituent blocks
// have already been converted to parquet
func (blc *BlockLineageChecker) isCompactedBlockRedundant(
compactedBlock metadata.Meta,
day time.Time,
parquetMetas map[string]schema.Meta,
) bool {
parentULIDs := make(map[string]bool)
for _, parent := range compactedBlock.BlockMeta.Compaction.Parents {
parentULIDs[parent.ULID.String()] = false // false = not found in parquet blocks
}

// Check if all parent blocks have been used as sources for parquet blocks
// covering the same day
dayStart := util.BeginOfDay(day)
dayEnd := util.EndOfDay(day)

for _, parquetMeta := range parquetMetas {
// Only consider parquet blocks that overlap with the day we're checking
parquetStart := time.UnixMilli(parquetMeta.Mint)
parquetEnd := time.UnixMilli(parquetMeta.Maxt)

if parquetEnd.Before(dayStart) || parquetStart.After(dayEnd) {
continue // No overlap with the day
}

// Mark any parent blocks found in this parquet block's sources
for _, sourceBlock := range parquetMeta.SourceBlocks {
if _, exists := parentULIDs[sourceBlock]; exists {
parentULIDs[sourceBlock] = true
}
}
}

// Check if all parent blocks covering this day have been converted
allParentsConverted := true
for parentULID, found := range parentULIDs {
if !found {
// This parent block hasn't been converted yet for this day
blc.logger.Debug("Parent block not yet converted",
"parent_block", parentULID,
"compacted_block", compactedBlock.ULID.String(),
"day", day.Format("2006-01-02"))
allParentsConverted = false
}
}

if allParentsConverted && len(parentULIDs) > 0 {
blc.logger.Info("Compacted block is redundant - all parent blocks already converted",
"compacted_block", compactedBlock.ULID.String(),
"parent_count", len(parentULIDs),
"day", day.Format("2006-01-02"))
return true
}

return false
}

// GetConflictingParquetBlocks returns parquet blocks that would conflict with
// converting the given TSDB block for the specified day
func (blc *BlockLineageChecker) GetConflictingParquetBlocks(
tsdbBlock metadata.Meta,
day time.Time,
parquetMetas map[string]schema.Meta,
) []string {
var conflicts []string
blockULID := tsdbBlock.ULID.String()
dayStart := util.BeginOfDay(day)
dayEnd := util.EndOfDay(day)

for parquetName, parquetMeta := range parquetMetas {
// Check if parquet block overlaps with the day
parquetStart := time.UnixMilli(parquetMeta.Mint)
parquetEnd := time.UnixMilli(parquetMeta.Maxt)

if parquetEnd.Before(dayStart) || parquetStart.After(dayEnd) {
continue // No overlap
}

// Check if this parquet block was created from the same TSDB block
if slices.Contains(parquetMeta.SourceBlocks, blockULID) {
conflicts = append(conflicts, parquetName)
}
}

return conflicts
}
149 changes: 149 additions & 0 deletions convert/lineage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

package convert

import (
"log/slog"
"os"
"testing"
"time"

"github.com/oklog/ulid/v2"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/thanos-io/thanos-parquet-gateway/schema"
)

func TestBlockLineageChecker_IsRedundantConversion(t *testing.T) {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
checker := NewBlockLineageChecker(logger)

// Create some test ULIDs
block1ULID := ulid.MustNew(ulid.Timestamp(time.Now()), nil)
block2ULID := ulid.MustNew(ulid.Timestamp(time.Now().Add(time.Minute)), nil)
compactedULID := ulid.MustNew(ulid.Timestamp(time.Now().Add(2*time.Minute)), nil)

testDay := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)

// Test case 1: Direct conversion - block has already been converted
t.Run("direct_conversion_already_exists", func(t *testing.T) {
tsdbBlock := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: block1ULID,
},
}

parquetMetas := map[string]schema.Meta{
"2024/01/01": {
Name: "2024/01/01",
Mint: testDay.UnixMilli(),
Maxt: testDay.Add(24 * time.Hour).UnixMilli(),
SourceBlocks: []string{block1ULID.String()},
},
}

if !checker.IsRedundantConversion(tsdbBlock, testDay, parquetMetas) {
t.Error("Expected redundant conversion detection for already converted block")
}
})

// Test case 2: Compacted block with all parents converted
t.Run("compacted_block_all_parents_converted", func(t *testing.T) {
compactedBlock := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: compactedULID,
Compaction: tsdb.BlockMetaCompaction{
Parents: []tsdb.BlockDesc{
{ULID: block1ULID},
{ULID: block2ULID},
},
},
},
}

parquetMetas := map[string]schema.Meta{
"2024/01/01": {
Name: "2024/01/01",
Mint: testDay.UnixMilli(),
Maxt: testDay.Add(24 * time.Hour).UnixMilli(),
SourceBlocks: []string{block1ULID.String(), block2ULID.String()},
},
}

if !checker.IsRedundantConversion(compactedBlock, testDay, parquetMetas) {
t.Error("Expected redundant conversion detection for compacted block with all parents converted")
}
})

// Test case 3: New block that hasn't been converted
t.Run("new_block_not_converted", func(t *testing.T) {
newULID := ulid.MustNew(ulid.Timestamp(time.Now().Add(5*time.Minute)), nil)
tsdbBlock := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: newULID,
},
}

parquetMetas := map[string]schema.Meta{
"2024/01/01": {
Name: "2024/01/01",
Mint: testDay.UnixMilli(),
Maxt: testDay.Add(24 * time.Hour).UnixMilli(),
SourceBlocks: []string{block1ULID.String()},
},
}

if checker.IsRedundantConversion(tsdbBlock, testDay, parquetMetas) {
t.Error("Expected no redundancy detection for new unconverted block")
}
})
}

func TestPlannerWithLineageChecking(t *testing.T) {
// Test that planner correctly filters out redundant blocks
testDay := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
nextDay := time.Date(2024, 1, 2, 0, 0, 0, 0, time.UTC)
block1ULID := ulid.MustNew(ulid.Timestamp(time.Now()), nil)
compactedULID := ulid.MustNew(ulid.Timestamp(time.Now().Add(time.Minute)), nil)

tsdbMetas := map[string]metadata.Meta{
compactedULID.String(): {
BlockMeta: tsdb.BlockMeta{
ULID: compactedULID,
MinTime: nextDay.UnixMilli(),
MaxTime: nextDay.Add(24 * time.Hour).UnixMilli(),
Compaction: tsdb.BlockMetaCompaction{
Parents: []tsdb.BlockDesc{
{ULID: block1ULID},
},
},
},
},
}

parquetMetas := map[string]schema.Meta{
"2024/01/01": {
Name: "2024/01/01",
Mint: testDay.UnixMilli(),
Maxt: testDay.Add(24 * time.Hour).UnixMilli(),
SourceBlocks: []string{block1ULID.String()},
},
}

// Test with lineage checking enabled
plannerWithLineage := NewPlannerWithOptions(time.Now().Add(time.Hour), true)
plan, ok := plannerWithLineage.Plan(tsdbMetas, parquetMetas)
if ok {
t.Error("Expected no conversion plan when compacted block's parents are already converted")
}

// Test with lineage checking disabled
plannerWithoutLineage := NewPlannerWithOptions(time.Now().Add(time.Hour), false)
plan, ok = plannerWithoutLineage.Plan(tsdbMetas, parquetMetas)
if !ok || len(plan.Download) == 0 {
t.Error("Expected conversion plan when lineage checking is disabled")
}
}
Loading
Loading