Skip to content

Commit 7a88c7e

Browse files
committed
auto-ttl support: Add galaxy-level TTL config
Directly support jittering the TTL/expiry to make it easy to do the right thing. (bound the expiration as necessary)
1 parent 6459d31 commit 7a88c7e

File tree

3 files changed

+258
-2
lines changed

3 files changed

+258
-2
lines changed

galaxycache.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"context"
2929
"errors"
3030
"fmt"
31+
"math/rand/v2"
3132
"strconv"
3233
"sync"
3334
"time"
@@ -231,6 +232,14 @@ func (universe *Universe) NewGalaxyWithBackendInfo(name string, cacheBytes int64
231232
for _, opt := range opts {
232233
opt.apply(&gOpts)
233234
}
235+
if gOpts.peekTTL.entryTTL == 0 {
236+
// Default the peek TTL if the get TTL is unset
237+
// most of the time you don't want the peek TTL to be set if
238+
// the get TTL isn't, but, if the content of a galaxy
239+
// changes/gets more info, it can be useful to bound how long
240+
// data can bounce round without being re-hydrated anew.
241+
gOpts.peekTTL = gOpts.getTTL
242+
}
234243
g := &Galaxy{
235244
name: name,
236245
parent: universe,
@@ -440,6 +449,14 @@ type galaxyOpts struct {
440449
maxCandidates int
441450
clock clocks.Clock
442451
resetIdleStatsAge time.Duration
452+
// parameters for capping the TTL on gets and peeks
453+
getTTL ttlJitter
454+
// peeks may be set separately to trigger delayed reloading around version
455+
// upgrades (to prevent old versions of cache-values from persisting
456+
// indefinitely -- may be O(days) with substantial jitter)
457+
//
458+
// Defaults to matching getTTL
459+
peekTTL ttlJitter
443460

444461
peekPeer *PeekPeerCfg
445462
}
@@ -503,6 +520,41 @@ func WithPreviousPeerPeeking(cfg PeekPeerCfg) GalaxyOption {
503520
})
504521
}
505522

523+
func newTTLJitter(ttl, jitter time.Duration) ttlJitter {
524+
if ttl <= 0 {
525+
return ttlJitter{}
526+
}
527+
return ttlJitter{
528+
entryTTL: ttl,
529+
entryTTLJitter: max(jitter, -jitter),
530+
jitterTTLDown: jitter < 0,
531+
}
532+
}
533+
534+
// WithGetTTL allows the client to specify a default TTL for the galaxy, with optional jitter
535+
// jitter may be 0 to always set the TTL to exactly ttl in the future after calling the BackendGetter.
536+
//
537+
// - Negative TTLs are silently ignored.
538+
// - Negative jitter values indicate that jittering should only be done
539+
// downward, and never extend the lifetime beyond TTL.
540+
func WithGetTTL(ttl, jitter time.Duration) GalaxyOption {
541+
return newFuncGalaxyOption(func(g *galaxyOpts) {
542+
g.getTTL = newTTLJitter(ttl, jitter)
543+
})
544+
}
545+
546+
// WithPeekTTL allows the client to specify a default TTL for the galaxy, with optional jitter
547+
// jitter may be 0 to always set the TTL to exactly ttl in the future after calling the BackendGetter.
548+
//
549+
// - Negative TTLs are silently ignored.
550+
// - Negative jitter values indicate that jittering should only be done
551+
// downward, and never extend the lifetime beyond TTL.
552+
func WithPeekTTL(ttl, jitter time.Duration) GalaxyOption {
553+
return newFuncGalaxyOption(func(g *galaxyOpts) {
554+
g.peekTTL = newTTLJitter(ttl, jitter)
555+
})
556+
}
557+
506558
// flightGroup is defined as an interface which flightgroup.Group
507559
// satisfies. We define this so that we may test with an alternate
508560
// implementation.
@@ -896,6 +948,38 @@ func (g *Galaxy) load(ctx context.Context, opts loadOpts, key string, dest Codec
896948
return
897949
}
898950

