Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ type conversionOpts struct {
blockDownloadConcurrency int
encodingConcurrency int
writeConcurrency int

tempDir string
minTimeDate string
maxTimeDate string
tempDir string
}

func (opts *convertOpts) registerFlags(cmd *kingpin.CmdClause) {
Expand All @@ -81,7 +82,8 @@ func (opts *conversionOpts) registerFlags(cmd *kingpin.CmdClause) {
cmd.Flag("convert.recompress", "recompress chunks").Default("true").BoolVar(&opts.recompress)
cmd.Flag("convert.grace-period", "dont convert for dates younger than this").Default("48h").DurationVar(&opts.gracePeriod)
cmd.Flag("convert.max-plan-days", "soft limit for the number of days to plan conversions for").Default("2").IntVar(&opts.maxDays)

cmd.Flag("convert.min-time", "start date for conversion window in YYYY-MM-DD format (e.g. 2025-12-20). Conversion rounds to day start (00:00:00 UTC)").Default("").StringVar(&opts.minTimeDate)
cmd.Flag("convert.max-time", "end date for conversion window in YYYY-MM-DD format (e.g. 2025-12-23). Conversion rounds to next day start (exclusive)").Default("").StringVar(&opts.maxTimeDate)
cmd.Flag("convert.rowgroup.size", "size of rowgroups").Default("1_000_000").IntVar(&opts.rowGroupSize)
cmd.Flag("convert.rowgroup.count", "rowgroups per shard").Default("6").IntVar(&opts.rowGroupCount)
cmd.Flag("convert.sorting.label", "label to sort by").Default("__name__").StringsVar(&opts.sortLabels)
Expand Down Expand Up @@ -234,7 +236,7 @@ func advanceConversion(
streamHashMap[discoveredStream.ExternalLabels.Hash()] = discoveredStream.ExternalLabels
}

plan := planner.Plan(tsdbMetas, parquetStreams)
plan := convert.NewPlanner(time.Now().Add(-opts.gracePeriod), opts.maxDays).Plan(tsdbMetas, parquetStreams, opts.minTimeDate, opts.maxTimeDate)
if len(plan.Steps) == 0 {
log.Info("Nothing to do")
return nil
Expand Down
59 changes: 55 additions & 4 deletions convert/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,70 @@ func NewPlanner(notAfter time.Time, maxDays int) Planner {
return Planner{notAfter: notAfter, maxDays: maxDays}
}

func (p Planner) planStream(tsdb schema.TSDBBlocksStream, parquet schema.ParquetBlocksStream) Plan {
func (p Planner) planStream(tsdb schema.TSDBBlocksStream, parquet schema.ParquetBlocksStream, minTimeDate string, maxTimeDate string) Plan {

var (
windowActive bool
winStartDay time.Time
winEndDay time.Time
)

if minTimeDate != "" || maxTimeDate != "" {
windowActive = true

// Parse minDate and round to start of day (00:00:00 UTC, inclusive)
if minTimeDate != "" {
t, err := time.Parse("2006-01-02", minTimeDate)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same constant 2006-01-02 could be reused. Also, could we move validation one layer up - the caller of this function could check if the inputs are correct and just pass time.Time, no? If a non-empty string is passed then I would expect it to be checked and error out if it doesn't follow the format that is expected.

if err != nil {
// Invalid date format, disable window filtering
windowActive = false
} else {
winStartDay = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
}
}

// Parse maxDate and round to start of next day (00:00:00 UTC, exclusive)
if maxTimeDate != "" {
t, err := time.Parse("2006-01-02", maxTimeDate)
if err != nil {
// Invalid date format, disable window filtering
windowActive = false
} else {
// Add 1 day to make the boundary exclusive
winEndDay = time.Date(t.Year(), t.Month(), t.Day()+1, 0, 0, 0, 0, time.UTC)
}
}
}
// Make a list of days covered by TSDB blocks.
tsdbDates := map[util.Date][]metadata.Meta{}
for _, tsdb := range tsdb.Metas {
for _, partialDate := range util.SplitIntoDates(tsdb.MinTime, tsdb.MaxTime) {
if windowActive {
pt := partialDate.ToTime()
if minTimeDate != "" && pt.Before(winStartDay) {
continue
}
if maxTimeDate != "" && !pt.Before(winEndDay) {
continue
}
}
tsdbDates[partialDate] = append(tsdbDates[partialDate], tsdb)
}
}

// Make a list of days covered by parquet blocks.
pqDates := map[util.Date]struct{}{}
for _, pq := range parquet.Metas {
for _, partialDate := range util.SplitIntoDates(pq.Mint, pq.Maxt) {

if windowActive {
pt := partialDate.ToTime()
if minTimeDate != "" && pt.Before(winStartDay) {
continue
}
if maxTimeDate != "" && !pt.Before(winEndDay) {
continue
}
}
pqDates[partialDate] = struct{}{}
}
}
Expand Down Expand Up @@ -113,7 +164,7 @@ func (p Planner) planStream(tsdb schema.TSDBBlocksStream, parquet schema.Parquet
return Plan{Steps: steps}
}

func (p Planner) Plan(tsdbStreams map[schema.ExternalLabelsHash]schema.TSDBBlocksStream, parquetStreams map[schema.ExternalLabelsHash]schema.ParquetBlocksStream) Plan {
func (p Planner) Plan(tsdbStreams map[schema.ExternalLabelsHash]schema.TSDBBlocksStream, parquetStreams map[schema.ExternalLabelsHash]schema.ParquetBlocksStream, minTimeDate string, maxTimeDate string) Plan {
outPlan := Plan{Steps: []Step{}}

for tsdbEH := range tsdbStreams {
Expand All @@ -122,7 +173,7 @@ func (p Planner) Plan(tsdbStreams map[schema.ExternalLabelsHash]schema.TSDBBlock
parquet = schema.ParquetBlocksStream{}
}

streamPlan := p.planStream(tsdbStreams[tsdbEH], parquet)
streamPlan := p.planStream(tsdbStreams[tsdbEH], parquet, minTimeDate, maxTimeDate)
for i := range streamPlan.Steps {
streamPlan.Steps[i].ExternalLabels = tsdbStreams[tsdbEH].ExternalLabels
}
Expand Down
135 changes: 134 additions & 1 deletion convert/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,140 @@ func TestPlanner(t *testing.T) {
},
} {
t.Run(tc.name, func(tt *testing.T) {
plan := NewPlanner(tc.notAfter, tc.maxDays).Plan(tc.tsdbStreams, tc.parquetStreams)
plan := NewPlanner(tc.notAfter, tc.maxDays).Plan(tc.tsdbStreams, tc.parquetStreams, "", "")

if diff := cmp.Diff(tc.expectedPlan, plan,
cmpopts.IgnoreUnexported(),
cmpopts.EquateComparable(util.Date{}),
cmpopts.IgnoreFields(tsdb.BlockMeta{}, "MinTime", "MaxTime"),
); diff != "" {
tt.Errorf("plan mismatch (-want +got):\n%s", diff)
}
})
}
}

func TestPlannerWithTimeWindow(t *testing.T) {
var (
defaultExternalLabels = schema.ExternalLabels{"stream": "eu-west-1"}
defaultHash = defaultExternalLabels.Hash()
)
mockBlocks := func(ulids ...string) []metadata.Meta {
metas := make([]metadata.Meta, 0, len(ulids))
for _, v := range ulids {
metas = append(metas, metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: ulid.MustParse(v)},
})
}
return metas
}
now := time.Now().UTC()
now = now.Truncate(24 * time.Hour)

for _, tc := range []struct {
name string

notAfter time.Time
maxDays int
minDate string
maxDate string
tsdbStreams map[schema.ExternalLabelsHash]schema.TSDBBlocksStream
parquetStreams map[schema.ExternalLabelsHash]schema.ParquetBlocksStream

expectedPlan Plan
}{
{
name: "zero time offsets means no filtering - all dates included",
notAfter: time.UnixMilli(math.MaxInt64),
maxDays: 10,
minDate: "",
maxDate: "",
tsdbStreams: map[schema.ExternalLabelsHash]schema.TSDBBlocksStream{
defaultHash: {
StreamDescriptor: schema.StreamDescriptor{ExternalLabels: defaultExternalLabels},
Metas: []metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JT0DPYGA1HPW5RBZ1KBXCNXA"),
MinTime: time.Date(2026, time.February, 20, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTime: time.Date(2026, time.February, 21, 0, 0, 0, 0, time.UTC).UnixMilli(),
},
},
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JT0DPYGA1HPW5RBZ1KBXCNXB"),
MinTime: time.Date(2026, time.February, 15, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTime: time.Date(2026, time.February, 16, 0, 0, 0, 0, time.UTC).UnixMilli(),
},
},
},
},
},
parquetStreams: map[schema.ExternalLabelsHash]schema.ParquetBlocksStream{},
expectedPlan: Plan{
Steps: []Step{
{
Date: util.NewDate(2026, time.February, 20),
Sources: mockBlocks("01JT0DPYGA1HPW5RBZ1KBXCNXA"),
ExternalLabels: defaultExternalLabels,
},
{
Date: util.NewDate(2026, time.February, 15),
Sources: mockBlocks("01JT0DPYGA1HPW5RBZ1KBXCNXB"),
ExternalLabels: defaultExternalLabels,
},
},
},
},

{
name: "date window filters only blocks within rang",
notAfter: time.UnixMilli(math.MaxInt64),
maxDays: 10,
minDate: "2026-02-17",
maxDate: "2026-02-19",
tsdbStreams: map[schema.ExternalLabelsHash]schema.TSDBBlocksStream{
defaultHash: {
StreamDescriptor: schema.StreamDescriptor{ExternalLabels: defaultExternalLabels},
Metas: []metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JT0DPYGA1HPW5RBZ1KBXCNXA"),
MinTime: time.Date(2026, time.February, 20, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTime: time.Date(2026, time.February, 21, 0, 0, 0, 0, time.UTC).UnixMilli(),
},
},
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JT0DPYGA1HPW5RBZ1KBXCNXB"),
MinTime: time.Date(2026, time.February, 15, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTime: time.Date(2026, time.February, 16, 0, 0, 0, 0, time.UTC).UnixMilli(),
},
},
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JT0DPYGA1HPW5RBZ1KBXCNXC"),
MinTime: time.Date(2026, time.February, 18, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTime: time.Date(2026, time.February, 19, 0, 0, 0, 0, time.UTC).UnixMilli(),
},
},
},
},
},
parquetStreams: map[schema.ExternalLabelsHash]schema.ParquetBlocksStream{},
expectedPlan: Plan{
Steps: []Step{
{
Date: util.NewDate(2026, time.February, 18),
Sources: mockBlocks("01JT0DPYGA1HPW5RBZ1KBXCNXC"),
ExternalLabels: defaultExternalLabels,
},
},
},
},
} {
t.Run(tc.name, func(tt *testing.T) {
plan := NewPlanner(tc.notAfter, tc.maxDays).Plan(tc.tsdbStreams, tc.parquetStreams, tc.minDate, tc.maxDate)

if diff := cmp.Diff(tc.expectedPlan, plan,
cmpopts.IgnoreUnexported(),
Expand Down