Skip to content

Commit 2237b4f

Browse files
support background entry removal for workflow cache (#7902)
## What changed? This adds support to the lru cache to actively expire entries older than the TTL, by spawning a background goroutine that periodically deletes old entries. A dynamic config, off by default, is added that can enable this feature for the workflow cache. ## Why? This can reduce the memory usage, and associated Go GC resource usage, for workflow entries that won't be utilized since they are past their TTL. ## How did you test it? - [x] built - [x] run locally and tested manually - [x] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) Also running this in test setups to verify expected memory reduction. ## Potential risks
1 parent 7e01f6c commit 2237b4f

8 files changed

Lines changed: 276 additions & 53 deletions

File tree

common/cache/cache.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"time"
55

66
"go.temporal.io/server/common/clock"
7+
"go.temporal.io/server/common/dynamicconfig"
78
)
89

910
// A Cache is a generalized interface to a cache. See cache.LRU for a specific
@@ -34,6 +35,13 @@ type Cache interface {
3435
Size() int
3536
}
3637

38+
type StoppableCache interface {
39+
Cache
40+
41+
// Stop halts any background processing, and should be called when the cache will no longer be used.
42+
Stop()
43+
}
44+
3745
// Options control the behavior of the cache.
3846
type Options struct {
3947
// TTL controls the time-to-live for a given cache entry. Cache entries that
@@ -49,6 +57,9 @@ type Options struct {
4957
OnPut func(val any)
5058

5159
OnEvict func(val any)
60+
61+
// BackgroundEvict configures background scanning for expired entries.
62+
BackgroundEvict func() dynamicconfig.CacheBackgroundEvictSettings
5263
}
5364

5465
// SimpleOptions provides options that can be used to configure SimpleCache.

common/cache/lru.go

Lines changed: 98 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package cache
22

33
import (
44
"container/list"
5+
"context"
56
"sync"
67
"time"
78

89
enumspb "go.temporal.io/api/enums/v1"
910
"go.temporal.io/api/serviceerror"
1011
"go.temporal.io/server/common/clock"
12+
"go.temporal.io/server/common/dynamicconfig"
13+
"go.temporal.io/server/common/goro"
1114
"go.temporal.io/server/common/metrics"
1215
)
1316

@@ -27,18 +30,20 @@ const emptyEntrySize = 0
2730
// lru is a concurrent fixed size cache that evicts elements in lru order
2831
type (
2932
lru struct {
30-
mut sync.Mutex
31-
byAccess *list.List
32-
byKey map[interface{}]*list.Element
33-
maxSize int
34-
currSize int
35-
pinnedSize int
36-
onPut func(val any)
37-
onEvict func(val any)
38-
ttl time.Duration
39-
pin bool
40-
timeSource clock.TimeSource
41-
metricsHandler metrics.Handler
33+
mut sync.Mutex
34+
byAccess *list.List
35+
byKey map[interface{}]*list.Element
36+
maxSize int
37+
currSize int
38+
pinnedSize int
39+
onPut func(val any)
40+
onEvict func(val any)
41+
ttl time.Duration
42+
pin bool
43+
timeSource clock.TimeSource
44+
metricsHandler metrics.Handler
45+
backgroundEvict dynamicconfig.TypedPropertyFn[dynamicconfig.CacheBackgroundEvictSettings]
46+
loops goro.Group
4247
}
4348

4449
iteratorImpl struct {
@@ -129,40 +134,53 @@ func (entry *entryImpl) CreateTime() time.Time {
129134
}
130135

131136
// New creates a new cache with the given options
132-
func New(maxSize int, opts *Options) Cache {
137+
func New(maxSize int, opts *Options) StoppableCache {
133138
return NewWithMetrics(maxSize, opts, metrics.NoopMetricsHandler)
134139
}
135140

136141
// NewWithMetrics creates a new cache that will emit capacity and ttl metrics.
137142
// handler should be tagged with metrics.CacheTypeTag.
138-
func NewWithMetrics(maxSize int, opts *Options, handler metrics.Handler) Cache {
143+
func NewWithMetrics(maxSize int, opts *Options, handler metrics.Handler) StoppableCache {
139144
if opts == nil {
140145
opts = &Options{}
141146
}
147+
if opts.BackgroundEvict == nil {
148+
opts.BackgroundEvict = func() dynamicconfig.CacheBackgroundEvictSettings {
149+
return dynamicconfig.CacheBackgroundEvictSettings{
150+
Enabled: false,
151+
}
152+
}
153+
}
154+
142155
timeSource := opts.TimeSource
143156
if timeSource == nil {
144157
timeSource = clock.NewRealTimeSource()
145158
}
146159

147160
metrics.CacheSize.With(handler).Record(float64(maxSize))
148161
metrics.CacheTtl.With(handler).Record(opts.TTL)
149-
return &lru{
150-
byAccess: list.New(),
151-
byKey: make(map[interface{}]*list.Element),
152-
ttl: opts.TTL,
153-
maxSize: maxSize,
154-
currSize: 0,
155-
pin: opts.Pin,
156-
onPut: opts.OnPut,
157-
onEvict: opts.OnEvict,
158-
timeSource: timeSource,
159-
metricsHandler: handler,
160-
}
162+
c := &lru{
163+
byAccess: list.New(),
164+
byKey: make(map[interface{}]*list.Element),
165+
ttl: opts.TTL,
166+
maxSize: maxSize,
167+
currSize: 0,
168+
pin: opts.Pin,
169+
onPut: opts.OnPut,
170+
onEvict: opts.OnEvict,
171+
timeSource: timeSource,
172+
metricsHandler: handler,
173+
backgroundEvict: opts.BackgroundEvict,
174+
}
175+
if c.backgroundEvict().Enabled {
176+
c.loops.Go(c.bgEvictLoop)
177+
}
178+
return c
161179
}
162180

163181
// NewLRU creates a new LRU cache of the given size, setting initial capacity
164182
// to the max size
165-
func NewLRU(maxSize int, handler metrics.Handler) Cache {
183+
func NewLRU(maxSize int, handler metrics.Handler) StoppableCache {
166184
return New(maxSize, nil)
167185
}
168186

@@ -181,14 +199,14 @@ func (c *lru) Get(key interface{}) interface{} {
181199

182200
entry := element.Value.(*entryImpl)
183201

184-
metrics.CacheEntryAgeOnGet.With(c.metricsHandler).Record(c.timeSource.Now().UTC().Sub(entry.createTime))
185-
186202
if c.isEntryExpired(entry, c.timeSource.Now().UTC()) {
187203
// Entry has expired
188204
c.deleteInternal(element)
189205
return nil
190206
}
191207

208+
metrics.CacheEntryAgeOnGet.With(c.metricsHandler).Record(c.timeSource.Now().UTC().Sub(entry.createTime))
209+
192210
c.updateEntryRefCount(entry)
193211
c.byAccess.MoveToFront(element)
194212
return entry.value
@@ -424,3 +442,54 @@ func (c *lru) updateEntryRefCount(entry *entryImpl) {
424442
}
425443
}
426444
}
445+
446+
func (c *lru) Stop() {
447+
c.loops.Cancel()
448+
}
449+
450+
func (c *lru) bgEvictLoop(ctx context.Context) error {
451+
ch, t := c.timeSource.NewTimer(c.backgroundEvict().LoopInterval)
452+
for {
453+
select {
454+
case <-ch:
455+
settings := c.backgroundEvict()
456+
if settings.Enabled {
457+
c.bgEvict(settings)
458+
}
459+
t.Reset(settings.LoopInterval)
460+
case <-ctx.Done():
461+
return ctx.Err()
462+
}
463+
}
464+
}
465+
466+
func (c *lru) bgEvict(settings dynamicconfig.CacheBackgroundEvictSettings) {
467+
now := c.timeSource.Now().UTC()
468+
469+
// Limit each iteration to scanning MaxEntryPerCall entries, to avoid holding the cache lock for too long.
470+
evictToMax := func() (again bool) {
471+
c.mut.Lock()
472+
defer c.mut.Unlock()
473+
474+
element := c.byAccess.Back()
475+
if settings.MaxEntryPerCall <= 0 {
476+
return false
477+
}
478+
for n := 0; n < settings.MaxEntryPerCall; n++ {
479+
if element == nil {
480+
return false
481+
}
482+
elementPrev := element.Prev()
483+
entry := element.Value.(*entryImpl) // nolint:revive
484+
if !c.isEntryExpired(entry, now) {
485+
return false
486+
}
487+
c.deleteInternal(element)
488+
element = elementPrev
489+
}
490+
return element != nil
491+
}
492+
493+
for evictToMax() {
494+
}
495+
}

common/cache/lru_test.go

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
1212
"go.temporal.io/server/common/clock"
13+
"go.temporal.io/server/common/dynamicconfig"
1314
"go.temporal.io/server/common/metrics"
1415
"go.temporal.io/server/common/metrics/metricstest"
1516
)
@@ -138,8 +139,7 @@ func TestLRUWithTTL(t *testing.T) {
138139
assert.Equal(t, 2, len(snapshot[metrics.CacheUsage.Name()]))
139140
assert.Equal(t, float64(0), snapshot[metrics.CacheUsage.Name()][1].Value)
140141
assert.Equal(t, 0, cache.Size())
141-
assert.Equal(t, 2, len(snapshot[metrics.CacheEntryAgeOnGet.Name()]))
142-
assert.Equal(t, time.Millisecond*300, snapshot[metrics.CacheEntryAgeOnGet.Name()][1].Value)
142+
assert.Equal(t, 1, len(snapshot[metrics.CacheEntryAgeOnGet.Name()]))
143143
assert.Equal(t, time.Millisecond*300, snapshot[metrics.CacheEntryAgeOnEviction.Name()][0].Value)
144144
}
145145

@@ -735,3 +735,117 @@ func TestCache_InvokeLifecycleCallbacks(t *testing.T) {
735735
assert.Nil(t, cache.Get("key"))
736736
require.Equal(t, 2, onEvict, "expected OnEvict callback to be invoked")
737737
}
738+
739+
func TestCache_UnusedExpiry(t *testing.T) {
740+
t.Parallel()
741+
r := require.New(t)
742+
743+
ttl := 10 * time.Minute
744+
loopInterval := 1 * time.Minute
745+
timeSource := clock.NewEventTimeSource()
746+
747+
cache := New(5,
748+
&Options{
749+
TTL: ttl,
750+
TimeSource: timeSource,
751+
BackgroundEvict: func() dynamicconfig.CacheBackgroundEvictSettings {
752+
return dynamicconfig.CacheBackgroundEvictSettings{
753+
Enabled: true,
754+
LoopInterval: loopInterval,
755+
MaxEntryPerCall: 1,
756+
}
757+
},
758+
},
759+
)
760+
761+
cache.Put(1, 1)
762+
r.Equal(1, cache.Size())
763+
764+
r.Eventually(func() bool {
765+
timeSource.Advance(loopInterval)
766+
return cache.Size() == 0
767+
}, 2*time.Second, 100*time.Millisecond)
768+
769+
cache.Put(2, 2)
770+
timeSource.Advance(ttl / 2)
771+
cache.Put(3, 3)
772+
r.Equal(2, cache.Size())
773+
774+
r.Eventually(func() bool {
775+
timeSource.Advance(loopInterval)
776+
return cache.Size() == 1 && cache.Get(2) == nil && cache.Get(3) == 3
777+
}, 2*time.Second, 100*time.Millisecond)
778+
779+
r.Eventually(func() bool {
780+
timeSource.Advance(loopInterval)
781+
return cache.Size() == 0 && cache.Get(2) == nil && cache.Get(3) == nil
782+
}, 2*time.Second, 100*time.Millisecond)
783+
784+
// Stop the background goroutine, confirm no active expiration.
785+
cache.Put(4, 4)
786+
cache.Stop()
787+
l, ok := cache.(*lru)
788+
r.True(ok)
789+
c := make(chan struct{})
790+
go func() {
791+
l.loops.Wait()
792+
close(c)
793+
}()
794+
r.Eventually(func() bool {
795+
select {
796+
case <-c:
797+
return true
798+
default:
799+
return false
800+
}
801+
}, 2*time.Second, 100*time.Millisecond)
802+
timeSource.Advance(ttl + 1*time.Second)
803+
// The cache should still have entry 4,
804+
r.Equal(1, cache.Size())
805+
// but this Get call will check the (hard) ttl & expire it.
806+
r.Equal(nil, cache.Get(4))
807+
}
808+
809+
func TestCache_UnusedExpiryPin(t *testing.T) {
810+
t.Parallel()
811+
r := require.New(t)
812+
813+
ttl := 10 * time.Minute
814+
loopInterval := 1 * time.Minute
815+
timeSource := clock.NewEventTimeSource()
816+
817+
cache := New(5,
818+
&Options{
819+
TTL: ttl,
820+
Pin: true,
821+
TimeSource: timeSource,
822+
BackgroundEvict: func() dynamicconfig.CacheBackgroundEvictSettings {
823+
return dynamicconfig.CacheBackgroundEvictSettings{
824+
Enabled: true,
825+
LoopInterval: loopInterval,
826+
MaxEntryPerCall: 1,
827+
}
828+
},
829+
},
830+
)
831+
832+
_, err := cache.PutIfNotExist(1, 1)
833+
r.NoError(err)
834+
timeSource.Advance(ttl / 2)
835+
cache.Release(1)
836+
_, err = cache.PutIfNotExist(2, 2)
837+
r.NoError(err)
838+
r.Equal(2, cache.Size())
839+
840+
r.Eventually(func() bool {
841+
timeSource.Advance(loopInterval)
842+
return cache.Size() == 1 && cache.Get(1) == nil
843+
}, 1*time.Second, 100*time.Millisecond)
844+
845+
cache.Release(2)
846+
847+
r.Eventually(func() bool {
848+
timeSource.Advance(loopInterval)
849+
return cache.Size() == 0
850+
}, 1*time.Second, 100*time.Millisecond)
851+
}

common/dynamicconfig/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,6 +1400,11 @@ will wait on workflow lock acquisition. Requires service restart to take effect.
14001400
`HistoryCacheHostLevelMaxSizeBytes is the maximum size of the host level history cache. This is only used if
14011401
HistoryCacheSizeBasedLimit is set to true.`,
14021402
)
1403+
HistoryCacheBackgroundEvict = NewGlobalTypedSetting(
1404+
"history.cacheBackgroundEvict",
1405+
DefaultHistoryCacheBackgroundEvictSettings,
1406+
`HistoryCacheBackgroundEvict configures background processing to purge expired entries from the history cache.`,
1407+
)
14031408
EnableWorkflowExecutionTimeoutTimer = NewGlobalBoolSetting(
14041409
"history.enableWorkflowExecutionTimeoutTimer",
14051410
true,

common/dynamicconfig/shared_constants.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,20 @@ type CircuitBreakerSettings struct {
100100
// Timeout: Period of open state before changing to half-open state (default 60s).`
101101
Timeout time.Duration
102102
}
103+
104+
type CacheBackgroundEvictSettings struct {
105+
// Enabled controls whether background purging of expired entries is active. To enable,
106+
// this must be set to true at process start, but can be dynamically set to false to
107+
// stop scanning entries.
108+
Enabled bool
109+
// LoopInterval is the frequency that a background goroutine scans for expired entries.
110+
LoopInterval time.Duration
111+
// MaxEntryPerCall is the max number of entries that are scanned while the cache is locked.
112+
MaxEntryPerCall int
113+
}
114+
115+
var DefaultHistoryCacheBackgroundEvictSettings = CacheBackgroundEvictSettings{
116+
Enabled: false,
117+
LoopInterval: 1 * time.Minute,
118+
MaxEntryPerCall: 1024,
119+
}

0 commit comments

Comments
 (0)