Skip to content

Conversation

@Theis-Mathiassen
Copy link

@Theis-Mathiassen Theis-Mathiassen commented Nov 11, 2025

What changed?
We added functionality to record the load as a moving average for each shard, where the weight of a new data point depends on how recently the average was last updated.

Furthermore it is now using cache, to reduce the read intensity on etcd.

Why?
This is done to smooth the load input for the shard distributor, this is desirable as the load can change sporadically.
It is also necessary to save the load of each shard in ETCD, as to persist it (In case the handler crashes) and make it available to each instance of shard distributors.

How did you test it?
We have created some unit tests, and tried to run it with the canary service:
TestRecordHeartbeatUpdatesShardStatistics:
This test verifies that when an executor sends a heartbeat with ShardLoad information for a shard, the ShardStatistics for that shard are correctly updated in the store, specifically the SmoothedLoad and LastUpdateTime. It also ensures that LastMoveTime remains unchanged if not explicitly updated.

TestRecordHeartbeatSkipsShardStatisticsWithNilReport:
This test ensures that if an executor's heartbeat includes a nil ShardStatusReport for a particular shard, the existing ShardStatistics for that shard are not updated or created. It also verifies that valid shard reports are processed correctly.

Potential risks
None, since it is for the shard distributor, which is not utilized in production yet.

Release notes
It is not, since it is for the shard distributor, which is not utilized in production yet.

Documentation Changes
No, but maybe some documentation should be created, later.

AndreasHolt and others added 27 commits October 20, 2025 14:05
… is being reassigned in AssignShard

Signed-off-by: Andreas Holt <[email protected]>
…to not overload etcd's 128 max ops per txn

Signed-off-by: Andreas Holt <[email protected]>
…s txn and retry monotonically

Signed-off-by: Andreas Holt <[email protected]>
…shard metrics, move out to staging to separate function

Signed-off-by: Andreas Holt <[email protected]>
… And more idiomatic naming of collection vs singular type

Signed-off-by: Andreas Holt <[email protected]>
…ook more like executor key tests

Signed-off-by: Andreas Holt <[email protected]>
…ey in BuildShardKey, as we don't use it

Signed-off-by: Andreas Holt <[email protected]>
Copy link
Member

@dkrotx dkrotx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good changes!
Let's proceed with making it more clear and moving algorithm-related code out store/etc tree.

"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdtypes"
)

func CalculateSmoothedLoad(prev, current float64, lastUpdate, now time.Time) float64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still not related to executorstore
util is a good and expected place for something like string/list manipulation, here it is part of algorithm, I believe.
I think it deserves it's own place.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to move it to ./sharddistributor/statistics/stats.go
I experimented with moving more shard statistic related functionality over, but they were also dependent on etcd.
So even though /statistics/. is a little empty, i hope it is okay that we perform the refactor of moving additional shard statistic functionality over here to a future refactor pull request.
I did the move in 7f3b9a5.

Comment on lines 146 to 148
err = s.applyShardStatisticsUpdates(ctx, namespace, statsUpdates)

return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return s.applyShardStatisticsUpdates(ctx, namespace, statsUpdates). It is expected it returns error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes sense, see 671661c.

return nil, nil
}

