Skip to content

Bucket cleanup min max #8235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 91 additions & 23 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type bucketRewriteConfig struct {
type bucketInspectConfig struct {
selector []string
sortBy []string
blockIDs []string
timeout time.Duration
}

Expand All @@ -114,6 +115,8 @@ type bucketLsConfig struct {
excludeDelete bool
selectorRelabelConf extflag.PathOrContent
filterConf *store.FilterConfig
resolutions []time.Duration
compactions []int
timeout time.Duration
}

Expand Down Expand Up @@ -149,6 +152,9 @@ type bucketCleanupConfig struct {
consistencyDelay time.Duration
blockSyncConcurrency int
deleteDelay time.Duration
filterConf *store.FilterConfig
blockIDs []string
dryRun bool
}

type bucketRetentionConfig struct {
Expand Down Expand Up @@ -196,6 +202,8 @@ func (tbc *bucketLsConfig) registerBucketLsFlag(cmd extkingpin.FlagClause) *buck
cmd.Flag("max-time", "End of time range limit to list. Thanos Tools will list only blocks, which were created earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z").SetValue(&tbc.filterConf.MaxTime)
cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").DurationVar(&tbc.timeout)
cmd.Flag("resolution", "Only blocks with these resolutions will be listed. Defaults to all resolutions. Repeated flag.").Default("0s", "5m", "1h").HintAction(listResLevel).DurationListVar(&tbc.resolutions)
cmd.Flag("compaction", "If set, only blocks with these compaction levels will be listed. Repeated flag.").Default().IntsVar(&tbc.compactions)
return tbc
}

Expand All @@ -204,6 +212,7 @@ func (tbc *bucketInspectConfig) registerBucketInspectFlag(cmd extkingpin.FlagCla
PlaceHolder("<name>=\\\"<value>\\\"").StringsVar(&tbc.selector)
cmd.Flag("sort-by", "Sort by columns. It's also possible to sort by multiple columns, e.g. '--sort-by FROM --sort-by UNTIL'. I.e., if the 'FROM' value is equal the rows are then further sorted by the 'UNTIL' value.").
Default("FROM", "UNTIL").EnumsVar(&tbc.sortBy, inspectColumns...)
cmd.Flag("id", "ID (ULID) of the blocks to be marked for inspection (repeated flag). If none are specified, all matching blocks will be inspected.").Default().StringsVar(&tbc.blockIDs)
cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").DurationVar(&tbc.timeout)

return tbc
Expand Down Expand Up @@ -278,11 +287,19 @@ func (tbc *bucketMarkBlockConfig) registerBucketMarkBlockFlag(cmd extkingpin.Fla
}

func (tbc *bucketCleanupConfig) registerBucketCleanupFlag(cmd extkingpin.FlagClause) *bucketCleanupConfig {
tbc.filterConf = &store.FilterConfig{}

cmd.Flag("delete-delay", "Time before a block marked for deletion is deleted from bucket.").Default("48h").DurationVar(&tbc.deleteDelay)
cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)).
Default("30m").DurationVar(&tbc.consistencyDelay)
cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").IntVar(&tbc.blockSyncConcurrency)
cmd.Flag("min-time", "Start of time range limit to cleanup blocks. Thanos Tools will cleanup blocks, which were created later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&tbc.filterConf.MinTime)
cmd.Flag("max-time", "End of time range limit to cleanup. Thanos Tools will cleanup only blocks, which were created earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z").SetValue(&tbc.filterConf.MaxTime)
cmd.Flag("id", "ID (ULID) of the blocks to be marked for cleanup (repeated flag). If none provided no block will be filtered by ID").Default().StringsVar(&tbc.blockIDs)
cmd.Flag("dry-run", "Prints the blocks to be cleaned, that match any provided filters instead of cleaning them. Defaults to true, for user to double check").Default("false").BoolVar(&tbc.dryRun)
return tbc
}

Expand Down Expand Up @@ -339,6 +356,11 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
return err
}

ids, err := parseBlockIDS(tbc.ids)
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, component.Bucket.String(), nil)
if err != nil {
return err
Expand Down Expand Up @@ -376,21 +398,20 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
}

// We ignore any block that has the deletion marker file.
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)}
filters := []block.MetadataFilter{
block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency),
block.NewIDMetaFilter(logger, ids),
}
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
}

