Skip to content

Commit a2ce8e1

Browse files
authored
Minor doc update, near cache calc update (#100)
* Minor doc update, near cache calc update * Updates to resolver
1 parent a2705c6 commit a2ce8e1

File tree

13 files changed

+178
-32
lines changed

13 files changed

+178
-32
lines changed

Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ generate-proto-v1: $(TOOLS_BIN)/protoc ## Generate Proto Files v1
244244
# ----------------------------------------------------------------------------------------------------------------------
245245
.PHONY: show-docs
246246
show-docs: ## Show the Documentation
247-
@echo "Serving documentation on http://localhost:6060/pkg/github.com/oracle/coherence-go-client/"
247+
@echo "Serving documentation on http://localhost:6060/pkg/github.com/oracle/coherence-go-client/v2"
248248
go install golang.org/x/tools/cmd/godoc@latest
249249
godoc -goroot $(GOROOT) -http=:6060
250250

@@ -421,7 +421,7 @@ getcopyright: ## Download copyright jar locally if necessary.
421421
$(TOOLS_BIN)/protoc:
422422
@mkdir -p $(TOOLS_BIN)
423423
./scripts/download-protoc.sh $(TOOLS_DIRECTORY)
424-
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.30.0
424+
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.33.0
425425
go install google.golang.org/grpc/cmd/[email protected]
426426

427427

coherence/common.go

+3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ const (
4444
// envResolverDebug enables resolver debug messages to be displayed.
4545
envResolverDebug = "COHERENCE_RESOLVER_DEBUG"
4646

47+
// envResolverDebug sets the number of retries when the resolver fails.
48+
envResolverRetries = "COHERENCE_RESOLVER_RETRIES"
49+
4750
// envResolverDebug enables randomization of addresses returned by resolver
4851
envResolverRandomize = "COHERENCE_RESOLVER_RANDOMIZE"
4952

coherence/event.go

+27-5
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ const (
6060
// MapEventType describes an event raised by a cache mutation.
6161
type MapEventType string
6262

63-
// MapLifecycleEventType describes an event that may be raised during the lifecycle
63+
// MapLifecycleEventType describes an event type that may be raised during the lifecycle
6464
// of a cache.
6565
type MapLifecycleEventType string
6666

@@ -159,8 +159,13 @@ func (se *sessionLifecycleEvent) String() string {
159159
return fmt.Sprintf("SessionLifecycleEvent{source=%v, format=%s}", se.Source(), se.Type())
160160
}
161161

162+
// MapLifecycleEvent describes an event that may be raised during the lifecycle
163+
// of a cache.
162164
type MapLifecycleEvent[K comparable, V any] interface {
165+
// Source returns the source of this MapLifecycleEvent.
163166
Source() NamedMap[K, V]
167+
168+
// Type returns the MapLifecycleEventType for this MapLifecycleEvent.
164169
Type() MapLifecycleEventType
165170
}
166171

@@ -186,22 +191,39 @@ func (l *mapLifecycleEvent[K, V]) Source() NamedMap[K, V] {
186191
return l.source
187192
}
188193

189-
// String returns a string representation of a MapLifecycleEvent.
194+
// String returns a string representation of a [MapLifecycleEvent].
190195
func (l *mapLifecycleEvent[K, V]) String() string {
191196
return fmt.Sprintf("MapLifecycleEvent{source=%v, type=%s}", l.Source().GetCacheName(), l.Type())
192197
}
193198

194-
// MapEvent an event which indicates that the content of the NamedMap or
195-
// NamedCache has changed (i.e., an entry has been added, updated, and/or
199+
// MapEvent an event which indicates that the content of the [NamedMap] or
200+
// [NamedCache] has changed (i.e., an entry has been added, updated, and/or
196201
// removed).
197202
type MapEvent[K comparable, V any] interface {
203+
// Source returns the source of this MapEvent.
198204
Source() NamedMap[K, V]
205+
206+
// Key returns the key of the entry for which this event was raised.
199207
Key() (*K, error)
208+
209+
// OldValue returns the old value, if any, of the entry for which this event
210+
// was raised.
200211
OldValue() (*V, error)
212+
213+
// NewValue returns the new value, if any, of the entry for which this event
214+
// was raised.
201215
NewValue() (*V, error)
216+
217+
// Type returns the MapEventType for this MapEvent.
202218
Type() MapEventType
219+
220+
// IsExpired returns true if the event was generated from an expiry event. Only valid for gRPC v1 connections.
203221
IsExpired() (bool, error)
222+
223+
// IsPriming returns true if the event is a priming event. Only valid for gRPC v1 connections.
204224
IsPriming() (bool, error)
225+
226+
// IsSynthetic returns true if the event is a synthetic event. Only valid for gRPC v1 connections.
205227
IsSynthetic() (bool, error)
206228
}
207229

@@ -1197,7 +1219,7 @@ func reRegisterListeners[K comparable, V any](ctx context.Context, namedMap *Nam
11971219
bc.filterListenersV1 = make(map[filters.Filter]*listenerGroupV1[K, V], 0)
11981220
bc.filterIDToGroupV1 = make(map[int64]*listenerGroupV1[K, V], 0)
11991221

1200-
// re-ensure all the caches as the connected has gone and so has the gRPC Proxy
1222+
// re-ensure all the caches as the connection has gone and so has the gRPC Proxy
12011223
for _, c := range cacheNames {
12021224
cacheID, err3 := bc.session.v1StreamManagerCache.ensureCache(context.Background(), c)
12031225
if err3 != nil {

coherence/localcache.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type localCache[K comparable, V any] interface {
4343
GetStats() CacheStats
4444
}
4545

46-
// CacheStats defines various statics for near caches.
46+
// CacheStats contains various statistics for near caches.
4747
type CacheStats interface {
4848
GetCacheHits() int64 // the number of entries served from the near cache
4949
GetCacheMisses() int64 // the number of entries that had to be retrieved from the cluster
@@ -479,7 +479,8 @@ func (l *localCacheImpl[K, V]) String() string {
479479
// updateEntrySize updates the cacheMemory size based upon a local entry. The sign indicates to either remove or add.
480480
func (l *localCacheImpl[K, V]) updateEntrySize(entry *localCacheEntry[K, V], sign int) {
481481
l.updateCacheMemory(int64(sign)*(int64(unsafe.Sizeof(entry.key))+int64(unsafe.Sizeof(entry.value))+
482-
(int64(unsafe.Sizeof(entry.ttl)))+(int64(unsafe.Sizeof(entry.insertTime)))) + (int64(unsafe.Sizeof(entry.lastAccess))))
482+
(int64(unsafe.Sizeof(entry.ttl)))+(int64(unsafe.Sizeof(entry.insertTime)))) +
483+
(int64(unsafe.Sizeof(entry.lastAccess))) + int64(unsafe.Sizeof(entry)))
483484
}
484485

485486
func formatMemory(bytesValue int64) string {

coherence/queue_events.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,16 @@ func (l *queueLifecycleEvent[V]) String() string {
5757
// QueueLifecycleListener allows registering callbacks to be notified when lifecycle events
5858
// (truncated, released or destroyed) occur against a [NamedQueue].
5959
type QueueLifecycleListener[V any] interface {
60+
// OnAny registers a callback that will be notified when any [NamedQueue] event occurs.
6061
OnAny(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]
62+
63+
// OnDestroyed registers a callback that will be notified when a [NamedQueue] is destroyed.
6164
OnDestroyed(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]
65+
66+
// OnTruncated registers a callback that will be notified when a [Queue] is truncated.
6267
OnTruncated(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]
68+
69+
// OnReleased registers a callback that will be notified when a [NamedQueue] is released.
6370
OnReleased(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]
6471
getEmitter() *eventEmitter[QueueLifecycleEventType, QueueLifecycleEvent[V]]
6572
}
@@ -89,7 +96,7 @@ func (q *queueLifecycleListener[V]) OnReleased(callback func(QueueLifecycleEvent
8996
return q.on(QueueReleased, callback)
9097
}
9198

92-
// OnTruncated registers a callback that will be notified when a [NamedMap] is truncated.
99+
// OnTruncated registers a callback that will be notified when a [Queue] is truncated.
93100
func (q *queueLifecycleListener[V]) OnTruncated(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] {
94101
return q.on(QueueTruncated, callback)
95102
}
@@ -98,7 +105,7 @@ func (q *queueLifecycleListener[V]) getEmitter() *eventEmitter[QueueLifecycleEve
98105
return q.emitter
99106
}
100107

101-
// OnAny registers a callback that will be notified when any [NamedMap] event occurs.
108+
// OnAny registers a callback that will be notified when any [NamedQueue] event occurs.
102109
func (q *queueLifecycleListener[V]) OnAny(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] {
103110
return q.OnTruncated(callback).OnDestroyed(callback).OnReleased(callback)
104111
}

coherence/resolver.go

+23-9
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@ import (
1212
"github.com/oracle/coherence-go-client/v2/coherence/discovery"
1313
"google.golang.org/grpc/resolver"
1414
"math/rand"
15+
"strconv"
1516
"strings"
1617
"sync"
1718
"time"
1819
)
1920

2021
const (
21-
nsLookupScheme = "coherence"
22+
nsLookupScheme = "coherence"
23+
defaultRetries = 20
24+
defaultResolverDelay = 1000 // ms
2225
)
2326

2427
var (
@@ -42,16 +45,27 @@ func (b *nsLookupResolverBuilder) Build(target resolver.Target, cc resolver.Clie
4245
}
4346
checkResolverDebug()
4447

48+
// set the number of resolver retried
49+
retries := getStringValueFromEnvVarOrDefault(envResolverRetries, "20")
50+
retriesValue, err := strconv.Atoi(retries)
51+
if err != nil {
52+
retriesValue = defaultRetries
53+
}
54+
55+
resolverDebug("resolver retries=%v", retriesValue)
56+
r.resolverRetries = retriesValue
57+
4558
r.start()
4659
return r, nil
4760
}
4861
func (*nsLookupResolverBuilder) Scheme() string { return nsLookupScheme }
4962

5063
type nsLookupResolver struct {
51-
target resolver.Target
52-
cc resolver.ClientConn
53-
mutex sync.Mutex
54-
addrStore map[string][]string
64+
target resolver.Target
65+
cc resolver.ClientConn
66+
mutex sync.Mutex
67+
addrStore map[string][]string
68+
resolverRetries int
5569
}
5670

5771
func (r *nsLookupResolver) resolve() {
@@ -60,10 +74,10 @@ func (r *nsLookupResolver) resolve() {
6074
defer r.mutex.Unlock()
6175

6276
if len(grpcEndpoints) == 0 {
63-
// try 8 times over 2 seconds to get gRPC addresses as we may be in the middle of fail-over
64-
for i := 0; i < 8; i++ {
65-
resolverDebug("retrying NSLookup attempt", i)
66-
time.Sleep(time.Duration(250) * time.Millisecond)
77+
// try r.resolverRetries; times over 2 seconds to get gRPC addresses as we may be in the middle of fail-over
78+
for i := 1; i <= r.resolverRetries; i++ {
79+
resolverDebug("retrying NSLookup attempt: %v", i)
80+
time.Sleep(time.Duration(defaultResolverDelay) * time.Millisecond)
6781
grpcEndpoints = generateNSAddresses(r.target.Endpoint())
6882
if len(grpcEndpoints) != 0 {
6983
break

coherence/serializers.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
2+
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
33
* Licensed under the Universal Permissive License v 1.0 as shown at
44
* https://oss.oracle.com/licenses/upl.
55
*/
@@ -28,8 +28,13 @@ type mathValue[T any] struct {
2828

2929
// Serializer defines how to serialize/ de-serialize objects.
3030
type Serializer[T any] interface {
31+
// Serialize serializes an object of type T and returns the []byte representation.
3132
Serialize(object T) ([]byte, error)
33+
34+
// Deserialize deserialized an object and returns the correct type of T.
3235
Deserialize(data []byte) (*T, error)
36+
37+
// Format returns the format used for the serializer.
3338
Format() string
3439
}
3540

@@ -111,6 +116,7 @@ func (s JSONSerializer[T]) Deserialize(data []byte) (*T, error) {
111116
return &zeroValue, fmt.Errorf("invalid serialization prefix %v", data[0])
112117
}
113118

119+
// Format returns the format used for the serializer.
114120
func (s JSONSerializer[T]) Format() string {
115121
return s.format
116122
}

coherence/session.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"google.golang.org/grpc/credentials"
2020
"google.golang.org/grpc/credentials/insecure"
2121
"google.golang.org/grpc/resolver"
22+
"google.golang.org/grpc/status"
2223
"log"
2324
"os"
2425
"reflect"
@@ -510,7 +511,14 @@ func (s *Session) ensureConnection() error {
510511
s.v1StreamManagerCache = manager
511512
apiMessage = fmt.Sprintf(" %v", manager)
512513
} else {
513-
s.debug("error connecting to session via v1, falling back to v0: %v", err1)
514+
// check if this is a gRPC status error
515+
if sts, ok := status.FromError(err1); ok {
516+
if sts.Message() == "Method not found: coherence.proxy.v1.ProxyService/subChannel" {
517+
s.debug("error connecting to session via v1, falling back to v0: %v", err1)
518+
} else {
519+
s.debug("received a different gRPC error: %v", err1)
520+
}
521+
}
514522
}
515523

516524
logMessage(INFO, "Session [%s] connected to [%s]%s", s.sessionID, s.sessOpts.Address, apiMessage)
@@ -547,14 +555,14 @@ func (s *Session) ensureConnection() error {
547555
return
548556
}
549557

550-
if newState == connectivity.Ready || newState == connectivity.Idle {
558+
if newState == connectivity.Ready {
551559
if !firstConnect && !connected {
552-
// Reconnect
560+
// Reconnected
553561
disconnectTime = 0
554562
session.closed = false
555563
connected = true
556564

557-
logMessage(INFO, "Session [%s] re-connected to address %s", session.sessionID, session.sessOpts.Address)
565+
logMessage(INFO, "Session [%s] re-connected to address %s (%v)", session.sessionID, session.sessOpts.Address, newState)
558566
session.dispatch(Reconnected, func() SessionLifecycleEvent {
559567
return newSessionLifecycleEvent(session, Reconnected)
560568
})

coherence/session_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
2+
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
33
* Licensed under the Universal Permissive License v 1.0 as shown at
44
* https://oss.oracle.com/licenses/upl.
55
*/
@@ -9,6 +9,7 @@ package coherence
99
import (
1010
"context"
1111
"github.com/onsi/gomega"
12+
"os"
1213
"strconv"
1314
"testing"
1415
"time"
@@ -21,6 +22,8 @@ func TestSessionValidation(t *testing.T) {
2122
ctx = context.Background()
2223
)
2324

25+
os.Setenv("COHERENCE_SESSION_DEBUG", "true")
26+
2427
_, err = NewSession(ctx, WithFormat("not-json"))
2528
g.Expect(err).To(gomega.Equal(ErrInvalidFormat))
2629

coherence/v1client.go

+3
Original file line numberDiff line numberDiff line change
@@ -843,19 +843,22 @@ func (m *streamManagerV1) putGenericRequest(ctx context.Context, reqType pb1.Nam
843843
return unwrapBytes(result)
844844
}
845845

846+
// BinaryKeyAndValue is an internal type exported only for serialization.
846847
type BinaryKeyAndValue struct {
847848
Key []byte
848849
Value []byte
849850
Err error
850851
Cookie []byte
851852
}
852853

854+
// BinaryKey is an internal type exported only for serialization.
853855
type BinaryKey struct {
854856
Key []byte
855857
Err error
856858
Cookie []byte
857859
}
858860

861+
// BinaryValue is an internal type exported only for serialization.
859862
type BinaryValue struct {
860863
Value []byte
861864
Err error

java/coherence-go-queues/pom.xml

+1-4
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@
5050
<profile>
5151
<id>javax</id>
5252
<activation>
53-
<!-- This is a work-around for the fact that activeByDefault does not do what you'd think it should -->
54-
<file>
55-
<exists>.</exists>
56-
</file>
53+
<activeByDefault>false</activeByDefault>
5754
</activation>
5855
<dependencies>
5956
<dependency>

scripts/run-checkin-test.sh

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/bin/bash
2+
3+
#
4+
# Copyright (c) 2022, 2024 Oracle and/or its affiliates.
5+
# Licensed under the Universal Permissive License v 1.0 as shown at
6+
# https://oss.oracle.com/licenses/upl.
7+
#
8+
9+
# This script runs some tests that should succeed to be sure we can push
10+
set -e
11+
12+
echo "Coherence CE 24.09 All Tests gRPC v1"
13+
COHERENCE_BASE_IMAGE=gcr.io/distroless/java17 PROFILES=,jakarta,-javax COHERENCE_VERSION=24.09 make clean generate-proto generate-proto-v1 build-test-images test-e2e-standalone
14+
15+
echo "Coherence CE 24.09 with queues"
16+
COHERENCE_BASE_IMAGE=gcr.io/distroless/java17 PROFILES=,jakarta,-javax,queues COHERENCE_VERSION=24.09 make clean generate-proto generate-proto-v1 build-test-images test-e2e-standalone-queues
17+
18+
echo "Coherence CE 22.06.10"
19+
COHERENCE_VERSION=22.06.10 PROFILES=,-jakarta,javax make clean generate-proto build-test-images test-e2e-standalone

0 commit comments

Comments
 (0)