Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 322 additions & 64 deletions cmd/convert.go

Large diffs are not rendered by default.

420 changes: 420 additions & 0 deletions cmd/convert_test.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func registerServeApp(app *kingpin.Application) (*kingpin.CmdClause, func(contex

db := cfdb.NewDB(
syncer,
log,
cfdb.ExternalLabels(labels.FromMap(opts.query.externalLabels)),
)

Expand Down
48 changes: 38 additions & 10 deletions convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func ConvertTSDBBlock(
ctx context.Context,
bkt objstore.Bucket,
day util.Date,
partition *util.Partition,
extLabelsHash schema.ExternalLabelsHash,
blks []Convertible,
opts ...ConvertOption,
Expand All @@ -192,7 +193,14 @@ func ConvertTSDBBlock(
for i := range opts {
opts[i](cfg)
}
start, end := day.MinT(), day.MaxT()

// Determine time range based on partition or date
var start, end int64
if partition != nil {
start, end = partition.MinT(), partition.MaxT()
} else {
start, end = day.MinT(), day.MaxT()
}

shardedRowReaders, err := shardedIndexRowReader(ctx, start, end, blks, *cfg)
if err != nil {
Expand All @@ -209,6 +217,7 @@ func ConvertTSDBBlock(
errGroup.Go(func() error {
converter := newConverter(
day,
partition,
shard,
extLabelsHash,
rr,
Expand All @@ -235,7 +244,7 @@ func ConvertTSDBBlock(
return fmt.Errorf("failed to convert shards in parallel: %w", err)
}

if err := writeMetaFile(ctx, day, extLabelsHash, int64(len(shardedRowReaders)), bkt); err != nil {
if err := writeMetaFile(ctx, day, partition, extLabelsHash, int64(len(shardedRowReaders)), bkt); err != nil {
return fmt.Errorf("failed to write meta file: %w", err)
}

Expand All @@ -258,19 +267,27 @@ type blockSeries struct {
labels labels.Labels
}

func writeMetaFile(ctx context.Context, day util.Date, extLabelsHash schema.ExternalLabelsHash, numShards int64, bkt objstore.Bucket) error {
func writeMetaFile(ctx context.Context, day util.Date, partition *util.Partition, extLabelsHash schema.ExternalLabelsHash, numShards int64, bkt objstore.Bucket) error {
// Determine time range based on partition or date
var mint, maxt int64
if partition != nil {
mint, maxt = partition.MinT(), partition.MaxT()
} else {
mint, maxt = day.MinT(), day.MaxT()
}

meta := &metapb.Metadata{
Version: schema.V2,
Mint: day.MinT(),
Maxt: day.MaxT(),
Mint: mint,
Maxt: maxt,
Shards: numShards,
}

metaBytes, err := proto.Marshal(meta)
if err != nil {
return fmt.Errorf("unable to marshal meta bytes: %w", err)
}
if err := bkt.Upload(ctx, schema.MetaFileNameForBlock(day, extLabelsHash), bytes.NewReader(metaBytes)); err != nil {
if err := bkt.Upload(ctx, schema.MetaFileNameForBlock(day, partition, extLabelsHash), bytes.NewReader(metaBytes)); err != nil {
return fmt.Errorf("unable to upload meta file: %w", err)
}

Expand Down Expand Up @@ -524,6 +541,7 @@ func compareBySortedLabelsFunc(sortedLabels []string) func(a, b labels.Labels) i

type converter struct {
date util.Date
partition *util.Partition
mint, maxt int64

shard int
Expand All @@ -546,6 +564,7 @@ type converter struct {

func newConverter(
date util.Date,
partition *util.Partition,
shard int,
extLabelsHash schema.ExternalLabelsHash,
rr *indexRowReader,
Expand All @@ -560,10 +579,19 @@ func newConverter(
chunkPageBufferSize int,

) *converter {
// Determine time range based on partition or date
var mint, maxt int64
if partition != nil {
mint, maxt = partition.MinT(), partition.MaxT()
} else {
mint, maxt = date.MinT(), date.MaxT()
}

return &converter{
date: date,
mint: date.MinT(),
maxt: date.MaxT(),
partition: partition,
mint: mint,
maxt: maxt,
shard: shard,
extLabelsHash: extLabelsHash,

Expand Down Expand Up @@ -616,11 +644,11 @@ func (c *converter) convertShard(ctx context.Context) (_ bool, rerr error) {
s := c.rr.Schema()

w, err := newSplitFileWriter(ctx, c.bkt, s, map[string]writerConfig{
schema.LabelsPfileNameForShard(c.extLabelsHash, c.date, c.shard): {
schema.LabelsPfileNameForShard(c.extLabelsHash, c.date, c.partition, c.shard): {
s: schema.WithCompression(schema.LabelsProjection(s)),
opts: c.labelWriterOptions(),
},
schema.ChunksPfileNameForShard(c.extLabelsHash, c.date, c.shard): {
schema.ChunksPfileNameForShard(c.extLabelsHash, c.date, c.partition, c.shard): {
s: schema.WithCompression(schema.ChunkProjection(s)),
opts: c.chunkWriterOptions(),
},
Expand Down
8 changes: 4 additions & 4 deletions convert/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestConverter(t *testing.T) {
RowGroupCount(2),
LabelPageBufferSize(units.KiB), // results in 2 pages
}
if err := ConvertTSDBBlock(t.Context(), bkt, d, schema.ExternalLabelsHash(0), []Convertible{&HeadBlock{Head: h}}, opts...); err != nil {
if err := ConvertTSDBBlock(t.Context(), bkt, d, nil, schema.ExternalLabelsHash(0), []Convertible{&HeadBlock{Head: h}}, opts...); err != nil {
t.Fatalf("unable to convert tsdb block: %s", err)
}

Expand All @@ -91,11 +91,11 @@ func TestConverter(t *testing.T) {

totalRows := int64(0)
for i := range int(meta.Shards) {
lf, err := loadParquetFile(t, bkt, schema.LabelsPfileNameForShard(schema.ExternalLabelsHash(0), meta.Date, i))
lf, err := loadParquetFile(t, bkt, schema.LabelsPfileNameForShard(schema.ExternalLabelsHash(0), meta.Date, nil, i))
if err != nil {
t.Fatalf("unable to load label parquet file for shard %d: %s", i, err)
}
cf, err := loadParquetFile(t, bkt, schema.ChunksPfileNameForShard(schema.ExternalLabelsHash(0), meta.Date, i))
cf, err := loadParquetFile(t, bkt, schema.ChunksPfileNameForShard(schema.ExternalLabelsHash(0), meta.Date, nil, i))
if err != nil {
t.Fatalf("unable to load chunk parquet file for shard %d: %s", i, err)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestConverterIndexWithManyLabelNames(t *testing.T) {
SortBy(labels.MetricName),
LabelPageBufferSize(units.KiB), // results in 2 pages
}
if err := ConvertTSDBBlock(t.Context(), bkt, d, schema.ExternalLabelsHash(0), []Convertible{&HeadBlock{h}}, opts...); err != nil {
if err := ConvertTSDBBlock(t.Context(), bkt, d, nil, schema.ExternalLabelsHash(0), []Convertible{&HeadBlock{h}}, opts...); err != nil {
t.Fatalf("unable to convert tsdb block: %s", err)
}
}
Expand Down
1 change: 0 additions & 1 deletion convert/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ var (

func RegisterMetrics(reg prometheus.Registerer) error {
lastSuccessfulConvertTime.Set(0)

return errors.Join(
reg.Register(lastSuccessfulConvertTime),
)
Expand Down
Loading