From 7fe9eb57e100d1406522e507c9756f447c5c9fda Mon Sep 17 00:00:00 2001 From: Philip Gough Date: Tue, 29 Apr 2025 15:28:33 +0100 Subject: [PATCH 1/8] tools: Add min and max time to bucket cleanup command --- cmd/thanos/tools_bucket.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 399bae1673..0922440470 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -149,6 +149,7 @@ type bucketCleanupConfig struct { consistencyDelay time.Duration blockSyncConcurrency int deleteDelay time.Duration + filterConf *store.FilterConfig } type bucketRetentionConfig struct { @@ -278,11 +279,17 @@ func (tbc *bucketMarkBlockConfig) registerBucketMarkBlockFlag(cmd extkingpin.Fla } func (tbc *bucketCleanupConfig) registerBucketCleanupFlag(cmd extkingpin.FlagClause) *bucketCleanupConfig { + tbc.filterConf = &store.FilterConfig{} + cmd.Flag("delete-delay", "Time before a block marked for deletion is deleted from bucket.").Default("48h").DurationVar(&tbc.deleteDelay) cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)). Default("30m").DurationVar(&tbc.consistencyDelay) cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). Default("20").IntVar(&tbc.blockSyncConcurrency) + cmd.Flag("min-time", "Start of time range limit to cleanup blocks. Thanos Tools will cleanup blocks, which were created later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("0000-01-01T00:00:00Z").SetValue(&tbc.filterConf.MinTime) + cmd.Flag("max-time", "End of time range limit to cleanup. Thanos Tools will cleanup only blocks, which were created earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("9999-12-31T23:59:59Z").SetValue(&tbc.filterConf.MaxTime) return tbc } @@ -885,6 +892,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat } cf := baseMetaFetcher.NewMetaFetcher( extprom.WrapRegistererWithPrefix(extpromPrefix, reg), []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(tbc.filterConf.MinTime, tbc.filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), block.NewConsistencyDelayMetaFilter(logger, tbc.consistencyDelay, extprom.WrapRegistererWithPrefix(extpromPrefix, reg)), ignoreDeletionMarkFilter, From 97f38f6d10811f10f3b63d9e655039f7c3efe8c6 Mon Sep 17 00:00:00 2001 From: Philip Gough Date: Thu, 1 May 2025 09:20:53 +0100 Subject: [PATCH 2/8] docs: Add docs for bucket tools cleanup --- docs/components/tools.md | 81 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/docs/components/tools.md b/docs/components/tools.md index 7e28a4af48..dd409fe52a 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -797,6 +797,87 @@ Flags: ``` +### Bucket cleanup + +`tools bucket cleanup` can be used to manually clean up blocks in the bucket which have been marked for deletion. + +```$ mdox-exec="thanos tools bucket cleanup --help" +usage: thanos tools bucket cleanup [] + +Cleans up all blocks marked for deletion. + +Flags: + --auto-gomemlimit.ratio=0.9 + The ratio of reserved GOMEMLIMIT memory to the + detected maximum container or system memory. + --block-sync-concurrency=20 + Number of goroutines to use when syncing block + metadata from object storage. + --consistency-delay=30m Minimum age of fresh (non-compacted) + blocks before they are being processed. + Malformed blocks older than the maximum of + consistency-delay and 48h0m0s will be removed. + --delete-delay=48h Time before a block marked for deletion is + deleted from bucket. + --enable-auto-gomemlimit Enable go runtime to automatically limit memory + consumption. + -h, --help Show context-sensitive help (also try + --help-long and --help-man). + --log.format=logfmt Log format to use. Possible options: logfmt or + json. + --log.level=info Log filtering level. + --max-time=9999-12-31T23:59:59Z + End of time range limit to cleanup. Thanos + Tools will cleanup only blocks, which were + created earlier than this value. Option can be a + constant time in RFC3339 format or time duration + relative to current time, such as -1d or 2h45m. + Valid duration units are ms, s, m, h, d, w, y. + --min-time=0000-01-01T00:00:00Z + Start of time range limit to cleanup blocks. + Thanos Tools will cleanup blocks, which were + created later than this value. Option can be a + constant time in RFC3339 format or time duration + relative to current time, such as -1d or 2h45m. + Valid duration units are ms, s, m, h, d, w, y. + --objstore.config= + Alternative to 'objstore.config-file' + flag (mutually exclusive). Content of + YAML file that contains object store + configuration. See format details: + https://thanos.io/tip/thanos/storage.md/#configuration + --objstore.config-file= + Path to YAML file that contains object + store configuration. See format details: + https://thanos.io/tip/thanos/storage.md/#configuration + --selector.relabel-config= + Alternative to 'selector.relabel-config-file' + flag (mutually exclusive). Content of YAML + file with relabeling configuration that allows + selecting blocks to act on based on their + external labels. It follows thanos sharding + relabel-config syntax. For format details see: + https://thanos.io/tip/thanos/sharding.md/#relabelling + --selector.relabel-config-file= + Path to YAML file with relabeling + configuration that allows selecting blocks + to act on based on their external labels. + It follows thanos sharding relabel-config + syntax. For format details see: + https://thanos.io/tip/thanos/sharding.md/#relabelling + --tracing.config= + Alternative to 'tracing.config-file' flag + (mutually exclusive). Content of YAML file + with tracing configuration. See format details: + https://thanos.io/tip/thanos/tracing.md/#configuration + --tracing.config-file= + Path to YAML file with tracing + configuration. See format details: + https://thanos.io/tip/thanos/tracing.md/#configuration + --version Show application version. + +``` + ### Bucket mark `tools bucket mark` can be used to manually mark block for deletion. From acb6982b72777c2e7fb5f38c2b19e2e247be0e19 Mon Sep 17 00:00:00 2001 From: Philip Gough Date: Thu, 1 May 2025 09:37:35 +0100 Subject: [PATCH 3/8] Add meta filters for compaction and resolution levels --- pkg/block/fetcher.go | 67 ++++++++++++++++++++ pkg/block/fetcher_test.go | 126 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 193 insertions(+) diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index d0f3154530..1b6ebdb876 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -1063,6 +1063,73 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL return nil } +var _ MetadataFilter = &ResolutionMetaFilter{} + +// ResolutionMetaFilter is a BaseFetcher filter that filters out blocks that are not in the specified resolution range. +// Not go-routine safe. +type ResolutionMetaFilter struct { + resolutions []int64 + logger log.Logger +} + +// NewResolutionMetaFilter creates ResolutionMetaFilter. +func NewResolutionMetaFilter(logger log.Logger, resolutions []int64) *ResolutionMetaFilter { + return &ResolutionMetaFilter{resolutions: resolutions, logger: logger} +} + +// Filter filters out blocks that are not in the specified resolution range. +func (f *ResolutionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { + if len(f.resolutions) == 0 { + return nil + } + for id, m := range metas { + var found bool + for _, resolution := range f.resolutions { + if m.Thanos.Downsample.Resolution == resolution { + found = true + break + } + } + if !found { + delete(metas, id) + } + } + return nil +} + +// CompactionMetaFilter is a BaseFetcher filter that filters out blocks that are not in the specified compaction range. +// Not go-routine safe. +type CompactionMetaFilter struct { + compactions []int + logger log.Logger +} + +// NewCompactionMetaFilter creates CompactionMetaFilter. +func NewCompactionMetaFilter(logger log.Logger, compactions []int) *CompactionMetaFilter { + return &CompactionMetaFilter{compactions: compactions, logger: logger} +} + +// Filter filters out blocks that are not in the specified compaction range. +func (f *CompactionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { + if len(f.compactions) == 0 { + return nil + } + + for id, m := range metas { + var found bool + for _, compactionLevel := range f.compactions { + if m.Compaction.Level == compactionLevel { + found = true + break + } + } + if !found { + delete(metas, id) + } + } + return nil +} + var ( SelectorSupportedRelabelActions = map[relabel.Action]struct{}{relabel.Keep: {}, relabel.Drop: {}, relabel.HashMod: {}} ) diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 5e24e26538..2997c9c90a 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -1211,3 +1211,129 @@ func Test_ParseRelabelConfig(t *testing.T) { testutil.NotOk(t, err) testutil.Equals(t, "unsupported relabel action: labelmap", err.Error()) } + +func TestCompactionMetaFilter_Filter(t *testing.T) { + testCases := []struct { + name string + compactionLevels []int + input map[ulid.ULID]*metadata.Meta + expected map[ulid.ULID]*metadata.Meta + }{ + { + name: "filters out blocks not in specified compaction range", + compactionLevels: []int{1, 2}, + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 1}}}, + ULID(2): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 2}}}, + ULID(3): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 3}}}, + ULID(4): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 4}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{ + ULID(1): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 1}}}, + ULID(2): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 2}}}, + }, + }, + { + name: "handles empty input", + compactionLevels: []int{1, 2}, + input: map[ulid.ULID]*metadata.Meta{}, + expected: map[ulid.ULID]*metadata.Meta{}, + }, + { + name: "filters out all blocks if no matching compaction levels", + compactionLevels: []int{5, 6}, + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 1}}}, + ULID(2): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 2}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{}, + }, + { + name: "keeps all blocks if all match compaction levels", + compactionLevels: []int{1, 2, 3}, + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 1}}}, + ULID(2): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 2}}}, + ULID(3): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 3}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{ + ULID(1): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 1}}}, + ULID(2): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 2}}}, + ULID(3): {BlockMeta: tsdb.BlockMeta{Compaction: tsdb.BlockMetaCompaction{Level: 3}}}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + filter := NewCompactionMetaFilter(log.NewNopLogger(), tc.compactionLevels) + + m := newTestFetcherMetrics() + testutil.Ok(t, filter.Filter(context.Background(), tc.input, m.Synced, nil)) + testutil.Equals(t, tc.expected, tc.input) + }) + } +} + +func TestResolutionMetaFilter_Filter(t *testing.T) { + testCases := []struct { + name string + resolutions []int64 + input map[ulid.ULID]*metadata.Meta + expected map[ulid.ULID]*metadata.Meta + }{ + { + name: "filters out blocks not matching resolution", + resolutions: []int64{30000, 60000}, + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 30000}}}, + ULID(2): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 60000}}}, + ULID(3): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 15000}}}, + ULID(4): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 0}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 30000}}}, + ULID(2): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 60000}}}, + }, + }, + { + name: "keeps all blocks when all match resolution", + resolutions: []int64{15000, 30000, 60000}, + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 15000}}}, + ULID(2): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 30000}}}, + ULID(3): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 60000}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 15000}}}, + ULID(2): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 30000}}}, + ULID(3): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 60000}}}, + }, + }, + { + name: "handles empty input gracefully", + resolutions: []int64{30000, 60000}, + input: map[ulid.ULID]*metadata.Meta{}, + expected: map[ulid.ULID]*metadata.Meta{}, + }, + { + name: "filters out all blocks when no resolution matches", + resolutions: []int64{90000, 120000}, + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 30000}}}, + ULID(2): {Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: 60000}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + filter := NewResolutionMetaFilter(log.NewNopLogger(), tc.resolutions) + + m := newTestFetcherMetrics() + testutil.Ok(t, filter.Filter(context.Background(), tc.input, m.Synced, nil)) + testutil.Equals(t, tc.expected, tc.input) + }) + } +} From f0a8a2168aaed72d0e1fe9d6a876dd0ef413d4d9 Mon Sep 17 00:00:00 2001 From: Philip Gough Date: Thu, 1 May 2025 09:39:07 +0100 Subject: [PATCH 4/8] tools: ls add filter for compaction and resolutions --- cmd/thanos/tools_bucket.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 0922440470..754a44d94e 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -114,6 +114,8 @@ type bucketLsConfig struct { excludeDelete bool selectorRelabelConf extflag.PathOrContent filterConf *store.FilterConfig + resolutions []time.Duration + compactions []int timeout time.Duration } @@ -197,6 +199,8 @@ func (tbc *bucketLsConfig) registerBucketLsFlag(cmd extkingpin.FlagClause) *buck cmd.Flag("max-time", "End of time range limit to list. Thanos Tools will list only blocks, which were created earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("9999-12-31T23:59:59Z").SetValue(&tbc.filterConf.MaxTime) cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").DurationVar(&tbc.timeout) + cmd.Flag("resolution", "Only blocks with these resolutions will be listed. Defaults to all resolutions. Repeated flag.").Default("0s", "5m", "1h").HintAction(listResLevel).DurationListVar(&tbc.resolutions) + cmd.Flag("compaction", "If set, only blocks with these compaction levels will be listed. Repeated flag.").Default().IntsVar(&tbc.compactions) return tbc } @@ -455,6 +459,18 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo block.NewTimePartitionMetaFilter(tbc.filterConf.MinTime, tbc.filterConf.MaxTime), } + if len(tbc.compactions) > 0 { + filters = append(filters, block.NewCompactionMetaFilter(logger, tbc.compactions)) + } + + if len(tbc.resolutions) > 0 { + var resolutionLevels []int64 + for _, lvl := range tbc.resolutions { + resolutionLevels = append(resolutionLevels, lvl.Milliseconds()) + } + filters = append(filters, block.NewResolutionMetaFilter(logger, resolutionLevels)) + } + if tbc.excludeDelete { ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency) filters = append(filters, ignoreDeletionMarkFilter) From 63302ea72265ce6a43a7a0401c43f27ffd838195 Mon Sep 17 00:00:00 2001 From: Philip Gough Date: Thu, 1 May 2025 09:41:10 +0100 Subject: [PATCH 5/8] docs: Generate docs for tools ls cmd --- docs/components/tools.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/components/tools.md b/docs/components/tools.md index dd409fe52a..dfb93098fe 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -463,6 +463,9 @@ Flags: --auto-gomemlimit.ratio=0.9 The ratio of reserved GOMEMLIMIT memory to the detected maximum container or system memory. + --compaction=COMPACTION ... + If set, only blocks with these compaction levels + will be listed. Repeated flag. --enable-auto-gomemlimit Enable go runtime to automatically limit memory consumption. --exclude-delete Exclude blocks marked for deletion. @@ -498,6 +501,9 @@ Flags: -o, --output="" Optional format in which to print each block's information. Options are 'json', 'wide' or a custom template. + --resolution=0s... ... Only blocks with these resolutions will be + listed. Defaults to all resolutions. Repeated + flag. --selector.relabel-config= Alternative to 'selector.relabel-config-file' flag (mutually exclusive). Content of YAML From 7ac6414b6808afddbaccce64efb27b3807304443 Mon Sep 17 00:00:00 2001 From: Philip Gough Date: Fri, 2 May 2025 10:37:32 +0100 Subject: [PATCH 6/8] tools: Add ability to filter by block id on inspect and cleanup commands --- cmd/thanos/tools_bucket.go | 66 ++++++++++++++++++++++++---------- pkg/block/fetcher.go | 74 +++++++++++++++++++++++++++----------- pkg/block/fetcher_test.go | 62 ++++++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 39 deletions(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 754a44d94e..127fa37a66 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -100,6 +100,7 @@ type bucketRewriteConfig struct { type bucketInspectConfig struct { selector []string sortBy []string + blockIDs []string timeout time.Duration } @@ -152,6 +153,7 @@ type bucketCleanupConfig struct { blockSyncConcurrency int deleteDelay time.Duration filterConf *store.FilterConfig + blockIDs []string } type bucketRetentionConfig struct { @@ -209,6 +211,7 @@ func (tbc *bucketInspectConfig) registerBucketInspectFlag(cmd extkingpin.FlagCla PlaceHolder("=\\\"\\\"").StringsVar(&tbc.selector) cmd.Flag("sort-by", "Sort by columns. It's also possible to sort by multiple columns, e.g. '--sort-by FROM --sort-by UNTIL'. I.e., if the 'FROM' value is equal the rows are then further sorted by the 'UNTIL' value."). Default("FROM", "UNTIL").EnumsVar(&tbc.sortBy, inspectColumns...) + cmd.Flag("id", "ID (ULID) of the blocks to be marked for inspection (repeated flag). If none are specified, all matching blocks will be inspected.").Default().StringsVar(&tbc.blockIDs) cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").DurationVar(&tbc.timeout) return tbc @@ -294,6 +297,7 @@ func (tbc *bucketCleanupConfig) registerBucketCleanupFlag(cmd extkingpin.FlagCla Default("0000-01-01T00:00:00Z").SetValue(&tbc.filterConf.MinTime) cmd.Flag("max-time", "End of time range limit to cleanup. Thanos Tools will cleanup only blocks, which were created earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("9999-12-31T23:59:59Z").SetValue(&tbc.filterConf.MaxTime) + cmd.Flag("id", "ID (ULID) of the blocks to be marked for cleanup (repeated flag)").Default().StringsVar(&tbc.blockIDs) return tbc } @@ -350,6 +354,11 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path return err } + ids, err := parseBlockIDS(tbc.ids) + if err != nil { + return err + } + bkt, err := client.NewBucket(logger, confContentYaml, component.Bucket.String(), nil) if err != nil { return err @@ -387,7 +396,10 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path } // We ignore any block that has the deletion marker file. - filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)} + filters := []block.MetadataFilter{ + block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency), + block.NewIDMetaFilter(logger, ids), + } baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters) if err != nil { @@ -395,13 +407,9 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path } var idMatcher func(ulid.ULID) bool = nil - if len(tbc.ids) > 0 { - idsMap := map[string]struct{}{} - for _, bid := range tbc.ids { - id, err := ulid.Parse(bid) - if err != nil { - return errors.Wrap(err, "invalid ULID found in --id flag") - } + if len(ids) > 0 { + idsMap := make(map[string]struct{}, len(ids)) + for _, id := range ids { idsMap[id.String()] = struct{}{} } @@ -567,6 +575,15 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat return errors.Wrap(err, "error parsing selector flag") } + ids, err := parseBlockIDS(tbc.blockIDs) + if err != nil { + return err + } + + filters := []block.MetadataFilter{ + block.NewIDMetaFilter(logger, ids), + } + confContentYaml, err := objStoreConfig.Content() if err != nil { return err @@ -579,7 +596,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name())) baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) - fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) + fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters) if err != nil { return err } @@ -599,9 +616,6 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat } blockMetas := make([]*metadata.Meta, 0, len(metas)) - for _, meta := range metas { - blockMetas = append(blockMetas, meta) - } var opPrinter tablePrinter op := outputType(*output) @@ -890,6 +904,11 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat stubCounter := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + ids, err := parseBlockIDS(tbc.blockIDs) + if err != nil { + return err + } + // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. // This is to make sure compactor will not accidentally perform compactions with gap instead. @@ -913,6 +932,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat block.NewConsistencyDelayMetaFilter(logger, tbc.consistencyDelay, extprom.WrapRegistererWithPrefix(extpromPrefix, reg)), ignoreDeletionMarkFilter, duplicateBlocksFilter, + block.NewIDMetaFilter(logger, ids), }, ) sy, err = compact.NewMetaSyncer( @@ -1255,13 +1275,9 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat return errors.New("rewrite configuration should be provided") } - var ids []ulid.ULID - for _, id := range tbc.blockIDs { - u, err := ulid.Parse(id) - if err != nil { - return errors.Errorf("id is not a valid block ULID, got: %v", id) - } - ids = append(ids, u) + ids, err := parseBlockIDS(tbc.blockIDs) + if err != nil { + return err } if err := os.RemoveAll(tbc.tmpDir); err != nil { @@ -1542,3 +1558,15 @@ func registerBucketUploadBlocks(app extkingpin.AppClause, objStoreConfig *extfla return nil }) } + +func parseBlockIDS(ids []string) ([]ulid.ULID, error) { + var ulids []ulid.ULID + for _, id := range ids { + u, err := ulid.Parse(id) + if err != nil { + return nil, errors.Errorf("block.id is not a valid UUID, got: %v", id) + } + ulids = append(ulids, u) + } + return ulids, nil +} diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 1b6ebdb876..79ce64714f 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -1068,68 +1068,102 @@ var _ MetadataFilter = &ResolutionMetaFilter{} // ResolutionMetaFilter is a BaseFetcher filter that filters out blocks that are not in the specified resolution range. // Not go-routine safe. type ResolutionMetaFilter struct { - resolutions []int64 + resolutions map[int64]struct{} logger log.Logger } // NewResolutionMetaFilter creates ResolutionMetaFilter. func NewResolutionMetaFilter(logger log.Logger, resolutions []int64) *ResolutionMetaFilter { - return &ResolutionMetaFilter{resolutions: resolutions, logger: logger} + resolutionLevels := make(map[int64]struct{}, len(resolutions)) + for _, resolutionLevel := range resolutions { + resolutionLevels[resolutionLevel] = struct{}{} + } + return &ResolutionMetaFilter{resolutions: resolutionLevels, logger: logger} } // Filter filters out blocks that are not in the specified resolution range. +// If no resolutions are specified, all blocks are kept. func (f *ResolutionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { if len(f.resolutions) == 0 { return nil } + for id, m := range metas { - var found bool - for _, resolution := range f.resolutions { - if m.Thanos.Downsample.Resolution == resolution { - found = true - break - } - } - if !found { + if _, ok := f.resolutions[m.Thanos.Downsample.Resolution]; !ok { delete(metas, id) } } + return nil } +var _ MetadataFilter = &ResolutionMetaFilter{} + // CompactionMetaFilter is a BaseFetcher filter that filters out blocks that are not in the specified compaction range. // Not go-routine safe. type CompactionMetaFilter struct { - compactions []int + compactions map[int]struct{} logger log.Logger } // NewCompactionMetaFilter creates CompactionMetaFilter. func NewCompactionMetaFilter(logger log.Logger, compactions []int) *CompactionMetaFilter { - return &CompactionMetaFilter{compactions: compactions, logger: logger} + compactionLevels := make(map[int]struct{}, len(compactions)) + for _, compactionLevel := range compactions { + compactionLevels[compactionLevel] = struct{}{} + } + return &CompactionMetaFilter{compactions: compactionLevels, logger: logger} } // Filter filters out blocks that are not in the specified compaction range. +// If no compactions are specified, all blocks are kept. func (f *CompactionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { if len(f.compactions) == 0 { return nil } for id, m := range metas { - var found bool - for _, compactionLevel := range f.compactions { - if m.Compaction.Level == compactionLevel { - found = true - break - } - } - if !found { + if _, ok := f.compactions[m.Compaction.Level]; !ok { delete(metas, id) } } return nil } +var _ MetadataFilter = &ResolutionMetaFilter{} + +// IDMetaFilter BlockIDMetaFilter is a BaseFetcher filter that filters out blocks that are not in the list of specified ID. +// Not go-routine safe. +type IDMetaFilter struct { + ids map[ulid.ULID]struct{} + logger log.Logger +} + +// NewIDMetaFilter creates BlockIDMetaFilter. +func NewIDMetaFilter(logger log.Logger, ids []ulid.ULID) *IDMetaFilter { + metaIDS := make(map[ulid.ULID]struct{}, len(ids)) + for _, id := range ids { + metaIDS[id] = struct{}{} + } + return &IDMetaFilter{ids: metaIDS, logger: logger} +} + +// Filter filters out blocks that are not in the specified ID list. +// If no IDs are specified, all blocks are kept. +func (f *IDMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { + if len(f.ids) == 0 { + return nil + } + + for metaID, _ := range metas { + if _, ok := f.ids[metaID]; !ok { + delete(metas, metaID) + } + } + + return nil +} + var ( SelectorSupportedRelabelActions = map[relabel.Action]struct{}{relabel.Keep: {}, relabel.Drop: {}, relabel.HashMod: {}} ) diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 2997c9c90a..aac5c5da9d 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -1337,3 +1337,65 @@ func TestResolutionMetaFilter_Filter(t *testing.T) { }) } } + +func TestIDMetaFilter_Filter(t *testing.T) { + testCases := []struct { + name string + ids []ulid.ULID + input map[ulid.ULID]*metadata.Meta + expected map[ulid.ULID]*metadata.Meta + }{ + { + name: "filters out blocks not in ID list", + ids: []ulid.ULID{ULID(1), ULID(2)}, + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"key1": "value1"}}}, + ULID(2): {Thanos: metadata.Thanos{Labels: map[string]string{"key2": "value2"}}}, + ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"key3": "value3"}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"key1": "value1"}}}, + ULID(2): {Thanos: metadata.Thanos{Labels: map[string]string{"key2": "value2"}}}, + }, + }, + { + name: "keeps all blocks when all IDs match", + ids: []ulid.ULID{ULID(1), ULID(2), ULID(3)}, + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"key1": "value1"}}}, + ULID(2): {Thanos: metadata.Thanos{Labels: map[string]string{"key2": "value2"}}}, + ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"key3": "value3"}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"key1": "value1"}}}, + ULID(2): {Thanos: metadata.Thanos{Labels: map[string]string{"key2": "value2"}}}, + ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"key3": "value3"}}}, + }, + }, + { + name: "handles empty input gracefully", + ids: []ulid.ULID{ULID(1), ULID(2)}, + input: map[ulid.ULID]*metadata.Meta{}, + expected: map[ulid.ULID]*metadata.Meta{}, + }, + { + name: "filters out all blocks when no IDs match", + ids: []ulid.ULID{ULID(4), ULID(5)}, + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"key1": "value1"}}}, + ULID(2): {Thanos: metadata.Thanos{Labels: map[string]string{"key2": "value2"}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + filter := NewIDMetaFilter(log.NewNopLogger(), tc.ids) + + m := newTestFetcherMetrics() + testutil.Ok(t, filter.Filter(context.Background(), tc.input, m.Synced, nil)) + testutil.Equals(t, tc.expected, tc.input) + }) + } +} From 7b5b54fd6c8430bdd6d7edb7c91fc339ca4bbb1a Mon Sep 17 00:00:00 2001 From: Philip Gough Date: Fri, 2 May 2025 10:38:43 +0100 Subject: [PATCH 7/8] make: Docs for tools cmd --- docs/components/tools.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/components/tools.md b/docs/components/tools.md index dfb93098fe..2847cf7180 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -556,6 +556,10 @@ Flags: consumption. -h, --help Show context-sensitive help (also try --help-long and --help-man). + --id=ID ... ID (ULID) of the blocks to be marked for + inspection (repeated flag). If none are + specified, all matching blocks will be + inspected. --log.format=logfmt Log format to use. Possible options: logfmt or json. --log.level=info Log filtering level. @@ -829,6 +833,8 @@ Flags: consumption. -h, --help Show context-sensitive help (also try --help-long and --help-man). + --id=ID ... ID (ULID) of the blocks to be marked for cleanup + (repeated flag) --log.format=logfmt Log format to use. Possible options: logfmt or json. --log.level=info Log filtering level. From ce4d503c11808066eddd5fc7b2869106a79191f0 Mon Sep 17 00:00:00 2001 From: Philip Gough Date: Tue, 6 May 2025 09:18:21 +0100 Subject: [PATCH 8/8] Cleanup dry run --- cmd/thanos/tools_bucket.go | 28 ++++++++++++++---- pkg/block/fetcher.go | 6 ++-- pkg/compact/blocks_cleaner.go | 56 +++++++++++++++++++++++++++++++---- pkg/compact/clean.go | 52 ++++++++++++++++++++++++-------- pkg/compact/compact.go | 2 ++ 5 files changed, 118 insertions(+), 26 deletions(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 127fa37a66..c41da760c3 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -154,6 +154,7 @@ type bucketCleanupConfig struct { deleteDelay time.Duration filterConf *store.FilterConfig blockIDs []string + dryRun bool } type bucketRetentionConfig struct { @@ -297,7 +298,8 @@ func (tbc *bucketCleanupConfig) registerBucketCleanupFlag(cmd extkingpin.FlagCla Default("0000-01-01T00:00:00Z").SetValue(&tbc.filterConf.MinTime) cmd.Flag("max-time", "End of time range limit to cleanup. Thanos Tools will cleanup only blocks, which were created earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("9999-12-31T23:59:59Z").SetValue(&tbc.filterConf.MaxTime) - cmd.Flag("id", "ID (ULID) of the blocks to be marked for cleanup (repeated flag)").Default().StringsVar(&tbc.blockIDs) + cmd.Flag("id", "ID (ULID) of the blocks to be marked for cleanup (repeated flag). If none provided no block will be filtered by ID").Default().StringsVar(&tbc.blockIDs) + cmd.Flag("dry-run", "Prints the blocks to be cleaned, that match any provided filters instead of cleaning them. Defaults to true, for user to double check").Default("false").BoolVar(&tbc.dryRun) return tbc } @@ -914,27 +916,26 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat // This is to make sure compactor will not accidentally perform compactions with gap instead. ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, tbc.deleteDelay/2, tbc.blockSyncConcurrency) duplicateBlocksFilter := block.NewDeduplicateFilter(tbc.blockSyncConcurrency) - blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, tbc.deleteDelay, stubCounter, stubCounter) ctx := context.Background() - var sy *compact.Syncer { baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) - baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "/tmp", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") } cf := baseMetaFetcher.NewMetaFetcher( extprom.WrapRegistererWithPrefix(extpromPrefix, reg), []block.MetadataFilter{ + block.NewIDMetaFilter(logger, ids), block.NewTimePartitionMetaFilter(tbc.filterConf.MinTime, tbc.filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), block.NewConsistencyDelayMetaFilter(logger, tbc.consistencyDelay, extprom.WrapRegistererWithPrefix(extpromPrefix, reg)), ignoreDeletionMarkFilter, duplicateBlocksFilter, - block.NewIDMetaFilter(logger, ids), }, ) + sy, err = compact.NewMetaSyncer( logger, reg, @@ -955,9 +956,24 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat if err := sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync blocks") } - level.Info(logger).Log("msg", "synced blocks done") + var filteredIDS []ulid.ULID + for i, _ := range sy.Metas() { + filteredIDS = append(filteredIDS, i) + } + + if tbc.dryRun { + compact.DryRunCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt) + blocksCleaner := compact.NewDryRunBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, tbc.deleteDelay, filteredIDS...) + if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { + return errors.Wrap(err, "error cleaning blocks") + } + level.Info(logger).Log("msg", "finished cleanup dry-run") + return nil + } + + blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, tbc.deleteDelay, stubCounter, stubCounter, filteredIDS...) compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, stubCounter, stubCounter, stubCounter) if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { return errors.Wrap(err, "error cleaning blocks") diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 79ce64714f..f530270287 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -299,8 +299,8 @@ type GaugeVec interface { WithLabelValues(lvs ...string) prometheus.Gauge } -// Filter allows filtering or modifying metas from the provided map or returns error. type MetadataFilter interface { + // Filter allows filtering or modifying metas from the provided map or returns error. Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error } @@ -1154,13 +1154,13 @@ func (f *IDMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.M if len(f.ids) == 0 { return nil } - + level.Debug(f.logger).Log("msg", "count before block ID meta filter ", "count", len(metas)) for metaID, _ := range metas { if _, ok := f.ids[metaID]; !ok { delete(metas, metaID) } } - + level.Debug(f.logger).Log("msg", "count after block ID meta filter", "count", len(metas)) return nil } diff --git a/pkg/compact/blocks_cleaner.go b/pkg/compact/blocks_cleaner.go index 5ae9120a8c..21a8893ce2 100644 --- a/pkg/compact/blocks_cleaner.go +++ b/pkg/compact/blocks_cleaner.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" @@ -16,6 +17,9 @@ import ( "github.com/thanos-io/thanos/pkg/block" ) +type deleteBlockFn func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) +type deleteBlockAndErrorFn func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error + // BlocksCleaner is a struct that deletes blocks from bucket which are marked for deletion. type BlocksCleaner struct { logger log.Logger @@ -24,10 +28,37 @@ type BlocksCleaner struct { deleteDelay time.Duration blocksCleaned prometheus.Counter blockCleanupFailures prometheus.Counter + deleteBlockFn deleteBlockAndErrorFn + // cleanByIDs is a list of block IDs to be cleaned. If empty, all blocks marked for deletion will be cleaned. + cleanByIDs map[ulid.ULID]struct{} } // NewBlocksCleaner creates a new BlocksCleaner. -func NewBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, deleteDelay time.Duration, blocksCleaned, blockCleanupFailures prometheus.Counter) *BlocksCleaner { +func NewBlocksCleaner( + logger log.Logger, + bkt objstore.Bucket, + ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, + deleteDelay time.Duration, + blocksCleaned, blockCleanupFailures prometheus.Counter, + cleanByID ...ulid.ULID) *BlocksCleaner { + + var idFilter = map[ulid.ULID]struct{}{} + if len(cleanByID) > 0 { + for _, id := range cleanByID { + idFilter[id] = struct{}{} + } + } + + deleteBlock := func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error { + if err := block.Delete(ctx, logger, bkt, id); err != nil { + blockCleanupFailures.Inc() + return errors.Wrap(err, "delete block") + } + blocksCleaned.Inc() + level.Info(logger).Log("msg", "deleted block marked for deletion", "block", id) + return nil + } + return &BlocksCleaner{ logger: logger, ignoreDeletionMarkFilter: ignoreDeletionMarkFilter, @@ -35,7 +66,19 @@ func NewBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMark deleteDelay: deleteDelay, blocksCleaned: blocksCleaned, blockCleanupFailures: blockCleanupFailures, + deleteBlockFn: deleteBlock, + cleanByIDs: idFilter, + } +} + +func NewDryRunBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, deleteDelay time.Duration, cleanByID ...ulid.ULID) *BlocksCleaner { + cleaner := NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, nil, nil, cleanByID...) + level.Info(logger).Log("msg", "dry-run block cleaning enabled") + cleaner.deleteBlockFn = func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error { + level.Info(logger).Log("msg", "would delete block marked for deletion", "block", id) + return nil } + return cleaner } // DeleteMarkedBlocks uses ignoreDeletionMarkFilter to gather the blocks that are marked for deletion and deletes those @@ -43,15 +86,18 @@ func NewBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMark func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) error { level.Info(s.logger).Log("msg", "started cleaning of blocks marked for deletion") + filterByID := len(s.cleanByIDs) > 0 deletionMarkMap := s.ignoreDeletionMarkFilter.DeletionMarkBlocks() for _, deletionMark := range deletionMarkMap { + if filterByID { + if _, ok := s.cleanByIDs[deletionMark.ID]; !ok { + continue + } + } if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > s.deleteDelay.Seconds() { - if err := block.Delete(ctx, s.logger, s.bkt, deletionMark.ID); err != nil { - s.blockCleanupFailures.Inc() + if err := s.deleteBlockFn(ctx, s.logger, s.bkt, deletionMark.ID); err != nil { return errors.Wrap(err, "delete block") } - s.blocksCleaned.Inc() - level.Info(s.logger).Log("msg", "deleted block marked for deletion", "block", deletionMark.ID) } } diff --git a/pkg/compact/clean.go b/pkg/compact/clean.go index a398ba9e3c..da7608355b 100644 --- a/pkg/compact/clean.go +++ b/pkg/compact/clean.go @@ -30,6 +30,44 @@ func BestEffortCleanAbortedPartialUploads( deleteAttempts prometheus.Counter, blockCleanups prometheus.Counter, blockCleanupFailures prometheus.Counter, +) { + deleteFn := func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) { + deleteAttempts.Inc() + level.Info(logger).Log("msg", "found partially uploaded block; marking for deletion", "block", id) + + if err := block.Delete(ctx, logger, bkt, id); err != nil { + blockCleanupFailures.Inc() + level.Warn(logger).Log("msg", "failed to delete aborted partial upload; will retry in next iteration", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err) + return + } + blockCleanups.Inc() + level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge) + } + + bestEffortCleanAbortedPartialUploads(ctx, logger, partial, bkt, deleteFn) +} + +func DryRunCleanAbortedPartialUploads( + ctx context.Context, + logger log.Logger, + partial map[ulid.ULID]error, + bkt objstore.Bucket, +) { + level.Info(logger).Log("msg", "dry-run best effort clean aborted partial uploads enabled") + + deleteFn := func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) { + level.Info(logger).Log("msg", "found partially uploaded block; marking for deletion", "block", id) + level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge) + } + bestEffortCleanAbortedPartialUploads(ctx, logger, partial, bkt, deleteFn) +} + +func bestEffortCleanAbortedPartialUploads( + ctx context.Context, + logger log.Logger, + partial map[ulid.ULID]error, + bkt objstore.Bucket, + deleteBlockFn deleteBlockFn, ) { level.Info(logger).Log("msg", "started cleaning of aborted partial uploads") @@ -45,18 +83,8 @@ func BestEffortCleanAbortedPartialUploads( continue } - deleteAttempts.Inc() - level.Info(logger).Log("msg", "found partially uploaded block; marking for deletion", "block", id) - // We don't gather any information about deletion marks for partial blocks, so let's simply remove it. We waited - // long PartialUploadThresholdAge already. - // TODO(bwplotka): Fix some edge cases: https://github.com/thanos-io/thanos/issues/2470 . - if err := block.Delete(ctx, logger, bkt, id); err != nil { - blockCleanupFailures.Inc() - level.Warn(logger).Log("msg", "failed to delete aborted partial upload; will retry in next iteration", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err) - continue - } - blockCleanups.Inc() - level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge) + deleteBlockFn(ctx, logger, bkt, id) + } level.Info(logger).Log("msg", "cleaning of aborted partial uploads done") } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index a20544b2f4..c84ac6ff06 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -167,7 +167,9 @@ func (s *Syncer) SyncMetas(ctx context.Context) error { } s.mtx.Lock() s.blocks = container.(metasContainer).metas + level.Debug(s.logger).Log("msg", "syncing metas", "count", len(s.blocks)) s.partial = container.(metasContainer).partial + level.Debug(s.logger).Log("msg", "syncing partial", "count", len(s.partial)) s.mtx.Unlock() return nil }