951+
type ttlJitter struct {
952+
// if entryTTL > 0, we'll cap the expiry at now + entryTTL ± math.Int64N(entryTTLJitter) if jitterTTLDown
953+
// otherwise, the expiry is capped at now + entryTTL - math.Int64N(entryTTLJitter)
954+
// (where rand() returns a float between 0 a)
955+
entryTTL, entryTTLJitter time.Duration
956+
jitterTTLDown bool
957+
}
958+
959+
func (g ttlJitter) capExpiry(clk clocks.Clock, bgInfo *BackendGetInfo) {
960+
// initial common-case: no TTL set
961+
if g.entryTTL <= 0 {
962+
return
963+
}
964+
now := clk.Now()
965+
// if the TTL is already set, and it's closer than the max TTL + jitter.
966+
maxJitter := g.entryTTLJitter
967+
if g.jitterTTLDown {
968+
maxJitter = 0
969+
}
970+
// We're done if the expiration is already closer than ttl + maxJitter
971+
if !bgInfo.Expiration.IsZero() && bgInfo.Expiration.Sub(now) <= (g.entryTTL+maxJitter) {
972+
return
973+
}
974+
// Either there's no expiration, or it's too far in the future so we need to adjust it downward.
975+
newCapInterval := g.entryTTL
976+
if g.entryTTLJitter > 0 {
977+
newCapInterval += time.Duration(rand.Int64N(int64(maxJitter+g.entryTTLJitter))) - g.entryTTLJitter
978+
}
979+
newCap := now.Add(newCapInterval)
980+
bgInfo.Expiration = newCap
981+
}
982+
899983
func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, BackendGetInfo, error) {
900984
startTime := time.Now()
901985
defer func() {
@@ -906,6 +990,7 @@ func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte
906990
return nil, BackendGetInfo{}, err
907991
}
908992
mar, marErr := dest.MarshalBinary()
993+
g.opts.getTTL.capExpiry(g.clock, &bgInfo)
909994
return mar, bgInfo, marErr
910995
}
911996

@@ -946,6 +1031,7 @@ func (g *Galaxy) peekPeer(ctx context.Context, key string) (valWithStat, Backend
9461031
// inserted into the main cache.
9471032

9481033
value := g.newValWithStat(peekVal, nil)
1034+
g.opts.peekTTL.capExpiry(g.clock, &bgInfo)
9491035
g.populateCache(ctx, key, value, &g.mainCache, bgInfo)
9501036

9511037
return value, bgInfo, nil
@@ -978,6 +1064,7 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcherWithInfo, ke
9781064
if g.opts.promoter.ShouldPromote(key, value.data, stats) {
9791065
g.populateCache(ctx, key, value, &g.hotCache, bgInfo)
9801066
}
1067+
g.opts.getTTL.capExpiry(g.clock, &bgInfo)
9811068
return value, bgInfo, nil
9821069
}
9831070

