Skip to content

Commit 36ad534

Browse files
committed
receive: implement shuffle sharding
See the documentation for details. Closes #3821. Signed-off-by: Giedrius Statkevičius <[email protected]>
1 parent 2a5a856 commit 36ad534

File tree

8 files changed

+615
-9
lines changed

8 files changed

+615
-9
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
1515

1616
### Added
1717

18+
- [#8238](https://github.com/thanos-io/thanos/pull/8238) Receive: add shuffle sharding support
19+
1820
### Changed
1921

2022
- [#8192](https://github.com/thanos-io/thanos/pull/8192) Sidecar: fix default get config timeout

cmd/thanos/receive.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ func setupHashring(g *run.Group,
593593
webHandler.Hashring(receive.SingleNodeHashring(conf.endpoint))
594594
level.Info(logger).Log("msg", "Empty hashring config. Set up single node hashring.")
595595
} else {
596-
h, err := receive.NewMultiHashring(algorithm, conf.replicationFactor, c)
596+
h, err := receive.NewMultiHashring(algorithm, conf.replicationFactor, c, reg)
597597
if err != nil {
598598
return errors.Wrap(err, "unable to create new hashring from config")
599599
}

docs/components/receive.md

+42
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,48 @@ The Ketama algorithm is a consistent hashing scheme which enables stable scaling
2222

2323
If you are using the `hashmod` algorithm and wish to migrate to `ketama`, the simplest and safest way would be to set up a new pool receivers with `ketama` hashrings and start remote-writing to them. Provided you are on the latest Thanos version, old receivers will flush their TSDBs after the configured retention period and will upload blocks to object storage. Once you have verified that is done, decommission the old receivers.
2424

25+
#### Shuffle sharding
26+
27+
Ketama also supports [shuffle sharding](https://aws.amazon.com/builders-library/workload-isolation-using-shuffle-sharding/). It allows you to provide a single-tenant experience in a multi-tenant system. With shuffle sharding, a tenant gets a subset of all nodes in a hashring. You can configure shuffle sharding for any Ketama hashring like so:
28+
29+
```json
30+
[
31+
{
32+
"endpoints": [
33+
{"address": "node-1:10901", "capnproto_address": "node-1:19391", "az": "foo"},
34+
{"address": "node-2:10901", "capnproto_address": "node-2:19391", "az": "bar"},
35+
{"address": "node-3:10901", "capnproto_address": "node-3:19391", "az": "qux"},
36+
{"address": "node-4:10901", "capnproto_address": "node-4:19391", "az": "foo"},
37+
{"address": "node-5:10901", "capnproto_address": "node-5:19391", "az": "bar"},
38+
{"address": "node-6:10901", "capnproto_address": "node-6:19391", "az": "qux"}
39+
],
40+
"algorithm": "ketama",
41+
"shuffle_sharding_config": {
42+
"shard_size": 2,
43+
"cache_size": 100,
44+
"overrides": [
45+
{
46+
"shard_size": 3,
47+
"tenants": ["prefix-tenant-*"],
48+
"tenant_matcher_type": "glob"
49+
}
50+
]
51+
}
52+
}
53+
]
54+
```
55+
56+
This will enable shuffle sharding with the default shard size of 2 and override it to 3 for every tenant that starts with `prefix-tenant-`.
57+
58+
`cache_size` sets the size of the in-memory LRU cache of the computed subrings. It is not possible to cache everything because an attacker could possibly
59+
spam requests with random tenants and those subrings would stay in-memory forever.
60+
61+
With this config, `shard_size/number_of_azs` is chosen from each availability zone for each tenant. So, each tenant will get a unique and consistent set of 3 nodes.
62+
63+
You can use `zone_awareness_disabled` to disable zone awareness. This is useful in the case where you have many separate AZs and it doesn't matter which one to choose. The shards will ignore AZs but the Ketama algorithm will later prefer spreading load through as many AZs as possible. That's why with zone awareness disabled it is recommended to set the shard size to be `max(nodes_in_any_az, replication_factor)`.
64+
65+
Receive only supports stateless shuffle sharding now so it doesn't store and check there have been any overlaps between shards.
66+
2567
### Hashmod (discouraged)
2668

2769
This algorithm uses a `hashmod` function over all labels to decide which receiver is responsible for a given timeseries. This is the default algorithm due to historical reasons. However, its usage for new Receive installations is discouraged since adding new Receiver nodes leads to series churn and memory usage spikes.

pkg/receive/config.go

+38
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,27 @@ const (
4242
DefaultCapNProtoPort string = "19391"
4343
)
4444

45+
type endpoints []Endpoint
46+
47+
func (e endpoints) Len() int {
48+
return len(e)
49+
}
50+
51+
func (e endpoints) Less(i, j int) bool {
52+
// Sort by address first, then by CapNProtoAddress.
53+
// First sort by address, then by CapNProtoAddress, then by AZ.
54+
if e[i].Address == e[j].Address {
55+
if e[i].CapNProtoAddress == e[j].CapNProtoAddress {
56+
return e[i].AZ < e[j].AZ
57+
}
58+
return e[i].CapNProtoAddress < e[j].CapNProtoAddress
59+
}
60+
return e[i].Address < e[j].Address
61+
}
62+
func (e endpoints) Swap(i, j int) {
63+
e[i], e[j] = e[j], e[i]
64+
}
65+
4566
type Endpoint struct {
4667
Address string `json:"address"`
4768
CapNProtoAddress string `json:"capnproto_address"`
@@ -104,6 +125,23 @@ type HashringConfig struct {
104125
Endpoints []Endpoint `json:"endpoints"`
105126
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
106127
ExternalLabels labels.Labels `json:"external_labels,omitempty"`
128+
// If non-zero then enable shuffle sharding.
129+
ShuffleShardingConfig ShuffleShardingConfig `json:"shuffle_sharding_config,omitempty"`
130+
}
131+
132+
type ShuffleShardingOverrideConfig struct {
133+
ShardSize int `json:"shard_size"`
134+
Tenants []string `json:"tenants,omitempty"`
135+
TenantMatcherType tenantMatcher `json:"tenant_matcher_type,omitempty"`
136+
}
137+
138+
type ShuffleShardingConfig struct {
139+
ShardSize int `json:"shard_size"`
140+
CacheSize int `json:"cache_size"`
141+
// ZoneAwarenessDisabled disables zone awareness. We still try to spread the load
142+
// across the available zones, but we don't try to balance the shards across zones.
143+
ZoneAwarenessDisabled bool `json:"zone_awareness_disabled"`
144+
Overrides []ShuffleShardingOverrideConfig `json:"overrides,omitempty"`
107145
}
108146

109147
type tenantMatcher string

pkg/receive/handler.go

+2
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,8 @@ func (h *Handler) Hashring(hashring Hashring) {
324324
level.Error(h.logger).Log("msg", "closing gRPC connection failed, we might have leaked a file descriptor", "addr", node, "err", err.Error())
325325
}
326326
}
327+
328+
h.hashring.Close()
327329
}
328330

329331
h.hashring = hashring

pkg/receive/handler_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func newTestHandlerHashring(
281281
hashringAlgo = AlgorithmHashmod
282282
}
283283

284-
hashring, err := NewMultiHashring(hashringAlgo, replicationFactor, cfg)
284+
hashring, err := NewMultiHashring(hashringAlgo, replicationFactor, cfg, prometheus.NewRegistry())
285285
if err != nil {
286286
return nil, nil, nil, err
287287
}

0 commit comments

Comments
 (0)