diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 714612bba10..81dec58c670 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -574,6 +574,23 @@ func runReceive( }) } + // Start the disk probe — writes 1KB + fsync to the TSDB data directory once per 5 seconds + // to measure actual disk I/O latency. + { + diskProbe := receive.NewDiskProbe(log.With(logger, "component", "disk-probe"), receive.DiskProbeOptions{ + Dir: conf.dataDir, + Interval: 5 * time.Second, + WriteSize: 1024, + }, reg) + stop := make(chan struct{}) + g.Add(func() error { + diskProbe.Run(stop) + return nil + }, func(err error) { + close(stop) + }) + } + if receiveMode == receive.IngestorOnly { level.Debug(logger).Log("msg", "setting up periodic top metrics collection") topMetricNumSeries := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ @@ -1096,10 +1113,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("").StringVar(&rc.hashringsFileContent) - hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ") + hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmRendezvous)}, ", ") cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext+". Will be overwritten by the tenant-specific algorithm in the hashring config."). Default(string(receive.AlgorithmHashmod)). - EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmAlignedKetama)) + EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmRendezvous)) rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)"). Default("5m")) @@ -1263,6 +1280,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.max-pooled-decompressed-cap", "Maximum capacity (bytes) of a decompressed buffer that will be returned to the pool. Buffers larger than this are discarded to prevent pool ballooning."). Default(fmt.Sprintf("%d", receive.DefaultMaxPooledDecompressedCap)). IntVar(&rc.maxPooledDecompressedCap) + } // determineMode returns the ReceiverMode that this receiver is configured to run in. diff --git a/pkg/receive/aligned_hashring.go b/pkg/receive/aligned_hashring.go deleted file mode 100644 index 30203fbb90d..00000000000 --- a/pkg/receive/aligned_hashring.go +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package receive - -import ( - "fmt" - "sort" - "strconv" - - "github.com/cespare/xxhash" - "github.com/pkg/errors" - - "github.com/thanos-io/thanos/pkg/strutil" -) - -// groupByAZ groups endpoints by Availability Zone and sorts them by their inferred ordinal in a k8s statefulset. -// It returns a 2D slice where each inner slice represents an AZ (sorted alphabetically) -// and contains endpoints sorted by ordinal. All inner slices are truncated to the -// length of the largest common sequence of ordinals starting from 0 across all AZs. -// All endpoint addresses must be valid k8s DNS names with 0-index ordinals at the end of the pod name. -func groupByAZ(endpoints []Endpoint) ([][]Endpoint, error) { - if len(endpoints) == 0 { - return nil, errors.New("no endpoints provided") - } - - // Group endpoints by AZ and then by ordinal. - azEndpoints := make(map[string]map[int]Endpoint) - for _, ep := range endpoints { - ordinal, err := strutil.ExtractPodOrdinal(ep.Address) - if err != nil { - return nil, errors.Wrapf(err, "failed to extract ordinal from address %s", ep.Address) - } - if _, ok := azEndpoints[ep.AZ]; !ok { - azEndpoints[ep.AZ] = make(map[int]Endpoint) - } - if _, exists := azEndpoints[ep.AZ][ordinal]; exists { - return nil, fmt.Errorf("duplicate endpoint ordinal %d for address %s in AZ %s", ordinal, ep.Address, ep.AZ) - } - azEndpoints[ep.AZ][ordinal] = ep - } - - // Get sorted list of AZ names. - sortedAZs := make([]string, 0, len(azEndpoints)) - for az := range azEndpoints { - sortedAZs = append(sortedAZs, az) - } - sort.Strings(sortedAZs) - - // Determine the maximum common ordinal across all AZs. - maxCommonOrdinal := -1 - for i := 0; ; i++ { - presentInAllAZs := true - for _, az := range sortedAZs { - if _, ok := azEndpoints[az][i]; !ok { - presentInAllAZs = false - if i == 0 { - return nil, fmt.Errorf("AZ %q is missing endpoint with ordinal 0", az) - } - break - } - } - if !presentInAllAZs { - maxCommonOrdinal = i - 1 - break - } - } - if maxCommonOrdinal < 0 { - return nil, errors.New("no common endpoints with ordinal 0 found across all AZs") - } - numAZs := len(sortedAZs) - result := make([][]Endpoint, numAZs) - for i, az := range sortedAZs { - result[i] = make([]Endpoint, 0, maxCommonOrdinal+1) - for j := 0; j <= maxCommonOrdinal; j++ { - result[i] = append(result[i], azEndpoints[az][j]) - } - } - return result, nil -} - -// newAlignedKetamaHashring creates a Ketama hash ring where replicas are strictly aligned across Availability Zones. -// Each section on the hash ring corresponds to a primary endpoint (taken from the first AZ) and its -// aligned replicas in other AZs (endpoints with the same ordinal). The hash for a section is calculated -// based *only* on the primary endpoint's address. -func newAlignedKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) { - if replicationFactor == 0 { - return nil, errors.New("replication factor cannot be zero") - } - if sectionsPerNode <= 0 { - return nil, errors.New("sections per node must be positive") - } - groupedEndpoints, err := groupByAZ(endpoints) - if err != nil { - return nil, errors.Wrap(err, "failed to group endpoints by AZ") - } - numAZs := len(groupedEndpoints) - if numAZs == 0 { - return nil, errors.New("no endpoint groups found after grouping by AZ") - } - if uint64(numAZs) != replicationFactor { - return nil, fmt.Errorf("number of AZs (%d) must equal replication factor (%d)", numAZs, replicationFactor) - } - numEndpointsPerAZ := len(groupedEndpoints[0]) - if numEndpointsPerAZ == 0 { - return nil, errors.New("AZ groups are empty after grouping") - } - totalEndpoints := numAZs * numEndpointsPerAZ - flatEndpoints := make([]Endpoint, 0, totalEndpoints) - for azIndex := 0; azIndex < numAZs; azIndex++ { - flatEndpoints = append(flatEndpoints, groupedEndpoints[azIndex]...) - } - hasher := xxhash.New() - ringSections := make(sections, 0, numEndpointsPerAZ*sectionsPerNode) - - // Iterate through primary endpoints (those in the first AZ) to define sections. - for primaryOrdinalIndex := 0; primaryOrdinalIndex < numEndpointsPerAZ; primaryOrdinalIndex++ { - primaryEndpoint := groupedEndpoints[0][primaryOrdinalIndex] - for sectionIndex := 1; sectionIndex <= sectionsPerNode; sectionIndex++ { - hasher.Reset() - _, _ = hasher.Write([]byte(primaryEndpoint.Address + ":" + strconv.Itoa(sectionIndex))) - sectionHash := hasher.Sum64() - sec := §ion{ - hash: sectionHash, - az: primaryEndpoint.AZ, - endpointIndex: uint64(primaryOrdinalIndex), - replicas: make([]uint64, 0, replicationFactor), - } - - // Find indices of all replicas (including primary) in the flat list and verify alignment. - for azIndex := 0; azIndex < numAZs; azIndex++ { - replicaFlatIndex := azIndex*numEndpointsPerAZ + primaryOrdinalIndex - replicaEndpoint := flatEndpoints[replicaFlatIndex] - replicaOrdinal, err := strutil.ExtractPodOrdinal(replicaEndpoint.Address) - if err != nil { - return nil, errors.Wrapf(err, "failed to extract ordinal from replica endpoint %s in AZ %s", replicaEndpoint.Address, replicaEndpoint.AZ) - } - if replicaOrdinal != primaryOrdinalIndex { - return nil, fmt.Errorf("ordinal mismatch for primary endpoint %s (ordinal %d): replica %s in AZ %s has ordinal %d", - primaryEndpoint.Address, primaryOrdinalIndex, replicaEndpoint.Address, replicaEndpoint.AZ, replicaOrdinal) - } - sec.replicas = append(sec.replicas, uint64(replicaFlatIndex)) - } - ringSections = append(ringSections, sec) - } - } - sort.Sort(ringSections) - return &ketamaHashring{ - endpoints: flatEndpoints, - sections: ringSections, - numEndpoints: uint64(totalEndpoints), - }, nil -} diff --git a/pkg/receive/aligned_hashring_test.go b/pkg/receive/aligned_hashring_test.go deleted file mode 100644 index c5e9feb611e..00000000000 --- a/pkg/receive/aligned_hashring_test.go +++ /dev/null @@ -1,378 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package receive - -import ( - "sort" - "strconv" - "strings" - "testing" - - "github.com/efficientgo/core/testutil" - "github.com/stretchr/testify/require" - "github.com/thanos-io/thanos/pkg/store/labelpb" - "github.com/thanos-io/thanos/pkg/store/storepb/prompb" - "github.com/thanos-io/thanos/pkg/strutil" -) - -// podDNS creates a DNS-like string for testing endpoint addresses. -func podDNS(name string, ordinal int) string { - return name + "-" + strconv.Itoa(ordinal) + ".test-svc.test-namespace.svc.cluster.local" -} - -func TestGroupByAZ(t *testing.T) { - // Test setup endpoints. - ep0a := Endpoint{Address: podDNS("pod", 0), AZ: "zone-a"} - ep1a := Endpoint{Address: podDNS("pod", 1), AZ: "zone-a"} - ep2a := Endpoint{Address: podDNS("pod", 2), AZ: "zone-a"} - ep0b := Endpoint{Address: podDNS("pod", 0), AZ: "zone-b"} - ep1b := Endpoint{Address: podDNS("pod", 1), AZ: "zone-b"} - ep0c := Endpoint{Address: podDNS("pod", 0), AZ: "zone-c"} - ep1c := Endpoint{Address: podDNS("pod", 1), AZ: "zone-c"} - invalidEp := Endpoint{Address: "invalid-address-format", AZ: "zone-a"} - duplicateEp0a := Endpoint{Address: podDNS("anotherpod", 0), AZ: "zone-a"} // Same ordinal (0) as ep0a in zone-a. - - testCases := map[string]struct { - inputEndpoints []Endpoint - expectedResult [][]Endpoint - expectError bool - errorContains string - }{ - "error on empty input": { - inputEndpoints: []Endpoint{}, - expectedResult: nil, - expectError: true, - errorContains: "no endpoints provided", - }, - "single AZ, multiple endpoints": { - inputEndpoints: []Endpoint{ep1a, ep0a, ep2a}, - expectedResult: [][]Endpoint{ - {ep0a, ep1a, ep2a}, - }, - expectError: false, - }, - "multiple AZs, balanced and ordered": { - inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b}, - expectedResult: [][]Endpoint{ - {ep0a, ep1a}, - {ep0b, ep1b}, - }, - expectError: false, - }, - "multiple AZs, different counts, stops at first missing ordinal > 0": { - inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep2a, ep0c}, - expectedResult: [][]Endpoint{ - {ep0a}, - {ep0b}, - {ep0c}, - }, - expectError: false, - }, - "error if ordinal 0 missing in any AZ": { - inputEndpoints: []Endpoint{ep1a, ep2a, ep1b}, - expectedResult: nil, - expectError: true, - errorContains: "missing endpoint with ordinal 0", - }, - "error if ordinal 0 missing in only one AZ": { - inputEndpoints: []Endpoint{ep0a, ep1a, ep1b}, - expectedResult: nil, - expectError: true, - errorContains: `AZ "zone-b" is missing endpoint with ordinal 0`, - }, - "error on invalid address format": { - inputEndpoints: []Endpoint{ep0a, invalidEp, ep0b}, - expectedResult: nil, - expectError: true, - errorContains: "failed to extract ordinal from address invalid-address-format", - }, - "error on duplicate ordinal within an AZ": { - inputEndpoints: []Endpoint{ep0a, ep1a, ep0b, duplicateEp0a}, - expectedResult: nil, - expectError: true, - errorContains: "duplicate endpoint ordinal 0 for address " + duplicateEp0a.Address + " in AZ zone-a", - }, - "AZ sorting check": { - inputEndpoints: []Endpoint{ep0b, ep0c, ep0a}, - expectedResult: [][]Endpoint{ - {ep0a}, - {ep0b}, - {ep0c}, - }, - expectError: false, - }, - "multiple AZs, stops correctly when next ordinal missing everywhere": { - inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, - expectedResult: [][]Endpoint{ - {ep0a, ep1a}, - {ep0b, ep1b}, - {ep0c, ep1c}, - }, - expectError: false, - }, - } - - for tcName, tc := range testCases { - t.Run(tcName, func(t *testing.T) { - result, err := groupByAZ(tc.inputEndpoints) - - if tc.expectError { - testutil.NotOk(t, err) - if tc.errorContains != "" { - testutil.Assert(t, strings.Contains(err.Error(), tc.errorContains), "Expected error message to contain '%s', but got: %v", tc.errorContains, err) - } - testutil.Assert(t, result == nil, "Expected nil result on error, got: %v", result) - } else { - testutil.Ok(t, err) - testutil.Equals(t, tc.expectedResult, result) - - // Verify outer slice (AZs) is sorted alphabetically. - if err == nil && len(result) > 1 { - azOrderCorrect := sort.SliceIsSorted(result, func(i, j int) bool { - return result[i][0].AZ < result[j][0].AZ - }) - testutil.Assert(t, azOrderCorrect, "Outer slice is not sorted by AZ") - } - } - }) - } -} - -func TestAlignedKetamaHashringGet(t *testing.T) { - t.Parallel() - - ep0a := Endpoint{Address: podDNS("pod", 0), AZ: "zone-a"} - ep1a := Endpoint{Address: podDNS("pod", 1), AZ: "zone-a"} - ep0b := Endpoint{Address: podDNS("pod", 0), AZ: "zone-b"} - ep1b := Endpoint{Address: podDNS("pod", 1), AZ: "zone-b"} - ep0c := Endpoint{Address: podDNS("pod", 0), AZ: "zone-c"} - ep1c := Endpoint{Address: podDNS("pod", 1), AZ: "zone-c"} - invalidEp := Endpoint{Address: "invalid-address", AZ: "zone-a"} - duplicateEp0a := Endpoint{Address: podDNS("anotherpod", 0), AZ: "zone-a"} - - tsForReplicaTest := &prompb.TimeSeries{ - Labels: []labelpb.ZLabel{{Name: "test", Value: "replica-routing"}}, - } - - testCases := map[string]struct { - inputEndpoints []Endpoint - replicationFactor uint64 - sectionsPerNode int - - tenant string - ts *prompb.TimeSeries - n uint64 - expectedEndpoint Endpoint - - expectConstructorError bool - constructorErrorContains string - expectGetNError bool - getNErrorContains string - }{ - "valid 2 AZs, RF=2, get replica 0": { - inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b}, - replicationFactor: 2, - sectionsPerNode: SectionsPerNode, - tenant: "tenant1", - ts: tsForReplicaTest, - n: 0, - expectedEndpoint: ep0a, - expectConstructorError: false, - expectGetNError: false, - }, - "valid 2 AZs, RF=2, get replica 1": { - inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b}, - replicationFactor: 2, - sectionsPerNode: SectionsPerNode, - tenant: "tenant1", - ts: tsForReplicaTest, - n: 1, - expectedEndpoint: ep0b, - expectConstructorError: false, - expectGetNError: false, - }, - "valid 3 AZs, RF=3, get replica 0": { - inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, - replicationFactor: 3, - sectionsPerNode: SectionsPerNode, - tenant: "tenant1", - ts: tsForReplicaTest, - n: 0, - expectedEndpoint: ep0a, - expectConstructorError: false, - expectGetNError: false, - }, - "valid 3 AZs, RF=3, get replica 1": { - inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, - replicationFactor: 3, - sectionsPerNode: SectionsPerNode, - tenant: "tenant1", - ts: tsForReplicaTest, - n: 1, - expectedEndpoint: ep0b, - expectConstructorError: false, - expectGetNError: false, - }, - "valid 3 AZs, RF=3, get replica 2": { - inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, - replicationFactor: 3, - sectionsPerNode: SectionsPerNode, - tenant: "tenant1", - ts: tsForReplicaTest, - n: 2, - expectedEndpoint: ep0c, - expectConstructorError: false, - expectGetNError: false, - }, - "error: empty input": { - inputEndpoints: []Endpoint{}, - replicationFactor: 1, - sectionsPerNode: SectionsPerNode, - expectConstructorError: true, - constructorErrorContains: "no endpoints provided", - }, - "error: invalid address": { - inputEndpoints: []Endpoint{ep0a, invalidEp, ep0b}, - replicationFactor: 2, - sectionsPerNode: SectionsPerNode, - expectConstructorError: true, - constructorErrorContains: "failed to extract ordinal from address invalid-address", - }, - "error: duplicate ordinal": { - inputEndpoints: []Endpoint{ep0a, ep1a, ep0b, duplicateEp0a}, - replicationFactor: 2, - sectionsPerNode: SectionsPerNode, - expectConstructorError: true, - constructorErrorContains: "duplicate endpoint", - }, - "error: missing ordinal 0": { - inputEndpoints: []Endpoint{ep1a, ep1b}, - replicationFactor: 2, - sectionsPerNode: SectionsPerNode, - expectConstructorError: true, - constructorErrorContains: "failed to group endpoints by AZ: AZ \"zone-a\" is missing endpoint with ordinal 0", - }, - "error: AZ count != RF (too few AZs)": { - inputEndpoints: []Endpoint{ep0a, ep1a}, - replicationFactor: 2, - sectionsPerNode: SectionsPerNode, - expectConstructorError: true, - constructorErrorContains: "number of AZs (1) must equal replication factor (2)", - }, - "error: AZ count != RF (too many AZs)": { - inputEndpoints: []Endpoint{ep0a, ep1a, ep0b, ep1b, ep0c, ep1c}, - replicationFactor: 2, - sectionsPerNode: SectionsPerNode, - expectConstructorError: true, - constructorErrorContains: "number of AZs (3) must equal replication factor (2)", - }, - "constructor success with unbalanced AZs (uses common subset)": { - inputEndpoints: []Endpoint{ep0a, ep1a, ep0b}, - replicationFactor: 2, - sectionsPerNode: SectionsPerNode, - expectConstructorError: false, - }, - "error: GetN index out of bounds (n >= numEndpoints)": { - inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b}, - replicationFactor: 2, - sectionsPerNode: SectionsPerNode, - tenant: "tenant1", - ts: tsForReplicaTest, - n: 4, - expectConstructorError: false, - expectGetNError: true, - getNErrorContains: "insufficient nodes; have 4, want 5", - }, - } - - for tcName, tc := range testCases { - t.Run(tcName, func(t *testing.T) { - hashRing, err := newAlignedKetamaHashring(tc.inputEndpoints, tc.sectionsPerNode, tc.replicationFactor) - - if tc.expectConstructorError { - require.Error(t, err, "Expected constructor error") - require.Nil(t, hashRing, "Hashring should be nil on constructor error") - if tc.constructorErrorContains != "" { - require.Contains(t, err.Error(), tc.constructorErrorContains, "Constructor error message mismatch") - } - return - } - - require.NoError(t, err, "Expected constructor to succeed") - require.NotNil(t, hashRing, "Hashring should not be nil on successful construction") - - if tc.ts == nil && !tc.expectGetNError { - return - } - if tc.ts == nil && tc.expectGetNError { - tc.ts = &prompb.TimeSeries{Labels: []labelpb.ZLabel{{Name: "dummy", Value: "dummy"}}} - } - - result, getNErr := hashRing.GetN(tc.tenant, tc.ts, tc.n) - if tc.expectGetNError { - require.Error(t, getNErr, "Expected GetN error") - if tc.getNErrorContains != "" { - require.Contains(t, getNErr.Error(), tc.getNErrorContains, "GetN error message mismatch") - } - } else { - require.NoError(t, getNErr, "Expected GetN to succeed") - testutil.Equals(t, tc.expectedEndpoint, result, "GetN returned unexpected endpoint") - } - }) - } -} - -func TestAlignedKetamaHashringReplicaOrdinals(t *testing.T) { - t.Parallel() - - var endpoints []Endpoint - for i := 0; i < 20; i++ { - endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-a"}) - } - for i := 0; i < 20; i++ { - endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-b"}) - } - for i := 0; i < 20; i++ { - endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-c"}) - } - replicationFactor := uint64(3) - sectionsPerNode := 10 - - hashRing, err := newAlignedKetamaHashring(endpoints, sectionsPerNode, replicationFactor) - require.NoError(t, err, "Aligned hashring constructor failed") - require.NotNil(t, hashRing, "Hashring should not be nil") - require.NotEmpty(t, hashRing.sections, "Hashring should contain sections") - - // Verify that all replicas within a section have the same ordinal. - for i, s := range hashRing.sections { - if len(s.replicas) == 0 { - continue - } - - expectedOrdinal := -1 - - for replicaNum, replicaIndex := range s.replicas { - require.Less(t, int(replicaIndex), len(hashRing.endpoints), - "Section %d (hash %d), Replica %d: index %d out of bounds for endpoints list (len %d)", - i, s.hash, replicaNum, replicaIndex, len(hashRing.endpoints)) - - endpoint := hashRing.endpoints[replicaIndex] - ordinal, err := strutil.ExtractPodOrdinal(endpoint.Address) - require.NoError(t, err, - "Section %d (hash %d), Replica %d: failed to extract ordinal from address %s", - i, s.hash, replicaNum, endpoint.Address) - - if expectedOrdinal == -1 { - expectedOrdinal = ordinal - } else { - require.Equal(t, expectedOrdinal, ordinal, - "Section %d (hash %d), Replica %d (%s): Mismatched ordinal. Expected %d, got %d. Replicas in section: %v", - i, s.hash, replicaNum, endpoint.Address, expectedOrdinal, ordinal, s.replicas) - } - } - if len(s.replicas) > 0 { - require.NotEqual(t, -1, expectedOrdinal, "Section %d (hash %d): Failed to determine expected ordinal for replicas %v", i, s.hash, s.replicas) - } - } -} diff --git a/pkg/receive/config.go b/pkg/receive/config.go index 75bda21e491..a81508f50f2 100644 --- a/pkg/receive/config.go +++ b/pkg/receive/config.go @@ -67,6 +67,7 @@ type Endpoint struct { Address string `json:"address"` CapNProtoAddress string `json:"capnproto_address"` AZ string `json:"az"` + Shard int `json:"shard"` } func (e *Endpoint) String() string { @@ -113,6 +114,7 @@ func (e *Endpoint) unmarshal(data []byte) error { e.Address = configEndpoint.Address e.AZ = configEndpoint.AZ e.CapNProtoAddress = configEndpoint.CapNProtoAddress + e.Shard = configEndpoint.Shard return nil } @@ -140,13 +142,8 @@ type ShuffleShardingConfig struct { CacheSize int `json:"cache_size"` // ZoneAwarenessDisabled disables zone awareness. We still try to spread the load // across the available zones, but we don't try to balance the shards across zones. - ZoneAwarenessDisabled bool `json:"zone_awareness_disabled"` - // AlignedOrdinalSharding enables aligned ordinal selection for shuffle sharding. - // When true and using aligned_ketama algorithm, the same ordinals are selected - // across all AZs, preserving strict replica alignment. ShardSize represents the - // number of ordinals to select (resulting in ShardSize * numAZs total endpoints). - AlignedOrdinalSharding bool `json:"aligned_ordinal_sharding"` - Overrides []ShuffleShardingOverrideConfig `json:"overrides,omitempty"` + ZoneAwarenessDisabled bool `json:"zone_awareness_disabled"` + Overrides []ShuffleShardingOverrideConfig `json:"overrides,omitempty"` } type tenantMatcher string diff --git a/pkg/receive/disk_probe.go b/pkg/receive/disk_probe.go new file mode 100644 index 00000000000..ef939e00f83 --- /dev/null +++ b/pkg/receive/disk_probe.go @@ -0,0 +1,207 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "os" + "path/filepath" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// DiskProbe performs synthetic write+fsync operations on the TSDB data directory +// to measure actual disk I/O latency. It exposes the results as Prometheus metrics. +// +// The probe performs the following operations: +// - Writes a fixed payload to a file in the data directory +// - Calls fsync to flush data to disk +// - Measures and records the duration +// - Tracks "stuck" writes that have not yet completed +type DiskProbe struct { + logger log.Logger + dir string + interval time.Duration + payload []byte + + duration prometheus.Histogram + stuckDur prometheus.Gauge + failTotal prometheus.Counter + + // mu protects writeStart for stuck-write detection. + mu sync.Mutex + writeStart time.Time + writing bool +} + +// DiskProbeOptions configures the disk probe. +type DiskProbeOptions struct { + // Dir is the directory to probe (should be on the same filesystem as the TSDB data). + Dir string + // Interval between probe writes. Default: 5s. + Interval time.Duration + // WriteSize is the number of bytes written per probe. Default: 1024. + WriteSize int +} + +func (o *DiskProbeOptions) defaults() { + if o.Interval == 0 { + o.Interval = 5 * time.Second + } + if o.WriteSize == 0 { + o.WriteSize = 1024 + } +} + +// NewDiskProbe creates a new DiskProbe. +func NewDiskProbe(logger log.Logger, opts DiskProbeOptions, reg prometheus.Registerer) *DiskProbe { + opts.defaults() + + payload := make([]byte, opts.WriteSize) + + return &DiskProbe{ + logger: logger, + dir: opts.Dir, + interval: opts.Interval, + payload: payload, + duration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "disk_probe_duration_seconds", + Help: "Duration of synthetic disk write+fsync probe operations on the TSDB data directory.", + Buckets: []float64{0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}, + }), + stuckDur: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "disk_probe_stuck_duration_seconds", + Help: "Duration in seconds that the current disk probe write has been in-flight. 0 if no write is stuck.", + }), + failTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "disk_probe_failures_total", + Help: "Total number of disk probe failures (open, write, or fsync errors).", + }), + } +} + +// Run starts the disk probe loop. It blocks until stop is closed. +// It runs two loops: +// - The probe loop, which performs a write+fsync at the configured interval. +// - The stuck-write monitor, which updates the stuck duration gauge every second. +func (p *DiskProbe) Run(stop <-chan struct{}) { + probePath := filepath.Join(p.dir, ".thanos_disk_probe") + + // Clean up probe file on exit. + defer os.Remove(probePath) + + // Start the stuck-write monitor in a separate goroutine. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + p.monitorStuckWrites(stop) + }() + + ticker := time.NewTicker(p.interval) + defer ticker.Stop() + + for { + select { + case <-stop: + wg.Wait() + return + case <-ticker.C: + p.mu.Lock() + stuck := p.writing + p.mu.Unlock() + if stuck { + level.Debug(p.logger).Log("msg", "skipping disk probe, previous write still in-flight") + continue + } + p.probe(probePath) + } + } +} + +// probe performs a single write+fsync and records the duration. +func (p *DiskProbe) probe(path string) { + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) + if err != nil { + level.Warn(p.logger).Log("msg", "disk probe open failed", "err", err) + p.failTotal.Inc() + return + } + + p.mu.Lock() + p.writeStart = time.Now() + p.writing = true + p.mu.Unlock() + + start := time.Now() + _, writeErr := f.Write(p.payload) + if writeErr != nil { + level.Warn(p.logger).Log("msg", "disk probe write failed", "err", writeErr) + p.failTotal.Inc() + f.Close() + p.markDone() + return + } + + syncErr := f.Sync() + duration := time.Since(start) + + f.Close() + p.markDone() + + if syncErr != nil { + level.Warn(p.logger).Log("msg", "disk probe fsync failed", "err", syncErr) + p.failTotal.Inc() + return + } + + p.duration.Observe(duration.Seconds()) + + if duration > 1*time.Second { + level.Warn(p.logger).Log("msg", "slow disk probe detected", "duration", duration) + } +} + +func (p *DiskProbe) markDone() { + p.mu.Lock() + p.writing = false + p.writeStart = time.Time{} + p.mu.Unlock() +} + +// monitorStuckWrites updates the stuck duration gauge every second. +// If a probe write is in-flight, it reports how long it's been stuck. +func (p *DiskProbe) monitorStuckWrites(stop <-chan struct{}) { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-stop: + return + case <-ticker.C: + p.mu.Lock() + if p.writing { + stuckDuration := time.Since(p.writeStart) + p.stuckDur.Set(stuckDuration.Seconds()) + if stuckDuration > 5*time.Second { + level.Warn(p.logger).Log("msg", "disk probe write appears stuck", "stuck_duration", stuckDuration) + } + } else { + p.stuckDur.Set(0) + } + p.mu.Unlock() + } + } +} diff --git a/pkg/receive/disk_probe_test.go b/pkg/receive/disk_probe_test.go new file mode 100644 index 00000000000..0cbbb459663 --- /dev/null +++ b/pkg/receive/disk_probe_test.go @@ -0,0 +1,165 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" +) + +func TestDiskProbe_SingleProbe(t *testing.T) { + dir := t.TempDir() + reg := prometheus.NewRegistry() + + probe := NewDiskProbe(log.NewNopLogger(), DiskProbeOptions{ + Dir: dir, + Interval: 1 * time.Second, + WriteSize: 1024, + }, reg) + + probePath := filepath.Join(dir, ".thanos_disk_probe") + probe.probe(probePath) + + // Verify no failures occurred. + testutil.Equals(t, float64(0), promtest.ToFloat64(probe.failTotal)) +} + +func TestDiskProbe_FailureOnBadDir(t *testing.T) { + reg := prometheus.NewRegistry() + + probe := NewDiskProbe(log.NewNopLogger(), DiskProbeOptions{ + Dir: "/nonexistent/path/that/should/not/exist", + Interval: 1 * time.Second, + WriteSize: 1024, + }, reg) + + probePath := filepath.Join("/nonexistent/path/that/should/not/exist", ".thanos_disk_probe") + probe.probe(probePath) + + // Should have recorded a failure. + testutil.Equals(t, float64(1), promtest.ToFloat64(probe.failTotal)) +} + +func TestDiskProbe_RunAndStop(t *testing.T) { + dir := t.TempDir() + reg := prometheus.NewRegistry() + + probe := NewDiskProbe(log.NewNopLogger(), DiskProbeOptions{ + Dir: dir, + Interval: 100 * time.Millisecond, + }, reg) + + stop := make(chan struct{}) + done := make(chan struct{}) + + go func() { + probe.Run(stop) + close(done) + }() + + // Let it run a few probes. + time.Sleep(350 * time.Millisecond) + + // Stop the probe. + close(stop) + + // Wait for Run to return. + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("Run did not return after stop was closed") + } + + // Verify no failures. + failCount := promtest.ToFloat64(probe.failTotal) + testutil.Equals(t, float64(0), failCount) + + // Probe file should be cleaned up. + probePath := filepath.Join(dir, ".thanos_disk_probe") + _, err := os.Stat(probePath) + testutil.Assert(t, os.IsNotExist(err), "probe file should be removed on shutdown") +} + +func TestDiskProbe_StuckWriteDetection(t *testing.T) { + dir := t.TempDir() + reg := prometheus.NewRegistry() + + probe := NewDiskProbe(log.NewNopLogger(), DiskProbeOptions{ + Dir: dir, + Interval: 1 * time.Hour, // Long interval so the probe loop doesn't interfere. + }, reg) + + // Simulate a stuck write by manually setting the write state. + probe.mu.Lock() + probe.writing = true + probe.writeStart = time.Now().Add(-3 * time.Second) // Pretend write started 3s ago. + probe.mu.Unlock() + + // Start and quickly check the stuck monitor. + stop := make(chan struct{}) + done := make(chan struct{}) + go func() { + probe.monitorStuckWrites(stop) + close(done) + }() + + // Wait for the monitor to tick. + time.Sleep(1500 * time.Millisecond) + + stuckDur := promtest.ToFloat64(probe.stuckDur) + // Should report approximately 4.5s (3s initial + ~1.5s from sleep). + testutil.Assert(t, stuckDur > 3.0, "stuck duration should be > 3s, got %f", stuckDur) + + // Clear the write state. + probe.markDone() + + // Wait for monitor to update. + time.Sleep(1500 * time.Millisecond) + + stuckDur = promtest.ToFloat64(probe.stuckDur) + testutil.Equals(t, float64(0), stuckDur) + + close(stop) + <-done +} + +func TestDiskProbe_DefaultOptions(t *testing.T) { + opts := DiskProbeOptions{Dir: "/tmp/test"} + opts.defaults() + + testutil.Equals(t, 5*time.Second, opts.Interval) + testutil.Equals(t, 1024, opts.WriteSize) +} + +func TestDiskProbe_CustomWriteSize(t *testing.T) { + dir := t.TempDir() + reg := prometheus.NewRegistry() + + writeSize := 4096 + probe := NewDiskProbe(log.NewNopLogger(), DiskProbeOptions{ + Dir: dir, + Interval: 1 * time.Second, + WriteSize: writeSize, + }, reg) + + testutil.Equals(t, writeSize, len(probe.payload)) + + // Run a probe and verify the file was written with the correct size. + probePath := filepath.Join(dir, ".thanos_disk_probe") + probe.probe(probePath) + + info, err := os.Stat(probePath) + testutil.Ok(t, err) + testutil.Equals(t, int64(writeSize), info.Size()) + + // Clean up. + os.Remove(probePath) +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 1363177c7fa..7343c926404 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1740,7 +1740,7 @@ func (p *peerGroup) closeUnlocked(endpoint Endpoint) error { p.connections[endpoint].wp.Close() delete(p.connections, endpoint) if err := c.client.Close(); err != nil { - return fmt.Errorf("closing connection for %s", endpoint) + return fmt.Errorf("closing connection for %v", endpoint) } return nil diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index e1f753d38b8..0190d082c52 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -267,7 +267,7 @@ func (g *fakePeersGroup) close(addr Endpoint) error { func (g *fakePeersGroup) getConnection(_ context.Context, endpoint Endpoint) (WriteableStoreAsyncClient, error) { c, ok := g.clients[endpoint] if !ok { - return nil, fmt.Errorf("client %s not found", endpoint) + return nil, fmt.Errorf("client %v not found", endpoint) } return c, nil } diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 0b63f9aea8b..dd26e3a50e0 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -27,8 +27,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/thanos/pkg/store/labelpb" - "github.com/thanos-io/thanos/pkg/strutil" - "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -36,9 +34,9 @@ import ( type HashringAlgorithm string const ( - AlgorithmHashmod HashringAlgorithm = "hashmod" - AlgorithmKetama HashringAlgorithm = "ketama" - AlgorithmAlignedKetama HashringAlgorithm = "aligned_ketama" + AlgorithmHashmod HashringAlgorithm = "hashmod" + AlgorithmKetama HashringAlgorithm = "ketama" + AlgorithmRendezvous HashringAlgorithm = "rendezvous" // SectionsPerNode is the number of sections in the ring assigned to each node // in the ketama hashring. A higher number yields a better series distribution, @@ -366,7 +364,7 @@ type shuffleShardHashring struct { nodes []Endpoint // cache stores tenant-specific subrings. The value is Hashring to support both - // *ketamaHashring (regular shuffle sharding) and aligned ketama subrings. + // *ketamaHashring (regular shuffle sharding) and *rendezvousHashring subrings. cache *lru.Cache[string, Hashring] metrics *shuffleShardCacheMetrics @@ -594,9 +592,10 @@ func (s *shuffleShardHashring) getTenantShardCached(tenant string) (Hashring, er var h Hashring var err error - if s.shuffleShardingConfig.AlignedOrdinalSharding { - h, err = s.getTenantShardAligned(tenant) - } else { + switch s.baseRing.(type) { + case *rendezvousHashring: + h, err = s.getTenantShardRendezvous(tenant) + default: h, err = s.getTenantShard(tenant) } if err != nil { @@ -710,248 +709,207 @@ func (s *shuffleShardHashring) getTenantShard(tenant string) (*ketamaHashring, e return newKetamaHashring(finalNodes, SectionsPerNode, s.replicationFactor) } -// ordinalSection represents a section in the ordinal ring for consistent hashing. -type ordinalSection struct { - ordinal int - hash uint64 -} +// groupByAZ groups endpoints by Availability Zone and sorts them by shard. +// It returns a 2D slice where each inner slice represents an AZ (sorted alphabetically) +// and contains endpoints sorted by shard. All inner slices are truncated to the +// length of the largest common sequence of shards starting from 0 across all AZs. +func groupByAZ(endpoints []Endpoint) ([][]Endpoint, error) { + if len(endpoints) == 0 { + return nil, errors.New("no endpoints provided") + } + + // Group endpoints by AZ and then by shard. + azEndpoints := make(map[string]map[int]Endpoint) + for _, ep := range endpoints { + if _, ok := azEndpoints[ep.AZ]; !ok { + azEndpoints[ep.AZ] = make(map[int]Endpoint) + } + if _, exists := azEndpoints[ep.AZ][ep.Shard]; exists { + return nil, fmt.Errorf("duplicate endpoint shard %d for address %s in AZ %s", ep.Shard, ep.Address, ep.AZ) + } + azEndpoints[ep.AZ][ep.Shard] = ep + } -type ordinalSections []ordinalSection + // Get sorted list of AZ names. + sortedAZs := make([]string, 0, len(azEndpoints)) + for az := range azEndpoints { + sortedAZs = append(sortedAZs, az) + } + sort.Strings(sortedAZs) -func (o ordinalSections) Len() int { return len(o) } -func (o ordinalSections) Less(i, j int) bool { return o[i].hash < o[j].hash } -func (o ordinalSections) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + // Determine the maximum common shard across all AZs. + maxCommonShard := -1 + for i := 0; ; i++ { + presentInAllAZs := true + for _, az := range sortedAZs { + if _, ok := azEndpoints[az][i]; !ok { + presentInAllAZs = false + if i == 0 { + return nil, fmt.Errorf("AZ %q is missing endpoint with shard 0", az) + } + break + } + } + if !presentInAllAZs { + maxCommonShard = i - 1 + break + } + } + if maxCommonShard < 0 { + return nil, errors.New("no common endpoints with shard 0 found across all AZs") + } + numAZs := len(sortedAZs) + result := make([][]Endpoint, numAZs) + for i, az := range sortedAZs { + result[i] = make([]Endpoint, 0, maxCommonShard+1) + for j := 0; j <= maxCommonShard; j++ { + result[i] = append(result[i], azEndpoints[az][j]) + } + } + return result, nil +} -// extractOrdinalStructure extracts the ordinal-to-endpoint mapping per AZ -// and returns the set of ordinals common to all AZs. -func extractOrdinalStructure(endpoints []Endpoint) (map[string]map[int]Endpoint, []int, error) { +// extractShardStructure extracts the shard-to-endpoint mapping per AZ +// and returns the set of shards common to all AZs. +func extractShardStructure(endpoints []Endpoint) (map[string]map[int]Endpoint, []int, error) { if len(endpoints) == 0 { return nil, nil, errors.New("no endpoints provided") } - // Group endpoints by AZ and ordinal - azOrdinalMap := make(map[string]map[int]Endpoint) + // Group endpoints by AZ and shard + azShardMap := make(map[string]map[int]Endpoint) for _, ep := range endpoints { - ordinal, err := strutil.ExtractPodOrdinal(ep.Address) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to extract ordinal from address %s", ep.Address) - } - if _, ok := azOrdinalMap[ep.AZ]; !ok { - azOrdinalMap[ep.AZ] = make(map[int]Endpoint) + if _, ok := azShardMap[ep.AZ]; !ok { + azShardMap[ep.AZ] = make(map[int]Endpoint) } - azOrdinalMap[ep.AZ][ordinal] = ep + azShardMap[ep.AZ][ep.Shard] = ep } - if len(azOrdinalMap) == 0 { + if len(azShardMap) == 0 { return nil, nil, errors.New("no AZs found") } - // Find common ordinals across all AZs - var commonOrdinals []int + // Find common shards across all AZs + var commonShards []int var firstAZ string - for az := range azOrdinalMap { + for az := range azShardMap { firstAZ = az break } - for ordinal := range azOrdinalMap[firstAZ] { + for shard := range azShardMap[firstAZ] { presentInAll := true - for az, ordMap := range azOrdinalMap { + for az, shardMap := range azShardMap { if az == firstAZ { continue } - if _, ok := ordMap[ordinal]; !ok { + if _, ok := shardMap[shard]; !ok { presentInAll = false break } } if presentInAll { - commonOrdinals = append(commonOrdinals, ordinal) + commonShards = append(commonShards, shard) } } - if len(commonOrdinals) == 0 { - return nil, nil, errors.New("no common ordinals found across all AZs") + if len(commonShards) == 0 { + return nil, nil, errors.New("no common shards found across all AZs") } - sort.Ints(commonOrdinals) - return azOrdinalMap, commonOrdinals, nil + sort.Ints(commonShards) + return azShardMap, commonShards, nil } -// buildOrdinalRing creates a consistent hash ring of ordinals. -// Each ordinal gets multiple sections for better distribution. -func buildOrdinalRing(ordinals []int, sectionsPerOrdinal int) ordinalSections { - ring := make(ordinalSections, 0, len(ordinals)*sectionsPerOrdinal) - hasher := xxhash.New() - - for _, ordinal := range ordinals { - for i := 1; i <= sectionsPerOrdinal; i++ { - hasher.Reset() - _, _ = hasher.Write([]byte(fmt.Sprintf("ordinal-%d:%d", ordinal, i))) - ring = append(ring, ordinalSection{ - ordinal: ordinal, - hash: hasher.Sum64(), - }) - } - } - - sort.Sort(ring) - return ring -} - -// selectOrdinalsConsistent selects ordinals using consistent hashing. -// This provides stability: adding ordinal N only affects tenants that would hash near N. -func selectOrdinalsConsistent(ring ordinalSections, tenant string, count int) []int { - if count >= len(ring) { - // Return all unique ordinals if count exceeds ring size - seen := make(map[int]struct{}) - for _, sec := range ring { - seen[sec.ordinal] = struct{}{} - } - result := make([]int, 0, len(seen)) - for ord := range seen { - result = append(result, ord) - } - sort.Ints(result) +// selectShardsRendezvous selects shards using rendezvous (highest random weight) hashing. +// For each shard, it computes hash(tenant, shard) and picks the top-K shards with the highest values. +func selectShardsRendezvous(commonShards []int, tenant string, count int) []int { + if count >= len(commonShards) { + result := make([]int, len(commonShards)) + copy(result, commonShards) return result } - seed := ShuffleShardSeed(tenant, "") // No AZ suffix for alignment - r := rand.New(rand.NewSource(seed)) + hasher := xxhash.New() - selected := make(map[int]struct{}) - result := make([]int, 0, count) + type shardScore struct { + shard int + score uint64 + } + scores := make([]shardScore, len(commonShards)) + for i, shard := range commonShards { + hasher.Reset() + _, _ = hasher.Write([]byte(tenant)) + _, _ = hasher.Write([]byte(strconv.Itoa(shard))) + scores[i] = shardScore{shard: shard, score: hasher.Sum64()} + } - for len(result) < count { - pos := r.Uint64() - idx := sort.Search(len(ring), func(i int) bool { - return ring[i].hash >= pos - }) - if idx == len(ring) { - idx = 0 - } + // Partial sort: find top-K by score (descending). + sort.Slice(scores, func(i, j int) bool { + return scores[i].score > scores[j].score + }) - // Walk ring to find unselected ordinal - for j := 0; j < len(ring); j++ { - checkIdx := (idx + j) % len(ring) - ord := ring[checkIdx].ordinal - if _, ok := selected[ord]; !ok { - selected[ord] = struct{}{} - result = append(result, ord) - break - } - } + result := make([]int, count) + for i := 0; i < count; i++ { + result[i] = scores[i].shard } - sort.Ints(result) return result } -// getTenantShardAligned returns a tenant shard with aligned ordinals across all AZs. -// Unlike getTenantShard which selects nodes independently per AZ, this selects -// ordinals first, then takes the same ordinal from each AZ. -func (s *shuffleShardHashring) getTenantShardAligned(tenant string) (Hashring, error) { - // Extract ordinal structure from all nodes - azOrdinalMap, commonOrdinals, err := extractOrdinalStructure(s.nodes) +// getTenantShardRendezvous returns a tenant shard with aligned shards across all AZs +// using rendezvous hashing for shard selection. Unlike getTenantShard which selects nodes +// independently per AZ, this selects shards first, then takes the same shard from each AZ. +func (s *shuffleShardHashring) getTenantShardRendezvous(tenant string) (Hashring, error) { + // Extract shard structure from all nodes + azShardMap, commonShards, err := extractShardStructure(s.nodes) if err != nil { - return nil, errors.Wrap(err, "failed to extract ordinal structure") + return nil, errors.Wrap(err, "failed to extract shard structure") } - // Determine shard size (number of ordinals to select) - shardSize := s.getShardSize(tenant) - if shardSize > len(commonOrdinals) { - return nil, fmt.Errorf("shard size %d exceeds available common ordinals (%d)", shardSize, len(commonOrdinals)) + // shard_size is the total shard size; divide by numAZs to get per-AZ count. + totalShardSize := s.getShardSize(tenant) + numAZs := len(azShardMap) + perAZShards := totalShardSize / numAZs // floor + if perAZShards == 0 { + return nil, fmt.Errorf("shard size %d too small for %d AZs", totalShardSize, numAZs) + } + if perAZShards > len(commonShards) { + return nil, fmt.Errorf("per-AZ shard count %d (from total %d / %d AZs) exceeds available common shards (%d)", perAZShards, totalShardSize, numAZs, len(commonShards)) } - // Build ordinal ring for consistent hashing - // Use fewer sections per ordinal since we have fewer ordinals than nodes - sectionsPerOrdinal := SectionsPerNode - ordinalRing := buildOrdinalRing(commonOrdinals, sectionsPerOrdinal) - - // Select ordinals using consistent hashing - selectedOrdinals := selectOrdinalsConsistent(ordinalRing, tenant, shardSize) + // Select shards using rendezvous hashing + selectedShards := selectShardsRendezvous(commonShards, tenant, perAZShards) - // Build endpoint list with same ordinals from each AZ - // Sorted AZ order for deterministic endpoint ordering - sortedAZs := make([]string, 0, len(azOrdinalMap)) - for az := range azOrdinalMap { + // Build endpoint list with same shards from each AZ + sortedAZs := make([]string, 0, len(azShardMap)) + for az := range azShardMap { sortedAZs = append(sortedAZs, az) } sort.Strings(sortedAZs) - // Create aligned ketama subring manually to preserve ordinal alignment - // without requiring sequential ordinals starting from 0 - return newAlignedSubring(azOrdinalMap, sortedAZs, selectedOrdinals, SectionsPerNode, s.replicationFactor) -} - -// newAlignedSubring creates a ketama hashring with aligned replicas from a subset of ordinals. -// Unlike newAlignedKetamaHashring, this doesn't require ordinals to be sequential from 0. -// The alignment property: for any section, all replicas have the same ordinal across different AZs. -func newAlignedSubring( - azOrdinalMap map[string]map[int]Endpoint, - sortedAZs []string, - selectedOrdinals []int, - sectionsPerNode int, - replicationFactor uint64, -) (*ketamaHashring, error) { - numAZs := len(sortedAZs) - numOrdinals := len(selectedOrdinals) - - if uint64(numAZs) != replicationFactor { - return nil, fmt.Errorf("number of AZs (%d) must equal replication factor (%d)", numAZs, replicationFactor) - } - - // Build flat endpoint list: [AZ0-ord0, AZ0-ord1, ..., AZ1-ord0, AZ1-ord1, ...] - // where ordN refers to selectedOrdinals[N], not the actual ordinal value - totalEndpoints := numAZs * numOrdinals - flatEndpoints := make([]Endpoint, 0, totalEndpoints) - for _, az := range sortedAZs { - for _, ordinal := range selectedOrdinals { - ep, ok := azOrdinalMap[az][ordinal] + // Build azEndpoints directly (non-contiguous shards are OK for sub-rings). + numShards := len(selectedShards) + azEndpoints := make([][]Endpoint, len(sortedAZs)) + var flatEndpoints []Endpoint + for i, az := range sortedAZs { + azEndpoints[i] = make([]Endpoint, 0, numShards) + for _, shard := range selectedShards { + ep, ok := azShardMap[az][shard] if !ok { - return nil, fmt.Errorf("ordinal %d not found in AZ %s", ordinal, az) + return nil, fmt.Errorf("shard %d not found in AZ %s", shard, az) } - flatEndpoints = append(flatEndpoints, ep) + azEndpoints[i] = append(azEndpoints[i], ep) } + flatEndpoints = append(flatEndpoints, azEndpoints[i]...) } - // Create sections with aligned replicas - // For aligned subring, we create sections based on the first AZ's endpoints (primary) - // Each section's replicas point to the same "position" (ordinal index) in each AZ - hasher := xxhash.New() - ringSections := make(sections, 0, numOrdinals*sectionsPerNode) - - for ordinalIdx := 0; ordinalIdx < numOrdinals; ordinalIdx++ { - // Primary endpoint is from the first AZ - primaryEndpoint := flatEndpoints[ordinalIdx] // AZ0 endpoints are at indices 0..numOrdinals-1 - - for sectionIdx := 1; sectionIdx <= sectionsPerNode; sectionIdx++ { - hasher.Reset() - _, _ = hasher.Write([]byte(primaryEndpoint.Address + ":" + strconv.Itoa(sectionIdx))) - - sec := §ion{ - hash: hasher.Sum64(), - az: primaryEndpoint.AZ, - endpointIndex: uint64(ordinalIdx), // Index within first AZ - replicas: make([]uint64, 0, replicationFactor), - } - - // Add replicas: same ordinal index from each AZ - for azIdx := 0; azIdx < numAZs; azIdx++ { - replicaFlatIndex := azIdx*numOrdinals + ordinalIdx - sec.replicas = append(sec.replicas, uint64(replicaFlatIndex)) - } - - ringSections = append(ringSections, sec) - } - } - - sort.Sort(ringSections) - - return &ketamaHashring{ - endpoints: flatEndpoints, - sections: ringSections, - numEndpoints: uint64(totalEndpoints), + return &rendezvousHashring{ + azEndpoints: azEndpoints, + sortedAZs: sortedAZs, + numShards: numShards, + replicationFactor: s.replicationFactor, + flatEndpoints: flatEndpoints, }, nil } @@ -979,6 +937,11 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg cache: make(map[string]Hashring), } + numShardsGauge := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "thanos_receive_hashring_shards", + Help: "Number of shards per hashring after groupByAZ alignment.", + }, []string{"hashring"}) + for _, h := range cfg { var hashring Hashring var err error @@ -986,7 +949,7 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg if h.Algorithm != "" { activeAlgorithm = h.Algorithm } - hashring, err = newHashring(activeAlgorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants, h.ShuffleShardingConfig, reg) + hashring, err = newHashring(activeAlgorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants, h.ShuffleShardingConfig, reg, numShardsGauge) if err != nil { return nil, err } @@ -1007,7 +970,7 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg return m, nil } -func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationFactor uint64, hashring string, tenants []string, shuffleShardingConfig ShuffleShardingConfig, reg prometheus.Registerer) (Hashring, error) { +func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationFactor uint64, hashring string, tenants []string, shuffleShardingConfig ShuffleShardingConfig, reg prometheus.Registerer, numShardsGauge *prometheus.GaugeVec) (Hashring, error) { switch algorithm { case AlgorithmHashmod: @@ -1031,11 +994,12 @@ func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationF return newShuffleShardHashring(ringImpl, shuffleShardingConfig, replicationFactor, reg, hashring) } return ringImpl, nil - case AlgorithmAlignedKetama: - ringImpl, err := newAlignedKetamaHashring(endpoints, SectionsPerNode, replicationFactor) + case AlgorithmRendezvous: + ringImpl, err := newRendezvousHashring(endpoints, replicationFactor) if err != nil { return nil, err } + numShardsGauge.WithLabelValues(hashring).Set(float64(ringImpl.numShards)) if shuffleShardingConfig.ShardSize > 0 { if shuffleShardingConfig.ShardSize > len(endpoints) { return nil, fmt.Errorf("shard size %d is larger than number of nodes in hashring %s (%d)", shuffleShardingConfig.ShardSize, hashring, len(endpoints)) diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 379861420e8..145c3fc7a2e 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -6,6 +6,8 @@ package receive import ( "fmt" "math" + "sort" + "strconv" "strings" "testing" @@ -1047,409 +1049,118 @@ func compareNodeSets(before, after map[string]struct{}) (added, removed []string return } -// makeK8sEndpoint creates an endpoint with K8s-style DNS name that has an extractable ordinal. -func makeK8sEndpoint(podName string, ordinal int, az string) Endpoint { - return Endpoint{ - Address: fmt.Sprintf("%s-%d.svc.test.svc.cluster.local:10901", podName, ordinal), - AZ: az, - } -} - -func TestAlignedOrdinalShardingBasic(t *testing.T) { - t.Parallel() - - // Create 3 AZs with 5 ordinals each (15 total endpoints) - endpoints := make([]Endpoint, 0, 15) - azs := []string{"az-a", "az-b", "az-c"} - for _, az := range azs { - for ord := 0; ord < 5; ord++ { - endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) - } - } - - // Create aligned ketama base ring with RF=3 (one per AZ) - baseRing, err := newAlignedKetamaHashring(endpoints, SectionsPerNode, 3) - require.NoError(t, err) - - // Create shuffle shard hashring with aligned ordinal sharding enabled - cfg := ShuffleShardingConfig{ - ShardSize: 2, // Select 2 ordinals -> 6 endpoints (2 * 3 AZs) - AlignedOrdinalSharding: true, - } - shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-aligned") - require.NoError(t, err) - - // Get the tenant shard - tenant := "test-tenant" - shard, err := shardRing.getTenantShardAligned(tenant) - require.NoError(t, err) - - // Verify we got the right number of nodes (2 ordinals * 3 AZs = 6) - nodes := shard.Nodes() - require.Len(t, nodes, 6, "expected 6 endpoints (2 ordinals * 3 AZs)") - - // Extract ordinals from each AZ and verify they're the same - ordinalsByAZ := make(map[string][]int) - for _, node := range nodes { - ordinalsByAZ[node.AZ] = append(ordinalsByAZ[node.AZ], extractOrdinalFromAddress(t, node.Address)) - } - - // Verify each AZ has exactly 2 ordinals - require.Len(t, ordinalsByAZ, 3, "expected 3 AZs") - for az, ordinals := range ordinalsByAZ { - require.Len(t, ordinals, 2, "AZ %s should have 2 ordinals", az) - } - - // Verify all AZs have the SAME ordinals (the key invariant) - var referenceOrdinals []int - for _, ordinals := range ordinalsByAZ { - if referenceOrdinals == nil { - referenceOrdinals = ordinals - } else { - require.ElementsMatch(t, referenceOrdinals, ordinals, - "all AZs should have the same ordinals for aligned ordinal sharding") - } - } - - t.Logf("Selected ordinals: %v", referenceOrdinals) -} - -func TestAlignedOrdinalShardingConsistency(t *testing.T) { - t.Parallel() - - // Create 3 AZs with 5 ordinals each - endpoints := make([]Endpoint, 0, 15) - azs := []string{"az-a", "az-b", "az-c"} - for _, az := range azs { - for ord := 0; ord < 5; ord++ { - endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) - } - } - - baseRing, err := newAlignedKetamaHashring(endpoints, SectionsPerNode, 3) - require.NoError(t, err) - - cfg := ShuffleShardingConfig{ - ShardSize: 2, - AlignedOrdinalSharding: true, - } - shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-consistency") - require.NoError(t, err) - - // Verify same tenant always gets same ordinals across multiple calls - tenant := "consistent-tenant" - var firstOrdinals []int - - for trial := 0; trial < 10; trial++ { - shard, err := shardRing.getTenantShardAligned(tenant) - require.NoError(t, err) - - currentOrdinals := extractOrdinalsFromShard(t, shard) - if firstOrdinals == nil { - firstOrdinals = currentOrdinals - } else { - require.Equal(t, firstOrdinals, currentOrdinals, - "same tenant should always get same ordinals") - } - } +// podDNS creates a DNS-like string for testing endpoint addresses. +func podDNS(name string, shard int) string { + return name + "-" + strconv.Itoa(shard) + ".test-svc.test-namespace.svc.cluster.local" } -func TestAlignedOrdinalShardingDifferentTenants(t *testing.T) { - t.Parallel() - - // Create 3 AZs with 10 ordinals each to have enough spread - endpoints := make([]Endpoint, 0, 30) - azs := []string{"az-a", "az-b", "az-c"} - for _, az := range azs { - for ord := 0; ord < 10; ord++ { - endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) - } - } - - baseRing, err := newAlignedKetamaHashring(endpoints, SectionsPerNode, 3) - require.NoError(t, err) - - cfg := ShuffleShardingConfig{ - ShardSize: 3, // Select 3 ordinals - AlignedOrdinalSharding: true, - } - shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-diff-tenants") - require.NoError(t, err) - - // Different tenants should (likely) get different ordinals - tenantOrdinals := make(map[string][]int) - numTenants := 20 - - for i := 0; i < numTenants; i++ { - tenant := fmt.Sprintf("tenant-%d", i) - shard, err := shardRing.getTenantShardAligned(tenant) - require.NoError(t, err) - tenantOrdinals[tenant] = extractOrdinalsFromShard(t, shard) - } - - // Count unique ordinal sets - uniqueSets := make(map[string]int) - for _, ordinals := range tenantOrdinals { - key := fmt.Sprintf("%v", ordinals) - uniqueSets[key]++ - } - - // With 10 ordinals choosing 3, there are C(10,3)=120 possible combinations - // We expect multiple unique sets across 20 tenants - t.Logf("Unique ordinal sets: %d out of %d tenants", len(uniqueSets), numTenants) - require.Greater(t, len(uniqueSets), 1, "different tenants should get different ordinal sets") -} - -func TestAlignedOrdinalShardingPreservesAlignment(t *testing.T) { - t.Parallel() - - // Create 3 AZs with 5 ordinals each - endpoints := make([]Endpoint, 0, 15) - azs := []string{"az-a", "az-b", "az-c"} - for _, az := range azs { - for ord := 0; ord < 5; ord++ { - endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) - } - } - - baseRing, err := newAlignedKetamaHashring(endpoints, SectionsPerNode, 3) - require.NoError(t, err) - - cfg := ShuffleShardingConfig{ - ShardSize: 2, - AlignedOrdinalSharding: true, - } - shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-preserves") - require.NoError(t, err) - - tenant := "alignment-test-tenant" - - // Use GetN to get replicas and verify they're aligned (same ordinal across AZs) - for i := 0; i < 100; i++ { - ts := &prompb.TimeSeries{ - Labels: []labelpb.ZLabel{ - {Name: "series", Value: fmt.Sprintf("series-%d", i)}, +func TestGroupByAZ(t *testing.T) { + // Test setup endpoints. + ep0a := Endpoint{Address: podDNS("pod", 0), AZ: "zone-a", Shard: 0} + ep1a := Endpoint{Address: podDNS("pod", 1), AZ: "zone-a", Shard: 1} + ep2a := Endpoint{Address: podDNS("pod", 2), AZ: "zone-a", Shard: 2} + ep0b := Endpoint{Address: podDNS("pod", 0), AZ: "zone-b", Shard: 0} + ep1b := Endpoint{Address: podDNS("pod", 1), AZ: "zone-b", Shard: 1} + ep0c := Endpoint{Address: podDNS("pod", 0), AZ: "zone-c", Shard: 0} + ep1c := Endpoint{Address: podDNS("pod", 1), AZ: "zone-c", Shard: 1} + duplicateEp0a := Endpoint{Address: podDNS("anotherpod", 0), AZ: "zone-a", Shard: 0} // Same shard (0) as ep0a in zone-a. + + testCases := map[string]struct { + inputEndpoints []Endpoint + expectedResult [][]Endpoint + expectError bool + errorContains string + }{ + "error on empty input": { + inputEndpoints: []Endpoint{}, + expectedResult: nil, + expectError: true, + errorContains: "no endpoints provided", + }, + "single AZ, multiple endpoints": { + inputEndpoints: []Endpoint{ep1a, ep0a, ep2a}, + expectedResult: [][]Endpoint{ + {ep0a, ep1a, ep2a}, }, - } - - // Get all 3 replicas (RF=3) - var replicas []Endpoint - for n := uint64(0); n < 3; n++ { - ep, err := shardRing.GetN(tenant, ts, n) - require.NoError(t, err) - replicas = append(replicas, ep) - } - - // Verify all 3 replicas have the same ordinal but different AZs - ordinals := make(map[int]struct{}) - azsSeen := make(map[string]struct{}) - for _, ep := range replicas { - ordinals[extractOrdinalFromAddress(t, ep.Address)] = struct{}{} - azsSeen[ep.AZ] = struct{}{} - } - - require.Len(t, ordinals, 1, "all replicas should have the same ordinal for series %d", i) - require.Len(t, azsSeen, 3, "replicas should span all 3 AZs for series %d", i) - } -} - -// TestAlignedOrdinalShardingDataDistribution verifies the key behavior: -// - With shard_size=2, tenant gets 2 ordinals (e.g., ordinals 1 and 4) -// - Series are distributed across both ordinals -// - a-1, b-1, c-1 always receive the same series (aligned replicas for ordinal 1) -// - a-4, b-4, c-4 always receive the same series (aligned replicas for ordinal 4) -// - Series assigned to ordinal 1 are different from series assigned to ordinal 4. -func TestAlignedOrdinalShardingDataDistribution(t *testing.T) { - t.Parallel() - - // Create 3 AZs with 5 ordinals each - endpoints := make([]Endpoint, 0, 15) - azs := []string{"az-a", "az-b", "az-c"} - for _, az := range azs { - for ord := 0; ord < 5; ord++ { - endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) - } - } - - baseRing, err := newAlignedKetamaHashring(endpoints, SectionsPerNode, 3) - require.NoError(t, err) - - cfg := ShuffleShardingConfig{ - ShardSize: 2, // Select 2 ordinals - AlignedOrdinalSharding: true, - } - shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-distribution") - require.NoError(t, err) - - tenant := "distribution-test-tenant" - - // First, get the tenant's selected ordinals - shard, err := shardRing.getTenantShardAligned(tenant) - require.NoError(t, err) - selectedOrdinals := extractOrdinalsFromShard(t, shard) - require.Len(t, selectedOrdinals, 2, "tenant should have exactly 2 ordinals") - t.Logf("Tenant's selected ordinals: %v", selectedOrdinals) - - // Track which series go to which ordinal - // Key: ordinal, Value: set of series indices - seriesByOrdinal := make(map[int]map[int]struct{}) - for _, ord := range selectedOrdinals { - seriesByOrdinal[ord] = make(map[int]struct{}) - } - - // Track which endpoints receive which series - // Key: endpoint address, Value: set of series indices - seriesByEndpoint := make(map[string]map[int]struct{}) - - // Generate many series and track their distribution - numSeries := 1000 - for i := 0; i < numSeries; i++ { - ts := &prompb.TimeSeries{ - Labels: []labelpb.ZLabel{ - {Name: "series", Value: fmt.Sprintf("series-%d", i)}, - {Name: "__name__", Value: "test_metric"}, + expectError: false, + }, + "multiple AZs, balanced and ordered": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b}, + expectedResult: [][]Endpoint{ + {ep0a, ep1a}, + {ep0b, ep1b}, }, - } - - // Get all 3 replicas - var replicas []Endpoint - for n := uint64(0); n < 3; n++ { - ep, err := shardRing.GetN(tenant, ts, n) - require.NoError(t, err) - replicas = append(replicas, ep) - - // Track series per endpoint - if seriesByEndpoint[ep.Address] == nil { - seriesByEndpoint[ep.Address] = make(map[int]struct{}) - } - seriesByEndpoint[ep.Address][i] = struct{}{} - } - - // All replicas should have the same ordinal - ordinal := extractOrdinalFromAddress(t, replicas[0].Address) - for _, ep := range replicas[1:] { - epOrdinal := extractOrdinalFromAddress(t, ep.Address) - require.Equal(t, ordinal, epOrdinal, "all replicas for series %d should have same ordinal", i) - } - - // Track which ordinal this series went to - seriesByOrdinal[ordinal][i] = struct{}{} - } - - // Verify 1: Series are distributed across BOTH ordinals (not just one) - for ord, series := range seriesByOrdinal { - t.Logf("Ordinal %d received %d series", ord, len(series)) - require.Greater(t, len(series), 0, "ordinal %d should receive some series", ord) - } - - // Verify 2: Same ordinal across different AZs receives the SAME series - // Group endpoints by ordinal - endpointsByOrdinal := make(map[int][]string) - for addr := range seriesByEndpoint { - ord := extractOrdinalFromAddress(t, addr) - endpointsByOrdinal[ord] = append(endpointsByOrdinal[ord], addr) - } - - for ord, addrs := range endpointsByOrdinal { - if len(addrs) < 2 { - continue - } - // All endpoints with the same ordinal should have received the exact same series - referenceSeries := seriesByEndpoint[addrs[0]] - for _, addr := range addrs[1:] { - otherSeries := seriesByEndpoint[addr] - require.Equal(t, len(referenceSeries), len(otherSeries), - "endpoints with ordinal %d should have same number of series", ord) - for seriesIdx := range referenceSeries { - _, ok := otherSeries[seriesIdx] - require.True(t, ok, - "series %d should be on all endpoints with ordinal %d", seriesIdx, ord) - } - } - t.Logf("Verified: All %d endpoints with ordinal %d have identical %d series", - len(addrs), ord, len(referenceSeries)) - } - - // Verify 3: Different ordinals receive DIFFERENT series (no overlap) - ordinalList := make([]int, 0, len(seriesByOrdinal)) - for ord := range seriesByOrdinal { - ordinalList = append(ordinalList, ord) - } - if len(ordinalList) >= 2 { - series1 := seriesByOrdinal[ordinalList[0]] - series2 := seriesByOrdinal[ordinalList[1]] - for seriesIdx := range series1 { - _, overlap := series2[seriesIdx] - require.False(t, overlap, - "series %d should not be on both ordinal %d and ordinal %d", - seriesIdx, ordinalList[0], ordinalList[1]) - } - t.Logf("Verified: Ordinals %d and %d have no overlapping series", ordinalList[0], ordinalList[1]) - } -} - -func TestAlignedOrdinalShardingValidation(t *testing.T) { - t.Parallel() - - endpoints := make([]Endpoint, 0, 15) - azs := []string{"az-a", "az-b", "az-c"} - for _, az := range azs { - for ord := 0; ord < 5; ord++ { - endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) - } - } - - baseRing, err := newAlignedKetamaHashring(endpoints, SectionsPerNode, 3) - require.NoError(t, err) - - // Test shard size exceeding available ordinals - cfg := ShuffleShardingConfig{ - ShardSize: 10, // Only 5 ordinals available - AlignedOrdinalSharding: true, + expectError: false, + }, + "multiple AZs, different counts, stops at first missing shard > 0": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep2a, ep0c}, + expectedResult: [][]Endpoint{ + {ep0a}, + {ep0b}, + {ep0c}, + }, + expectError: false, + }, + "error if shard 0 missing in any AZ": { + inputEndpoints: []Endpoint{ep1a, ep2a, ep1b}, + expectedResult: nil, + expectError: true, + errorContains: "missing endpoint with shard 0", + }, + "error if shard 0 missing in only one AZ": { + inputEndpoints: []Endpoint{ep0a, ep1a, ep1b}, + expectedResult: nil, + expectError: true, + errorContains: `AZ "zone-b" is missing endpoint with shard 0`, + }, + "error on duplicate shard within an AZ": { + inputEndpoints: []Endpoint{ep0a, ep1a, ep0b, duplicateEp0a}, + expectedResult: nil, + expectError: true, + errorContains: "duplicate endpoint shard 0 for address " + duplicateEp0a.Address + " in AZ zone-a", + }, + "AZ sorting check": { + inputEndpoints: []Endpoint{ep0b, ep0c, ep0a}, + expectedResult: [][]Endpoint{ + {ep0a}, + {ep0b}, + {ep0c}, + }, + expectError: false, + }, + "multiple AZs, stops correctly when next shard missing everywhere": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, + expectedResult: [][]Endpoint{ + {ep0a, ep1a}, + {ep0b, ep1b}, + {ep0c, ep1c}, + }, + expectError: false, + }, } - shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-validation") - require.NoError(t, err) - _, err = shardRing.getTenantShardAligned("test-tenant") - require.Error(t, err) - require.Contains(t, err.Error(), "exceeds available common ordinals") -} + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + result, err := groupByAZ(tc.inputEndpoints) -// Helper function to extract ordinal from K8s-style address. -func extractOrdinalFromAddress(t *testing.T, address string) int { - t.Helper() - // Address format: pod-az-N.svc.test.svc.cluster.local:10901 - parts := strings.Split(address, ".") - require.Greater(t, len(parts), 0) - podPart := parts[0] // pod-az-N - lastDash := strings.LastIndex(podPart, "-") - require.Greater(t, lastDash, 0) - ordinalStr := podPart[lastDash+1:] - var ordinal int - _, err := fmt.Sscanf(ordinalStr, "%d", &ordinal) - require.NoError(t, err) - return ordinal -} - -// Helper function to extract unique ordinals from a shard. -func extractOrdinalsFromShard(t *testing.T, shard Hashring) []int { - t.Helper() - nodes := shard.Nodes() - ordinalSet := make(map[int]struct{}) - for _, node := range nodes { - ordinalSet[extractOrdinalFromAddress(t, node.Address)] = struct{}{} - } - ordinals := make([]int, 0, len(ordinalSet)) - for ord := range ordinalSet { - ordinals = append(ordinals, ord) - } - // Sort for consistent comparison - for i := 0; i < len(ordinals); i++ { - for j := i + 1; j < len(ordinals); j++ { - if ordinals[i] > ordinals[j] { - ordinals[i], ordinals[j] = ordinals[j], ordinals[i] + if tc.expectError { + testutil.NotOk(t, err) + if tc.errorContains != "" { + testutil.Assert(t, strings.Contains(err.Error(), tc.errorContains), "Expected error message to contain '%s', but got: %v", tc.errorContains, err) + } + testutil.Assert(t, result == nil, "Expected nil result on error, got: %v", result) + } else { + testutil.Ok(t, err) + testutil.Equals(t, tc.expectedResult, result) + + // Verify outer slice (AZs) is sorted alphabetically. + if err == nil && len(result) > 1 { + azOrderCorrect := sort.SliceIsSorted(result, func(i, j int) bool { + return result[i][0].AZ < result[j][0].AZ + }) + testutil.Assert(t, azOrderCorrect, "Outer slice is not sorted by AZ") + } } - } + }) } - return ordinals } diff --git a/pkg/receive/rendezvous_hashring.go b/pkg/receive/rendezvous_hashring.go new file mode 100644 index 00000000000..220ad26f864 --- /dev/null +++ b/pkg/receive/rendezvous_hashring.go @@ -0,0 +1,108 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "encoding/binary" + "fmt" + + "github.com/cespare/xxhash" + "github.com/pkg/errors" + + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" +) + +// rendezvousHashring distributes series across shards using rendezvous (highest random weight) hashing. +// For each series, it computes hash(seriesKey, shard) for every shard and picks the shard with the +// highest hash value. This avoids virtual nodes entirely while providing good distribution. +type rendezvousHashring struct { + azEndpoints [][]Endpoint // [azIndex][shardIndex] → Endpoint + sortedAZs []string + numShards int + replicationFactor uint64 + flatEndpoints []Endpoint +} + +func newRendezvousHashring(endpoints []Endpoint, replicationFactor uint64) (*rendezvousHashring, error) { + if replicationFactor == 0 { + return nil, errors.New("replication factor cannot be zero") + } + + groupedEndpoints, err := groupByAZ(endpoints) + if err != nil { + return nil, errors.Wrap(err, "failed to group endpoints by AZ") + } + + numAZs := len(groupedEndpoints) + if numAZs == 0 { + return nil, errors.New("no endpoint groups found after grouping by AZ") + } + if uint64(numAZs) != replicationFactor { + return nil, fmt.Errorf("number of AZs (%d) must equal replication factor (%d)", numAZs, replicationFactor) + } + + numShards := len(groupedEndpoints[0]) + if numShards == 0 { + return nil, errors.New("AZ groups are empty after grouping") + } + + // Build sorted AZ names from groupedEndpoints (already sorted by groupByAZ). + sortedAZs := make([]string, numAZs) + for i := range groupedEndpoints { + sortedAZs[i] = groupedEndpoints[i][0].AZ + } + + // Build flat endpoint list: all AZ0 endpoints, then AZ1, etc. + flatEndpoints := make([]Endpoint, 0, numAZs*numShards) + for azIdx := 0; azIdx < numAZs; azIdx++ { + flatEndpoints = append(flatEndpoints, groupedEndpoints[azIdx]...) + } + + return &rendezvousHashring{ + azEndpoints: groupedEndpoints, + sortedAZs: sortedAZs, + numShards: numShards, + replicationFactor: replicationFactor, + flatEndpoints: flatEndpoints, + }, nil +} + +func (r *rendezvousHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (Endpoint, error) { + if n >= r.replicationFactor { + return Endpoint{}, &insufficientNodesError{have: r.replicationFactor, want: n + 1} + } + + seriesKey := labelpb.HashWithPrefix(tenant, ts.Labels) + + // Rendezvous hashing: for each shard, compute hash(seriesKey, shard), pick the max. + // buf is a fixed-size stack array: 8 bytes seriesKey + 2 bytes shard (little-endian). + bestShard := 0 + bestHash := uint64(0) + + var buf [10]byte + binary.LittleEndian.PutUint64(buf[:8], seriesKey) + + for shard := 0; shard < r.numShards; shard++ { + binary.LittleEndian.PutUint16(buf[8:], uint16(shard)) + h := xxhash.Sum64(buf[:]) + if shard == 0 || h > bestHash { + bestHash = h + bestShard = shard + } + } + + // n selects the AZ (replica index). + return r.azEndpoints[n][bestShard], nil +} + +func (r *rendezvousHashring) Get(tenant string, ts *prompb.TimeSeries) (Endpoint, error) { + return r.GetN(tenant, ts, 0) +} + +func (r *rendezvousHashring) Nodes() []Endpoint { + return r.flatEndpoints +} + +func (r *rendezvousHashring) Close() {} diff --git a/pkg/receive/rendezvous_hashring_test.go b/pkg/receive/rendezvous_hashring_test.go new file mode 100644 index 00000000000..84f6e37b0df --- /dev/null +++ b/pkg/receive/rendezvous_hashring_test.go @@ -0,0 +1,593 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "fmt" + "math" + "sort" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" +) + +// makeK8sEndpoint creates an endpoint with K8s-style DNS name. +func makeK8sEndpoint(podName string, shard int, az string) Endpoint { + return Endpoint{ + Address: fmt.Sprintf("%s-%d.svc.test.svc.cluster.local:10901", podName, shard), + AZ: az, + Shard: shard, + } +} + +// extractShardFromAddress extracts shard number from K8s-style address. +func extractShardFromAddress(t *testing.T, address string) int { + t.Helper() + // Address format: pod-az-N.svc.test.svc.cluster.local:10901 + parts := strings.Split(address, ".") + require.Greater(t, len(parts), 0) + podPart := parts[0] // pod-az-N + lastDash := strings.LastIndex(podPart, "-") + require.Greater(t, lastDash, 0) + shardStr := podPart[lastDash+1:] + var shard int + _, err := fmt.Sscanf(shardStr, "%d", &shard) + require.NoError(t, err) + return shard +} + +// extractShardsFromSubring extracts unique shards from a subring. +func extractShardsFromSubring(t *testing.T, subring Hashring) []int { + t.Helper() + nodes := subring.Nodes() + shardSet := make(map[int]struct{}) + for _, node := range nodes { + shardSet[extractShardFromAddress(t, node.Address)] = struct{}{} + } + shards := make([]int, 0, len(shardSet)) + for s := range shardSet { + shards = append(shards, s) + } + sort.Ints(shards) + return shards +} + +func TestRendezvousHashringConstructor(t *testing.T) { + t.Parallel() + + ep0a := Endpoint{Address: podDNS("pod", 0), AZ: "zone-a", Shard: 0} + ep1a := Endpoint{Address: podDNS("pod", 1), AZ: "zone-a", Shard: 1} + ep0b := Endpoint{Address: podDNS("pod", 0), AZ: "zone-b", Shard: 0} + ep1b := Endpoint{Address: podDNS("pod", 1), AZ: "zone-b", Shard: 1} + ep0c := Endpoint{Address: podDNS("pod", 0), AZ: "zone-c", Shard: 0} + ep1c := Endpoint{Address: podDNS("pod", 1), AZ: "zone-c", Shard: 1} + duplicateEp0a := Endpoint{Address: podDNS("anotherpod", 0), AZ: "zone-a", Shard: 0} + + testCases := map[string]struct { + endpoints []Endpoint + replicationFactor uint64 + expectError bool + errorContains string + }{ + "valid 2 AZs, RF=2": { + endpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b}, + replicationFactor: 2, + }, + "valid 3 AZs, RF=3": { + endpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, + replicationFactor: 3, + }, + "error: empty input": { + endpoints: []Endpoint{}, + replicationFactor: 1, + expectError: true, + errorContains: "no endpoints provided", + }, + "error: RF=0": { + endpoints: []Endpoint{ep0a, ep0b}, + replicationFactor: 0, + expectError: true, + errorContains: "replication factor cannot be zero", + }, + "error: duplicate shard": { + endpoints: []Endpoint{ep0a, ep1a, ep0b, duplicateEp0a}, + replicationFactor: 2, + expectError: true, + errorContains: "duplicate endpoint", + }, + "error: missing shard 0": { + endpoints: []Endpoint{ep1a, ep1b}, + replicationFactor: 2, + expectError: true, + errorContains: "missing endpoint with shard 0", + }, + "error: AZ count != RF (too few AZs)": { + endpoints: []Endpoint{ep0a, ep1a}, + replicationFactor: 2, + expectError: true, + errorContains: "number of AZs (1) must equal replication factor (2)", + }, + "error: AZ count != RF (too many AZs)": { + endpoints: []Endpoint{ep0a, ep1a, ep0b, ep1b, ep0c, ep1c}, + replicationFactor: 2, + expectError: true, + errorContains: "number of AZs (3) must equal replication factor (2)", + }, + "unbalanced AZs uses common subset": { + endpoints: []Endpoint{ep0a, ep1a, ep0b}, + replicationFactor: 2, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + ring, err := newRendezvousHashring(tc.endpoints, tc.replicationFactor) + if tc.expectError { + require.Error(t, err) + require.Nil(t, ring) + if tc.errorContains != "" { + require.Contains(t, err.Error(), tc.errorContains) + } + } else { + require.NoError(t, err) + require.NotNil(t, ring) + } + }) + } +} + +func TestRendezvousHashringGetN(t *testing.T) { + t.Parallel() + + ep0a := Endpoint{Address: podDNS("pod", 0), AZ: "zone-a", Shard: 0} + ep1a := Endpoint{Address: podDNS("pod", 1), AZ: "zone-a", Shard: 1} + ep0b := Endpoint{Address: podDNS("pod", 0), AZ: "zone-b", Shard: 0} + ep1b := Endpoint{Address: podDNS("pod", 1), AZ: "zone-b", Shard: 1} + ep0c := Endpoint{Address: podDNS("pod", 0), AZ: "zone-c", Shard: 0} + ep1c := Endpoint{Address: podDNS("pod", 1), AZ: "zone-c", Shard: 1} + + ts := &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{{Name: "test", Value: "replica-routing"}}, + } + + t.Run("2 AZs RF=2, replicas have same shard", func(t *testing.T) { + ring, err := newRendezvousHashring([]Endpoint{ep1a, ep0b, ep0a, ep1b}, 2) + require.NoError(t, err) + + r0, err := ring.GetN("tenant1", ts, 0) + require.NoError(t, err) + r1, err := ring.GetN("tenant1", ts, 1) + require.NoError(t, err) + + require.Equal(t, r0.Shard, r1.Shard, "replicas should have same shard") + require.NotEqual(t, r0.AZ, r1.AZ, "replicas should be in different AZs") + }) + + t.Run("3 AZs RF=3, replicas have same shard", func(t *testing.T) { + ring, err := newRendezvousHashring([]Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, 3) + require.NoError(t, err) + + r0, err := ring.GetN("tenant1", ts, 0) + require.NoError(t, err) + r1, err := ring.GetN("tenant1", ts, 1) + require.NoError(t, err) + r2, err := ring.GetN("tenant1", ts, 2) + require.NoError(t, err) + + require.Equal(t, r0.Shard, r1.Shard) + require.Equal(t, r1.Shard, r2.Shard) + + azs := map[string]struct{}{r0.AZ: {}, r1.AZ: {}, r2.AZ: {}} + require.Len(t, azs, 3, "replicas should span all 3 AZs") + }) + + t.Run("error: n >= replicationFactor", func(t *testing.T) { + ring, err := newRendezvousHashring([]Endpoint{ep1a, ep0b, ep0a, ep1b}, 2) + require.NoError(t, err) + + _, err = ring.GetN("tenant1", ts, 2) + require.Error(t, err) + require.Contains(t, err.Error(), "insufficient nodes") + }) +} + +func TestRendezvousHashringSameShardAcrossAZs(t *testing.T) { + t.Parallel() + + // Create 3 AZs with 20 shards each. + var endpoints []Endpoint + for i := 0; i < 20; i++ { + endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-a", Shard: i}) + } + for i := 0; i < 20; i++ { + endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-b", Shard: i}) + } + for i := 0; i < 20; i++ { + endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-c", Shard: i}) + } + + ring, err := newRendezvousHashring(endpoints, 3) + require.NoError(t, err) + + // For many series, verify all replicas go to the same shard. + for i := 0; i < 500; i++ { + ts := &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + {Name: "__name__", Value: "test_metric"}, + {Name: "series", Value: fmt.Sprintf("series-%d", i)}, + }, + } + + r0, err := ring.GetN("tenant1", ts, 0) + require.NoError(t, err) + r1, err := ring.GetN("tenant1", ts, 1) + require.NoError(t, err) + r2, err := ring.GetN("tenant1", ts, 2) + require.NoError(t, err) + + require.Equal(t, r0.Shard, r1.Shard, "series %d: shard mismatch between AZ0 and AZ1", i) + require.Equal(t, r1.Shard, r2.Shard, "series %d: shard mismatch between AZ1 and AZ2", i) + + azs := map[string]struct{}{r0.AZ: {}, r1.AZ: {}, r2.AZ: {}} + require.Len(t, azs, 3, "series %d: replicas should span all 3 AZs", i) + } +} + +func TestRendezvousHashringDistribution(t *testing.T) { + t.Parallel() + + numShards := 10 + var endpoints []Endpoint + for i := 0; i < numShards; i++ { + endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-a", Shard: i}) + endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-b", Shard: i}) + endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-c", Shard: i}) + } + + ring, err := newRendezvousHashring(endpoints, 3) + require.NoError(t, err) + + shardCounts := make(map[int]int) + numSeries := 10000 + for i := 0; i < numSeries; i++ { + ts := &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + {Name: "__name__", Value: "test_metric"}, + {Name: "instance", Value: fmt.Sprintf("instance-%d", i)}, + }, + } + + ep, err := ring.Get("tenant1", ts) + require.NoError(t, err) + shardCounts[ep.Shard]++ + } + + // Expect all shards to receive some series. + require.Len(t, shardCounts, numShards, "all shards should receive some series") + + // Check uniformity: each shard should get roughly numSeries/numShards. + expected := float64(numSeries) / float64(numShards) + for shard, count := range shardCounts { + deviation := math.Abs(float64(count)-expected) / expected + require.Less(t, deviation, 0.30, + "shard %d has %d series (expected ~%.0f), deviation %.1f%%", + shard, count, expected, deviation*100) + } +} + +func TestRendezvousShuffleShardingBasic(t *testing.T) { + t.Parallel() + + // Create 3 AZs with 5 shards each. + endpoints := make([]Endpoint, 0, 15) + azs := []string{"az-a", "az-b", "az-c"} + for _, az := range azs { + for ord := 0; ord < 5; ord++ { + endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) + } + } + + baseRing, err := newRendezvousHashring(endpoints, 3) + require.NoError(t, err) + + cfg := ShuffleShardingConfig{ + ShardSize: 6, + } + shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-rendezvous") + require.NoError(t, err) + + tenant := "test-tenant" + shard, err := shardRing.getTenantShardRendezvous(tenant) + require.NoError(t, err) + + // Verify we got the right number of nodes (6 total / 3 AZs = 2 shards per AZ → 6 endpoints). + nodes := shard.Nodes() + require.Len(t, nodes, 6, "expected 6 endpoints (2 shards * 3 AZs)") + + // Extract shards from each AZ and verify they're the same. + shardsByAZ := make(map[string][]int) + for _, node := range nodes { + shardsByAZ[node.AZ] = append(shardsByAZ[node.AZ], extractShardFromAddress(t, node.Address)) + } + + require.Len(t, shardsByAZ, 3, "expected 3 AZs") + for az, shards := range shardsByAZ { + require.Len(t, shards, 2, "AZ %s should have 2 shards", az) + } + + // All AZs should have the same shards. + var referenceShards []int + for _, shards := range shardsByAZ { + if referenceShards == nil { + referenceShards = shards + } else { + require.ElementsMatch(t, referenceShards, shards, + "all AZs should have the same shards") + } + } +} + +func TestRendezvousShuffleShardingConsistency(t *testing.T) { + t.Parallel() + + endpoints := make([]Endpoint, 0, 15) + azs := []string{"az-a", "az-b", "az-c"} + for _, az := range azs { + for ord := 0; ord < 5; ord++ { + endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) + } + } + + baseRing, err := newRendezvousHashring(endpoints, 3) + require.NoError(t, err) + + cfg := ShuffleShardingConfig{ + ShardSize: 6, + } + shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-consistency") + require.NoError(t, err) + + tenant := "consistent-tenant" + var firstShards []int + + for trial := 0; trial < 10; trial++ { + shard, err := shardRing.getTenantShardRendezvous(tenant) + require.NoError(t, err) + currentShards := extractShardsFromSubring(t, shard) + if firstShards == nil { + firstShards = currentShards + } else { + require.Equal(t, firstShards, currentShards, + "same tenant should always get same shards") + } + } +} + +func TestRendezvousShuffleShardingDifferentTenants(t *testing.T) { + t.Parallel() + + endpoints := make([]Endpoint, 0, 30) + azs := []string{"az-a", "az-b", "az-c"} + for _, az := range azs { + for ord := 0; ord < 10; ord++ { + endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) + } + } + + baseRing, err := newRendezvousHashring(endpoints, 3) + require.NoError(t, err) + + cfg := ShuffleShardingConfig{ + ShardSize: 9, + } + shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-diff-tenants") + require.NoError(t, err) + + tenantShards := make(map[string][]int) + numTenants := 20 + + for i := 0; i < numTenants; i++ { + tenant := fmt.Sprintf("tenant-%d", i) + shard, err := shardRing.getTenantShardRendezvous(tenant) + require.NoError(t, err) + tenantShards[tenant] = extractShardsFromSubring(t, shard) + } + + uniqueSets := make(map[string]int) + for _, shards := range tenantShards { + key := fmt.Sprintf("%v", shards) + uniqueSets[key]++ + } + + require.Greater(t, len(uniqueSets), 1, "different tenants should get different shard sets") +} + +func TestRendezvousShuffleShardingPreservesAlignment(t *testing.T) { + t.Parallel() + + endpoints := make([]Endpoint, 0, 15) + azs := []string{"az-a", "az-b", "az-c"} + for _, az := range azs { + for ord := 0; ord < 5; ord++ { + endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) + } + } + + baseRing, err := newRendezvousHashring(endpoints, 3) + require.NoError(t, err) + + cfg := ShuffleShardingConfig{ + ShardSize: 6, + } + shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-preserves") + require.NoError(t, err) + + tenant := "alignment-test-tenant" + + for i := 0; i < 100; i++ { + ts := &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + {Name: "series", Value: fmt.Sprintf("series-%d", i)}, + }, + } + + var replicas []Endpoint + for n := uint64(0); n < 3; n++ { + ep, err := shardRing.GetN(tenant, ts, n) + require.NoError(t, err) + replicas = append(replicas, ep) + } + + // All replicas should have the same shard but different AZs. + shardsSeen := make(map[int]struct{}) + azsSeen := make(map[string]struct{}) + for _, ep := range replicas { + shardsSeen[extractShardFromAddress(t, ep.Address)] = struct{}{} + azsSeen[ep.AZ] = struct{}{} + } + + require.Len(t, shardsSeen, 1, "all replicas should have the same shard for series %d", i) + require.Len(t, azsSeen, 3, "replicas should span all 3 AZs for series %d", i) + } +} + +func TestRendezvousShuffleShardingDataDistribution(t *testing.T) { + t.Parallel() + + endpoints := make([]Endpoint, 0, 15) + azs := []string{"az-a", "az-b", "az-c"} + for _, az := range azs { + for ord := 0; ord < 5; ord++ { + endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) + } + } + + baseRing, err := newRendezvousHashring(endpoints, 3) + require.NoError(t, err) + + cfg := ShuffleShardingConfig{ + ShardSize: 6, + } + shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-distribution") + require.NoError(t, err) + + tenant := "distribution-test-tenant" + + shard, err := shardRing.getTenantShardRendezvous(tenant) + require.NoError(t, err) + selectedShards := extractShardsFromSubring(t, shard) + require.Len(t, selectedShards, 2, "tenant should have exactly 2 shards") + + seriesByShard := make(map[int]map[int]struct{}) + for _, ord := range selectedShards { + seriesByShard[ord] = make(map[int]struct{}) + } + + seriesByEndpoint := make(map[string]map[int]struct{}) + + numSeries := 1000 + for i := 0; i < numSeries; i++ { + ts := &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + {Name: "series", Value: fmt.Sprintf("series-%d", i)}, + {Name: "__name__", Value: "test_metric"}, + }, + } + + var replicas []Endpoint + for n := uint64(0); n < 3; n++ { + ep, err := shardRing.GetN(tenant, ts, n) + require.NoError(t, err) + replicas = append(replicas, ep) + + if seriesByEndpoint[ep.Address] == nil { + seriesByEndpoint[ep.Address] = make(map[int]struct{}) + } + seriesByEndpoint[ep.Address][i] = struct{}{} + } + + // All replicas should have the same shard. + primaryShard := extractShardFromAddress(t, replicas[0].Address) + for _, ep := range replicas[1:] { + epShard := extractShardFromAddress(t, ep.Address) + require.Equal(t, primaryShard, epShard, "all replicas for series %d should have same shard", i) + } + + seriesByShard[primaryShard][i] = struct{}{} + } + + // Both shards should receive series. + for ord, series := range seriesByShard { + require.Greater(t, len(series), 0, "shard %d should receive some series", ord) + } + + // Same shard across different AZs should receive the same series. + endpointsByShard := make(map[int][]string) + for addr := range seriesByEndpoint { + ord := extractShardFromAddress(t, addr) + endpointsByShard[ord] = append(endpointsByShard[ord], addr) + } + + for ord, addrs := range endpointsByShard { + if len(addrs) < 2 { + continue + } + referenceSeries := seriesByEndpoint[addrs[0]] + for _, addr := range addrs[1:] { + otherSeries := seriesByEndpoint[addr] + require.Equal(t, len(referenceSeries), len(otherSeries), + "endpoints with shard %d should have same number of series", ord) + for seriesIdx := range referenceSeries { + _, ok := otherSeries[seriesIdx] + require.True(t, ok, "series %d should be on all endpoints with shard %d", seriesIdx, ord) + } + } + } + + // Different shards should receive different series. + shardList := make([]int, 0, len(seriesByShard)) + for ord := range seriesByShard { + shardList = append(shardList, ord) + } + if len(shardList) >= 2 { + series1 := seriesByShard[shardList[0]] + series2 := seriesByShard[shardList[1]] + for seriesIdx := range series1 { + _, overlap := series2[seriesIdx] + require.False(t, overlap, + "series %d should not be on both shard %d and shard %d", + seriesIdx, shardList[0], shardList[1]) + } + } +} + +func TestRendezvousShuffleShardingValidation(t *testing.T) { + t.Parallel() + + endpoints := make([]Endpoint, 0, 15) + azs := []string{"az-a", "az-b", "az-c"} + for _, az := range azs { + for ord := 0; ord < 5; ord++ { + endpoints = append(endpoints, makeK8sEndpoint("pod-"+az, ord, az)) + } + } + + baseRing, err := newRendezvousHashring(endpoints, 3) + require.NoError(t, err) + + cfg := ShuffleShardingConfig{ + ShardSize: 30, // 30 / 3 AZs = 10 per-AZ, but only 5 shards available + } + shardRing, err := newShuffleShardHashring(baseRing, cfg, 3, prometheus.NewRegistry(), "test-validation") + require.NoError(t, err) + + _, err = shardRing.getTenantShardRendezvous("test-tenant") + require.Error(t, err) + require.Contains(t, err.Error(), "exceeds available common shards") +} diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 90192d0fac8..fcaeb81e954 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -12,6 +12,7 @@ import ( "io" "sort" "strings" + "sync" "unsafe" "github.com/cespare/xxhash/v2" @@ -350,10 +351,21 @@ func DeepCopy(lbls []ZLabel) []ZLabel { return ret } +var hashBufPool = sync.Pool{ + New: func() any { + b := make([]byte, 0, 1024) + return &b + }, +} + +const maxHashBufPoolCap = 4096 + // HashWithPrefix returns a hash for the given prefix and labels. func HashWithPrefix(prefix string, lbls []ZLabel) uint64 { // Use xxhash.Sum64(b) for fast path as it's faster. - b := make([]byte, 0, 1024) + bp := hashBufPool.Get().(*[]byte) + b := (*bp)[:0] + b = append(b, prefix...) b = append(b, sep[0]) @@ -368,6 +380,10 @@ func HashWithPrefix(prefix string, lbls []ZLabel) uint64 { _, _ = h.WriteString(v.Value) _, _ = h.Write(sep) } + if cap(b) <= maxHashBufPoolCap { + *bp = b + hashBufPool.Put(bp) + } return h.Sum64() } b = append(b, v.Name...) @@ -375,7 +391,12 @@ func HashWithPrefix(prefix string, lbls []ZLabel) uint64 { b = append(b, v.Value...) b = append(b, sep[0]) } - return xxhash.Sum64(b) + result := xxhash.Sum64(b) + if cap(b) <= maxHashBufPoolCap { + *bp = b + hashBufPool.Put(bp) + } + return result } // ValidateLabels validates label names and values (checks for empty