Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type conversionOpts struct {

gracePeriod time.Duration
maxDays int
maxDaysBack int
recompress bool
sortLabels []string
rowGroupSize int
Expand Down Expand Up @@ -76,6 +77,7 @@ func (opts *conversionOpts) registerFlags(cmd *kingpin.CmdClause) {
cmd.Flag("convert.recompress", "recompress chunks").Default("true").BoolVar(&opts.recompress)
cmd.Flag("convert.grace-period", "dont convert for dates younger than this").Default("48h").DurationVar(&opts.gracePeriod)
cmd.Flag("convert.max-plan-days", "soft limit for the number of days to plan conversions for").Default("2").IntVar(&opts.maxDays)
cmd.Flag("convert.max-days-back", "limit conversion to this many days back (0 = no limit)").Default("0").IntVar(&opts.maxDaysBack)

cmd.Flag("convert.rowgroup.size", "size of rowgroups").Default("1_000_000").IntVar(&opts.rowGroupSize)
cmd.Flag("convert.rowgroup.count", "rowgroups per shard").Default("6").IntVar(&opts.rowGroupCount)
Expand Down Expand Up @@ -200,7 +202,12 @@ func advanceConversion(
parquetMetas := parquetDiscoverer.Metas()
tsdbMetas := tsdbDiscoverer.Metas()

plan := convert.NewPlanner(time.Now().Add(-opts.gracePeriod), opts.maxDays).Plan(tsdbMetas, parquetMetas)
notAfter := time.Now().Add(-opts.gracePeriod)
var notBefore time.Time
if opts.maxDaysBack > 0 {
notBefore = time.Now().AddDate(0, 0, -opts.maxDaysBack).Truncate(24 * time.Hour)
}
plan := convert.NewPlanner(notAfter, notBefore, opts.maxDays).Plan(tsdbMetas, parquetMetas)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could values of notAfter, notBefore be logged, please?

if len(plan.Steps) == 0 {
log.Info("Nothing to do")
return nil
Expand Down
11 changes: 9 additions & 2 deletions convert/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ type Plan struct {
type Planner struct {
// Do not create parquet blocks that are younger then this.
notAfter time.Time
// Do not create parquet blocks that are older then this.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/then/than/

notBefore time.Time
// Maximum number of days to plan conversions for. Planner might still produce a plan with more days
// if it would be converting a TSDB block that spans over more days, to avoid re-downloading that block
// for the next plan.
maxDays int
}

func NewPlanner(notAfter time.Time, maxDays int) Planner {
return Planner{notAfter: notAfter, maxDays: maxDays}
func NewPlanner(notAfter, notBefore time.Time, maxDays int) Planner {
return Planner{notAfter: notAfter, notBefore: notBefore, maxDays: maxDays}
}

func (p Planner) Plan(tsdbMetas map[string]metadata.Meta, parquetMetas map[string]schema.Meta) Plan {
Expand Down Expand Up @@ -79,6 +81,11 @@ func (p Planner) Plan(tsdbMetas map[string]metadata.Meta, parquetMetas map[strin
continue
}

if date.ToTime().Before(p.notBefore) {
// Ignore TSDB blocks that are too old.
continue
}

if _, ok := pqDates[date]; ok {
// This date is already covered by a parquet block.
continue
Expand Down
63 changes: 62 additions & 1 deletion convert/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestPlanner(t *testing.T) {
name string

notAfter time.Time
notBefore time.Time
maxDays int
tsdbMetas map[string]metadata.Meta
parquetMetas map[string]schema.Meta
Expand Down Expand Up @@ -510,9 +511,69 @@ func TestPlanner(t *testing.T) {
},
},
},
{
name: "we dont convert blocks that are too old",
notAfter: time.UnixMilli(math.MaxInt64),
notBefore: time.Date(2020, time.January, 6, 0, 0, 0, 0, time.UTC),
maxDays: 7,
tsdbMetas: map[string]metadata.Meta{
"01JT0DPYGA1HPW5RBZ1KBXCNXK": {
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JT0DPYGA1HPW5RBZ1KBXCNXK"),
MinTime: time.Date(2020, time.January, 4, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTime: time.Date(2020, time.January, 8, 0, 0, 0, 0, time.UTC).UnixMilli(),
},
},
},
parquetMetas: map[string]schema.Meta{},
expectedPlan: Plan{
Steps: []Step{
{
Date: util.NewDate(2020, time.January, 7),
Sources: mockBlocks("01JT0DPYGA1HPW5RBZ1KBXCNXK"),
},
{
Date: util.NewDate(2020, time.January, 6),
Sources: mockBlocks("01JT0DPYGA1HPW5RBZ1KBXCNXK"),
},
},
},
},
{
name: "block spanning old and new dates only converts new dates",
notAfter: time.Date(2026, time.January, 8, 0, 0, 0, 0, time.UTC),
notBefore: time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC), // max-days-back=7 from 1/8/2026
maxDays: 10,
tsdbMetas: map[string]metadata.Meta{
"01JT0DPYGA1HPW5RBZ1KBXCNXK": {
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JT0DPYGA1HPW5RBZ1KBXCNXK"),
MinTime: time.Date(2025, time.December, 25, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTime: time.Date(2026, time.January, 4, 0, 0, 0, 0, time.UTC).UnixMilli(),
},
},
},
parquetMetas: map[string]schema.Meta{},
expectedPlan: Plan{
Steps: []Step{
{
Date: util.NewDate(2026, time.January, 3),
Sources: mockBlocks("01JT0DPYGA1HPW5RBZ1KBXCNXK"),
},
{
Date: util.NewDate(2026, time.January, 2),
Sources: mockBlocks("01JT0DPYGA1HPW5RBZ1KBXCNXK"),
},
{
Date: util.NewDate(2026, time.January, 1),
Sources: mockBlocks("01JT0DPYGA1HPW5RBZ1KBXCNXK"),
},
},
},
},
} {
t.Run(tc.name, func(tt *testing.T) {
plan := NewPlanner(tc.notAfter, tc.maxDays).Plan(tc.tsdbMetas, tc.parquetMetas)
plan := NewPlanner(tc.notAfter, tc.notBefore, tc.maxDays).Plan(tc.tsdbMetas, tc.parquetMetas)

if diff := cmp.Diff(tc.expectedPlan, plan,
cmpopts.IgnoreUnexported(),
Expand Down