Skip to content

Commit 4134777

Browse files
authored
Update shard controller getters to use atomic (#6864)
1 parent 8fe714f commit 4134777

File tree

3 files changed

+111
-13
lines changed

3 files changed

+111
-13
lines changed

service/history/shard/controller.go

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ type (
7575
ShardIDs() []int32
7676
}
7777

78+
shardIDSnapshot struct {
79+
shardIDs []int32
80+
numShards int
81+
}
82+
7883
controller struct {
7984
resource.Resource
8085

@@ -90,7 +95,8 @@ type (
9095
metricsScope metrics.Scope
9196

9297
sync.RWMutex
93-
historyShards map[int]*historyShardsItem
98+
historyShards map[int]*historyShardsItem
99+
shardIDSnapshot atomic.Pointer[shardIDSnapshot]
94100
}
95101

96102
historyShardsItemStatus int
@@ -223,22 +229,19 @@ func (c *controller) Status() int32 {
223229
}
224230

225231
func (c *controller) NumShards() int {
226-
nShards := 0
227-
c.RLock()
228-
nShards = len(c.historyShards)
229-
c.RUnlock()
230-
return nShards
232+
s := c.shardIDSnapshot.Load()
233+
if s == nil {
234+
return 0
235+
}
236+
return s.numShards
231237
}
232238

233239
func (c *controller) ShardIDs() []int32 {
234-
c.RLock()
235-
ids := []int32{}
236-
for id := range c.historyShards {
237-
id32 := int32(id)
238-
ids = append(ids, id32)
240+
s := c.shardIDSnapshot.Load()
241+
if s == nil {
242+
return []int32{}
239243
}
240-
c.RUnlock()
241-
return ids
244+
return s.shardIDs
242245
}
243246

244247
func (c *controller) removeEngineForShard(shardID int, shardItem *historyShardsItem) {
@@ -330,6 +333,7 @@ func (c *controller) getOrCreateHistoryShardItem(shardID int) (*historyShardsIte
330333
return nil, err
331334
}
332335
c.historyShards[shardID] = shardItem
336+
c.updateShardIDSnapshotLocked()
333337
c.metricsScope.IncCounter(metrics.ShardItemCreatedCounter)
334338

335339
shardItem.logger.Info("Shard item state changed", tag.LifeCycleStarted, tag.ComponentShardItem)
@@ -340,6 +344,18 @@ func (c *controller) getOrCreateHistoryShardItem(shardID int) (*historyShardsIte
340344
return nil, CreateShardOwnershipLostError(c.GetHostInfo(), info)
341345
}
342346

347+
func (c *controller) updateShardIDSnapshotLocked() {
348+
shardIDs := make([]int32, 0, len(c.historyShards))
349+
for shardID := range c.historyShards {
350+
shardIDs = append(shardIDs, int32(shardID))
351+
}
352+
snapshot := &shardIDSnapshot{
353+
shardIDs: shardIDs,
354+
numShards: len(shardIDs),
355+
}
356+
c.shardIDSnapshot.Store(snapshot)
357+
}
358+
343359
func (c *controller) removeHistoryShardItem(shardID int, shardItem *historyShardsItem) (*historyShardsItem, error) {
344360
c.Lock()
345361
defer c.Unlock()
@@ -355,6 +371,7 @@ func (c *controller) removeHistoryShardItem(shardID int, shardItem *historyShard
355371
}
356372

357373
delete(c.historyShards, shardID)
374+
c.updateShardIDSnapshotLocked()
358375
c.metricsScope.IncCounter(metrics.ShardItemRemovedCounter)
359376
currentShardItem.logger.Info("Shard item state changed", tag.LifeCycleStopped, tag.ComponentShardItem, tag.Number(int64(len(c.historyShards))))
360377
return currentShardItem, nil
@@ -450,6 +467,7 @@ func (c *controller) doShutdown() {
450467
item.stopEngine()
451468
}
452469
c.historyShards = nil
470+
c.updateShardIDSnapshotLocked()
453471
}
454472

455473
func (c *controller) isShuttingDown() bool {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package shard
24+
25+
import (
26+
"sync"
27+
"testing"
28+
)
29+
30+
// BenchmarkController_ShardIDs-96 52588224 293.9 ns/op 66 B/op 0 allocs/op
31+
// go test -bench=. --benchtime=10s --benchmem
32+
// goos: linux
33+
// goarch: amd64
34+
// pkg: github.com/uber/cadence/service/history/shard
35+
// cpu: AMD EPYC 7B13
36+
// With the old approach, the benchmark result is:
37+
// BenchmarkController_ShardIDs-96 39314 324629 ns/op 272333 B/op 19 allocs/op
38+
func BenchmarkController_ShardIDs(b *testing.B) {
39+
numShards := 16384
40+
historyShards := make(map[int]*historyShardsItem)
41+
for i := 0; i < numShards; i++ {
42+
historyShards[i] = &historyShardsItem{shardID: i}
43+
}
44+
shardController := &controller{
45+
historyShards: historyShards,
46+
}
47+
var wg sync.WaitGroup
48+
for i := 0; i < b.N; i++ {
49+
if i%1000 == 0 { // update is much much less frequent than read
50+
wg.Add(1)
51+
go func() {
52+
defer wg.Done()
53+
shardController.Lock()
54+
shardController.updateShardIDSnapshotLocked()
55+
shardController.Unlock()
56+
}()
57+
}
58+
shardController.ShardIDs()
59+
shardController.NumShards()
60+
}
61+
wg.Wait()
62+
}

service/history/shard/controller_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ func (s *controllerSuite) TestAcquireShardSuccess() {
175175
count++
176176
}
177177
s.Equal(3, count)
178+
s.Equal(3, s.shardController.NumShards())
179+
s.ElementsMatch([]int32{0, 4, 8}, s.shardController.ShardIDs())
178180
}
179181

180182
func (s *controllerSuite) TestAcquireShardsConcurrently() {
@@ -259,6 +261,8 @@ func (s *controllerSuite) TestAcquireShardsConcurrently() {
259261
count++
260262
}
261263
s.Equal(3, count)
264+
s.Equal(3, s.shardController.NumShards())
265+
s.ElementsMatch([]int32{0, 4, 8}, s.shardController.ShardIDs())
262266
}
263267

264268
func (s *controllerSuite) TestAcquireShardLookupFailure() {
@@ -273,6 +277,8 @@ func (s *controllerSuite) TestAcquireShardLookupFailure() {
273277
s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(membership.HostInfo{}, errors.New("ring failure")).Times(1)
274278
s.Nil(s.shardController.GetEngineForShard(shardID))
275279
}
280+
s.Equal(0, s.shardController.NumShards())
281+
s.Empty(s.shardController.ShardIDs())
276282
}
277283

278284
func (s *controllerSuite) TestAcquireShardRenewSuccess() {
@@ -341,11 +347,17 @@ func (s *controllerSuite) TestAcquireShardRenewSuccess() {
341347

342348
s.shardController.acquireShards()
343349

350+
s.Equal(2, s.shardController.NumShards())
351+
s.ElementsMatch([]int32{0, 1}, s.shardController.ShardIDs())
352+
344353
for shardID := 0; shardID < numShards; shardID++ {
345354
s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(s.hostInfo, nil).Times(1)
346355
}
347356
s.shardController.acquireShards()
348357

358+
s.Equal(2, s.shardController.NumShards())
359+
s.ElementsMatch([]int32{0, 1}, s.shardController.ShardIDs())
360+
349361
for shardID := 0; shardID < numShards; shardID++ {
350362
s.NotNil(s.shardController.GetEngineForShard(shardID))
351363
}
@@ -510,6 +522,9 @@ func (s *controllerSuite) TestHistoryEngineClosed() {
510522
s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(s.hostInfo, nil).AnyTimes()
511523
}
512524
s.shardController.Stop()
525+
526+
s.Equal(0, s.shardController.NumShards())
527+
s.Empty(s.shardController.ShardIDs())
513528
}
514529

515530
func (s *controllerSuite) TestShardControllerClosed() {
@@ -555,6 +570,9 @@ func (s *controllerSuite) TestShardControllerClosed() {
555570
}
556571
s.shardController.Stop()
557572
workerWG.Wait()
573+
574+
s.Equal(0, s.shardController.NumShards())
575+
s.Empty(s.shardController.ShardIDs())
558576
}
559577

560578
func (s *controllerSuite) TestGetOrCreateHistoryShardItem_InvalidShardID_Error() {

0 commit comments

Comments
 (0)