galaxycache_peek_test.go

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ func TestGalaxycacheGetWithPeek(t *testing.T) {
6565
includeSelf bool
6666
setClock time.Time
6767

68+
getTTL, peekTTL time.Duration
69+
getTTLJitter, peekTTLJitter time.Duration
70+
6871
expiry time.Time
6972

7073
// getter for locally fetched values (may call t.Error/Errorf,
@@ -194,6 +197,132 @@ func TestGalaxycacheGetWithPeek(t *testing.T) {
194197
peer1Addr: 1,
195198
peer2Addr: 1,
196199
},
200+
}, {
201+
name: "peeks_hit_each_peer_with_expiry_capped_getTTL",
202+
includePeers: []int{0, 1, 2},
203+
includeSelf: true,
204+
setClock: baseTime,
205+
expiry: baseTime.Add(time.Hour * 84),
206+
getTTL: time.Hour * 30,
207+
localGetter: func(t testing.TB, key string) (string, BackendGetInfo, error) {
208+
t.Errorf("unexpected local fetch (peek should succeed on peer)")
209+
return "", BackendGetInfo{}, fmt.Errorf("unexpected call")
210+
},
211+
peekModes: map[string]testPeekMode{
212+
peer0Addr: testPeekModeHit,
213+
peer1Addr: testPeekModeHit,
214+
peer2Addr: testPeekModeHit,
215+
},
216+
checkSteps: []checkStep{
217+
{
218+
key: chtest.FallthroughKey(self, peer0),
219+
expVal: peer0Addr + ": peek got: " + chtest.FallthroughKey(self, peer0),
220+
expExpiry: baseTime.Add(time.Hour * 30),
221+
},
222+
{
223+
// second fetch, since the key should now be in the main cache
224+
key: chtest.FallthroughKey(self, peer0),
225+
expVal: peer0Addr + ": peek got: " + chtest.FallthroughKey(self, peer0),
226+
getOpts: GetOptions{
227+
FetchMode: FetchModePeek,
228+
},
229+
expExpiry: baseTime.Add(time.Hour * 30),
230+
},
231+
{
232+
key: chtest.FallthroughKey(self, peer1),
233+
expVal: peer1Addr + ": peek got: " + chtest.FallthroughKey(self, peer1),
234+
expExpiry: baseTime.Add(time.Hour * 30),
235+
},
236+
{
237+
key: chtest.FallthroughKey(self, peer1),
238+
expVal: peer1Addr + ": peek got: " + chtest.FallthroughKey(self, peer1),
239+
getOpts: GetOptions{
240+
FetchMode: FetchModePeek,
241+
},
242+
expExpiry: baseTime.Add(time.Hour * 30),
243+
},
244+
{
245+
key: chtest.FallthroughKey(self, peer2),
246+
expVal: peer2Addr + ": peek got: " + chtest.FallthroughKey(self, peer2),
247+
expExpiry: baseTime.Add(time.Hour * 30),
248+
},
249+
{
250+
key: chtest.FallthroughKey(self, peer2),
251+
expVal: peer2Addr + ": peek got: " + chtest.FallthroughKey(self, peer2),
252+
getOpts: GetOptions{
253+
FetchMode: FetchModePeek,
254+
},
255+
expExpiry: baseTime.Add(time.Hour * 30),
256+
},
257+
},
258+
expPeeks: map[string]int{
259+
peer0Addr: 1,
260+
peer1Addr: 1,
261+
peer2Addr: 1,
262+
},
263+
}, {
264+
name: "peeks_hit_each_peer_with_expiry_capped_peekTTL",
265+
includePeers: []int{0, 1, 2},
266+
includeSelf: true,
267+
setClock: baseTime,
268+
expiry: baseTime.Add(time.Hour * 84),
269+
peekTTL: time.Hour * 30,
270+
localGetter: func(t testing.TB, key string) (string, BackendGetInfo, error) {
271+
t.Errorf("unexpected local fetch (peek should succeed on peer)")
272+
return "", BackendGetInfo{}, fmt.Errorf("unexpected call")
273+
},
274+
peekModes: map[string]testPeekMode{
275+
peer0Addr: testPeekModeHit,
276+
peer1Addr: testPeekModeHit,
277+
peer2Addr: testPeekModeHit,
278+
},
279+
checkSteps: []checkStep{
280+
{
281+
key: chtest.FallthroughKey(self, peer0),
282+
expVal: peer0Addr + ": peek got: " + chtest.FallthroughKey(self, peer0),
283+
expExpiry: baseTime.Add(time.Hour * 30),
284+
},
285+
{
286+
// second fetch, since the key should now be in the main cache
287+
key: chtest.FallthroughKey(self, peer0),
288+
expVal: peer0Addr + ": peek got: " + chtest.FallthroughKey(self, peer0),
289+
getOpts: GetOptions{
290+
FetchMode: FetchModePeek,
291+
},
292+
expExpiry: baseTime.Add(time.Hour * 30),
293+
},
294+
{
295+
key: chtest.FallthroughKey(self, peer1),
296+
expVal: peer1Addr + ": peek got: " + chtest.FallthroughKey(self, peer1),
297+
expExpiry: baseTime.Add(time.Hour * 30),
298+
},
299+
{
300+
key: chtest.FallthroughKey(self, peer1),
301+
expVal: peer1Addr + ": peek got: " + chtest.FallthroughKey(self, peer1),
302+
getOpts: GetOptions{
303+
FetchMode: FetchModePeek,
304+
},
305+
expExpiry: baseTime.Add(time.Hour * 30),
306+
},
307+
{
308+
key: chtest.FallthroughKey(self, peer2),
309+
expVal: peer2Addr + ": peek got: " + chtest.FallthroughKey(self, peer2),
310+
expExpiry: baseTime.Add(time.Hour * 30),
311+
},
312+
{
313+
key: chtest.FallthroughKey(self, peer2),
314+
expVal: peer2Addr + ": peek got: " + chtest.FallthroughKey(self, peer2),
315+
getOpts: GetOptions{
316+
FetchMode: FetchModePeek,
317+
},
318+
expExpiry: baseTime.Add(time.Hour * 30),
319+
},
320+
},
321+
expPeeks: map[string]int{
322+
peer0Addr: 1,
323+
peer1Addr: 1,
324+
peer2Addr: 1,
325+
},
197326
}, {
198327
name: "peeks_timeout_each_peer",
199328
includePeers: []int{0, 1, 2},
@@ -653,7 +782,7 @@ func TestGalaxycacheGetWithPeek(t *testing.T) {
653782
g := u.NewGalaxyWithBackendInfo("easy come; easy go", 256, getter, WithPreviousPeerPeeking(PeekPeerCfg{
654783
WarmTime: warmPeriod,
655784
PeekTimeout: peekTimeout},
656-
))
785+
), WithGetTTL(tbl.getTTL, tbl.getTTLJitter), WithPeekTTL(tbl.peekTTL, tbl.peekTTLJitter))
657786

