Skip to content

Commit daa2f56

Browse files
authored
fix(shard-distributor): add error handling in namespace refresh loop (cadence-workflow#7519)
<!-- Describe what has changed in this PR --> **What changed?** * Error handling was added to `namespaceRefreshLoop` <!-- Tell your future self why have you made these changes --> **Why?** etcd `Watch` method may close `WatchChannel` in case of non-recoverable errors like `ErrCompaction`, according to [the doc](https://github.com/etcd-io/etcd/blob/main/client/v3/watch.go#L65-L67). `namespaceRefreshLoop` is used to update internal cache. However, the code didn't catch errors in case of a closed channel or `WatchResponse` containing errors. It led to a busy loop, because the loop is never broken in case of these cases, and may cause a high CPU problem observed in the staging environment. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** * Unit test * Local run with canary * Integration test <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** * There is an interval of 150ms between retries that may cause a cache invalidation for 150ms. <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent 6b038e9 commit daa2f56

6 files changed

Lines changed: 217 additions & 46 deletions

File tree

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type ExecutorStoreParams struct {
5757

5858
// NewStore creates a new etcd-backed store and provides it to the fx application.
5959
func NewStore(p ExecutorStoreParams) (store.Store, error) {
60-
shardCache := shardcache.NewShardToExecutorCache(p.Cfg.Prefix, p.Client, p.Logger)
60+
shardCache := shardcache.NewShardToExecutorCache(p.Cfg.Prefix, p.Client, p.Logger, p.TimeSource)
6161

6262
timeSource := p.TimeSource
6363
if timeSource == nil {

service/sharddistributor/store/etcd/executorstore/etcdstore_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.uber.org/fx/fxtest"
1313
"gopkg.in/yaml.v2"
1414

15+
"github.com/uber/cadence/common/clock"
1516
"github.com/uber/cadence/common/config"
1617
"github.com/uber/cadence/common/log/testlogger"
1718
"github.com/uber/cadence/common/types"
@@ -713,10 +714,11 @@ func createStore(t *testing.T, tc *testhelper.StoreTestCluster) store.Store {
713714
require.NoError(t, err)
714715

715716
store, err := NewStore(ExecutorStoreParams{
716-
Client: tc.Client,
717-
Cfg: etcdConfig,
718-
Lifecycle: fxtest.NewLifecycle(t),
719-
Logger: testlogger.New(t),
717+
Client: tc.Client,
718+
Cfg: etcdConfig,
719+
Lifecycle: fxtest.NewLifecycle(t),
720+
Logger: testlogger.New(t),
721+
TimeSource: clock.NewRealTimeSource(),
720722
})
721723
require.NoError(t, err)
722724
return store

service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go

Lines changed: 74 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import (
55
"fmt"
66
"strings"
77
"sync"
8+
"time"
89

910
clientv3 "go.etcd.io/etcd/client/v3"
1011

12+
"github.com/uber/cadence/common/backoff"
13+
"github.com/uber/cadence/common/clock"
1114
"github.com/uber/cadence/common/log"
1215
"github.com/uber/cadence/common/log/tag"
1316
"github.com/uber/cadence/service/sharddistributor/store"
@@ -17,47 +20,49 @@ import (
1720
"github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/common"
1821
)
1922

23+
const (
24+
// RetryInterval for watch failures is between 50ms to 150ms
25+
namespaceRefreshLoopWatchJitterCoeff = 0.5
26+
namespaceRefreshLoopWatchRetryInterval = 100 * time.Millisecond
27+
)
28+
2029
type namespaceShardToExecutor struct {
2130
sync.RWMutex
2231

23-
shardToExecutor map[string]*store.ShardOwner // shardID -> shardOwner
24-
shardOwners map[string]*store.ShardOwner // executorID -> shardOwner
25-
executorState map[*store.ShardOwner][]string // executor -> shardIDs
26-
executorRevision map[string]int64
27-
namespace string
28-
etcdPrefix string
29-
changeUpdateChannel clientv3.WatchChan
30-
stopCh chan struct{}
31-
logger log.Logger
32-
client etcdclient.Client
33-
pubSub *executorStatePubSub
32+
shardToExecutor map[string]*store.ShardOwner // shardID -> shardOwner
33+
shardOwners map[string]*store.ShardOwner // executorID -> shardOwner
34+
executorState map[*store.ShardOwner][]string // executor -> shardIDs
35+
executorRevision map[string]int64
36+
namespace string
37+
etcdPrefix string
38+
stopCh chan struct{}
39+
logger log.Logger
40+
client etcdclient.Client
41+
timeSource clock.TimeSource
42+
pubSub *executorStatePubSub
3443
}
3544

36-
func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient.Client, stopCh chan struct{}, logger log.Logger) (*namespaceShardToExecutor, error) {
37-
// Start listening
38-
watchPrefix := etcdkeys.BuildExecutorsPrefix(etcdPrefix, namespace)
39-
watchChan := client.Watch(context.Background(), watchPrefix, clientv3.WithPrefix(), clientv3.WithPrevKV())
40-
45+
func newNamespaceShardToExecutor(etcdPrefix, namespace string, client etcdclient.Client, stopCh chan struct{}, logger log.Logger, timeSource clock.TimeSource) (*namespaceShardToExecutor, error) {
4146
return &namespaceShardToExecutor{
42-
shardToExecutor: make(map[string]*store.ShardOwner),
43-
executorState: make(map[*store.ShardOwner][]string),
44-
executorRevision: make(map[string]int64),
45-
shardOwners: make(map[string]*store.ShardOwner),
46-
namespace: namespace,
47-
etcdPrefix: etcdPrefix,
48-
changeUpdateChannel: watchChan,
49-
stopCh: stopCh,
50-
logger: logger,
51-
client: client,
52-
pubSub: newExecutorStatePubSub(logger, namespace),
47+
shardToExecutor: make(map[string]*store.ShardOwner),
48+
executorState: make(map[*store.ShardOwner][]string),
49+
executorRevision: make(map[string]int64),
50+
shardOwners: make(map[string]*store.ShardOwner),
51+
namespace: namespace,
52+
etcdPrefix: etcdPrefix,
53+
stopCh: stopCh,
54+
logger: logger.WithTags(tag.ShardNamespace(namespace)),
55+
client: client,
56+
timeSource: timeSource,
57+
pubSub: newExecutorStatePubSub(logger, namespace),
5358
}, nil
5459
}
5560

5661
func (n *namespaceShardToExecutor) Start(wg *sync.WaitGroup) {
5762
wg.Add(1)
5863
go func() {
5964
defer wg.Done()
60-
n.nameSpaceRefreashLoop()
65+
n.namespaceRefreshLoop()
6166
}()
6267
}
6368

@@ -106,22 +111,56 @@ func (n *namespaceShardToExecutor) Subscribe(ctx context.Context) (<-chan map[*s
106111

107112
select {
108113
case <-ctx.Done():
109-
n.logger.Warn("context finnished before initial state was sent", tag.ShardNamespace(n.namespace))
114+
n.logger.Warn("context finished before initial state was sent")
110115
case subCh <- initialState:
111-
n.logger.Info("initial state sent to subscriber", tag.ShardNamespace(n.namespace), tag.Value(initialState))
116+
n.logger.Info("initial state sent to subscriber", tag.Value(initialState))
112117
}
113118

114119
}()
115120

116121
return subCh, unSub
117122
}
118123

119-
func (n *namespaceShardToExecutor) nameSpaceRefreashLoop() {
124+
func (n *namespaceShardToExecutor) namespaceRefreshLoop() {
125+
for {
126+
if err := n.watch(); err != nil {
127+
n.logger.Error("error watching in namespaceRefreshLoop, retrying...", tag.Error(err))
128+
n.timeSource.Sleep(backoff.JitDuration(
129+
namespaceRefreshLoopWatchRetryInterval,
130+
namespaceRefreshLoopWatchJitterCoeff,
131+
))
132+
continue
133+
}
134+
135+
n.logger.Info("namespaceRefreshLoop is exiting")
136+
return
137+
}
138+
}
139+
140+
func (n *namespaceShardToExecutor) watch() error {
141+
ctx, cancel := context.WithCancel(context.Background())
142+
defer cancel()
143+
144+
watchChan := n.client.Watch(
145+
// WithRequireLeader ensures that the etcd cluster has a leader
146+
clientv3.WithRequireLeader(ctx),
147+
etcdkeys.BuildExecutorsPrefix(n.etcdPrefix, n.namespace),
148+
clientv3.WithPrefix(), clientv3.WithPrevKV(),
149+
)
150+
120151
for {
121152
select {
122153
case <-n.stopCh:
123-
return
124-
case watchResp := <-n.changeUpdateChannel:
154+
return nil
155+
156+
case watchResp, ok := <-watchChan:
157+
if err := watchResp.Err(); err != nil {
158+
return fmt.Errorf("watch response: %w", err)
159+
}
160+
if !ok {
161+
return fmt.Errorf("watch channel closed")
162+
}
163+
125164
shouldRefresh := false
126165
for _, event := range watchResp.Events {
127166
_, keyType, keyErr := etcdkeys.ParseExecutorKey(n.etcdPrefix, n.namespace, string(event.Kv.Key))
@@ -134,13 +173,13 @@ func (n *namespaceShardToExecutor) nameSpaceRefreashLoop() {
134173
break
135174
}
136175
}
176+
137177
if shouldRefresh {
138178
err := n.refresh(context.Background())
139179
if err != nil {
140-
n.logger.Error("failed to refresh namespace shard to executor", tag.ShardNamespace(n.namespace), tag.Error(err))
180+
n.logger.Error("failed to refresh namespace shard to executor", tag.Error(err))
141181
}
142182
}
143-
144183
}
145184
}
146185
}

service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go

Lines changed: 128 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,21 @@ import (
44
"context"
55
"encoding/json"
66
"sync"
7+
"sync/atomic"
78
"testing"
89
"time"
910

1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
1213
clientv3 "go.etcd.io/etcd/client/v3"
14+
"go.uber.org/goleak"
15+
"go.uber.org/mock/gomock"
1316

17+
"github.com/uber/cadence/common/clock"
1418
"github.com/uber/cadence/common/log/testlogger"
1519
"github.com/uber/cadence/common/types"
1620
"github.com/uber/cadence/service/sharddistributor/store"
21+
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient"
1722
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys"
1823
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdtypes"
1924
"github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper"
@@ -32,7 +37,7 @@ func TestNamespaceShardToExecutor_Lifecycle(t *testing.T) {
3237
})
3338

3439
// Start the cache
35-
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger)
40+
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource())
3641
assert.NoError(t, err)
3742
namespaceShardToExecutor.Start(&sync.WaitGroup{})
3843
time.Sleep(50 * time.Millisecond)
@@ -84,12 +89,13 @@ func TestNamespaceShardToExecutor_Subscribe(t *testing.T) {
8489
})
8590

8691
// Start the cache
87-
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger)
92+
namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, testCluster.Namespace, testCluster.Client, stopCh, logger, clock.NewRealTimeSource())
8893
assert.NoError(t, err)
8994
namespaceShardToExecutor.Start(&sync.WaitGroup{})
9095

9196
// Refresh the cache to get the initial state
92-
namespaceShardToExecutor.refresh(context.Background())
97+
err = namespaceShardToExecutor.refresh(context.Background())
98+
require.NoError(t, err)
9399

94100
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
95101
defer cancel()
@@ -133,6 +139,125 @@ func TestNamespaceShardToExecutor_Subscribe(t *testing.T) {
133139
wg.Wait()
134140
}
135141

142+
func TestNamespaceShardToExecutor_watch_watchChanErrors(t *testing.T) {
143+
ctrl := gomock.NewController(t)
144+
defer ctrl.Finish()
145+
146+
logger := testlogger.New(t)
147+
mockClient := etcdclient.NewMockClient(ctrl)
148+
stopCh := make(chan struct{})
149+
testPrefix := "/test-prefix"
150+
testNamespace := "test-namespace"
151+
152+
// Mock the Watch call to return our watch channel
153+
watchChan := make(chan clientv3.WatchResponse)
154+
mockClient.EXPECT().
155+
Watch(gomock.Any(), gomock.Any(), gomock.Any()).
156+
Return(watchChan).
157+
AnyTimes()
158+
159+
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, clock.NewRealTimeSource())
160+
require.NoError(t, err)
161+
162+
// Test Case #1
163+
// Test received compact revision error from watch channel
164+
{
165+
go func() {
166+
watchChan <- clientv3.WatchResponse{
167+
CompactRevision: 100,
168+
}
169+
}()
170+
171+
err = e.watch()
172+
require.Error(t, err)
173+
assert.ErrorContains(t, err, "etcdserver: mvcc: required revision has been compacted")
174+
}
175+
176+
// Test Case #2
177+
// Test closed watch channel
178+
{
179+
close(watchChan)
180+
err = e.watch()
181+
require.Error(t, err)
182+
assert.ErrorContains(t, err, "watch channel closed")
183+
}
184+
}
185+
186+
func TestNamespaceShardToExecutor_namespaceRefreshLoop_watchError(t *testing.T) {
187+
defer goleak.VerifyNone(t)
188+
189+
ctrl := gomock.NewController(t)
190+
defer ctrl.Finish()
191+
192+
logger := testlogger.New(t)
193+
mockClient := etcdclient.NewMockClient(ctrl)
194+
timeSource := clock.NewMockedTimeSource()
195+
stopCh := make(chan struct{})
196+
testPrefix := "/test-prefix"
197+
testNamespace := "test-namespace"
198+
199+
// mock for first watch call that receives error
200+
watchChanRcvErr := make(chan clientv3.WatchResponse)
201+
mockClient.EXPECT().
202+
Watch(gomock.Any(), gomock.Any(), gomock.Any()).
203+
Return(watchChanRcvErr)
204+
205+
// mock for second watch call that receives closed channel
206+
watchChanClosed := make(chan clientv3.WatchResponse)
207+
mockClient.EXPECT().
208+
Watch(gomock.Any(), gomock.Any(), gomock.Any()).
209+
Return(watchChanClosed)
210+
211+
// mock for third watch call that will be used when stopCh is closed
212+
mockClient.EXPECT().
213+
Watch(gomock.Any(), gomock.Any(), gomock.Any()).
214+
Return(make(chan clientv3.WatchResponse))
215+
216+
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, timeSource)
217+
require.NoError(t, err)
218+
219+
wg := sync.WaitGroup{}
220+
wg.Add(1)
221+
finished := atomic.Bool{}
222+
223+
go func() {
224+
defer wg.Done()
225+
e.namespaceRefreshLoop()
226+
finished.Store(true)
227+
}()
228+
229+
// Test Case #1: watchChan receives error
230+
{
231+
// Sends a response containing compact revision to simulate error
232+
watchChanRcvErr <- clientv3.WatchResponse{
233+
CompactRevision: 100,
234+
}
235+
236+
timeSource.BlockUntil(1)
237+
require.False(t, finished.Load(), "namespaceRefreshLoop should not exit on watch error")
238+
}
239+
240+
// Test Case #2: watchChan is closed
241+
{
242+
timeSource.Advance(2 * namespaceRefreshLoopWatchRetryInterval)
243+
244+
// Sends a response containing compact revision to simulate error
245+
close(watchChanClosed)
246+
247+
timeSource.BlockUntil(1)
248+
require.False(t, finished.Load(), "namespaceRefreshLoop should not exit on watch error")
249+
}
250+
251+
// Test Case #3: stopCh is closed
252+
{
253+
timeSource.Advance(2 * namespaceRefreshLoopWatchRetryInterval)
254+
255+
close(stopCh)
256+
wg.Wait()
257+
require.True(t, finished.Load(), "namespaceRefreshLoop should exit on watch error")
258+
}
259+
}
260+
136261
// setupExecutorWithShards creates an executor in etcd with assigned shards and metadata
137262
func setupExecutorWithShards(t *testing.T, testCluster *testhelper.StoreTestCluster, executorID string, shards []string, metadata map[string]string) {
138263
// Create assigned state

0 commit comments

Comments
 (0)