now := s.timeSource.Now().UTC()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try moving things closer to the usage place.
Then it shapes a clear prepare-action pair. Otherwise One would expect now is required for the very next action (like GetExecutorStatistics, which is not true.
If you want to optimise it and don't call it in every shard' report, initialize it before the for shardID, report := range reported { loop

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it down just above the for loop, in 20456d6.

statsUpdate.stats[shardID] = common.UpdateShardStatistic(shardID, report.ShardLoad, now, oldStats)
}

statsUpdates = append(statsUpdates, statsUpdate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't it always append to the empty statsUpdates?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, i refactored it here 20456d6.

We still want to return it as an array since it is a helper function, and the usage of it expects an array. Is this the correct approach, or would it be better to have the helper function just return the single element, and then put it in an array where it is called?

tag.ShardExecutor(update.executorID),
tag.Error(err),
)
multiError = errors.Join(multiError, fmt.Errorf("failed to delete executor shard statistics: %w", err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the message should be fixed, it not about "delete". Like the next ones

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was an accidental copy paste error from previous refactoring, should be fixed here: 92fafe5


type executorData struct {
assignedStates map[string]etcdtypes.AssignedState
metadata map[string]map[string]string // executorID -> metadata key -> metadata value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly recommend aliasing metadata map[string]string. Then it doesn't look that scary and don't require comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, fixed here: 83af565

Comment on lines 163 to 170
n.executorStatistics.lock.RLock()
stats, ok := n.executorStatistics.stats[executorID]
if ok {
clonedStatistics := cloneStatisticsMap(stats)
n.executorStatistics.lock.RUnlock()
return clonedStatistics, nil
}
n.executorStatistics.lock.RUnlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move this to embedded function, then it will be more clear:

if stats, found := readStats(); found {
  return stats, nil
}

if err := n.refreshExecutorStatisticsCache(ctx, executorID); err != nil {
  return nil, fmt.Errorf("error from refresh: %w", err)
}

if stats, found := readStats(); found {
  return stats, nil
}

return nil, fmt.Errorf("could not get executor statistics, even after refresh")

I'd still recomment commenting why refreshing helps here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good idea, I should have performed the refactor here: 441bdd2
I idea behind refreshing is that if we have a cache miss, we want to look at the source truth in etcd.
I am not sure if this is necessary because of the way we update the cache on change from etcd but we thought to include it for safety. I Hope the comment reflects this.

Comment on lines +199 to +209
resp, err := n.client.Get(ctx, statsKey)
if err != nil {
return fmt.Errorf("get executor shard statistics: %w", err)
}

stats := make(map[string]etcdtypes.ShardStatistics)
if len(resp.Kvs) > 0 {
if err := common.DecompressAndUnmarshal(resp.Kvs[0].Value, &stats); err != nil {
return fmt.Errorf("parse executor shard statistics: %w", err)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakobht I strongly recommend adding an [DB-agnostic] interface which will allow us to write somerthing like this:

unmarshalledResult, err := storage.GetShardStats(ctx, namespace, executorID, ...)

Otherwise we keep embedding etcd very deeply and mixing logic with storage interface with very explicit calls like BuildExecutorKey and depending on etcdtypes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely agree! I don't think it needs to be in this PR though. It's a refactor that will touch a lot of places, so better to keep it in it's own PR.

for _, event := range watchResp.Events {
executorID, keyType, keyErr := etcdkeys.ParseExecutorKey(n.etcdPrefix, n.namespace, string(event.Kv.Key))
if keyErr != nil {
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this expected we swallow keyErr w/o logging?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the behavior before we changed anything, but i would agree that it makes sense to log it: abcb6d0

Comment on lines 406 to 407
delete(n.executorStatistics.stats, executorID)
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's quite unexpecrted we remove stats if there are no stats passed. For me it sounds like we're missing deleteExecutorStatistics method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that is confusing, since these functions were only used in the function above, and were not too complex, i decided to remove them, and handle it in handleExecutorStatisticsEvent.
See f7c9d77.

@dkrotx
Copy link
Member

dkrotx commented Dec 15, 2025

I feel like the commit message should be updated as well "Heartbeat shard statistics" points out to just statistics of heartbeats, but as far as I understand you introduce shard statistics and its smoothing in general. Delivering by heartbeat is minor here, as we know all the executor <-> SD comm is via heartbeats.

@Theis-Mathiassen Theis-Mathiassen force-pushed the heartbeat-shard-statistics branch from 872c621 to 92fafe5 Compare December 16, 2025 09:49
@Theis-Mathiassen Theis-Mathiassen changed the title feat: Heartbeat shard statistics feat(shard-distributor): record a smoothed per shard load in etcd Dec 16, 2025
for _, kv := range resp.Kvs {
executorID, keyType, err := etcdkeys.ParseExecutorKey(etcdPrefix, namespace, string(kv.Key))
if err != nil {
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: while it's maybe fine to continue, the error should be highligted. At least as warning (if it's not critical).
Just swallowing it here looks dangerous - we can be in a situation we do not update stats, and we don't tell "why".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think it is critical, at least in our load balancing, it is not ideal, but okay to just use the next most recent data. But agree we should add a warning: 38cf3d9

Comment on lines 56 to 59
assignedStates map[string]etcdtypes.AssignedState
metadata map[string]ExecutorMetadata // executorID -> metadata key -> metadata value
statistics map[string]map[string]etcdtypes.ShardStatistics
revisions map[string]int64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all of these mapping executorID -> data?
If so, I think we should convert it to a single struct and then to have a single map[string]this_struct here. Otherwise we need to artificially support all of them in-sync.

If that's not the case, I guess the struct should have a different name - if it is executorData, then having map from executor -> something is unxpected since it's clearly plural then.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed, should i keep a comment for
// metadata key -> metadata value
As to not confuse it with shard id?
Here are the changes: 5d3b545

func (n *namespaceShardToExecutor) readStats(executorID string) (map[string]etcdtypes.ShardStatistics, bool) {
n.executorStatistics.lock.RLock()
defer n.executorStatistics.lock.RUnlock()
stats, ok := n.executorStatistics.stats[executorID]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit nit: empty line helps to separate [an expected] lock acquiring from the rest.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 387 to 392
if event == nil || event.Type == clientv3.EventTypeDelete || event.Kv == nil || len(event.Kv.Value) == 0 {
n.executorStatistics.lock.Lock()
defer n.executorStatistics.lock.Unlock()
delete(n.executorStatistics.stats, executorID)
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n.executorStatistics.lock.Lock()
defer n.executorStatistics.lock.Unlock()
delete(n.executorStatistics.stats, executorID)

This "stuttering" points out to the fact this better to a method of n.executorStatistics.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense, that makes it a lot cleaner: 260af7e

Comment on lines 405 to 407
n.executorStatistics.lock.Lock()
defer n.executorStatistics.lock.Unlock()
n.executorStatistics.stats[executorID] = cloneStatisticsMap(stats)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above. I would implement delete and assign as special methods managing locks themselves.
Then calling them becomes easier.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above 260af7e

Comment on lines 814 to 815
clonedOldOwnerStats := make(map[string]etcdtypes.ShardStatistics, len(oldOwnerStats))
maps.Copy(clonedOldOwnerStats, oldOwnerStats)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you also have cloneStatisticsMap. Is this possible to generalize to something like utils.cloneMap and unify the usages ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, https://pkg.go.dev/maps#Clone should be a good fit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i switched to maps.Clone multiple places: 6db1110

break
}
}
func (n *namespaceShardToExecutor) handlePotentialRefresh(watchResp clientv3.WatchResponse) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't know if this possible, but I would rather change this to.

if executorStateChanges(events) {
   refresh()
}

in the caller. Since handlePotentialRefresh is a big capture-all name, which hides the above simple logic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should hopefully be a lot clearer what is happening now: 537dab9

// handleExecutorStatisticsEvent processes incoming watch events for executor shard statistics.
// It updates the in-memory statistics map directly from the event without triggering a full refresh.
func (n *namespaceShardToExecutor) handleExecutorStatisticsEvent(executorID string, event *clientv3.Event) {
if event == nil || event.Type == clientv3.EventTypeDelete || event.Kv == nil || len(event.Kv.Value) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this - I understand if event.Type == clientv3.EventTypeDelete, but we don't need others:

  1. event == nil - we shouldn't have in internal handlers. The thing that dispatches events should make sure there are no nils already. Otherwise how can we trust it? Just pass value, not pointer.
  2. event.Kv == nil || len(event.Kv.Value) == 0. It's not obvious why it makes us to delete stats?
    if it makes sense, then I'd recommend having some bool variable like:
    invalidEvent := event.Kv == nil || len(event.Kv.Value) == 0
    then if ... || invalidEvent { is self-explanatory. But still, it should be a strong reason why we tread invalid records like that, why they lead to deletion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we just wanted to be sure we did not accidentally caused a panic.
But i see how it would be expected to have these values available.
Should we just return, without deleting, or is this acceptable: 7e93d0d

Copy link
Member

@dkrotx dkrotx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong objections anymore. Good job on addressing the comments!
Left some nit comments to address before landing (please squash the diff, or ask @jakobht to merge).

One thing to update (don't know if it possible at this stage) - there are multiple functions accessing executorStatistics and therefore knowing a lot about how update it (incl. if lock has to be taken). Try to make if abstract for outside users and just have update() or get() functions. This way:

  • called shouldn't be aware of what is has to lock (it won't be possible to miss)
  • you can easily change the structure without changing the code
  • this code-block can be unit-tested separately, so when you build on top of it you already know you have something tested (and you can rely on it)
    -- btw, I'd recommend using TDD in this case - writing test beforehand will define a clear interface and you will immediately see how the usafe gonna look like.

Building with simple abstraction levels and combining them is a key for scalable development. Complex intertwined things are hard to make work reliably.

return nil, fmt.Errorf("could not get executor statistics, even after refresh")
}

func (n *namespaceShardToExecutor) readStats(executorID string) (map[string]etcdtypes.ShardStatistics, bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better rename it to getStats() or statsForExecutor(), read is too much for a lookup.
There is often a better name than just wage read/write/get/put.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I performed all the changes in: 457997a

n.executorStatistics.lock.Lock()
defer n.executorStatistics.lock.Unlock()

n.applyParsedData(parsedData)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be consistent with naming. If one functions is called parseExecutorData(), then this one most probably should be applyExecutorData (I think it is assumet its parsed), or maybe updateExecutorData().
Btw, sometimes that's a sign of artificial split of functions - when it's not easy to pick up the name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at this series of 4 lock-unlocks lines and I think it's better to either be a single function or a different split. In addition, this looks very unusual this functiouns requires evertyhing pre-locked while others do the locking themselves.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense, i moved the locks to the function, so the different function at least behave similar.
457997a

Comment on lines 112 to 114
executorStatistics: namespaceExecutorStatistics{
stats: make(map[string]map[string]etcdtypes.ShardStatistics),
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can also call it newNamespaceExecutorStatistics() which will construct itself.
This way it will even look more like the previous line.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants