Skip to content

Commit baca0ca

Browse files
committed
feat: dynamic partition conversion
1 parent 740fc4b commit baca0ca

13 files changed

Lines changed: 1428 additions & 168 deletions

File tree

cmd/convert.go

Lines changed: 369 additions & 107 deletions
Large diffs are not rendered by default.

convert/convert.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ func ConvertTSDBBlock(
173173
ctx context.Context,
174174
bkt objstore.Bucket,
175175
day util.Date,
176+
partition *util.Partition,
176177
extLabelsHash schema.ExternalLabelsHash,
177178
blks []Convertible,
178179
opts ...ConvertOption,
@@ -192,7 +193,14 @@ func ConvertTSDBBlock(
192193
for i := range opts {
193194
opts[i](cfg)
194195
}
195-
start, end := day.MinT(), day.MaxT()
196+
197+
// Determine time range based on partition or date
198+
var start, end int64
199+
if partition != nil {
200+
start, end = partition.MinT(), partition.MaxT()
201+
} else {
202+
start, end = day.MinT(), day.MaxT()
203+
}
196204

197205
shardedRowReaders, err := shardedIndexRowReader(ctx, start, end, blks, *cfg)
198206
if err != nil {
@@ -209,6 +217,7 @@ func ConvertTSDBBlock(
209217
errGroup.Go(func() error {
210218
converter := newConverter(
211219
day,
220+
partition,
212221
shard,
213222
extLabelsHash,
214223
rr,
@@ -235,7 +244,7 @@ func ConvertTSDBBlock(
235244
return fmt.Errorf("failed to convert shards in parallel: %w", err)
236245
}
237246

238-
if err := writeMetaFile(ctx, day, extLabelsHash, int64(len(shardedRowReaders)), bkt); err != nil {
247+
if err := writeMetaFile(ctx, day, partition, extLabelsHash, int64(len(shardedRowReaders)), bkt); err != nil {
239248
return fmt.Errorf("failed to write meta file: %w", err)
240249
}
241250

@@ -258,19 +267,27 @@ type blockSeries struct {
258267
labels labels.Labels
259268
}
260269

261-
func writeMetaFile(ctx context.Context, day util.Date, extLabelsHash schema.ExternalLabelsHash, numShards int64, bkt objstore.Bucket) error {
270+
func writeMetaFile(ctx context.Context, day util.Date, partition *util.Partition, extLabelsHash schema.ExternalLabelsHash, numShards int64, bkt objstore.Bucket) error {
271+
// Determine time range based on partition or date
272+
var mint, maxt int64
273+
if partition != nil {
274+
mint, maxt = partition.MinT(), partition.MaxT()
275+
} else {
276+
mint, maxt = day.MinT(), day.MaxT()
277+
}
278+
262279
meta := &metapb.Metadata{
263280
Version: schema.V2,
264-
Mint: day.MinT(),
265-
Maxt: day.MaxT(),
281+
Mint: mint,
282+
Maxt: maxt,
266283
Shards: numShards,
267284
}
268285

269286
metaBytes, err := proto.Marshal(meta)
270287
if err != nil {
271288
return fmt.Errorf("unable to marshal meta bytes: %w", err)
272289
}
273-
if err := bkt.Upload(ctx, schema.MetaFileNameForBlock(day, extLabelsHash), bytes.NewReader(metaBytes)); err != nil {
290+
if err := bkt.Upload(ctx, schema.MetaFileNameForBlock(day, partition, extLabelsHash), bytes.NewReader(metaBytes)); err != nil {
274291
return fmt.Errorf("unable to upload meta file: %w", err)
275292
}
276293

@@ -524,6 +541,7 @@ func compareBySortedLabelsFunc(sortedLabels []string) func(a, b labels.Labels) i
524541

525542
type converter struct {
526543
date util.Date
544+
partition *util.Partition
527545
mint, maxt int64
528546

529547
shard int
@@ -546,6 +564,7 @@ type converter struct {
546564

547565
func newConverter(
548566
date util.Date,
567+
partition *util.Partition,
549568
shard int,
550569
extLabelsHash schema.ExternalLabelsHash,
551570
rr *indexRowReader,
@@ -560,10 +579,19 @@ func newConverter(
560579
chunkPageBufferSize int,
561580

562581
) *converter {
582+
// Determine time range based on partition or date
583+
var mint, maxt int64
584+
if partition != nil {
585+
mint, maxt = partition.MinT(), partition.MaxT()
586+
} else {
587+
mint, maxt = date.MinT(), date.MaxT()
588+
}
589+
563590
return &converter{
564591
date: date,
565-
mint: date.MinT(),
566-
maxt: date.MaxT(),
592+
partition: partition,
593+
mint: mint,
594+
maxt: maxt,
567595
shard: shard,
568596
extLabelsHash: extLabelsHash,
569597

@@ -616,11 +644,11 @@ func (c *converter) convertShard(ctx context.Context) (_ bool, rerr error) {
616644
s := c.rr.Schema()
617645

618646
w, err := newSplitFileWriter(ctx, c.bkt, s, map[string]writerConfig{
619-
schema.LabelsPfileNameForShard(c.extLabelsHash, c.date, c.shard): {
647+
schema.LabelsPfileNameForShard(c.extLabelsHash, c.date, c.partition, c.shard): {
620648
s: schema.WithCompression(schema.LabelsProjection(s)),
621649
opts: c.labelWriterOptions(),
622650
},
623-
schema.ChunksPfileNameForShard(c.extLabelsHash, c.date, c.shard): {
651+
schema.ChunksPfileNameForShard(c.extLabelsHash, c.date, c.partition, c.shard): {
624652
s: schema.WithCompression(schema.ChunkProjection(s)),
625653
opts: c.chunkWriterOptions(),
626654
},

convert/convert_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestConverter(t *testing.T) {
7070
RowGroupCount(2),
7171
LabelPageBufferSize(units.KiB), // results in 2 pages
7272
}
73-
if err := ConvertTSDBBlock(t.Context(), bkt, d, schema.ExternalLabelsHash(0), []Convertible{&HeadBlock{Head: h}}, opts...); err != nil {
73+
if err := ConvertTSDBBlock(t.Context(), bkt, d, nil, schema.ExternalLabelsHash(0), []Convertible{&HeadBlock{Head: h}}, opts...); err != nil {
7474
t.Fatalf("unable to convert tsdb block: %s", err)
7575
}
7676

@@ -91,11 +91,11 @@ func TestConverter(t *testing.T) {
9191

9292
totalRows := int64(0)
9393
for i := range int(meta.Shards) {
94-
lf, err := loadParquetFile(t, bkt, schema.LabelsPfileNameForShard(schema.ExternalLabelsHash(0), meta.Date, i))
94+
lf, err := loadParquetFile(t, bkt, schema.LabelsPfileNameForShard(schema.ExternalLabelsHash(0), meta.Date, nil, i))
9595
if err != nil {
9696
t.Fatalf("unable to load label parquet file for shard %d: %s", i, err)
9797
}
98-
cf, err := loadParquetFile(t, bkt, schema.ChunksPfileNameForShard(schema.ExternalLabelsHash(0), meta.Date, i))
98+
cf, err := loadParquetFile(t, bkt, schema.ChunksPfileNameForShard(schema.ExternalLabelsHash(0), meta.Date, nil, i))
9999
if err != nil {
100100
t.Fatalf("unable to load chunk parquet file for shard %d: %s", i, err)
101101
}
@@ -163,7 +163,7 @@ func TestConverterIndexWithManyLabelNames(t *testing.T) {
163163
SortBy(labels.MetricName),
164164
LabelPageBufferSize(units.KiB), // results in 2 pages
165165
}
166-
if err := ConvertTSDBBlock(t.Context(), bkt, d, schema.ExternalLabelsHash(0), []Convertible{&HeadBlock{h}}, opts...); err != nil {
166+
if err := ConvertTSDBBlock(t.Context(), bkt, d, nil, schema.ExternalLabelsHash(0), []Convertible{&HeadBlock{h}}, opts...); err != nil {
167167
t.Fatalf("unable to convert tsdb block: %s", err)
168168
}
169169
}

0 commit comments

Comments
 (0)