var idMatcher func(ulid.ULID) bool = nil
if len(tbc.ids) > 0 {
idsMap := map[string]struct{}{}
for _, bid := range tbc.ids {
id, err := ulid.Parse(bid)
if err != nil {
return errors.Wrap(err, "invalid ULID found in --id flag")
}
if len(ids) > 0 {
idsMap := make(map[string]struct{}, len(ids))
for _, id := range ids {
idsMap[id.String()] = struct{}{}
}

Expand Down Expand Up @@ -448,6 +469,18 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
block.NewTimePartitionMetaFilter(tbc.filterConf.MinTime, tbc.filterConf.MaxTime),
}

if len(tbc.compactions) > 0 {
filters = append(filters, block.NewCompactionMetaFilter(logger, tbc.compactions))
}

if len(tbc.resolutions) > 0 {
var resolutionLevels []int64
for _, lvl := range tbc.resolutions {
resolutionLevels = append(resolutionLevels, lvl.Milliseconds())
}
filters = append(filters, block.NewResolutionMetaFilter(logger, resolutionLevels))
}

if tbc.excludeDelete {
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)
filters = append(filters, ignoreDeletionMarkFilter)
Expand Down Expand Up @@ -544,6 +577,15 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return errors.Wrap(err, "error parsing selector flag")
}

ids, err := parseBlockIDS(tbc.blockIDs)
if err != nil {
return err
}

filters := []block.MetadataFilter{
block.NewIDMetaFilter(logger, ids),
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand All @@ -556,7 +598,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
}
Expand All @@ -576,9 +618,6 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}

blockMetas := make([]*metadata.Meta, 0, len(metas))
for _, meta := range metas {
blockMetas = append(blockMetas, meta)
}

var opPrinter tablePrinter
op := outputType(*output)
Expand Down Expand Up @@ -867,30 +906,36 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat

stubCounter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})

ids, err := parseBlockIDS(tbc.blockIDs)
if err != nil {
return err
}

// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, tbc.deleteDelay/2, tbc.blockSyncConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(tbc.blockSyncConcurrency)
blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, tbc.deleteDelay, stubCounter, stubCounter)

ctx := context.Background()

var sy *compact.Syncer
{
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "/tmp", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
cf := baseMetaFetcher.NewMetaFetcher(
extprom.WrapRegistererWithPrefix(extpromPrefix, reg), []block.MetadataFilter{
block.NewIDMetaFilter(logger, ids),
block.NewTimePartitionMetaFilter(tbc.filterConf.MinTime, tbc.filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, tbc.consistencyDelay, extprom.WrapRegistererWithPrefix(extpromPrefix, reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
},
)

sy, err = compact.NewMetaSyncer(
logger,
reg,
Expand All @@ -911,9 +956,24 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync blocks")
}

level.Info(logger).Log("msg", "synced blocks done")

var filteredIDS []ulid.ULID
for i, _ := range sy.Metas() {
filteredIDS = append(filteredIDS, i)
}

if tbc.dryRun {
compact.DryRunCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt)
blocksCleaner := compact.NewDryRunBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, tbc.deleteDelay, filteredIDS...)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}
level.Info(logger).Log("msg", "finished cleanup dry-run")
return nil
}

blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, tbc.deleteDelay, stubCounter, stubCounter, filteredIDS...)
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, stubCounter, stubCounter, stubCounter)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
Expand Down Expand Up @@ -1231,13 +1291,9 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return errors.New("rewrite configuration should be provided")
}

var ids []ulid.ULID
for _, id := range tbc.blockIDs {
u, err := ulid.Parse(id)
if err != nil {
return errors.Errorf("id is not a valid block ULID, got: %v", id)
}
ids = append(ids, u)
ids, err := parseBlockIDS(tbc.blockIDs)
if err != nil {
return err
}

if err := os.RemoveAll(tbc.tmpDir); err != nil {
Expand Down Expand Up @@ -1518,3 +1574,15 @@ func registerBucketUploadBlocks(app extkingpin.AppClause, objStoreConfig *extfla
return nil
})
}

