Skip to content

Commit f00decb

Browse files
authored
Add index for dynamic config constraints (temporalio#8766)
## What changed? For dynamic config settings with many constrained values, linear search can be slow. This adds an on-demand index when the number of constrained values crosses a threshold. Also fix convertCache so that the cleanup closures don't reference the whole Collection, which would prevent it from getting colllected. ## Why? Optimization. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [x] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) ## Potential risks Increased memory usage for the indexes.
1 parent 33faf7e commit f00decb

File tree

4 files changed

+134
-12
lines changed

4 files changed

+134
-12
lines changed

common/dynamicconfig/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ type (
8888
Namespace string
8989
NamespaceID string
9090
TaskQueueName string
91+
Destination string
9192
TaskQueueType enumspb.TaskQueueType
9293
ShardID int32
9394
TaskType enumsspb.TaskType
94-
Destination string
9595
}
9696
)

common/dynamicconfig/collection.go

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"math"
78
"reflect"
89
"runtime"
910
"strconv"
@@ -43,8 +44,12 @@ type (
4344
poller goro.Group
4445

4546
// cache converted values. use weak pointers to avoid holding on to values in the cache
46-
// that are no longer in use.
47-
convertCache sync.Map // map[weak.Pointer[ConstrainedValue]]any
47+
// that are no longer in use. this must be a pointer since the cleanup closures need to
48+
// reference this without referencing Collection.
49+
convertCache *sync.Map // map[weak.Pointer[ConstrainedValue]]any
50+
51+
// index by constraints
52+
indexCache *sync.Map // map[weak.Pointer[ConstrainedValue]]map[Constraints]int32
4853
}
4954

