Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ type tsdbDiscoveryOpts struct {
discoveryMinBlockAge time.Duration

externalLabelMatchers matcherSlice
replicaLabels []string
}

func setupTSDBDiscovery(ctx context.Context, g *run.Group, log *slog.Logger, bkt objstore.Bucket, opts tsdbDiscoveryOpts) (*locate.TSDBDiscoverer, error) {
Expand All @@ -256,6 +257,7 @@ func setupTSDBDiscovery(ctx context.Context, g *run.Group, log *slog.Logger, bkt
locate.TSDBMetaConcurrency(opts.discoveryConcurrency),
locate.TSDBMinBlockAge(opts.discoveryMinBlockAge),
locate.TSDBMatchExternalLabels(opts.externalLabelMatchers...),
locate.TSDBReplicaLabels(opts.replicaLabels...),
locate.WithLogger(log),
)

Expand Down
6 changes: 5 additions & 1 deletion cmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (opts *tsdbDiscoveryOpts) registerConvertTSDBFlags(cmd *kingpin.CmdClause)
cmd.Flag("tsdb.discovery.concurrency", "concurrency for loading metadata").Default("1").IntVar(&opts.discoveryConcurrency)
cmd.Flag("tsdb.discovery.min-block-age", "blocks that have metrics that are youner then this won't be loaded").Default("0s").DurationVar(&opts.discoveryMinBlockAge)
MatchersVar(cmd.Flag("tsdb.discovery.select-external-labels", "only external labels matching this selector will be discovered").PlaceHolder("SELECTOR"), &opts.externalLabelMatchers)
cmd.Flag("tsdb.discovery.replica-labels", "label names to exclude from stream hash (e.g. receive_node_hostname). Use so compacted and uncompacted blocks write to the same path.").StringsVar(&opts.replicaLabels)
}