func parseBlockIDS(ids []string) ([]ulid.ULID, error) {
var ulids []ulid.ULID
for _, id := range ids {
u, err := ulid.Parse(id)
if err != nil {
return nil, errors.Errorf("block.id is not a valid UUID, got: %v", id)
}
ulids = append(ulids, u)
}
return ulids, nil
}
93 changes: 93 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ Flags:
--auto-gomemlimit.ratio=0.9
The ratio of reserved GOMEMLIMIT memory to the
detected maximum container or system memory.
--compaction=COMPACTION ...
If set, only blocks with these compaction levels
will be listed. Repeated flag.
--enable-auto-gomemlimit Enable go runtime to automatically limit memory
consumption.
--exclude-delete Exclude blocks marked for deletion.
Expand Down Expand Up @@ -498,6 +501,9 @@ Flags:
-o, --output="" Optional format in which to print each block's
information. Options are 'json', 'wide' or a
custom template.
--resolution=0s... ... Only blocks with these resolutions will be
listed. Defaults to all resolutions. Repeated
flag.
--selector.relabel-config=<content>
Alternative to 'selector.relabel-config-file'
flag (mutually exclusive). Content of YAML
Expand Down Expand Up @@ -550,6 +556,10 @@ Flags:
consumption.
-h, --help Show context-sensitive help (also try
--help-long and --help-man).
--id=ID ... ID (ULID) of the blocks to be marked for
inspection (repeated flag). If none are
specified, all matching blocks will be
inspected.
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--log.level=info Log filtering level.
Expand Down Expand Up @@ -797,6 +807,89 @@ Flags:

```

### Bucket cleanup

`tools bucket cleanup` can be used to manually clean up blocks in the bucket which have been marked for deletion.

```$ mdox-exec="thanos tools bucket cleanup --help"
usage: thanos tools bucket cleanup [<flags>]

Cleans up all blocks marked for deletion.

Flags:
--auto-gomemlimit.ratio=0.9
The ratio of reserved GOMEMLIMIT memory to the
detected maximum container or system memory.
--block-sync-concurrency=20
Number of goroutines to use when syncing block
metadata from object storage.
--consistency-delay=30m Minimum age of fresh (non-compacted)
blocks before they are being processed.
Malformed blocks older than the maximum of
consistency-delay and 48h0m0s will be removed.
--delete-delay=48h Time before a block marked for deletion is
deleted from bucket.
--enable-auto-gomemlimit Enable go runtime to automatically limit memory
consumption.
-h, --help Show context-sensitive help (also try
--help-long and --help-man).
--id=ID ... ID (ULID) of the blocks to be marked for cleanup
(repeated flag)
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--log.level=info Log filtering level.
--max-time=9999-12-31T23:59:59Z
End of time range limit to cleanup. Thanos
Tools will cleanup only blocks, which were
created earlier than this value. Option can be a
constant time in RFC3339 format or time duration
relative to current time, such as -1d or 2h45m.
Valid duration units are ms, s, m, h, d, w, y.
--min-time=0000-01-01T00:00:00Z
Start of time range limit to cleanup blocks.
Thanos Tools will cleanup blocks, which were
created later than this value. Option can be a
constant time in RFC3339 format or time duration
relative to current time, such as -1d or 2h45m.
Valid duration units are ms, s, m, h, d, w, y.
--objstore.config=<content>
Alternative to 'objstore.config-file'
flag (mutually exclusive). Content of
YAML file that contains object store
configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--objstore.config-file=<file-path>
Path to YAML file that contains object
store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--selector.relabel-config=<content>
Alternative to 'selector.relabel-config-file'
flag (mutually exclusive). Content of YAML
file with relabeling configuration that allows
selecting blocks to act on based on their
external labels. It follows thanos sharding
relabel-config syntax. For format details see:
https://thanos.io/tip/thanos/sharding.md/#relabelling
--selector.relabel-config-file=<file-path>
Path to YAML file with relabeling
configuration that allows selecting blocks
to act on based on their external labels.
It follows thanos sharding relabel-config
syntax. For format details see:
https://thanos.io/tip/thanos/sharding.md/#relabelling
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
with tracing configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--tracing.config-file=<file-path>
Path to YAML file with tracing
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--version Show application version.

```

### Bucket mark

`tools bucket mark` can be used to manually mark block for deletion.
Expand Down
Loading
Loading