Skip to content

Commit c910def

Browse files
authored
Initial queues gRPCv1 support (#97)
* Initial queues gRPCv1 support * Add doc and Java queues test * Minor examples update for queues * Minor README update * Additional tests * Add queue events * Add additional test with go routines
1 parent 75b86a8 commit c910def

27 files changed

+1935
-1103
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
# ----------------------------------------------------------------------------------------------------------------------
99

1010
# This is the version of the coherence-go-client
11-
VERSION ?=1.3.0
11+
VERSION ?=2.0.0
1212
CURRDIR := $(shell pwd)
1313
USER_ID := $(shell echo "`id -u`:`id -g`")
1414

README.md

+6-4
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,24 @@ to a Coherence Cluster using gRPC for the network transport.
2525
* session lifecycle events such as connected, disconnected, reconnected and closed
2626
* Support for storing Go structs as JSON as well as the ability to serialize to Java objects on the server for access from other Coherence language API's
2727
* Near cache support to cache frequently accessed data in the Go client to avoid sending requests across the network
28-
* Support for Queues in Coherence Community Edition 24.03+
28+
* Support for simple and double-ended queues in Coherence Community Edition 24.09+ and commercial version 14.1.2.0+
2929
* Full support for Go generics in all Coherence API's
3030

3131
#### Requirements
3232

33-
* Coherence CE 22.06.4+, 24.03+ or Coherence 14.1.1.2206.4+ Commercial edition with a configured [gRPCProxy](https://docs.oracle.com/en/middleware/standalone/coherence/14.1.1.2206/develop-remote-clients/using-coherence-grpc-server.html).
33+
* Coherence CE 22.06.4+, 24.09+ or Coherence 14.1.1.2206.4+ Commercial edition with a configured [gRPCProxy](https://docs.oracle.com/en/middleware/standalone/coherence/14.1.1.2206/develop-remote-clients/using-coherence-grpc-server.html).
3434
* Go 1.19.+
3535

36+
> Note: If you wish to use the queues API in the latest release, you must use CE 24.09 or commercial version 14.1.2.0.x.
37+
3638
#### <a name="start"></a> Starting a gRPC enabled Coherence cluster
3739

3840
Before testing the Go client, you must ensure a Coherence cluster is available.
3941
For local development, we recommend using the Coherence CE Docker image; it contains
4042
everything necessary for the client to operate correctly.
4143

4244
```bash
43-
docker run -d -p 1408:1408 -p 30000:30000 ghcr.io/oracle/coherence-ce:24.03
45+
docker run -d -p 1408:1408 -p 30000:30000 ghcr.io/oracle/coherence-ce:24.09
4446
```
4547

4648
## Installation
@@ -150,7 +152,7 @@ Please consult the [security guide](./SECURITY.md) for our responsible security
150152
151153
## License
152154
153-
Copyright (c) 2023 Oracle and/or its affiliates.
155+
Copyright (c) 2024 Oracle and/or its affiliates.
154156
155157
Released under the Universal Permissive License v1.0 as shown at
156158
<https://oss.oracle.com/licenses/upl/>.

coherence/cache.go

+4
Original file line numberDiff line numberDiff line change
@@ -263,3 +263,7 @@ type EventSubmitter interface {
263263
generateMapLifecycleEvent(client interface{}, eventType MapLifecycleEventType)
264264
generateMapEvent(client interface{}, mapEvent *pb1.MapEventMessage)
265265
}
266+
267+
type QueueEventSubmitter interface {
268+
generateQueueLifecycleEvent(client interface{}, eventType QueueLifecycleEventType)
269+
}

coherence/coherence_test_helpers.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515

1616
// test helpers
1717

18-
//func SubmitRequest(session *Session, req *pb1.ProxyRequest) (namedCacheRequest, error) {
18+
//func SubmitRequest(session *Session, req *pb1.ProxyRequest) (proxyRequestChannel, error) {
1919
// return session.v1StreamManagerCache.submitRequest(req)
2020
//}
2121

@@ -117,6 +117,11 @@ func GetSessionCacheID(session *Session, cache string) *int32 {
117117
return session.getCacheID(cache)
118118
}
119119

120+
// GetSessionQueueID returns the queue id for a cache name
121+
func GetSessionQueueID(session *Session, queue string) *int32 {
122+
return session.getQueueID(queue)
123+
}
124+
120125
// revive:disable:unexported-return
121126
func GetKeyListenerGroupMap[K comparable, V any](namedMap NamedMap[K, V]) map[K]*listenerGroupV1[K, V] {
122127
return namedMap.getBaseClient().keyListenersV1

coherence/doc.go

+113-31
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ The Coherence Go client provides the following features:
2626
and session lifecycle events such as connected, disconnected, reconnected and closed
2727
- Support for storing Go structs as JSON as well as the ability to serialize to Java objects on the server for access from other Coherence language API's
2828
- Near cache support to cache frequently accessed data in the Go client to avoid sending requests across the network
29-
- Support for Queues in Coherence Community Edition 24.03+
29+
- Support for simple and double-ended queues in Coherence Community Edition 24.09+ and commercial version 14.1.2.0+
3030
- Full support for Go generics in all Coherence API's
3131
3232
For more information on Coherence caches, please see the [Coherence Documentation].
@@ -482,30 +482,35 @@ in your main code, create a new [Session] and register the listener
482482
483483
# Working with Queues
484484
485-
When connecting to a Coherence CE cluster versions 24.03 or above you have the ability to create [NamedQueue] or [NamedBlockingQueue].
486-
Queues in general have the following methods.
485+
When connecting to a Coherence CE cluster versions 24.09 or above or commercial 14.1.2.0.+, you have the ability to create two main types of queues, a [NamedQueue] or [NamedDequeue].
487486
488-
- Peek() - retrieve but not remove the value at the head of the queue
487+
A [NamedQueue] is a simple FIFO queue which can be one of two types: either [Queue] - a simple queue which stores data in a single
488+
partition and is limited to approx 2GB of storage, or [PagedQueue] which distributes data over the cluster and is only limited
489+
by the cluster capacity.
489490
490-
- Offer() - inserts the specified value to the end of the queue if it is possible to do so
491+
A [NamedDequeue] is a simple double-ended queue that stores data in a single partition.
491492
492-
- Poll() - retrieves and removes the head of this queue
493+
Queues in general have the following methods. See [NamedQueue] for the full list.
493494
494-
The [NamedBlockingQueue] changes the Peek() and Poll() operations to be blocking by passing a timeout. A specific error
495-
is returned to indicate the blocking operation did no complete within the specified timeout.
495+
- PeekHead(ctx context.Context) (*V, error) - retrieve but not remove the value at the head of this queue
496+
497+
- PollHead(ctx context.Context) (*V, error - retrieves and removes the head of this queue
498+
499+
- OfferTail(ctx context.Context, value V) error - inserts the specified value to the end of this queue if it is possible to do so
496500
497501
Consider the example below where we want to create a standard queue and add 10 entries, and then retrieve 10 entries.
502+
We have specified [coherence.Queue] as the type but this could also be [coherence.PagedQueue].
498503
499-
namedQueue, err := coherence.GetNamedQueue[string](ctx, session, "my-queue")
504+
namedQueue, err := coherence.GetNamedQueue[string](ctx, session, "my-queue", coherence.Queue)
500505
if err != nil {
501506
panic(err)
502507
}
503508
504-
// add an entry to the head of the queue
509+
// add an entry to the tail of the queue
505510
for i := 1; i <= iterations; i++ {
506511
v := fmt.Sprintf("value-%v", i)
507-
log.Printf("Offer() %s to the queue\n", v)
508-
err = namedQueue.Offer(v)
512+
log.Printf("OfferTail() %s to the queue\n", v)
513+
err = namedQueue.OfferTail(ctx, v)
509514
if err != nil {
510515
panic(err)
511516
}
@@ -515,9 +520,9 @@ Consider the example below where we want to create a standard queue and add 10 e
515520
// ...
516521
// Offer() value-10 to the queue
517522
518-
// Poll() 10 entries from the queue
523+
// Poll 10 entries from the head of the queue
519524
for i := 1; i <= iterations; i++ {
520-
value, err = namedQueue.Poll()
525+
value, err = namedQueue.PollHead(ctx)
521526
if err != nil {
522527
panic(err)
523528
}
@@ -530,44 +535,121 @@ Consider the example below where we want to create a standard queue and add 10 e
530535
// Poll() returned: value-10
531536
532537
// try to read again should get nil as nothing left on the queue
533-
value, err = namedQueue.Poll()
538+
value, err = namedQueue.PollHead(ctx)
534539
if err != nil {
535540
panic(err)
536541
}
537542
log.Println("last value is", value)
538543
// output: last value is nil
539544
540-
In the following example, we are using a [NamedBlockingQueue] and trying to read a value from this queue
541-
with a timeout of 10 seconds. In this example we will just display a message if we are not able to retrieve
542-
a value and then try again.
545+
The [NamedDequeue] is a double-ended queue and has the following additional functions:
546+
547+
- OfferHead(ctx context.Context, value V) error - inserts the specific value at the head of this queue
543548
544-
Internally, while the Poll() is blocking for up to the timeout value, if and entry is
545-
added to the queue, the Poll() will get immediately notified via a coherence [MapEvent]
546-
and the Poll() will return. If multiple go routines or processes are waiting for dequeues,
547-
only one of the processes will retrieve the newly inserted value.
549+
- PollTail(ctx context.Context) (*V, error) - retrieves and removes the tail of this queue
548550
549-
blockingQueue, err := coherence.GetBlockingNamedQueue[Order](ctx, session, "blocking-queue"")
551+
- PeekTail(ctx context.Context) (*V, error) - retrieves, but does not remove, the tail of this queue
552+
553+
In the following example, we are using a [NamedDequeue] or double-ended queue, where we have the ability to
554+
add or offer data to the head of the queue as well as the end of the queue, and also poll and peek the the end of the queue.
555+
556+
namedQueue, err := coherence.GetNamedDeQueue[string](ctx, session, "double-ended-queue")
550557
if err != nil {
551558
panic(err)
552559
}
553560
554-
log.Println("Waiting to receive messages...")
555-
for {
556-
order, err = blockingQueue.Poll(time.Duration(10) * time.Second)
557-
if err == coherence.ErrQueueTimedOut {
558-
log.Println("Timeout waiting for Poll()")
559-
continue
561+
// add 10 entries to the end (tail) of the queue
562+
for i := 1; i <= iterations; i++ {
563+
v := fmt.Sprintf("value-%v", i)
564+
log.Printf("OfferTail() %s to the queue\n", v)
565+
err = namedQueue.OfferTail(ctx, v)
566+
if err != nil {
567+
panic(err)
560568
}
569+
}
570+
571+
// output:
572+
// 2024/11/27 11:05:37 OfferTail() value-1 to the queue
573+
// ..
574+
// 2024/11/27 11:05:37 OfferTail() value-10 to the queue
561575
576+
// Offer a value to the head
577+
err = namedQueue.OfferHead(ctx, "value-head")
578+
if err != nil {
579+
panic(err)
580+
}
581+
582+
// peek the tail of the queue
583+
value, err = namedQueue.PeekTail(ctx)
584+
if err != nil {
585+
panic(err)
586+
}
587+
log.Printf("PeekTail() returned: %s\n", *value)
588+
589+
// output:
590+
// 2024/11/27 11:05:37 PeekTail() returned: value-10
591+
592+
// poll for iterations +1 because we added another entry to the head
593+
for i := 1; i <= iterations+1; i++ {
594+
value, err = namedQueue.PollHead(ctx)
562595
if err != nil {
563596
panic(err)
564597
}
598+
log.Printf("PollHead() returned: %s\n", *value)
599+
}
565600
566-
// do some processing, then continue waiting for messages
601+
// output:
602+
// 2024/11/27 11:05:37 PollHead() returned: value-head (the value we added to the head)
603+
// 2024/11/27 11:05:37 PollHead() returned: value-1
604+
// ..
605+
// 2024/11/27 11:05:37 PollHead() returned: value-10
606+
607+
// try to read again should get nil
608+
value, err = namedQueue.PollHead(ctx)
609+
if err != nil {
610+
panic(err)
567611
}
612+
log.Println("last value is", value)
613+
614+
// output:
615+
// 2024/11/27 11:05:37 last value is <nil>
568616
569617
See the [Queues] documentation for more information on using queues on the Coherence Server.
570618
619+
# Responding to queue lifecycle events
620+
621+
The Coherence Go client provides the ability to add a [QueueLifecycleListener] that will receive events (truncated, released and destroyed)
622+
that occur against a [NamedQueue].
623+
624+
// consider the example below where we want to listen for all 'QueueReleased' events for a NamedQueue.
625+
// in your main code, create a new NamedQueue and register the listener.
626+
// Note: this is a contrived example, but you can listen for QueueDestroyed and QueueTruncated events
627+
// in a similar way.
628+
629+
namedQueue, err := coherence.GetNamedQueue[string](session, "queue", coherence.Queue)
630+
if err != nil {
631+
log.Fatal(err)
632+
}
633+
634+
// Create a listener to monitor
635+
listener := coherence.NewQueueLifecycleListener[string]().
636+
OnTruncated(func(e coherence.QueueLifecycleEvent[string]) {
637+
fmt.Printf("**EVENT=%s: source=%v\n", e.Type(), e.Source())
638+
})
639+
640+
_ = namedQueue.AddLifecycleListener(listener)
641+
defer namedQueue.RemoveLifecycleListener(listener)
642+
643+
namedQueue.Release()
644+
645+
// sleep to ensure we receive the event before we close
646+
time.Sleep(5 * time.Second)
647+
648+
// output:
649+
// 2024/11/28 11:40:58 INFO: Session [b1435a16-f210-4289-97e4-e1654947acd5] connected to [localhost:1408] Coherence version: 24.09, serverProtocolVersion: 1, proxyMemberId: 1
650+
// **EVENT=queue_released: source=NamedQueue{name=queue-events, type=Queue, queueID=1198559040}
651+
// 2024/11/28 11:41:03 INFO: Session [b1435a16-f210-4289-97e4-e1654947acd5] closed
652+
571653
# Serializing to Java objects on the server
572654
573655
By default, the Coherence Go client serializes any keys and values to JSON and then stores them as JsonObjects in Coherence.
@@ -752,6 +834,6 @@ accessed and created entries.
752834
[examples]: https://github.com/oracle/coherence-go-client/tree/main/examples
753835
[gRPC Proxy documentation]: https://docs.oracle.com/en/middleware/standalone/coherence/14.1.1.2206/develop-remote-clients/using-coherence-grpc-server.html
754836
[gRPC Naming]: https://github.com/grpc/grpc/blob/master/doc/naming.md
755-
[Queues]: https://coherence.community/latest/24.03/docs/#/docs/core/09_queues
837+
[Queues]: https://coherence.community/latest/24.09/docs/#/docs/core/09_queues
756838
*/
757839
package coherence

coherence/event.go

+2-10
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ const (
6161
type MapEventType string
6262

6363
// MapLifecycleEventType describes an event that may be raised during the lifecycle
64-
// of cache.
64+
// of a cache.
6565
type MapLifecycleEventType string
6666

6767
// SessionLifecycleEventType describes an event that may be raised during the lifecycle
@@ -1139,11 +1139,7 @@ func reRegisterListeners[K comparable, V any](ctx context.Context, namedMap *Nam
11391139
defer bc.session.mapMutex.Unlock()
11401140

11411141
// save the cache names
1142-
cacheNames := make([]string, 0)
1143-
for c := range bc.session.cacheIDMap {
1144-
cacheNames = append(cacheNames, c)
1145-
}
1146-
1142+
cacheNames := bc.session.cacheIDMap.Keys()
11471143
err2 := bc.session.ensureConnection()
11481144
if err2 != nil {
11491145
return err2
@@ -1154,7 +1150,6 @@ func reRegisterListeners[K comparable, V any](ctx context.Context, namedMap *Nam
11541150
if err1 == nil {
11551151
// save the stream manager for a successful V1 client connection
11561152
bc.session.v1StreamManagerCache = manager
1157-
bc.session.cacheIDMapMutex.Lock()
11581153

11591154
// reset the filters for V1
11601155
bc.keyListenersV1 = make(map[K]*listenerGroupV1[K, V], 0)
@@ -1167,7 +1162,6 @@ func reRegisterListeners[K comparable, V any](ctx context.Context, namedMap *Nam
11671162
if err3 != nil {
11681163
// unrecoverable
11691164
bc.session.Close()
1170-
bc.session.cacheIDMapMutex.Unlock()
11711165
return err3
11721166
}
11731167
bc.session.debugConnection("re-ensureCache cacheId=%v for cache=%v", cacheID, c)
@@ -1177,8 +1171,6 @@ func reRegisterListeners[K comparable, V any](ctx context.Context, namedMap *Nam
11771171
}
11781172
}
11791173

1180-
bc.session.cacheIDMapMutex.Unlock()
1181-
11821174
// re-register key listeners
11831175
for k, save := range keyListeners {
11841176
debug("re-registering listener %v for key: %v", save.listener, k)

coherence/named_cache_client.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,7 @@ func (nc *NamedCacheClient[K, V]) Release() {
178178

179179
// remove the cacheID mapping
180180
if s.GetProtocolVersion() > 0 {
181-
s.cacheIDMapMutex.Lock()
182-
delete(s.cacheIDMap, nc.Name())
183-
s.cacheIDMapMutex.Unlock()
181+
s.cacheIDMap.Remove(nc.name)
184182
}
185183

186184
if nc.namedCacheReconnectListener.listener != nil {

coherence/named_map_client.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -476,9 +476,7 @@ func (nm *NamedMapClient[K, V]) Release() {
476476

477477
// remove the cacheID mapping
478478
if s.GetProtocolVersion() > 0 {
479-
s.cacheIDMapMutex.Lock()
480-
delete(s.cacheIDMap, nm.Name())
481-
s.cacheIDMapMutex.Unlock()
479+
s.cacheIDMap.Remove(nm.Name())
482480
}
483481

484482
if nm.namedMapReconnectListener.listener != nil {
@@ -1073,6 +1071,10 @@ func getExistingError(cacheType, name string) error {
10731071
return fmt.Errorf(mapOrCacheExists, cacheType, name)
10741072
}
10751073

1074+
func getDifferentQueueTypeError(name string, queueType NamedQueueType) error {
1075+
return fmt.Errorf("the queue %s already exists but is not %v", name, queueType)
1076+
}
1077+
10761078
// getExistingError returns an error indicating a [NamedMap] or [NamedCache] exists with different near cache options.
10771079
func getExistingNearCacheError(cacheType, name string) error {
10781080
return fmt.Errorf(mapOrCacheExistsNearCache, cacheType, name)

0 commit comments

Comments
 (0)