5055
subscription[T any] struct {
@@ -87,6 +92,9 @@ type (
8792

8893
const (
8994
errCountLogThreshold = 1000
95+
// After this many constraints, switch to a cached lookup. This value was determined
96+
// empirically on my machine using BenchmarkCollectionIndexed.
97+
constraintsCacheThreshold = 32
9098
)
9199

92100
var (
@@ -112,6 +120,8 @@ func NewCollection(client Client, logger log.Logger) *Collection {
112120
logger: logger,
113121
errCount: -1,
114122
subscriptions: make(map[Key]map[int]any),
123+
convertCache: new(sync.Map),
124+
indexCache: new(sync.Map),
115125
}
116126
}
117127

@@ -216,10 +226,17 @@ func (c *Collection) throttleLog() bool {
216226
return errCount < errCountLogThreshold || errCount%errCountLogThreshold == 0
217227
}
218228

219-
func findMatch(cvs []ConstrainedValue, precedence []Constraints) (*ConstrainedValue, error) {
229+
func findMatch(
230+
cache *sync.Map,
231+
cvs []ConstrainedValue,
232+
precedence []Constraints,
233+
) (*ConstrainedValue, error) {
220234
if len(cvs) == 0 {
221235
return nil, errKeyNotPresent
236+
} else if len(cvs) > constraintsCacheThreshold && len(cvs) <= math.MaxInt32 {
237+
return findMatchWithCache(cache, cvs, precedence)
222238
}
239+
223240
for _, m := range precedence {
224241
for idx, cv := range cvs {
225242
if m == cv.Constraints {
@@ -235,6 +252,43 @@ func findMatch(cvs []ConstrainedValue, precedence []Constraints) (*ConstrainedVa
235252
return nil, errNoMatchingConstraint
236253
}
237254

255+
func findMatchWithCache(
256+
cache *sync.Map,
257+
cvs []ConstrainedValue,
258+
precedence []Constraints,
259+
) (*ConstrainedValue, error) {
260+
var cached map[Constraints]int32
261+
weakcvp := weak.Make(&cvs[0])
262+
if v, ok := cache.Load(weakcvp); ok {
263+
cached = v.(map[Constraints]int32) // nolint:revive // unchecked-type-assertion
264+
} else {
265+
cached = make(map[Constraints]int32, len(cvs))
266+
for i := range cvs {
267+
// pick first one to match behavior if multiple match
268+
if _, ok := cached[cvs[i].Constraints]; !ok {
269+
cached[cvs[i].Constraints] = int32(i)
270+
}
271+
}
272+
if _, loaded := cache.LoadOrStore(weakcvp, cached); !loaded {
273+
runtime.AddCleanup(&cvs[0], func(w weak.Pointer[ConstrainedValue]) {
274+
cache.Delete(w)
275+
}, weakcvp)
276+
}
277+
}
278+
279+
for _, m := range precedence {
280+
if i, ok := cached[m]; ok {
281+
// Note: cvs here is the slice returned by Client.GetValue. We want to return a
282+
// pointer into that slice so that the converted value is cached as long as the
283+
// Client keeps the []ConstrainedValue alive. See the comment on
284+
// Client.GetValue.
285+
return &cvs[i], nil
286+
}
287+
}
288+
// key is present but no constraint section matches
289+
return nil, errNoMatchingConstraint
290+
}
291+
238292
// matchAndConvert can't be a method of Collection because methods can't be generic, but we can
239293
// take a *Collection as an argument.
240294
func matchAndConvert[T any](
@@ -257,7 +311,7 @@ func matchAndConvertCvs[T any](
257311
precedence []Constraints,
258312
cvs []ConstrainedValue,
259313
) (T, any) {
260-
cvp, err := findMatch(cvs, precedence)
314+
cvp, err := findMatch(c.indexCache, cvs, precedence)
261315
if err != nil {
262316
// couldn't find a constrained match, use default
263317
return def, usingDefaultValue
@@ -449,7 +503,7 @@ func dispatchUpdate[T any](
449503
cvs []ConstrainedValue,
450504
) {
451505
var raw any
452-
cvp, err := findMatch(cvs, sub.prec)
506+
cvp, err := findMatch(c.indexCache, cvs, sub.prec)
453507
if err != nil {
454508
raw = usingDefaultValue
455509
} else {
@@ -529,11 +583,12 @@ func convertWithCache[T any](c *Collection, key Key, convert func(any) (T, error
529583
return zero, err
530584
}
531585

532-
c.convertCache.Store(weakcvp, t)
533-
534-
runtime.AddCleanup(cvp, func(w weak.Pointer[ConstrainedValue]) {
535-
c.convertCache.Delete(w)
536-
}, weakcvp)
586+
if _, loaded := c.convertCache.LoadOrStore(weakcvp, t); !loaded {
587+
cc := c.convertCache // capture only this pointer, not the whole Collection
588+
runtime.AddCleanup(cvp, func(w weak.Pointer[ConstrainedValue]) {
589+
cc.Delete(w)
590+
}, weakcvp)
591+
}
537592

538593
return t, nil
539594
}

common/dynamicconfig/collection_bench_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dynamicconfig_test
22

33
import (
4+
"fmt"
45
"testing"
56

67
"go.temporal.io/server/common/dynamicconfig"
@@ -122,3 +123,40 @@ func BenchmarkCollection(b *testing.B) {
122123
}
123124
})
124125
}
126+
127+
func BenchmarkCollectionIndexed(b *testing.B) {
128+
// You might want to set constraintsCacheThreshold to a high value before running this to
129+
// measure the performance of linear search.
130+
131+
var nums []int
132+
for v := 1.0; v < 1000; v *= 1.5 {
133+
nums = append(nums, int(v+0.999))
134+
}
135+
for _, numNs := range nums {
136+
// query for the middle one to measure the average
137+
queryNs := numNs / 2
138+
139+
b.Run(fmt.Sprintf("num%d", numNs), func(b *testing.B) {
140+
cvs := make([]dynamicconfig.ConstrainedValue, numNs)
141+
for i := range cvs {
142+
cvs[i] = dynamicconfig.ConstrainedValue{
143+
Constraints: dynamicconfig.Constraints{
144+
Namespace: fmt.Sprintf("namespace%d", i),
145+
},
146+
Value: 1000 + i,
147+
}
148+
}
149+
150+
cli := dynamicconfig.StaticClient{
151+
dynamicconfig.FrontendGlobalNamespaceRPS.Key(): cvs,
152+
}
153+
cln := dynamicconfig.NewCollection(cli, log.NewNoopLogger())
154+
get := dynamicconfig.FrontendGlobalNamespaceRPS.Get(cln)
155+
query := fmt.Sprintf("namespace%d", queryNs)
156+
157+
for b.Loop() {
158+
get(query)
159+
}
160+
})
161+
}
162+
}

common/dynamicconfig/find_match_test.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package dynamicconfig
22

33
import (
4+
"fmt"
5+
"sync"
46
"testing"
57

68
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
710
)
811

912
// These two tests are in a separate file in the 'dynamicconfig' package to access the private
@@ -54,11 +57,37 @@ func TestFindMatch(t *testing.T) {
5457
}
5558

5659
for _, tc := range testCases {
57-
_, err := findMatch(tc.v, tc.filters)
60+
var cache sync.Map
61+
_, err := findMatch(&cache, tc.v, tc.filters)
5862
assert.Equal(t, tc.matched, err == nil)
5963
}
6064
}
6165

66+
func TestFindMatchIndexed(t *testing.T) {
67+
var cvs []ConstrainedValue
68+
for i := range 100 {
69+
cvs = append(cvs, ConstrainedValue{
70+
Constraints: Constraints{
71+
Namespace: fmt.Sprintf("namespace%d", i),
72+
},
73+
Value: 1000 + i,
74+
})
75+
}
76+
77+
have := []Constraints{{Namespace: "namespace75"}}
78+
notHave := []Constraints{{Namespace: "othernamespace"}}
79+
80+
var cache sync.Map
81+
v, err := findMatch(&cache, cvs, have)
82+
require.NoError(t, err)
83+
require.NotNil(t, v)
84+
assert.EqualValues(t, 1075, v.Value)
85+
assert.Equal(t, &cvs[75], v)
86+
87+
_, err = findMatch(&cache, cvs, notHave)
88+
assert.Error(t, err)
89+
}
90+
6291
func TestFindMatchWithTyped(t *testing.T) {
6392
testCases := []struct {
6493
val []ConstrainedValue

0 commit comments

Comments
 (0)