func (opts *apiOpts) registerConvertFlags(cmd *kingpin.CmdClause) {
Expand Down Expand Up @@ -194,7 +195,7 @@ func registerConvertApp(app *kingpin.Application) (*kingpin.CmdClause, func(cont
return fmt.Errorf("unable to clean up buffer directory: %w", err)
}

return advanceConversion(iterCtx, log, tsdbBkt, parquetBkt, tsdbDiscoverer, parquetDiscoverer, &planner, opts.conversion.downloadConcurrency, convOpts, blkDir)
return advanceConversion(iterCtx, log, tsdbBkt, parquetBkt, tsdbDiscoverer, parquetDiscoverer, &planner, opts.conversion.downloadConcurrency, convOpts, blkDir, opts.tsdbDiscover.replicaLabels)
}); err != nil {
log.Warn("Error during conversion", slog.Any("err", err))
return nil
Expand All @@ -220,6 +221,7 @@ func advanceConversion(
downloadConcurrency int,
convOpts []convert.ConvertOption,
blkDir string,
replicaLabels []string,
) error {
log.Info("Cleaning up previous state", "block_directory", blkDir)
if err := cleanupDirectory(blkDir); err != nil {
Expand Down Expand Up @@ -251,6 +253,8 @@ func advanceConversion(
)
// Process each step (day) one by one, keeping blocks shared between steps on disk.
for _, step := range plan.Steps {
// Normalize external labels so path hash is stable (e.g. drop replica labels).
step.ExternalLabels = step.ExternalLabels.Without(replicaLabels)
// Close blocks that were open in the previous step but are no longer needed.
prevBlocks = closeUnused(log, step.Sources, prevBlocks)

Expand Down
16 changes: 14 additions & 2 deletions locate/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ type tsdbDiscoveryConfig struct {

externalLabelMatchers []*labels.Matcher
minBlockAge time.Duration
replicaLabels []string
l *slog.Logger
}

Expand Down Expand Up @@ -412,6 +413,14 @@ func TSDBMinBlockAge(d time.Duration) TSDBDiscoveryOption {
}
}

// TSDBReplicaLabels sets label names to exclude from the stream hash (e.g. receive_node_hostname).
// Blocks that differ only by these labels are merged into one stream so compacted and uncompacted blocks write to the same path.
func TSDBReplicaLabels(labels ...string) TSDBDiscoveryOption {
return func(cfg *tsdbDiscoveryConfig) {
cfg.replicaLabels = labels
}
}

type TSDBDiscoverer struct {
bkt objstore.Bucket

Expand All @@ -420,6 +429,7 @@ type TSDBDiscoverer struct {

externalLabelMatchers []*labels.Matcher
minBlockAge time.Duration
replicaLabels []string

concurrency int
l *slog.Logger
Expand All @@ -439,6 +449,7 @@ func NewTSDBDiscoverer(bkt objstore.Bucket, opts ...TSDBDiscoveryOption) *TSDBDi
concurrency: cfg.concurrency,
externalLabelMatchers: cfg.externalLabelMatchers,
minBlockAge: cfg.minBlockAge,
replicaLabels: cfg.replicaLabels,
l: cfg.l,
}
}
Expand All @@ -454,13 +465,14 @@ func (s *TSDBDiscoverer) Streams() map[schema.ExternalLabelsHash]schema.TSDBBloc

for _, m := range s.metas {
extLbls := schema.ExternalLabels(m.Thanos.Labels)
eh := extLbls.Hash()
streamLbls := extLbls.Without(s.replicaLabels)
eh := streamLbls.Hash()

bs, ok := out[eh]
if !ok {
bs = schema.TSDBBlocksStream{
StreamDescriptor: schema.StreamDescriptor{
ExternalLabels: m.Thanos.Labels,
ExternalLabels: streamLbls,
},
DiscoveredDays: make(map[util.Date]struct{}),
}
Expand Down
91 changes: 91 additions & 0 deletions locate/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,97 @@ func TestTSDBDiscoverer(t *testing.T) {
}

})
t.Run("Discoverer merges blocks that differ only by replica label", func(tt *testing.T) {
ctx := tt.Context()
bkt, err := filesystem.NewBucket(tt.TempDir())
require.NoError(tt, err)

for _, m := range []metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JS0DPYGA1HPW5RBZ1KBXCNXK"),
Stats: tsdb.BlockStats{NumChunks: 1},
},
Thanos: metadata.Thanos{
Labels: map[string]string{
"cluster": "eu",
"receive_node_hostname": "node-a",
},
},
},
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JT0DPYGA1HPW5RBZ1KBXCNXK"),
Stats: tsdb.BlockStats{NumChunks: 1},
},
Thanos: metadata.Thanos{
Labels: map[string]string{
"cluster": "eu",
"receive_node_hostname": "node-b",
},
},
},
} {
buf := bytes.NewBuffer(nil)
require.NoError(tt, json.NewEncoder(buf).Encode(m))
require.NoError(tt, bkt.Upload(ctx, filepath.Join(m.ULID.String(), metadata.MetaFilename), buf))
}

discoverer := NewTSDBDiscoverer(bkt, TSDBReplicaLabels("receive_node_hostname"))
require.NoError(tt, discoverer.Discover(ctx))

streams := discoverer.Streams()
require.Len(tt, streams, 1, "blocks that differ only by replica label should merge into one stream")

streamLbls := schema.ExternalLabels{"cluster": "eu"}
eh := streamLbls.Hash()
stream, ok := streams[eh]
require.True(tt, ok)
require.True(tt, maps.Equal(streamLbls, stream.ExternalLabels), "stream external labels should not contain replica label")
require.Len(tt, stream.Metas, 2, "stream should contain both blocks")
})
t.Run("Discoverer without replica labels keeps blocks in separate streams", func(tt *testing.T) {
ctx := tt.Context()
bkt, err := filesystem.NewBucket(tt.TempDir())
require.NoError(tt, err)

for _, m := range []metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JS0DPYGA1HPW5RBZ1KBXCNXK"),
Stats: tsdb.BlockStats{NumChunks: 1},
},
Thanos: metadata.Thanos{
Labels: map[string]string{
"cluster": "eu",
"receive_node_hostname": "node-a",
},
},
},
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse("01JT0DPYGA1HPW5RBZ1KBXCNXK"),
Stats: tsdb.BlockStats{NumChunks: 1},
},
Thanos: metadata.Thanos{
Labels: map[string]string{
"cluster": "eu",
"receive_node_hostname": "node-b",
},
},
},
} {
buf := bytes.NewBuffer(nil)
require.NoError(tt, json.NewEncoder(buf).Encode(m))
require.NoError(tt, bkt.Upload(ctx, filepath.Join(m.ULID.String(), metadata.MetaFilename), buf))
}

