Skip to content

Commit a1f7a65

Browse files
authored
Update near cache to support synchronous listeners (#98)
* Update near cache to support syncrhonous listeners * Add configurable pruneFactor * Minor review changes * Address test failure
1 parent c910def commit a1f7a65

15 files changed

+304
-96
lines changed

coherence/coherence_test_helpers.go

+9
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,15 @@ func GetSessionQueueID(session *Session, queue string) *int32 {
122122
return session.getQueueID(queue)
123123
}
124124

125+
func GetNearCachePruneFactor[K comparable, V any](namedMap NamedMap[K, V]) float32 {
126+
ncOptions := namedMap.getBaseClient().cacheOpts.NearCacheOptions
127+
if ncOptions == nil {
128+
return 0.0
129+
}
130+
131+
return ncOptions.PruneFactor
132+
}
133+
125134
// revive:disable:unexported-return
126135
func GetKeyListenerGroupMap[K comparable, V any](namedMap NamedMap[K, V]) map[K]*listenerGroupV1[K, V] {
127136
return namedMap.getBaseClient().keyListenersV1

coherence/common.go

+27-15
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ type baseClient[K comparable, V any] struct {
8686
mutex *sync.RWMutex
8787
nearCache *localCacheImpl[K, V]
8888
nearCacheListener *namedCacheNearCacheListener[K, V]
89-
nearCacheLifecycleListener *namedCacheNearLifecyleListener[K, V]
89+
nearCacheLifecycleListener *namedCacheNearLifestyleListener[K, V]
9090

9191
// gRPC v1 listeners registered
9292
keyListenersV1 map[K]*listenerGroupV1[K, V]
@@ -103,15 +103,26 @@ type CacheOptions struct {
103103

104104
// NearCacheOptions defines options when creating a near cache.
105105
type NearCacheOptions struct {
106-
TTL time.Duration
107-
HighUnits int64
108-
HighUnitsMemory int64
106+
// TTL is the maximum time to keep the entry in the near cache. When this time has been reached it will be expired.
107+
TTL time.Duration
108+
109+
// HighUnits is the maximum number of cache entries to keep in the near cache.
110+
HighUnits int64
111+
112+
// HighUnitsMemory is the maximum amount of memory to use for entries in the near cache.
113+
HighUnitsMemory int64
114+
109115
InvalidationStrategy InvalidationStrategyType // currently only supports ListenAll
116+
117+
// PruneFactor indicates the percentage of the total number of units that will remain
118+
// after the cache manager prunes the near cache(i.e. this is the "low watermark" value)
119+
// this value is in the range 0.1 to 1.0 and the default is 0.8 or 80%.
120+
PruneFactor float32
110121
}
111122

112123
func (n NearCacheOptions) String() string {
113-
return fmt.Sprintf("NearCacheOptions{TTL=%v, HighUnits=%v, HighUnitsMemory=%v, invalidationStrategy=%v}",
114-
n.TTL, n.HighUnits, n.HighUnitsMemory, getInvalidationStrategyString(n.InvalidationStrategy))
124+
return fmt.Sprintf("NearCacheOptions{TTL=%v, HighUnits=%v, HighUnitsMemory=%v, PruneFactor=%.2f, invalidationStrategy=%v}",
125+
n.TTL, n.HighUnits, n.HighUnitsMemory, n.PruneFactor, getInvalidationStrategyString(n.InvalidationStrategy))
115126
}
116127

117128
// WithExpiry returns a function to set the default expiry for a [NamedCache]. This option is not valid on [NamedMap].
@@ -134,6 +145,7 @@ func executeClear[K comparable, V any](ctx context.Context, bc *baseClient[K, V]
134145
err = bc.ensureClientConnection()
135146
nearCache = bc.nearCache
136147
)
148+
137149
if err != nil {
138150
return err
139151
}
@@ -152,7 +164,7 @@ func executeClear[K comparable, V any](ctx context.Context, bc *baseClient[K, V]
152164
}
153165

154166
// clear the near cache
155-
if nearCache != nil {
167+
if bc.session.GetProtocolVersion() == 0 && nearCache != nil {
156168
nearCache.Clear()
157169
}
158170

@@ -272,7 +284,7 @@ func executeTruncate[K comparable, V any](ctx context.Context, bc *baseClient[K,
272284
_, err = bc.client.Truncate(newCtx, &request)
273285
}
274286

275-
// clear the near cache
287+
// clear the near cache as the lifecycle listeners are not synchronous
276288
if nearCache != nil {
277289
nearCache.Clear()
278290
}
@@ -1396,8 +1408,8 @@ func executePutAll[K comparable, V any](ctx context.Context, bc *baseClient[K, V
13961408
}
13971409
}
13981410

1399-
// if we have near cache and the entry exists then update
1400-
if nearCache != nil {
1411+
// if we have near cache and the entry exists then update (gRPC v0)
1412+
if bc.session.GetProtocolVersion() == 0 && nearCache != nil {
14011413
for k, v := range entries {
14021414
if oldVal := nearCache.Get(k); oldVal != nil {
14031415
nearCache.Put(k, v)
@@ -1510,8 +1522,8 @@ func executePutWithExpiry[K comparable, V any](ctx context.Context, bc *baseClie
15101522
}
15111523

15121524
// if we have near cache and the entry exists then update this as well because we do
1513-
// not use synchronous listener like we do on Java
1514-
if nearCache != nil {
1525+
// not use synchronous listener in gRPC v0
1526+
if bc.session.GetProtocolVersion() == 0 && nearCache != nil {
15151527
if oldValue := nearCache.Get(key); oldValue != nil {
15161528
nearCache.Put(key, value)
15171529
}
@@ -1580,7 +1592,7 @@ func executeRemove[K comparable, V any](ctx context.Context, bc *baseClient[K, V
15801592
}
15811593
}
15821594

1583-
if nearCache != nil {
1595+
if bc.session.GetProtocolVersion() == 0 && nearCache != nil {
15841596
nearCache.Remove(key)
15851597
}
15861598

@@ -1632,7 +1644,7 @@ func executeRemoveMapping[K comparable, V any](ctx context.Context, bc *baseClie
16321644
}
16331645
}
16341646

1635-
if result.Value && nearCache != nil {
1647+
if bc.session.GetProtocolVersion() == 0 && result.Value && nearCache != nil {
16361648
nearCache.Remove(key)
16371649
}
16381650

@@ -1738,7 +1750,7 @@ func executeReplaceMapping[K comparable, V any](ctx context.Context, bc *baseCli
17381750
}
17391751
}
17401752

1741-
if nearCache != nil && result.Value {
1753+
if bc.session.GetProtocolVersion() == 0 && nearCache != nil && result.Value {
17421754
if old := nearCache.Get(key); old != nil {
17431755
nearCache.Put(key, newValue)
17441756
}

coherence/doc.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -774,9 +774,13 @@ The following example shows how to get a named cache that will cache entries fro
774774
2. Creating a Near Cache specifying maximum number of entries to store
775775
776776
The following example shows how to get a named cache that will cache up to 100 entries from Get() or GetAll().
777-
When the threshold of HighUnits is reached, the near cache is pruned to 80% of its size and evicts least recently
777+
When the threshold of HighUnits is reached, the near cache is pruned to the default of 80% of its size and evicts least recently
778778
accessed and created entries.
779779
780+
Note: The default prune percentage is 0.8 (80%) which indicates the percentage of the total number of
781+
units that will remain after the cache manager prunes the near cache( i.e. this is the "low watermark" value).
782+
This can be changed by setting the PruneFactory to a value in the range 0.1 to 1.0 in [NearCacheOptions].
783+
780784
// specify HighUnits of 1000
781785
nearCacheOptions := coherence.NearCacheOptions{HighUnits: 1000}
782786
@@ -827,8 +831,8 @@ accessed and created entries.
827831
828832
// print the near cache stats via String()
829833
fmt.Println(namedMap.GetNearCacheStats())
830-
// localCache{name=customers options=localCacheOptions{ttl=0s, highUnits=0, highUnitsMemory=10.0KB, invalidation=ListenAll},
831-
// stats=CacheStats{puts=5000, gets=5000, hits=0, misses=5000, missesDuration=4.95257111s, hitRate=0, prunes=7, prunesDuration=196.498µs, size=398, memoryUsed=9.3KB}}
834+
// localCache{name=my-near-cache-high-units, options=localCacheOptions{ttl=0s, highUnits=1000, highUnitsMemory=0B, pruneFactor=0.80, invalidation=ListenAll}, stats=CacheStats{puts=1001, gets=1002, hits=1, misses=1001, missesDuration=4.628931138s,
835+
// hitRate=0.0998004, prunes=1, prunesDuration=181.533µs, expires=0, expiresDuration=0s, size=200, memoryUsed=53.2KB}}
832836
833837
[Coherence Documentation]: https://docs.oracle.com/en/middleware/standalone/coherence/14.1.1.2206/develop-applications/introduction-coherence-caches.html
834838
[examples]: https://github.com/oracle/coherence-go-client/tree/main/examples

coherence/event.go

+43-2
Original file line numberDiff line numberDiff line change
@@ -471,21 +471,42 @@ func (l *mapLifecycleListener[K, V]) OnAny(callback func(MapLifecycleEvent[K, V]
471471
// MapListener allows registering callbacks to be notified when mutations events
472472
// occur within a [NamedMap] or [NamedCache].
473473
type MapListener[K comparable, V any] interface {
474+
// OnInserted registers a callback that will be notified when an entry is inserted.
474475
OnInserted(callback func(MapEvent[K, V])) MapListener[K, V]
476+
477+
// OnUpdated registers a callback that will be notified when an entry is updated.
475478
OnUpdated(callback func(MapEvent[K, V])) MapListener[K, V]
479+
480+
// OnDeleted registers a callback that will be notified when an entry is deleted.
476481
OnDeleted(callback func(MapEvent[K, V])) MapListener[K, V]
482+
483+
// OnAny registers a callback that will be notified when any entry mutation has occurred.
477484
OnAny(callback func(MapEvent[K, V])) MapListener[K, V]
478485
dispatch(event MapEvent[K, V])
486+
487+
// SetSynchronous sets this [MapListener] as synchronous.
488+
SetSynchronous()
489+
490+
// SetPriming sets this [MapListener] as priming.
491+
SetPriming()
492+
493+
// IsSynchronous indicates this [MapListener] is synchronous.
494+
IsSynchronous() bool
495+
496+
// IsPriming indicates this [MapListener] is priming.
497+
IsPriming() bool
479498
}
480499

481500
// mapListener struct containing data members to satisfy the [MapListener] contract.
482501
type mapListener[K comparable, V any] struct {
483-
emitter *eventEmitter[MapEventType, MapEvent[K, V]]
502+
emitter *eventEmitter[MapEventType, MapEvent[K, V]]
503+
synchronous bool
504+
priming bool
484505
}
485506

486507
// NewMapListener creates and returns a pointer to a new [MapListener] instance.
487508
func NewMapListener[K comparable, V any]() MapListener[K, V] {
488-
return &mapListener[K, V]{newEventEmitter[MapEventType, MapEvent[K, V]]()}
509+
return &mapListener[K, V]{emitter: newEventEmitter[MapEventType, MapEvent[K, V]]()}
489510
}
490511

491512
// dispatch dispatches the specified event to the appropriate group of listeners.
@@ -499,6 +520,26 @@ func (l *mapListener[K, V]) on(event MapEventType, callback func(MapEvent[K, V])
499520
return l
500521
}
501522

523+
// SetSynchronous sets this listener to be synchronous.
524+
func (l *mapListener[K, V]) SetSynchronous() {
525+
l.synchronous = true
526+
}
527+
528+
// SetPriming sets this listener to be a priming listener.
529+
func (l *mapListener[K, V]) SetPriming() {
530+
l.priming = true
531+
}
532+
533+
// IsSynchronous indicates if this listener is synchronous.
534+
func (l *mapListener[K, V]) IsSynchronous() bool {
535+
return l.synchronous
536+
}
537+
538+
// IsPriming indicates if this listener is a priming listener.
539+
func (l *mapListener[K, V]) IsPriming() bool {
540+
return l.priming
541+
}
542+
502543
// OnInserted registers a callback that will be notified when an entry is inserted.
503544
func (l *mapListener[K, V]) OnInserted(callback func(MapEvent[K, V])) MapListener[K, V] {
504545
return l.on(EntryInserted, callback)

0 commit comments

Comments
 (0)