Skip to content

Commit 3fb036d

Browse files
authored
Further near cache expiry optimizations (#103)
* Further near cache expiry optimizations * minor comment * Correct version incompatability * Add cacheEntriesExpired and cacheEntriesPruned * Minor corrections
1 parent 90a89ce commit 3fb036d

File tree

6 files changed

+221
-42
lines changed

6 files changed

+221
-42
lines changed

coherence/doc.go

+3
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,9 @@ To manage the amount of memory used by the near cache, the following options are
738738
739739
Note: You can specify either High-Units or Memory and in either case, optionally, a TTL.
740740
741+
Note: The minimum expiry time for a near cache entry is 1/4 second. This is to ensure that expiry of elements is as efficient
742+
as possible. You will receive an error if you try to set the TTL to a lower value.
743+
741744
The above can be specified by passing [NearCacheOptions] within [WithNearCache] when creating a [NamedMap] or [NamedCache].
742745
See below for various ways of creating near caches.
743746

coherence/localcache.go

+151-31
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ type CacheStats interface {
5353
GetTotalGets() int64 // the number of gets against the near cache
5454
GetCachePrunes() int64 // the number of times the near cache was pruned
5555
GetCachePrunesDuration() time.Duration // the duration of all prunes
56-
GetCacheExpires() int64 // the number of times the near cache had expiry event
56+
GetCacheEntriesPruned() int64 // the actual number of cache entries that were pruned
57+
GetCacheExpires() int64 // the number of times the near cache expired entries
5758
GetCacheExpiresDuration() time.Duration // the duration of all expires
59+
GetCacheEntriesExpired() int64 // the actual number of cache entries that were expired
5860
Size() int // the number of entries in the near cache
5961
SizeBytes() int64 // the number of bytes used by the entries (keys and values) in the near cache
6062
ResetStats() // reset the stats for the near cache, not including Size() or SizeBytes()
@@ -64,16 +66,20 @@ type localCacheImpl[K comparable, V any] struct {
6466
Name string
6567
options *localCacheOptions
6668
sync.Mutex
67-
data map[K]*localCacheEntry[K, V]
68-
cacheHits int64
69-
cacheMisses int64
70-
cacheMissesNannos int64
71-
cachePuts int64
72-
cachePrunes int64
73-
cachePrunesNannos int64
74-
cacheExpires int64
75-
cacheExpiresNannos int64
76-
cacheMemory int64
69+
data map[K]*localCacheEntry[K, V]
70+
expiryMap map[int64]*[]K
71+
nextExpiry time.Time
72+
cacheHits int64
73+
cacheMisses int64
74+
cacheMissesNannos int64
75+
cachePuts int64
76+
cacheEntriesPruned int64
77+
cachePrunes int64
78+
cachePrunesNannos int64
79+
cacheEntriesExpired int64
80+
cacheExpires int64
81+
cacheExpiresNannos int64
82+
cacheMemory int64
7783
}
7884

7985
type localCacheEntry[K comparable, V any] struct {
@@ -82,6 +88,7 @@ type localCacheEntry[K comparable, V any] struct {
8288
ttl time.Duration
8389
insertTime time.Time
8490
lastAccess time.Time
91+
expiresAt time.Time
8592
}
8693

8794
type pair[K comparable] struct {
@@ -123,6 +130,7 @@ func (l *localCacheImpl[K, V]) PutWithExpiry(key K, value V, ttl time.Duration)
123130
newEntry := newLocalCacheEntry[K, V](key, value, ttl)
124131

125132
l.updateEntrySize(newEntry, 1)
133+
l.registerExpiry(newEntry)
126134

127135
prev, ok := l.data[key]
128136

@@ -185,6 +193,7 @@ func (l *localCacheImpl[K, V]) Remove(key K) *V {
185193
v, ok := l.data[key]
186194

187195
if ok {
196+
l.removeExpiry(key)
188197
delete(l.data, key)
189198
l.updateEntrySize(v, -1)
190199
return &v.value
@@ -219,6 +228,7 @@ func (l *localCacheImpl[K, V]) Clear() {
219228
defer l.Unlock()
220229

221230
l.data = make(map[K]*localCacheEntry[K, V], 0)
231+
l.expiryMap = make(map[int64]*[]K, 0)
222232
l.updateCacheMemory(0)
223233
}
224234

@@ -232,26 +242,56 @@ func (l *localCacheImpl[K, V]) GetStats() CacheStats {
232242
}
233243

234244
// expireEntries goes through the map to see if any entries have expired due to ttl.
245+
// this is done in buckets of 1/4 second as so to be more efficient. this means the
246+
// min expiry duration is 1/4 of a second.
235247
func (l *localCacheImpl[K, V]) expireEntries() {
248+
if len(l.expiryMap) == 0 {
249+
return
250+
}
251+
236252
var (
237-
keysToDelete = make([]K, 0)
238-
start = time.Now()
253+
bucketsToRemove = make([]int64, 0)
254+
expiryKeys = make([]int64, len(l.expiryMap))
255+
start = time.Now()
256+
startUnixMillis = start.UnixMilli()
257+
index = 0
239258
)
240259

241-
// check for cache expiry
242-
for k, v := range l.data {
243-
if v.ttl > 0 && start.Sub(v.insertTime) > v.ttl {
244-
keysToDelete = append(keysToDelete, k)
245-
}
260+
if start.Before(l.nextExpiry) {
261+
return
246262
}
247263

248-
// delete all the keys that were flagged from the expiry, this may be enough to free up space
249-
for _, k := range keysToDelete {
250-
l.updateEntrySize(l.data[k], -1)
251-
delete(l.data, k)
264+
// get the keys from the map and sort them, so we are seeing the earliest first
265+
for key := range l.expiryMap {
266+
expiryKeys[index] = key
267+
index++
252268
}
253269

254-
if len(keysToDelete) > 0 {
270+
sort.Slice(expiryKeys, func(p, q int) bool {
271+
return p < q
272+
})
273+
274+
for _, expireTime := range expiryKeys {
275+
if expireTime < startUnixMillis {
276+
// need to expire all entries for the expiry key, retrieve the entry
277+
if v, ok := l.expiryMap[expireTime]; ok {
278+
bucketsToRemove = append(bucketsToRemove, expireTime)
279+
for _, k := range *v {
280+
l.updateEntrySize(l.data[k], -1)
281+
atomic.AddInt64(&l.cacheEntriesExpired, 1)
282+
delete(l.data, k)
283+
}
284+
}
285+
}
286+
}
287+
288+
if len(bucketsToRemove) > 0 {
289+
l.nextExpiry = time.Now().Add(time.Duration(256) * time.Millisecond)
290+
291+
for _, b := range bucketsToRemove {
292+
delete(l.expiryMap, b)
293+
}
294+
255295
l.registerExpireNanos(time.Since(start).Nanoseconds())
256296
}
257297
}
@@ -300,24 +340,35 @@ func (l *localCacheImpl[K, V]) pruneEntries() {
300340
break
301341
}
302342
l.updateEntrySize(l.data[v.key], -1)
343+
atomic.AddInt64(&l.cacheEntriesPruned, 1)
344+
l.removeExpiry(v.key)
303345
delete(l.data, v.key)
304346
}
305347
}
306348
}
307349

308350
func newLocalCacheEntry[K comparable, V any](key K, value V, ttl time.Duration) *localCacheEntry[K, V] {
309-
return &localCacheEntry[K, V]{
351+
now := time.Now()
352+
entry := &localCacheEntry[K, V]{
310353
key: key,
311354
value: value,
312355
ttl: ttl,
313-
insertTime: time.Now(),
356+
insertTime: now,
314357
}
358+
if ttl > 0 {
359+
// granularity of expiry is minimum of 250ms
360+
entry.expiresAt = now.Add(getMillisBucket(ttl))
361+
}
362+
363+
return entry
315364
}
316365

317366
func newLocalCache[K comparable, V any](name string, options ...func(localCache *localCacheOptions)) *localCacheImpl[K, V] {
318367
cache := &localCacheImpl[K, V]{
319-
Name: name,
320-
data: make(map[K]*localCacheEntry[K, V], 0),
368+
Name: name,
369+
data: make(map[K]*localCacheEntry[K, V], 0),
370+
expiryMap: make(map[int64]*[]K, 0),
371+
nextExpiry: time.Now().Add(time.Duration(256) * time.Millisecond),
321372
options: &localCacheOptions{
322373
TTL: 0,
323374
HighUnits: 0,
@@ -432,6 +483,14 @@ func (l *localCacheImpl[K, V]) GetCachePuts() int64 {
432483
return l.cachePuts
433484
}
434485

486+
func (l *localCacheImpl[K, V]) GetCacheEntriesExpired() int64 {
487+
return l.cacheEntriesExpired
488+
}
489+
490+
func (l *localCacheImpl[K, V]) GetCacheEntriesPruned() int64 {
491+
return l.cacheEntriesPruned
492+
}
493+
435494
func (l *localCacheImpl[K, V]) GetCachePrunes() int64 {
436495
return l.cachePrunes
437496
}
@@ -462,26 +521,32 @@ func (l *localCacheImpl[K, V]) GetHitRate() float32 {
462521

463522
func (l *localCacheImpl[K, V]) ResetStats() {
464523
atomic.StoreInt64(&l.cachePrunesNannos, 0)
524+
atomic.StoreInt64(&l.cacheExpiresNannos, 0)
465525
atomic.StoreInt64(&l.cacheMissesNannos, 0)
466526
atomic.StoreInt64(&l.cachePrunes, 0)
467527
atomic.StoreInt64(&l.cacheHits, 0)
468528
atomic.StoreInt64(&l.cacheMisses, 0)
469529
atomic.StoreInt64(&l.cachePuts, 0)
530+
atomic.StoreInt64(&l.cacheEntriesExpired, 0)
531+
atomic.StoreInt64(&l.cacheEntriesPruned, 0)
470532
}
471533

472534
func (l *localCacheImpl[K, V]) String() string {
473535
return fmt.Sprintf("localCache{name=%s, options=%v, stats=CacheStats{puts=%v, gets=%v, hits=%v, misses=%v, "+
474-
"missesDuration=%v, hitRate=%v%%, prunes=%v, prunesDuration=%v, expires=%v, expiresDuration=%v, size=%v, memoryUsed=%v}}",
536+
"missesDuration=%v, hitRate=%v%%, prunes=%v, prunesDuration=%v, entriesPruned=%v, expires=%v, expiresDuration=%v, entriesExpired=%v, size=%v, memoryUsed=%v}}",
475537
l.Name, l.options, l.GetCachePuts(), l.GetTotalGets(), l.GetCacheHits(), l.GetCacheMisses(),
476-
l.GetCacheMissesDuration(), l.GetHitRate()*100, l.GetCachePrunes(), l.GetCachePrunesDuration(),
477-
l.GetCacheExpires(), l.GetCacheExpiresDuration(), l.Size(), formatMemory(l.cacheMemory))
538+
l.GetCacheMissesDuration(), l.GetHitRate()*100,
539+
l.GetCachePrunes(), l.GetCachePrunesDuration(), l.GetCacheEntriesPruned(),
540+
l.GetCacheExpires(), l.GetCacheExpiresDuration(), l.GetCacheEntriesExpired(),
541+
l.Size(), formatMemory(l.cacheMemory))
478542
}
479543

480544
// updateEntrySize updates the cacheMemory size based upon a local entry. The sign indicates to either remove or add.
481545
func (l *localCacheImpl[K, V]) updateEntrySize(entry *localCacheEntry[K, V], sign int) {
482546
var size = int64(unsafe.Sizeof(entry.key)) + int64(unsafe.Sizeof(entry.value)) +
483547
int64(unsafe.Sizeof(entry.lastAccess)) + int64(unsafe.Sizeof(entry.ttl)) +
484-
int64(unsafe.Sizeof(entry.insertTime)) + int64(unsafe.Sizeof(entry))
548+
int64(unsafe.Sizeof(entry.insertTime)) + int64(unsafe.Sizeof(entry.expiresAt)) +
549+
int64(unsafe.Sizeof(entry))
485550
l.updateCacheMemory(int64(sign) * size)
486551
}
487552

@@ -498,3 +563,58 @@ func formatMemory(bytesValue int64) string {
498563
}
499564
return printer.Sprintf("%-.1fGB", float64(bytesValue)/1024/1024/1024)
500565
}
566+
567+
func (l *localCacheImpl[K, V]) registerExpiry(entry *localCacheEntry[K, V]) {
568+
if entry.ttl > 0 {
569+
// get the expires millis in unix millis and key on this
570+
expiresAtMillis := entry.expiresAt.UnixMilli()
571+
572+
// see if we can find an entry for the expires time as millis
573+
v, ok := l.expiryMap[expiresAtMillis]
574+
if !ok {
575+
// create a new map entry
576+
newSlice := []K{entry.key}
577+
l.expiryMap[expiresAtMillis] = &newSlice
578+
} else {
579+
// append to the existing one
580+
*v = append(*v, entry.key)
581+
}
582+
}
583+
}
584+
585+
func (l *localCacheImpl[K, V]) removeExpiry(k K) {
586+
// find the entry for the key and process if it exists
587+
if entry, ok1 := l.data[k]; ok1 {
588+
if entry.ttl > 0 {
589+
expiresAtMillis := entry.expiresAt.UnixMilli()
590+
591+
// see if we can find an entry for the expires time as millis
592+
v, ok := l.expiryMap[expiresAtMillis]
593+
if ok {
594+
// entry exists for expiry, so remove the entry from the slice
595+
existingKeys := *v
596+
597+
if len(existingKeys) == 1 {
598+
// delete the TTL map entry as no keys left in slice
599+
delete(l.expiryMap, expiresAtMillis)
600+
return
601+
}
602+
603+
newSlice := existingKeys[:0]
604+
605+
for _, key := range existingKeys {
606+
if key != entry.key {
607+
newSlice = append(newSlice, key)
608+
}
609+
}
610+
611+
*v = newSlice
612+
}
613+
}
614+
}
615+
}
616+
617+
// getMillisBucket returns the ttl in buckets of 256ms for expiry.
618+
func getMillisBucket(ttl time.Duration) time.Duration {
619+
return time.Duration(ttl.Milliseconds() & ^0xFF) * time.Millisecond
620+
}

0 commit comments

Comments
 (0)