discoverer := NewTSDBDiscoverer(bkt)
require.NoError(tt, discoverer.Discover(ctx))

streams := discoverer.Streams()
require.Len(tt, streams, 2, "without replica labels, blocks with different replica label values are separate streams")
})
}

func createBlockForDate(ctx context.Context, t *testing.T, bkt objstore.Bucket, d util.Date, extLabels schema.ExternalLabels) {
Expand Down
22 changes: 22 additions & 0 deletions schema/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package schema

import (
"fmt"
"maps"
"path"
"regexp"
"sort"
Expand Down Expand Up @@ -66,6 +67,27 @@ func (h ExternalLabels) Hash() ExternalLabelsHash {
return ExternalLabelsHash(xxh.Sum64())
}

// Without returns a copy of the label set with the given keys removed.
// Used to drop replica labels so the path hash is stable across compacted (no replica) and uncompacted (replica in meta) blocks.
func (h ExternalLabels) Without(exclude []string) ExternalLabels {
if len(exclude) == 0 {
out := make(ExternalLabels, len(h))
maps.Copy(out, h)
return out
}
excl := make(map[string]struct{}, len(exclude))
for _, k := range exclude {
excl[k] = struct{}{}
}
out := make(ExternalLabels, len(h))
for k, v := range h {
if _, skip := excl[k]; !skip {
out[k] = v
}
}
return out
}

// StreamDescriptor describes a Parquet block stream. Typical structure:
// <ext_labels_hash>/stream.pb
// <ext_labels_hash>/<date>/0.chunks.parquet
Expand Down
43 changes: 43 additions & 0 deletions schema/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package schema

import (
"maps"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos-parquet-gateway/internal/util"
)

Expand All @@ -29,3 +31,44 @@ func TestBlockNameForDay(t *testing.T) {
}
})
}

func TestExternalLabelsWithout(t *testing.T) {
t.Run("empty exclude returns copy", func(t *testing.T) {
in := ExternalLabels{"a": "1", "b": "2"}
out := in.Without(nil)
require.True(t, maps.Equal(in, out), "expected equal maps")
out["c"] = "3"
require.Len(t, in, 2, "original unchanged")
out = in.Without([]string{})
require.True(t, maps.Equal(in, out))
})
t.Run("exclude one label removes it", func(t *testing.T) {
in := ExternalLabels{"cluster": "eu", "receive_node_hostname": "node-a"}
out := in.Without([]string{"receive_node_hostname"})
require.Equal(t, ExternalLabels{"cluster": "eu"}, out)
require.Equal(t, ExternalLabels{"cluster": "eu", "receive_node_hostname": "node-a"}, in, "original unchanged")
})
t.Run("exclude multiple labels", func(t *testing.T) {
in := ExternalLabels{"a": "1", "b": "2", "c": "3"}
out := in.Without([]string{"b"})
require.Equal(t, ExternalLabels{"a": "1", "c": "3"}, out)
out = in.Without([]string{"a", "c"})
require.Equal(t, ExternalLabels{"b": "2"}, out)
})
t.Run("exclude non-existent key is fine", func(t *testing.T) {
in := ExternalLabels{"a": "1"}
out := in.Without([]string{"missing"})
require.True(t, maps.Equal(in, out))
})
t.Run("hash of Without(replica) is stable for same non-replica labels", func(t *testing.T) {
withReplicaA := ExternalLabels{"cluster": "eu", "receive_node_hostname": "node-a"}
withReplicaB := ExternalLabels{"cluster": "eu", "receive_node_hostname": "node-b"}
withoutReplica := ExternalLabels{"cluster": "eu"}
streamA := withReplicaA.Without([]string{"receive_node_hostname"})
streamB := withReplicaB.Without([]string{"receive_node_hostname"})
require.True(t, maps.Equal(streamA, withoutReplica))
require.True(t, maps.Equal(streamB, withoutReplica))
require.Equal(t, streamA.Hash(), streamB.Hash(), "same stream hash for different replicas")
require.NotEqual(t, withReplicaA.Hash(), withReplicaB.Hash(), "raw labels differ by replica")
})
}
Loading