Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,27 @@ func advanceConversion(
if len(candidates) == 0 {
continue
}

// Get the metadata for blocks being converted for this date
candidateULIDs := make(map[string]bool)
for _, candidate := range candidates {
candidateULIDs[candidate.Meta().ULID.String()] = true
}

relevantMetas := make([]metadata.Meta, 0)
for _, meta := range plan.Download {
if candidateULIDs[meta.ULID.String()] {
relevantMetas = append(relevantMetas, meta)
}
}

convOpts := []convert.ConvertOption{
convert.SortBy(opts.sortLabels...),
convert.RowGroupSize(opts.rowGroupSize),
convert.RowGroupCount(opts.rowGroupCount),
convert.EncodingConcurrency(opts.encodingConcurrency),
convert.ChunkBufferPool(parquet.NewFileBufferPool(bufferDir, "chunkbuf-*")),
convert.UpdateTSDBMeta(tsdbBkt, relevantMetas),
}
if err := convert.ConvertTSDBBlock(ctx, parquetBkt, next, candidates, convOpts...); err != nil {
return fmt.Errorf("unable to convert blocks for date %q: %s", next, err)
Expand Down
73 changes: 73 additions & 0 deletions convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (

"github.com/alecthomas/units"
"github.com/efficientgo/core/errcapture"
jsoniter "github.com/json-iterator/go"
"github.com/parquet-go/parquet-go"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/util/zeropool"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"google.golang.org/protobuf/proto"

"github.com/thanos-io/thanos-parquet-gateway/internal/encoding"
Expand All @@ -31,6 +33,13 @@ import (
"github.com/thanos-io/thanos-parquet-gateway/schema"
)

const (
// ParquetMigratedExtensionKey is the key used in TSDB block metadata Extensions
// to indicate that the block has been migrated to Parquet format.
// TODO: Import this constant from Thanos when it becomes available in their metadata package.
ParquetMigratedExtensionKey = "parquet_migrated"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

type Convertible interface {
Index() (tsdb.IndexReader, error)
Chunks() (tsdb.ChunkReader, error)
Expand All @@ -51,6 +60,10 @@ type convertOpts struct {

labelPageBufferSize int
chunkPageBufferSize int

updateTSDBMeta bool
tsdbBucket objstore.Bucket
tsdbMetas []metadata.Meta
}

func (cfg convertOpts) buildBloomfilterColumns() []parquet.BloomFilterColumn {
Expand Down Expand Up @@ -128,6 +141,15 @@ func EncodingConcurrency(c int) ConvertOption {
}
}

// UpdateTSDBMeta enables updating the original TSDB block metadata after successful conversion
func UpdateTSDBMeta(tsdbBucket objstore.Bucket, tsdbMetas []metadata.Meta) ConvertOption {
return func(opts *convertOpts) {
opts.updateTSDBMeta = true
opts.tsdbBucket = tsdbBucket
opts.tsdbMetas = tsdbMetas
}
}

func ConvertTSDBBlock(
ctx context.Context,
bkt objstore.Bucket,
Expand Down Expand Up @@ -183,6 +205,14 @@ func ConvertTSDBBlock(
return fmt.Errorf("unable to convert block: %w", err)
}

if cfg.updateTSDBMeta {
for _, meta := range cfg.tsdbMetas {
if err := markBlockAsMigrated(ctx, cfg.tsdbBucket, meta); err != nil {
return fmt.Errorf("unable to mark block %s as migrated: %w", meta.ULID, err)
}
}
}

lastSuccessfulConvertTime.SetToCurrentTime()

return nil
Expand Down Expand Up @@ -623,3 +653,46 @@ func (b *bufferedReader) readRows() {
}
}
}

// markBlockAsMigrated updates a single TSDB block's metadata to indicate it has been migrated to Parquet
func markBlockAsMigrated(ctx context.Context, bkt objstore.Bucket, meta metadata.Meta) (rerr error) {
metaPath := meta.ULID.String() + "/meta.json"

rc, err := bkt.Get(ctx, metaPath)
if err != nil {
return fmt.Errorf("unable to get meta.json: %w", err)
}
defer errcapture.ExhaustClose(&rerr, rc, "meta.json close")

var existingMeta metadata.Meta
if err := jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(rc).Decode(&existingMeta); err != nil {
return fmt.Errorf("unable to decode meta.json: %w", err)
}
if existingMeta.Thanos.Extensions == nil {
existingMeta.Thanos.Extensions = make(map[string]any)
}

var extensionsMap map[string]any
if ext, ok := existingMeta.Thanos.Extensions.(map[string]any); ok {
extensionsMap = ext
} else {
extensionsMap = make(map[string]any)
existingMeta.Thanos.Extensions = extensionsMap
}

extensionsMap[ParquetMigratedExtensionKey] = true

var buf bytes.Buffer

encoder := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(&buf)
encoder.SetIndent("", "\t")
if err := encoder.Encode(&existingMeta); err != nil {
return fmt.Errorf("unable to encode meta.json: %w", err)
}

if err := bkt.Upload(ctx, metaPath, &buf); err != nil {
return fmt.Errorf("unable to upload meta.json: %w", err)
}

return nil
}
158 changes: 158 additions & 0 deletions convert/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package convert
import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
Expand All @@ -16,11 +17,15 @@ import (
"time"

"github.com/alecthomas/units"
jsoniter "github.com/json-iterator/go"
"github.com/oklog/ulid/v2"
"github.com/parquet-go/parquet-go"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
"github.com/thanos-io/thanos/pkg/block/metadata"
"go.uber.org/goleak"

"github.com/thanos-io/thanos-parquet-gateway/internal/util"
Expand Down Expand Up @@ -238,3 +243,156 @@ func nameColumnValuesAreIncreasing(pf *parquet.File) error {
}
return nil
}

func TestUpdateTSDBBlockMetadata(t *testing.T) {
ctx := context.Background()
bkt, err := filesystem.NewBucket(t.TempDir())
if err != nil {
t.Fatalf("unable to create bucket: %s", err)
}
t.Cleanup(func() { _ = bkt.Close() })

mockMeta := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(ulid.Now(), rand.Reader),
},
Thanos: metadata.Thanos{
Version: 1,
Labels: map[string]string{"cluster": "test"},
},
}

metaPath := mockMeta.ULID.String() + "/meta.json"
var buf bytes.Buffer
encoder := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(&buf)
encoder.SetIndent("", "\t")
if err := encoder.Encode(&mockMeta); err != nil {
t.Fatalf("unable to encode initial meta.json: %s", err)
}
if err := bkt.Upload(ctx, metaPath, &buf); err != nil {
t.Fatalf("unable to upload initial meta.json: %s", err)
}

err = markBlockAsMigrated(ctx, bkt, mockMeta)
if err != nil {
t.Fatalf("unexpected error updating metadata: %s", err)
}

rc, err := bkt.Get(ctx, metaPath)
if err != nil {
t.Fatalf("unable to get updated meta.json: %s", err)
}
defer rc.Close()

var updatedMeta metadata.Meta
if err := jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(rc).Decode(&updatedMeta); err != nil {
t.Fatalf("unable to decode updated meta.json: %s", err)
}

if updatedMeta.Thanos.Extensions == nil {
t.Fatal("Extensions field is nil after update")
}

extensionsMap, ok := updatedMeta.Thanos.Extensions.(map[string]any)
if !ok {
t.Fatal("Extensions field is not a map[string]any")
}

migratedFlag, exists := extensionsMap["parquet_migrated"]
if !exists {
t.Fatal("parquet_migrated flag not found in Extensions")
}

migratedBool, ok := migratedFlag.(bool)
if !ok {
t.Fatalf("parquet_migrated flag is not a bool, got type: %T", migratedFlag)
}

if !migratedBool {
t.Fatal("parquet_migrated flag should be true")
}

if updatedMeta.ULID != mockMeta.ULID {
t.Fatal("ULID was not preserved")
}
if updatedMeta.Thanos.Version != mockMeta.Thanos.Version {
t.Fatal("Thanos version was not preserved")
}
if !maps.Equal(updatedMeta.Thanos.Labels, mockMeta.Thanos.Labels) {
t.Fatal("Thanos labels were not preserved")
}
}

func TestUpdateTSDBBlockMetadataWithExistingExtensions(t *testing.T) {
ctx := context.Background()
bkt, err := filesystem.NewBucket(t.TempDir())
if err != nil {
t.Fatalf("unable to create bucket: %s", err)
}
t.Cleanup(func() { _ = bkt.Close() })

// Create a mock TSDB block metadata file with existing extensions
existingExtensions := map[string]any{
"existing_key": "existing_value",
"number_key": 42,
}

mockMeta := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(ulid.Now(), rand.Reader),
},
Thanos: metadata.Thanos{
Version: 1,
Labels: map[string]string{"cluster": "test"},
Extensions: existingExtensions,
},
}

metaPath := mockMeta.ULID.String() + "/meta.json"
var buf bytes.Buffer
encoder := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(&buf)
encoder.SetIndent("", "\t")
if err := encoder.Encode(&mockMeta); err != nil {
t.Fatalf("unable to encode initial meta.json: %s", err)
}
if err := bkt.Upload(ctx, metaPath, &buf); err != nil {
t.Fatalf("unable to upload initial meta.json: %s", err)
}

err = markBlockAsMigrated(ctx, bkt, mockMeta)
if err != nil {
t.Fatalf("unexpected error updating metadata: %s", err)
}

rc, err := bkt.Get(ctx, metaPath)
if err != nil {
t.Fatalf("unable to get updated meta.json: %s", err)
}
defer rc.Close()

var updatedMeta metadata.Meta
if err := jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(rc).Decode(&updatedMeta); err != nil {
t.Fatalf("unable to decode updated meta.json: %s", err)
}

extensionsMap, ok := updatedMeta.Thanos.Extensions.(map[string]any)
if !ok {
t.Fatal("Extensions field is not a map[string]any")
}

migratedFlag, exists := extensionsMap["parquet_migrated"]
if !exists {
t.Fatal("parquet_migrated flag not found in Extensions")
}

if migratedBool, ok := migratedFlag.(bool); !ok || !migratedBool {
t.Fatal("parquet_migrated flag should be true")
}

if extensionsMap["existing_key"] != "existing_value" {
t.Fatal("existing_key was not preserved")
}
if extensionsMap["number_key"] != float64(42) {
t.Fatal("number_key was not preserved")
}
}
Loading