-
Notifications
You must be signed in to change notification settings - Fork 867
feat(shard distributor): Persist Shard-Level Statistics for Load Balancing, and Add Cleanup Function #7354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(shard distributor): Persist Shard-Level Statistics for Load Balancing, and Add Cleanup Function #7354
Conversation
d393051 to
6360f8a
Compare
| } | ||
|
|
||
| func BuildShardKey(prefix string, namespace, shardID, keyType string) (string, error) { | ||
| if keyType != ShardAssignedKey && keyType != ShardMetricsKey { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where/when is this used?
| return parts[0], parts[1], nil | ||
| } | ||
|
|
||
| func BuildShardPrefix(prefix string, namespace string) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to have tests for BuildShardPrefix, BuildShardKey and ParseShardKey :)
| shardID, shardKeyType, err := etcdkeys.ParseShardKey(s.prefix, namespace, string(kv.Key)) | ||
| if err != nil { | ||
| continue | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't want to abort metric emission , we should still log an error such that we have evidence that something is not working as expected.
| // Compute shard moves to update last_move_time metrics when ownership changes. | ||
| // Read current assignments for the namespace and compare with the new state. | ||
| // Concurrent changes will be caught by the revision comparisons later. | ||
| currentAssignments := make(map[string]string) // shardID -> executorID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of building all the execurorsTOShard mapping can we rely on shardCache *shardcache.ShardToExecutorCache cache?
| } | ||
| } | ||
| now := time.Now().Unix() | ||
| // Collect metric updates now so we can apply them after committing the main transaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move all the code for metric generation to a separate function? it will make the overall code more readable
| metrics store.ShardMetrics | ||
| modRevision int64 | ||
| desiredLastMove int64 // intended LastMoveTime for this update | ||
| defaultLastUpdate int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the defaultLastUpdate? can we just call this LastUpdate?
| } else { | ||
| update.metrics = store.ShardMetrics{ | ||
| SmoothedLoad: 0, | ||
| LastUpdateTime: update.defaultLastUpdate, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking at the way defaultLastUpdate is used, i think we can simplify the code and just remove it, wouln't be equal to use desired last move here?
| for i := range updates { | ||
| update := &updates[i] | ||
|
|
||
| for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very difficult to read, I am not sure we will understand what it does in few weeks, can we remove this? :)
| newAssignments[shardID] = executorID | ||
| } | ||
| } | ||
| now := time.Now().Unix() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general we use clock.TimeSource to handle time, it makes testing easier, I would suggest to extend this using the same approach, you can check out in executorImpl for example
| // shardMetricsUpdate tracks the etcd key, revision, and metrics used to update a shard | ||
| // after the main transaction in AssignShards for exec state. | ||
| // Retains metrics to safely merge concurrent updates before retrying. | ||
| type shardMetricsUpdate struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would call this statistics, metrics have a pretty standard meaning.
1078343 to
6816b8e
Compare
| } | ||
|
|
||
| func (p *namespaceProcessor) cleanupStaleShardStats(ctx context.Context) { | ||
| namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider to only query GetState once and pass this as a parameter, even if we don't have the most updated state for cleaning the stats it is fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, added this in recent commit: 3931fea
Refactored runCleanupLoop to fetch state once and pass it to both cleanup functions. Had to update tests to pass state directly, instead of expecting GetState calls in cleanupStaleExecutors/cleanupStaleShardStats.
|
|
||
| // 1. build set of active executors | ||
|
|
||
| // add all assigned shards from executors that are ACTIVE and not stale |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if the executor is in draining state? are we fine with losing the statics for that? it is covered from the following case where the shard is not in a done state right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ShardStatus != DONE check in cleanupStaleShardStats keeps shard stats alive while a draining executor still reports them, and the TTL "grace period" only removes them after the shard has been marked DONE and stayed idle for that whole TTL window.
Signed-off-by: Andreas Holt <[email protected]>
Signed-off-by: Andreas Holt <[email protected]>
… is being reassigned in AssignShard Signed-off-by: Andreas Holt <[email protected]>
Signed-off-by: Andreas Holt <[email protected]>
Signed-off-by: Andreas Holt <[email protected]>
Signed-off-by: Andreas Holt <[email protected]>
…to not overload etcd's 128 max ops per txn Signed-off-by: Andreas Holt <[email protected]>
…s txn and retry monotonically Signed-off-by: Andreas Holt <[email protected]>
…ents Signed-off-by: Andreas Holt <[email protected]>
…shard metrics, move out to staging to separate function Signed-off-by: Andreas Holt <[email protected]>
Signed-off-by: Andreas Holt <[email protected]>
Signed-off-by: Andreas Holt <[email protected]>
… And more idiomatic naming of collection vs singular type Signed-off-by: Andreas Holt <[email protected]>
…ook more like executor key tests Signed-off-by: Andreas Holt <[email protected]>
…ey in BuildShardKey, as we don't use it Signed-off-by: Andreas Holt <[email protected]>
…o "statistics" Signed-off-by: Andreas Holt <[email protected]>
…ollow conventions Signed-off-by: Andreas Holt <[email protected]>
Signed-off-by: Andreas Holt <[email protected]>
Signed-off-by: Andreas Holt <[email protected]>
Signed-off-by: Andreas Holt <[email protected]>
…eartbeat TTL Signed-off-by: Andreas Holt <[email protected]>
d5a13d9 to
2642080
Compare
Signed-off-by: Andreas Holt <[email protected]>
What changed?
store.ShardMetrics(smoothed load + timestamps) and persist it understore/<namespace>/shards/<shardID>/metrics.SmoothedLoadwill keep an EWMA of shard load,LastUpdateTimewill be used for dynamically updating the alpha value used in EWMA, andLastMoveTimewill be used to support cooldown logic to limits shard churnLastMoveTimewhen reusing existing metrics, and apply per-shard metric updates after the main transaction to stay within etcd’s 128ops trnsaction limitWhy?
reported_shards is keyed by executor. That works for reporting the latest heartbeat, but it breaks down the moment a shard moves. Then the new owner can’t see the old owner’s smoothed load or timestamps, and the leader has to collect executor-specific parts just to reason about shard state. By giving each shard its own metrics key:
(alpha) when applying the next sample, and last_move_time is what we’ll use for cooldowns before moving a shard again.
A follow-up pull request will wire heartbeats to update the metrics each time.
How did you test it?
Integration tests w/ etcd (added new test cases to
./service/sharddistributor/store/etcd/etcdstore_test.go)go test ./service/sharddistributor/store/etcd/executorstoreAlso tested it by logging values while running the ephemeral service (which simulates executors and shards)
Potential risks
Added pressure to etcd and extra read operations when preparing metric updates
Release notes
Shard distributor now persists shard metrics in etcd (smoothed load and timestamps) for future load balancing logic.
Documentation Changes