diff --git a/CHANGELOG.md b/CHANGELOG.md index 59c8029171..2960d8c6a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added +- [#8238](https://github.com/thanos-io/thanos/pull/8238) Receive: add shuffle sharding support + ### Changed - [#8192](https://github.com/thanos-io/thanos/pull/8192) Sidecar: fix default get config timeout diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index a5d8b46ef5..a6602bdd10 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -593,7 +593,7 @@ func setupHashring(g *run.Group, webHandler.Hashring(receive.SingleNodeHashring(conf.endpoint)) level.Info(logger).Log("msg", "Empty hashring config. Set up single node hashring.") } else { - h, err := receive.NewMultiHashring(algorithm, conf.replicationFactor, c) + h, err := receive.NewMultiHashring(algorithm, conf.replicationFactor, c, reg) if err != nil { return errors.Wrap(err, "unable to create new hashring from config") } diff --git a/docs/components/receive.md b/docs/components/receive.md index 110599762e..8448be3afc 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -22,6 +22,48 @@ The Ketama algorithm is a consistent hashing scheme which enables stable scaling If you are using the `hashmod` algorithm and wish to migrate to `ketama`, the simplest and safest way would be to set up a new pool receivers with `ketama` hashrings and start remote-writing to them. Provided you are on the latest Thanos version, old receivers will flush their TSDBs after the configured retention period and will upload blocks to object storage. Once you have verified that is done, decommission the old receivers. +#### Shuffle sharding + +Ketama also supports [shuffle sharding](https://aws.amazon.com/builders-library/workload-isolation-using-shuffle-sharding/). It allows you to provide a single-tenant experience in a multi-tenant system. With shuffle sharding, a tenant gets a subset of all nodes in a hashring. You can configure shuffle sharding for any Ketama hashring like so: + +```json +[ + { + "endpoints": [ + {"address": "node-1:10901", "capnproto_address": "node-1:19391", "az": "foo"}, + {"address": "node-2:10901", "capnproto_address": "node-2:19391", "az": "bar"}, + {"address": "node-3:10901", "capnproto_address": "node-3:19391", "az": "qux"}, + {"address": "node-4:10901", "capnproto_address": "node-4:19391", "az": "foo"}, + {"address": "node-5:10901", "capnproto_address": "node-5:19391", "az": "bar"}, + {"address": "node-6:10901", "capnproto_address": "node-6:19391", "az": "qux"} + ], + "algorithm": "ketama", + "shuffle_sharding_config": { + "shard_size": 2, + "cache_size": 100, + "overrides": [ + { + "shard_size": 3, + "tenants": ["prefix-tenant-*"], + "tenant_matcher_type": "glob" + } + ] + } + } +] +``` + +This will enable shuffle sharding with the default shard size of 2 and override it to 3 for every tenant that starts with `prefix-tenant-`. + +`cache_size` sets the size of the in-memory LRU cache of the computed subrings. It is not possible to cache everything because an attacker could possibly +spam requests with random tenants and those subrings would stay in-memory forever. + +With this config, `shard_size/number_of_azs` is chosen from each availability zone for each tenant. So, each tenant will get a unique and consistent set of 3 nodes. + +You can use `zone_awareness_disabled` to disable zone awareness. This is useful in the case where you have many separate AZs and it doesn't matter which one to choose. The shards will ignore AZs but the Ketama algorithm will later prefer spreading load through as many AZs as possible. That's why with zone awareness disabled it is recommended to set the shard size to be `max(nodes_in_any_az, replication_factor)`. + +Receive only supports stateless shuffle sharding now so it doesn't store and check there have been any overlaps between shards. + ### Hashmod (discouraged) This algorithm uses a `hashmod` function over all labels to decide which receiver is responsible for a given timeseries. This is the default algorithm due to historical reasons. However, its usage for new Receive installations is discouraged since adding new Receiver nodes leads to series churn and memory usage spikes. diff --git a/pkg/receive/config.go b/pkg/receive/config.go index f453cb2263..5455ba2b37 100644 --- a/pkg/receive/config.go +++ b/pkg/receive/config.go @@ -42,6 +42,27 @@ const ( DefaultCapNProtoPort string = "19391" ) +type endpoints []Endpoint + +func (e endpoints) Len() int { + return len(e) +} + +func (e endpoints) Less(i, j int) bool { + // Sort by address first, then by CapNProtoAddress. + // First sort by address, then by CapNProtoAddress, then by AZ. + if e[i].Address == e[j].Address { + if e[i].CapNProtoAddress == e[j].CapNProtoAddress { + return e[i].AZ < e[j].AZ + } + return e[i].CapNProtoAddress < e[j].CapNProtoAddress + } + return e[i].Address < e[j].Address +} +func (e endpoints) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} + type Endpoint struct { Address string `json:"address"` CapNProtoAddress string `json:"capnproto_address"` @@ -104,6 +125,23 @@ type HashringConfig struct { Endpoints []Endpoint `json:"endpoints"` Algorithm HashringAlgorithm `json:"algorithm,omitempty"` ExternalLabels labels.Labels `json:"external_labels,omitempty"` + // If non-zero then enable shuffle sharding. + ShuffleShardingConfig ShuffleShardingConfig `json:"shuffle_sharding_config,omitempty"` +} + +type ShuffleShardingOverrideConfig struct { + ShardSize int `json:"shard_size"` + Tenants []string `json:"tenants,omitempty"` + TenantMatcherType tenantMatcher `json:"tenant_matcher_type,omitempty"` +} + +type ShuffleShardingConfig struct { + ShardSize int `json:"shard_size"` + CacheSize int `json:"cache_size"` + // ZoneAwarenessDisabled disables zone awareness. We still try to spread the load + // across the available zones, but we don't try to balance the shards across zones. + ZoneAwarenessDisabled bool `json:"zone_awareness_disabled"` + Overrides []ShuffleShardingOverrideConfig `json:"overrides,omitempty"` } type tenantMatcher string diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b217b3cb6c..c900f7169d 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -324,6 +324,8 @@ func (h *Handler) Hashring(hashring Hashring) { level.Error(h.logger).Log("msg", "closing gRPC connection failed, we might have leaked a file descriptor", "addr", node, "err", err.Error()) } } + + h.hashring.Close() } h.hashring = hashring diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index e49a3f28c2..d2d0a3ccad 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -281,7 +281,7 @@ func newTestHandlerHashring( hashringAlgo = AlgorithmHashmod } - hashring, err := NewMultiHashring(hashringAlgo, replicationFactor, cfg) + hashring, err := NewMultiHashring(hashringAlgo, replicationFactor, cfg, prometheus.NewRegistry()) if err != nil { return nil, nil, nil, err } diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 0199b0fe1a..257f72dae5 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -4,19 +4,26 @@ package receive import ( + "crypto/md5" + "encoding/binary" "fmt" "math" + "math/rand" "path/filepath" "slices" "sort" "strconv" "strings" "sync" + "unsafe" "github.com/cespare/xxhash/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" + lru "github.com/hashicorp/golang-lru/v2" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" @@ -56,11 +63,15 @@ type Hashring interface { // Nodes returns a sorted slice of nodes that are in this hashring. Addresses could be duplicated // if, for example, the same address is used for multiple tenants in the multi-hashring. Nodes() []Endpoint + + Close() } // SingleNodeHashring always returns the same node. type SingleNodeHashring string +func (s SingleNodeHashring) Close() {} + func (s SingleNodeHashring) Nodes() []Endpoint { return []Endpoint{{Address: string(s), CapNProtoAddress: string(s)}} } @@ -79,6 +90,8 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (Endp // simpleHashring represents a group of nodes handling write requests by hashmoding individual series. type simpleHashring []Endpoint +func (s simpleHashring) Close() {} + func newSimpleHashring(endpoints []Endpoint) (Hashring, error) { for i := range endpoints { if endpoints[i].AZ != "" { @@ -131,6 +144,8 @@ type ketamaHashring struct { numEndpoints uint64 } +func (s ketamaHashring) Close() {} + func newKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) { numSections := len(endpoints) * sectionsPerNode @@ -279,6 +294,12 @@ type multiHashring struct { nodes []Endpoint } +func (s *multiHashring) Close() { + for _, h := range s.hashrings { + h.Close() + } +} + // Get returns a target to handle the given tenant and time series. func (m *multiHashring) Get(tenant string, ts *prompb.TimeSeries) (Endpoint, error) { return m.GetN(tenant, ts, 0) @@ -328,11 +349,306 @@ func (m *multiHashring) Nodes() []Endpoint { return m.nodes } +// shuffleShardHashring wraps a hashring implementation and applies shuffle sharding logic +// to limit which nodes are used for each tenant. +type shuffleShardHashring struct { + baseRing Hashring + + shuffleShardingConfig ShuffleShardingConfig + + replicationFactor uint64 + + nodes []Endpoint + + cache *lru.Cache[string, *ketamaHashring] + + metrics *shuffleShardCacheMetrics +} + +func (s *shuffleShardHashring) Close() { + s.metrics.close() +} + +func (s *shuffleShardCacheMetrics) close() { + s.reg.Unregister(s.requestsTotal) + s.reg.Unregister(s.hitsTotal) + s.reg.Unregister(s.numItems) + s.reg.Unregister(s.maxItems) + s.reg.Unregister(s.evicted) +} + +type shuffleShardCacheMetrics struct { + requestsTotal prometheus.Counter + hitsTotal prometheus.Counter + numItems prometheus.Gauge + maxItems prometheus.Gauge + evicted prometheus.Counter + + reg prometheus.Registerer +} + +func newShuffleShardCacheMetrics(reg prometheus.Registerer, hashringName string) *shuffleShardCacheMetrics { + reg = prometheus.WrapRegistererWith(prometheus.Labels{"hashring": hashringName}, reg) + + return &shuffleShardCacheMetrics{ + reg: reg, + requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_shuffle_shard_cache_requests_total", + Help: "Total number of cache requests for shuffle shard subrings", + }), + hitsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_shuffle_shard_cache_hits_total", + Help: "Total number of cache hits for shuffle shard subrings", + }), + numItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_shuffle_shard_cache_items", + Help: "Total number of cached items", + }), + maxItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_shuffle_shard_cache_max_items", + Help: "Maximum number of items that can be cached", + }), + evicted: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_shuffle_shard_cache_evicted_total", + Help: "Total number of items evicted from the cache", + }), + } +} + +// newShuffleShardHashring creates a new shuffle sharding hashring wrapper. +func newShuffleShardHashring(baseRing Hashring, shuffleShardingConfig ShuffleShardingConfig, replicationFactor uint64, reg prometheus.Registerer, name string) (*shuffleShardHashring, error) { + l := log.NewNopLogger() + + level.Info(l).Log( + "msg", "Creating shuffle sharding hashring", + "default_shard_size", shuffleShardingConfig.ShardSize, + "total_nodes", len(baseRing.Nodes()), + ) + + if len(shuffleShardingConfig.Overrides) > 0 { + for _, override := range shuffleShardingConfig.Overrides { + level.Info(l).Log( + "msg", "Tenant shard size override", + "tenants", override.Tenants, + "tenant_matcher_type", override.TenantMatcherType, + "shard_size", override.ShardSize, + ) + } + } + + const DefaultShuffleShardingCacheSize = 100 + + if shuffleShardingConfig.CacheSize <= 0 { + shuffleShardingConfig.CacheSize = DefaultShuffleShardingCacheSize + } + + metrics := newShuffleShardCacheMetrics(reg, name) + metrics.maxItems.Set(float64(shuffleShardingConfig.CacheSize)) + + cache, err := lru.NewWithEvict[string, *ketamaHashring](shuffleShardingConfig.CacheSize, func(key string, value *ketamaHashring) { + metrics.evicted.Inc() + metrics.numItems.Dec() + }) + if err != nil { + return nil, err + } + + ssh := &shuffleShardHashring{ + baseRing: baseRing, + shuffleShardingConfig: shuffleShardingConfig, + replicationFactor: replicationFactor, + cache: cache, + metrics: metrics, + } + + // Dedupe nodes as the base ring may have duplicates. We are only interested in unique nodes. + ssh.nodes = ssh.dedupedNodes() + + nodeCountByAZ := make(map[string]int) + for _, node := range ssh.nodes { + var az string = node.AZ + if shuffleShardingConfig.ZoneAwarenessDisabled { + az = "" + } + nodeCountByAZ[az]++ + } + + maxNodesInAZ := 0 + for _, count := range nodeCountByAZ { + maxNodesInAZ = max(maxNodesInAZ, count) + } + + if shuffleShardingConfig.ShardSize > maxNodesInAZ { + level.Warn(l).Log( + "msg", "Shard size is larger than the maximum number of nodes in any AZ; some tenants might get all not working nodes if that AZ goes down", + "shard_size", shuffleShardingConfig.ShardSize, + "max_nodes_in_az", maxNodesInAZ, + ) + } + + for _, override := range shuffleShardingConfig.Overrides { + if override.ShardSize < maxNodesInAZ { + continue + } + level.Warn(l).Log( + "msg", "Shard size is larger than the maximum number of nodes in any AZ; some tenants might get all not working nodes if that AZ goes down", + "max_nodes_in_az", maxNodesInAZ, + "shard_size", override.ShardSize, + "tenants", override.Tenants, + "tenant_matcher_type", override.TenantMatcherType, + ) + } + return ssh, nil +} + +func (s *shuffleShardHashring) Nodes() []Endpoint { + return s.nodes +} + +func (s *shuffleShardHashring) dedupedNodes() []Endpoint { + uniqueNodes := make(map[Endpoint]struct{}) + for _, node := range s.baseRing.Nodes() { + uniqueNodes[node] = struct{}{} + } + + // Convert the map back to a slice + nodes := make(endpoints, 0, len(uniqueNodes)) + for node := range uniqueNodes { + nodes = append(nodes, node) + } + + sort.Sort(nodes) + + return nodes +} + +// getShardSize returns the shard size for a specific tenant, taking into account any overrides. +func (s *shuffleShardHashring) getShardSize(tenant string) int { + for _, override := range s.shuffleShardingConfig.Overrides { + if override.TenantMatcherType == TenantMatcherTypeExact { + for _, t := range override.Tenants { + if t == tenant { + return override.ShardSize + } + } + } else if override.TenantMatcherType == TenantMatcherGlob { + for _, t := range override.Tenants { + matches, err := filepath.Match(t, tenant) + if err == nil && matches { + return override.ShardSize + } + } + } + } + + // Default shard size is used if no overrides match + return s.shuffleShardingConfig.ShardSize +} + +// ShuffleShardExpectedInstancesPerZone returns the expected number of instances per zone for a given shard size and number of zones. +// Copied from Cortex. Copyright Cortex Authors. +func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int { + return int(math.Ceil(float64(shardSize) / float64(numZones))) +} + +var ( + seedSeparator = []byte{0} +) + +// yoloBuf will return an unsafe pointer to a string, as the name yoloBuf implies. Use at your own risk. +func yoloBuf(s string) []byte { + return *((*[]byte)(unsafe.Pointer(&s))) +} + +// ShuffleShardSeed returns seed for random number generator, computed from provided identifier. +// Copied from Cortex. Copyright Cortex Authors. +func ShuffleShardSeed(identifier, zone string) int64 { + // Use the identifier to compute a hash we'll use to seed the random. + hasher := md5.New() + hasher.Write(yoloBuf(identifier)) // nolint:errcheck + if zone != "" { + hasher.Write(seedSeparator) // nolint:errcheck + hasher.Write(yoloBuf(zone)) // nolint:errcheck + } + checksum := hasher.Sum(nil) + + // Generate the seed based on the first 64 bits of the checksum. + return int64(binary.BigEndian.Uint64(checksum)) +} + +func (s *shuffleShardHashring) getTenantShardCached(tenant string) (*ketamaHashring, error) { + s.metrics.requestsTotal.Inc() + + cached, ok := s.cache.Get(tenant) + if ok { + s.metrics.hitsTotal.Inc() + return cached, nil + } + + h, err := s.getTenantShard(tenant) + if err != nil { + return nil, err + } + + s.metrics.numItems.Inc() + s.cache.Add(tenant, h) + + return h, nil +} + +// getTenantShard returns or creates a consistent subset of nodes for a tenant. +func (s *shuffleShardHashring) getTenantShard(tenant string) (*ketamaHashring, error) { + nodes := s.Nodes() + nodesByAZ := make(map[string][]Endpoint) + for _, node := range nodes { + var az = node.AZ + if s.shuffleShardingConfig.ZoneAwarenessDisabled { + az = "" + } + nodesByAZ[az] = append(nodesByAZ[az], node) + } + + ss := s.getShardSize(tenant) + var take int + if s.shuffleShardingConfig.ZoneAwarenessDisabled { + take = ss + } else { + take = ShuffleShardExpectedInstancesPerZone(ss, len(nodesByAZ)) + } + + var finalNodes = make([]Endpoint, 0, take*len(nodesByAZ)) + for az, azNodes := range nodesByAZ { + seed := ShuffleShardSeed(tenant, az) + r := rand.New(rand.NewSource(seed)) + r.Shuffle(len(azNodes), func(i, j int) { + azNodes[i], azNodes[j] = azNodes[j], azNodes[i] + }) + + if take > len(azNodes) { + return nil, fmt.Errorf("shard size %d is larger than number of nodes in AZ %s (%d)", ss, az, len(azNodes)) + } + + finalNodes = append(finalNodes, azNodes[:take]...) + } + + return newKetamaHashring(finalNodes, SectionsPerNode, s.replicationFactor) +} + +// GetN returns the nth endpoint for a tenant and time series, respecting the shuffle sharding. +func (s *shuffleShardHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (Endpoint, error) { + h, err := s.getTenantShardCached(tenant) + if err != nil { + return Endpoint{}, err + } + + return h.GetN(tenant, ts, n) +} + // newMultiHashring creates a multi-tenant hashring for a given slice of // groups. // Which hashring to use for a tenant is determined // by the tenants field of the hashring configuration. -func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error) { +func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig, reg prometheus.Registerer) (Hashring, error) { m := &multiHashring{ cache: make(map[string]Hashring), } @@ -344,7 +660,7 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg if h.Algorithm != "" { activeAlgorithm = h.Algorithm } - hashring, err = newHashring(activeAlgorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants) + hashring, err = newHashring(activeAlgorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants, h.ShuffleShardingConfig, reg) if err != nil { return nil, err } @@ -365,17 +681,38 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg return m, nil } -func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) { +func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationFactor uint64, hashring string, tenants []string, shuffleShardingConfig ShuffleShardingConfig, reg prometheus.Registerer) (Hashring, error) { + switch algorithm { case AlgorithmHashmod: - return newSimpleHashring(endpoints) + ringImpl, err := newSimpleHashring(endpoints) + if err != nil { + return nil, err + } + if shuffleShardingConfig.ShardSize > 0 { + return nil, fmt.Errorf("hashmod algorithm does not support shuffle sharding. Either use Ketama or remove shuffle sharding configuration") + } + return ringImpl, nil case AlgorithmKetama: - return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor) + ringImpl, err := newKetamaHashring(endpoints, SectionsPerNode, replicationFactor) + if err != nil { + return nil, err + } + if shuffleShardingConfig.ShardSize > 0 { + if shuffleShardingConfig.ShardSize > len(endpoints) { + return nil, fmt.Errorf("shard size %d is larger than number of nodes in hashring %s (%d)", shuffleShardingConfig.ShardSize, hashring, len(endpoints)) + } + return newShuffleShardHashring(ringImpl, shuffleShardingConfig, replicationFactor, reg, hashring) + } + return ringImpl, nil default: l := log.NewNopLogger() level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.", "hashring", hashring, "tenants", tenants) + if shuffleShardingConfig.ShardSize > 0 { + return nil, fmt.Errorf("hashmod algorithm does not support shuffle sharding. Either use Ketama or remove shuffle sharding configuration") + } return newSimpleHashring(endpoints) } } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 3a77372aee..8f64f7871e 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -12,6 +12,7 @@ import ( "github.com/efficientgo/core/testutil" "github.com/stretchr/testify/require" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -198,7 +199,7 @@ func TestHashringGet(t *testing.T) { tenant: "t2", }, } { - hs, err := NewMultiHashring(AlgorithmHashmod, 3, tc.cfg) + hs, err := NewMultiHashring(AlgorithmHashmod, 3, tc.cfg, prometheus.NewRegistry()) require.NoError(t, err) h, err := hs.GetN(tc.tenant, ts, 0) @@ -661,16 +662,200 @@ func TestInvalidAZHashringCfg(t *testing.T) { { cfg: []HashringConfig{{Endpoints: []Endpoint{{Address: "a", AZ: "1"}, {Address: "b", AZ: "2"}}}}, replicas: 2, + algorithm: AlgorithmHashmod, expectedError: "Hashmod algorithm does not support AZ aware hashring configuration. Either use Ketama or remove AZ configuration.", }, } { t.Run("", func(t *testing.T) { - _, err := NewMultiHashring(tt.algorithm, tt.replicas, tt.cfg) + _, err := NewMultiHashring(tt.algorithm, tt.replicas, tt.cfg, prometheus.NewRegistry()) require.EqualError(t, err, tt.expectedError) }) } } +func TestShuffleShardHashring(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + endpoints []Endpoint + tenant string + shuffleShardCfg ShuffleShardingConfig + err string + usedNodes int + nodeAddrs map[string]struct{} + }{ + { + usedNodes: 3, + name: "ketama with shuffle sharding", + endpoints: []Endpoint{ + {Address: "node-1", AZ: "az-1"}, + {Address: "node-2", AZ: "az-1"}, + {Address: "node-3", AZ: "az-2"}, + {Address: "node-4", AZ: "az-2"}, + {Address: "node-5", AZ: "az-3"}, + {Address: "node-6", AZ: "az-3"}, + }, + tenant: "tenant-1", + shuffleShardCfg: ShuffleShardingConfig{ + ShardSize: 2, + Overrides: []ShuffleShardingOverrideConfig{ + { + Tenants: []string{"special-tenant"}, + ShardSize: 2, + }, + }, + }, + }, + { + usedNodes: 3, + name: "ketama with glob tenant override", + endpoints: []Endpoint{ + {Address: "node-1", AZ: "az-1"}, + {Address: "node-2", AZ: "az-1"}, + {Address: "node-3", AZ: "az-2"}, + {Address: "node-4", AZ: "az-2"}, + {Address: "node-5", AZ: "az-3"}, + {Address: "node-6", AZ: "az-3"}, + }, + tenant: "prefix-tenant", + shuffleShardCfg: ShuffleShardingConfig{ + ShardSize: 2, + Overrides: []ShuffleShardingOverrideConfig{ + { + Tenants: []string{"prefix*"}, + ShardSize: 3, + TenantMatcherType: TenantMatcherGlob, + }, + }, + }, + }, + { + name: "big shard size", + endpoints: []Endpoint{ + {Address: "node-1", AZ: "az-1"}, + {Address: "node-2", AZ: "az-1"}, + {Address: "node-3", AZ: "az-2"}, + {Address: "node-4", AZ: "az-2"}, + {Address: "node-5", AZ: "az-3"}, + {Address: "node-6", AZ: "az-3"}, + }, + tenant: "prefix-tenant", + err: `shard size 20 is larger than number of nodes in AZ`, + shuffleShardCfg: ShuffleShardingConfig{ + ShardSize: 2, + Overrides: []ShuffleShardingOverrideConfig{ + { + Tenants: []string{"prefix*"}, + ShardSize: 20, + TenantMatcherType: TenantMatcherGlob, + }, + }, + }, + }, + { + name: "zone awareness disabled", + endpoints: []Endpoint{ + {Address: "node-1", AZ: "az-1"}, + {Address: "node-2", AZ: "az-1"}, + {Address: "node-3", AZ: "az-2"}, + {Address: "node-4", AZ: "az-2"}, + {Address: "node-5", AZ: "az-2"}, + {Address: "node-6", AZ: "az-2"}, + {Address: "node-7", AZ: "az-3"}, + {Address: "node-8", AZ: "az-3"}, + }, + tenant: "prefix-tenant", + usedNodes: 3, + nodeAddrs: map[string]struct{}{ + "node-1": {}, + "node-2": {}, + "node-6": {}, + }, + shuffleShardCfg: ShuffleShardingConfig{ + ShardSize: 1, + ZoneAwarenessDisabled: true, + Overrides: []ShuffleShardingOverrideConfig{ + { + Tenants: []string{"prefix*"}, + ShardSize: 3, + TenantMatcherType: TenantMatcherGlob, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var baseRing Hashring + var err error + + baseRing, err = newKetamaHashring(tc.endpoints, SectionsPerNode, 2) + require.NoError(t, err) + + // Create the shuffle shard hashring + shardRing, err := newShuffleShardHashring(baseRing, tc.shuffleShardCfg, 2, prometheus.NewRegistry(), "test") + require.NoError(t, err) + + // Test that the shuffle sharding is consistent + usedNodes := make(map[string]struct{}) + + // We'll sample multiple times to ensure consistency + for i := 0; i < 100; i++ { + ts := &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + { + Name: "iteration", + Value: fmt.Sprintf("%d", i), + }, + }, + } + + h, err := shardRing.GetN(tc.tenant, ts, 0) + if tc.err != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.err) + return + } + require.NoError(t, err) + usedNodes[h.Address] = struct{}{} + } + + require.Len(t, usedNodes, tc.usedNodes) + if tc.nodeAddrs != nil { + require.Len(t, usedNodes, len(tc.nodeAddrs)) + require.Equal(t, tc.nodeAddrs, usedNodes) + } + + // Test consistency - same tenant should always get same nodes. + for trial := 0; trial < 50; trial++ { + trialNodes := make(map[string]struct{}) + + for i := 0; i < 10+trial; i++ { + ts := &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + { + Name: "iteration", + Value: fmt.Sprintf("%d", i), + }, + { + Name: "trial", + Value: fmt.Sprintf("%d", trial), + }, + }, + } + + h, err := shardRing.GetN(tc.tenant, ts, 0) + require.NoError(t, err) + trialNodes[h.Address] = struct{}{} + } + + // Same tenant should get same set of nodes in every trial + require.Equal(t, usedNodes, trialNodes, "Inconsistent node sharding between trials") + } + }) + } +} + func makeSeries() []prompb.TimeSeries { numSeries := 10000 series := make([]prompb.TimeSeries, numSeries)