Skip to content

receive: implement shuffle sharding #8238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#8238](https://github.com/thanos-io/thanos/pull/8238) Receive: add shuffle sharding support

### Changed

- [#8192](https://github.com/thanos-io/thanos/pull/8192) Sidecar: fix default get config timeout
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func setupHashring(g *run.Group,
webHandler.Hashring(receive.SingleNodeHashring(conf.endpoint))
level.Info(logger).Log("msg", "Empty hashring config. Set up single node hashring.")
} else {
h, err := receive.NewMultiHashring(algorithm, conf.replicationFactor, c)
h, err := receive.NewMultiHashring(algorithm, conf.replicationFactor, c, reg)
if err != nil {
return errors.Wrap(err, "unable to create new hashring from config")
}
Expand Down
42 changes: 42 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,48 @@ The Ketama algorithm is a consistent hashing scheme which enables stable scaling

If you are using the `hashmod` algorithm and wish to migrate to `ketama`, the simplest and safest way would be to set up a new pool receivers with `ketama` hashrings and start remote-writing to them. Provided you are on the latest Thanos version, old receivers will flush their TSDBs after the configured retention period and will upload blocks to object storage. Once you have verified that is done, decommission the old receivers.

#### Shuffle sharding

Ketama also supports [shuffle sharding](https://aws.amazon.com/builders-library/workload-isolation-using-shuffle-sharding/). It allows you to provide a single-tenant experience in a multi-tenant system. With shuffle sharding, a tenant gets a subset of all nodes in a hashring. You can configure shuffle sharding for any Ketama hashring like so:

```json
[
{
"endpoints": [
{"address": "node-1:10901", "capnproto_address": "node-1:19391", "az": "foo"},
{"address": "node-2:10901", "capnproto_address": "node-2:19391", "az": "bar"},
{"address": "node-3:10901", "capnproto_address": "node-3:19391", "az": "qux"},
{"address": "node-4:10901", "capnproto_address": "node-4:19391", "az": "foo"},
{"address": "node-5:10901", "capnproto_address": "node-5:19391", "az": "bar"},
{"address": "node-6:10901", "capnproto_address": "node-6:19391", "az": "qux"}
],
"algorithm": "ketama",
"shuffle_sharding_config": {
"shard_size": 2,
"cache_size": 100,
"overrides": [
{
"shard_size": 3,
"tenants": ["prefix-tenant-*"],
"tenant_matcher_type": "glob"
}
]
}
}
]
```

This will enable shuffle sharding with the default shard size of 2 and override it to 3 for every tenant that starts with `prefix-tenant-`.

`cache_size` sets the size of the in-memory LRU cache of the computed subrings. It is not possible to cache everything because an attacker could possibly
spam requests with random tenants and those subrings would stay in-memory forever.

With this config, `shard_size/number_of_azs` is chosen from each availability zone for each tenant. So, each tenant will get a unique and consistent set of 3 nodes.

You can use `zone_awareness_disabled` to disable zone awareness. This is useful in the case where you have many separate AZs and it doesn't matter which one to choose. The shards will ignore AZs but the Ketama algorithm will later prefer spreading load through as many AZs as possible. That's why with zone awareness disabled it is recommended to set the shard size to be `max(nodes_in_any_az, replication_factor)`.

Receive only supports stateless shuffle sharding now so it doesn't store and check there have been any overlaps between shards.

### Hashmod (discouraged)

This algorithm uses a `hashmod` function over all labels to decide which receiver is responsible for a given timeseries. This is the default algorithm due to historical reasons. However, its usage for new Receive installations is discouraged since adding new Receiver nodes leads to series churn and memory usage spikes.
Expand Down
38 changes: 38 additions & 0 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ const (
DefaultCapNProtoPort string = "19391"
)

type endpoints []Endpoint

func (e endpoints) Len() int {
return len(e)
}

func (e endpoints) Less(i, j int) bool {
// Sort by address first, then by CapNProtoAddress.
// First sort by address, then by CapNProtoAddress, then by AZ.
if e[i].Address == e[j].Address {
if e[i].CapNProtoAddress == e[j].CapNProtoAddress {
return e[i].AZ < e[j].AZ
}
return e[i].CapNProtoAddress < e[j].CapNProtoAddress
}
return e[i].Address < e[j].Address
}
func (e endpoints) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}

type Endpoint struct {
Address string `json:"address"`
CapNProtoAddress string `json:"capnproto_address"`
Expand Down Expand Up @@ -104,6 +125,23 @@ type HashringConfig struct {
Endpoints []Endpoint `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels labels.Labels `json:"external_labels,omitempty"`
// If non-zero then enable shuffle sharding.
ShuffleShardingConfig ShuffleShardingConfig `json:"shuffle_sharding_config,omitempty"`
}

type ShuffleShardingOverrideConfig struct {
ShardSize int `json:"shard_size"`
Tenants []string `json:"tenants,omitempty"`
TenantMatcherType tenantMatcher `json:"tenant_matcher_type,omitempty"`
}

type ShuffleShardingConfig struct {
ShardSize int `json:"shard_size"`
CacheSize int `json:"cache_size"`
// ZoneAwarenessDisabled disables zone awareness. We still try to spread the load
// across the available zones, but we don't try to balance the shards across zones.
ZoneAwarenessDisabled bool `json:"zone_awareness_disabled"`
Overrides []ShuffleShardingOverrideConfig `json:"overrides,omitempty"`
}

type tenantMatcher string
Expand Down
2 changes: 2 additions & 0 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ func (h *Handler) Hashring(hashring Hashring) {
level.Error(h.logger).Log("msg", "closing gRPC connection failed, we might have leaked a file descriptor", "addr", node, "err", err.Error())
}
}

h.hashring.Close()
}

h.hashring = hashring
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func newTestHandlerHashring(
hashringAlgo = AlgorithmHashmod
}

hashring, err := NewMultiHashring(hashringAlgo, replicationFactor, cfg)
hashring, err := NewMultiHashring(hashringAlgo, replicationFactor, cfg, prometheus.NewRegistry())
if err != nil {
return nil, nil, nil, err
}
Expand Down
Loading
Loading