658787
fc.SetClock(tbl.setClock)
659788
ctx, cancel := context.WithCancel(context.Background())
@@ -687,7 +816,7 @@ func TestGalaxycacheGetWithPeek(t *testing.T) {
687816
t.Errorf("unexpected error fetching step %d; key %q: %s", i, step.key, getErr)
688817
continue
689818
} else if !getInfo.Expiry.Equal(step.expExpiry) {
690-
t.Errorf("unexpected expiry: %s; expected %s", getInfo.Expiry, step.expExpiry)
819+
t.Errorf("step %d: unexpected expiry: %s; expected %s", i, getInfo.Expiry, step.expExpiry)
691820
}
692821
if string(c) != step.expVal {
693822
t.Errorf("unexpected value for key %q in step %d\nwant: %q\n got %q", step.key, i, step.expVal, c)

galaxycache_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,19 @@ func setupStringGalaxyTest(cacheFills *AtomicInt, clk clocks.Clock, ttl time.Dur
6969
return stringGalaxy, ctx, stringc
7070
}
7171

72+
func setupSimpleStringGalaxyTestWithOpts(cacheFills *AtomicInt, otherOpts ...GalaxyOption) (*Galaxy, context.Context, chan string) {
73+
universe, ctx, stringc := initSetup()
74+
stringGalaxy := universe.NewGalaxy(stringGalaxyName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Codec) error {
75+
if key == fromChan {
76+
key = <-stringc
77+
}
78+
cacheFills.Add(1)
79+
str := "ECHO:" + key
80+
return dest.UnmarshalBinary([]byte(str))
81+
}), otherOpts...)
82+
return stringGalaxy, ctx, stringc
83+
}
84+
7285
// tests that a BackendGetter's Get method is only called once with two
7386
// outstanding callers
7487
func TestGetDupSuppress(t *testing.T) {
@@ -162,6 +175,33 @@ func TestCachingWithExpiry(t *testing.T) {
162175
}
163176
}
164177

178+
func TestCachingWithAutoExpiryNoJitter(t *testing.T) {
179+
t.Parallel()
180+
var cacheFills AtomicInt
181+
fc := fake.NewClock(time.Now())
182+
// we'll set the ttl 1 minute in the future and advance by 31s per-iteration, so we get 5 fetches
183+
stringGalaxy, ctx, _ := setupSimpleStringGalaxyTestWithOpts(&cacheFills, WithClock(fc), WithGetTTL(time.Minute, 0))
184+
t.Logf("galaxy: %+v", stringGalaxy)
185+
possibleExpiries := map[time.Time]struct{}{}
186+
fills := countFills(func() {
187+
for range 10 {
188+
possibleExpiries[fc.Now().Add(time.Minute)] = struct{}{}
189+
var s StringCodec
190+
if info, err := stringGalaxy.GetWithOptions(ctx, GetOptions{}, "TestCaching-key", &s); err != nil {
191+
t.Fatal(err)
192+
} else if _, availExp := possibleExpiries[info.Expiry]; !availExp {
193+
t.Errorf("unexpected expiry: %s; expected one of: %v", info.Expiry, slices.Collect(maps.Keys(possibleExpiries)))
194+
} else if info.Expiry.Sub(fc.Now()) < 0 {
195+
t.Errorf("received expired item: current time %s; got %s", fc.Now(), info.Expiry)
196+
}
197+
fc.Advance(time.Second * 31)
198+
}
199+
}, &cacheFills)
200+
if fills != 5 {
201+
t.Errorf("expected 5 cache fill; got %d", fills)
202+
}
203+
}
204+
165205
func TestCacheEviction(t *testing.T) {
166206
var cacheFills AtomicInt
167207
stringGalaxy, ctx, _ := setupStringGalaxyTest(&cacheFills, clocks.DefaultClock(), 0)

0 commit comments

Comments
 (0)