Skip to content

Commit 475cae9

Browse files
committed
Add fetch call
1 parent 20dc7a0 commit 475cae9

File tree

5 files changed

+200
-6
lines changed

5 files changed

+200
-6
lines changed

background.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,20 @@ func (km *keyMutex) Lock(key string) bool {
3535
return true
3636
}
3737

38+
// Wait blocks until the given key is unlocked by a prior Lock call.
39+
// It does not acquire the lock; callers typically call Get after waiting
40+
// or attempt to Lock again to become the next owner.
41+
func (km *keyMutex) Wait(key string) {
42+
km.l.Lock()
43+
for {
44+
if _, ok := km.s[key]; !ok {
45+
km.l.Unlock()
46+
return
47+
}
48+
km.c.Wait()
49+
}
50+
}
51+
3852
func (m CacheMap) BackgroundUpdate(key string, updater func() (interface{}, error)) {
3953
// Lock the key from writes
4054
locked := backgroundMutex.Lock(key)

fetch.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package ttlmap
2+
3+
import (
4+
"errors"
5+
"time"
6+
)
7+
8+
// ErrTypeMismatch is returned when the cached value cannot be cast to the requested generic type.
9+
var ErrTypeMismatch = errors.New("ttlmap: cached value has different type")
10+
11+
// Fetch returns a strictly typed value from the cache, fetching from the provided source function when missing.
12+
func Fetch[T any](m CacheMap, key string, source func(string) (T, error)) (T, error) {
13+
var zero T
14+
var returnValue T
15+
var okCast bool
16+
17+
shard := m.GetShard(key)
18+
shard.RLock()
19+
itm, ok := shard.items[key]
20+
if ok {
21+
returnValue, okCast = itm.GetValue().(T)
22+
shard.RUnlock()
23+
if !okCast {
24+
return zero, ErrTypeMismatch
25+
}
26+
27+
if !itm.Expired() {
28+
return returnValue, nil
29+
}
30+
31+
if !itm.isUpdating && itm.updateMutex.TryLock() {
32+
itm.isUpdating = true
33+
value, err := source(key)
34+
if err == nil {
35+
m.Set(key, value, nil)
36+
}
37+
itm.updateMutex.Unlock()
38+
itm.isUpdating = false
39+
return value, nil
40+
}
41+
42+
// Item has expired, but another thread is updateMutex
43+
return returnValue, nil
44+
}
45+
shard.RUnlock()
46+
shard.Lock()
47+
defer shard.Unlock()
48+
49+
itm, ok = shard.items[key]
50+
if ok {
51+
// check the value was not already processed when waiting for the lock
52+
returnValue, okCast = itm.GetValue().(T)
53+
if !okCast {
54+
return zero, ErrTypeMismatch
55+
}
56+
return returnValue, nil
57+
}
58+
59+
value, err := source(key)
60+
if err == nil {
61+
itm = newItem(value, m.options.defaultCacheDuration, time.Now().Add(m.options.maxLifetime), nil)
62+
shard.items[key] = itm
63+
}
64+
return value, err
65+
}

fetch_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package ttlmap_test
2+
3+
import (
4+
"log"
5+
"sync"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
10+
"github.com/packaged/ttlmap"
11+
)
12+
13+
func TestFetch_MissSingleflight(t *testing.T) {
14+
cache := ttlmap.New(ttlmap.WithDefaultTTL(100 * time.Millisecond))
15+
16+
var calls int32
17+
source := func(key string) (int, error) {
18+
time.Sleep(10 * time.Millisecond)
19+
atomic.AddInt32(&calls, 1)
20+
return 123, nil
21+
}
22+
23+
// Launch many concurrent callers for a missing key
24+
const n = 32
25+
var wg sync.WaitGroup
26+
wg.Add(n)
27+
results := make([]int, n)
28+
for i := 0; i < n; i++ {
29+
go func(ix int) {
30+
defer wg.Done()
31+
v, err := ttlmap.Fetch[int](cache, "k1", source)
32+
if err != nil {
33+
t.Errorf("unexpected error: %v", err)
34+
return
35+
}
36+
results[ix] = v
37+
}(i)
38+
}
39+
wg.Wait()
40+
41+
if atomic.LoadInt32(&calls) != 1 {
42+
t.Fatalf("expected single source call, got %d", calls)
43+
}
44+
for i := 0; i < n; i++ {
45+
if results[i] != 123 {
46+
t.Fatalf("unexpected result at %d: %d", i, results[i])
47+
}
48+
}
49+
}
50+
51+
func TestFetch_TypeMismatch(t *testing.T) {
52+
cache := ttlmap.New(ttlmap.WithDefaultTTL(100 * time.Millisecond))
53+
cache.Set("k", "hello", nil)
54+
55+
var called int32
56+
src := func(key string) (int, error) {
57+
atomic.AddInt32(&called, 1)
58+
return 42, nil
59+
}
60+
_, err := ttlmap.Fetch[int](cache, "k", src)
61+
if err == nil {
62+
t.Fatalf("expected ErrTypeMismatch, got nil")
63+
}
64+
if err != ttlmap.ErrTypeMismatch {
65+
t.Fatalf("expected ErrTypeMismatch, got %v", err)
66+
}
67+
if atomic.LoadInt32(&called) != 0 {
68+
t.Fatalf("source should not be called on type mismatch hit")
69+
}
70+
}
71+
72+
func TestFetch_StaleWhileRevalidate(t *testing.T) {
73+
// Short TTL to trigger expiry
74+
cache := ttlmap.New(ttlmap.WithDefaultTTL(time.Second), ttlmap.WithMaxLifetime(2*time.Second))
75+
ttl := 20 * time.Millisecond
76+
cache.Set("sk", 1, &ttl)
77+
78+
src := func(key string) (int, error) {
79+
time.Sleep(15 * time.Millisecond)
80+
return 2, nil
81+
}
82+
// First call should return stale value 1 and trigger background refresh
83+
v1, err := ttlmap.Fetch[int](cache, "sk", src)
84+
if err != nil {
85+
t.Fatalf("unexpected err: %v", err)
86+
}
87+
if v1 != 1 {
88+
t.Fatalf("expected stale value 1, got %d", v1)
89+
}
90+
91+
wg := sync.WaitGroup{}
92+
for i := 0; i < 100000; i++ {
93+
wg.Add(1)
94+
go func() {
95+
defer wg.Done()
96+
_, fetchErr := ttlmap.Fetch[int](cache, "sk", src)
97+
if fetchErr != nil {
98+
log.Print(fetchErr.Error())
99+
}
100+
}()
101+
}
102+
wg.Wait()
103+
104+
time.Sleep(10 * time.Millisecond)
105+
// Next call should observe refreshed value 2
106+
v2, err := ttlmap.Fetch[int](cache, "sk", src)
107+
if err != nil {
108+
t.Fatalf("unexpected err: %v", err)
109+
}
110+
if v2 != 2 {
111+
t.Fatalf("expected refreshed value 2, got %d", v2)
112+
}
113+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module github.com/packaged/ttlmap
22

3-
go 1.15
3+
go 1.20

item.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import (
88
// Item represents a record in the map
99
type Item struct {
1010
sync.RWMutex
11-
data interface{}
12-
deadline time.Time
13-
ttl time.Duration
14-
expires *time.Time
15-
onDelete func(*Item)
11+
updateMutex sync.RWMutex
12+
isUpdating bool
13+
data interface{}
14+
deadline time.Time
15+
ttl time.Duration
16+
expires *time.Time
17+
onDelete func(*Item)
1618
}
1719

1820
func newItem(value interface{}, duration time.Duration, deadline time.Time, onDelete func(*Item)) *Item {

0 commit comments

Comments
 (0)