Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ var (
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}

cacheEntriesMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
cacheEntriesMetric = estats.RegisterInt64AsyncGauge(estats.MetricDescriptor{
Name: "grpc.lb.rls.cache_entries",
Description: "EXPERIMENTAL. Number of entries in the RLS cache.",
Unit: "{entry}",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
Default: false,
})
cacheSizeMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
cacheSizeMetric = estats.RegisterInt64AsyncGauge(estats.MetricDescriptor{
Name: "grpc.lb.rls.cache_size",
Description: "EXPERIMENTAL. The current size of the RLS cache.",
Unit: "By",
Expand Down Expand Up @@ -140,7 +140,10 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
updateCh: buffer.NewUnbounded(),
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger, cc.MetricsRecorder(), opts.Target.String())
lb.dataCache = newDataCache(maxCacheSize, lb.logger, opts.Target.String())
if metricsRecorder := cc.MetricsRecorder(); metricsRecorder != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the nil check here. cc.MetricsRecorder() must always return a non-nil metrics recorder. The ClientConn implementation provided by gRPC ensures this, and we enforce that LB policies embed a valid ClientConn when wrapping. A nil MetricsRecorder would indicate a bug in a custom LB policy that is explicitly returning nil.

lb.metricHandler = metricsRecorder.RegisterAsyncReporter(lb, cacheEntriesMetric, cacheSizeMetric)
}
lb.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: opts,
Expand All @@ -162,6 +165,9 @@ type rlsBalancer struct {
dataCachePurgeHook func()
logger *internalgrpclog.PrefixLogger

// metricHandler is the function to deregister the async metric reporter.
metricHandler func()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we include unregister in the name to emphasize that it's a cleanup function? metricHandler seems a little generic.


// If both cacheMu and stateMu need to be acquired, the former must be
// acquired first to prevent a deadlock. This order restriction is due to the
// fact that in places where we need to acquire both the locks, we always
Expand Down Expand Up @@ -488,6 +494,9 @@ func (b *rlsBalancer) Close() {
if b.ctrlCh != nil {
b.ctrlCh.close()
}
if b.metricHandler != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should omit the nil check. The constructor guarantees metricHandler is non-nil. If it is nil, the balancer is in an invalid state, and we should let it crash to surface the bug.

b.metricHandler()
}
b.bg.Close()
b.stateMu.Unlock()

Expand Down Expand Up @@ -702,3 +711,14 @@ func (b *rlsBalancer) releaseChildPolicyReferences(targets []string) {
}
b.stateMu.Unlock()
}

// Report reports the metrics data to the provided recorder.
func (b *rlsBalancer) Report(r estats.AsyncMetricsRecorder) error {
b.cacheMu.Lock()
defer b.cacheMu.Unlock()
Comment on lines +717 to +718
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, we should avoid reporting metrics while holding the mutex. If the metrics collector is slow or hangs (due to slow I/O, for instance), it could potentially block the RLS from functioning. We can fetch the required data while holding the lock, but we should release it before calling the metrics reporter.


if b.dataCache == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we should omit the nil check.

return nil
}
return b.dataCache.reportMetrics(r)
}
40 changes: 23 additions & 17 deletions balancer/rls/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,19 @@ type dataCache struct {
rlsServerTarget string

// Read only after initialization.
grpcTarget string
uuid string
metricsRecorder estats.MetricsRecorder
grpcTarget string
uuid string
}

func newDataCache(size int64, logger *internalgrpclog.PrefixLogger, metricsRecorder estats.MetricsRecorder, grpcTarget string) *dataCache {
func newDataCache(size int64, logger *internalgrpclog.PrefixLogger, grpcTarget string) *dataCache {
return &dataCache{
maxSize: size,
keys: newLRU(),
entries: make(map[cacheKey]*cacheEntry),
logger: logger,
shutdown: grpcsync.NewEvent(),
grpcTarget: grpcTarget,
uuid: uuid.New().String(),
metricsRecorder: metricsRecorder,
maxSize: size,
keys: newLRU(),
entries: make(map[cacheKey]*cacheEntry),
logger: logger,
shutdown: grpcsync.NewEvent(),
grpcTarget: grpcTarget,
uuid: uuid.New().String(),
}
}

Expand Down Expand Up @@ -327,8 +325,7 @@ func (dc *dataCache) addEntry(key cacheKey, entry *cacheEntry) (backoffCancelled
if dc.currentSize > dc.maxSize {
backoffCancelled = dc.resize(dc.maxSize)
}
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
cacheEntriesMetric.Record(dc.metricsRecorder, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid)

return backoffCancelled, true
}

Expand All @@ -338,7 +335,7 @@ func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) {
dc.currentSize -= entry.size
entry.size = newSize
dc.currentSize += entry.size
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)

}

func (dc *dataCache) getEntry(key cacheKey) *cacheEntry {
Expand Down Expand Up @@ -371,8 +368,17 @@ func (dc *dataCache) deleteAndCleanup(key cacheKey, entry *cacheEntry) {
delete(dc.entries, key)
dc.currentSize -= entry.size
dc.keys.removeEntry(key)
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
cacheEntriesMetric.Record(dc.metricsRecorder, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid)

}

func (dc *dataCache) reportMetrics(r estats.AsyncMetricsRecorder) error {
if dc.shutdown.HasFired() {
return nil
}
// Caller (rlsBalancer) holds usage lock.
cacheSizeMetric.Record(r, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
cacheEntriesMetric.Record(r, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
return nil
}

func (dc *dataCache) stop() {
Expand Down
68 changes: 35 additions & 33 deletions balancer/rls/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/testutils/stats"
)

var (
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s) TestLRU_BasicOperations(t *testing.T) {

func (s) TestDataCache_BasicOperations(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -134,7 +134,7 @@ func (s) TestDataCache_BasicOperations(t *testing.T) {

func (s) TestDataCache_AddForcesResize(t *testing.T) {
initCacheEntries()
dc := newDataCache(1, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(1, nil, "")

// The first entry in cacheEntries has a minimum expiry time in the future.
// This entry would stop the resize operation since we do not evict entries
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s) TestDataCache_AddForcesResize(t *testing.T) {

func (s) TestDataCache_Resize(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (s) TestDataCache_Resize(t *testing.T) {

func (s) TestDataCache_EvictExpiredEntries(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -221,7 +221,7 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
}

initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -243,6 +243,17 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
}
}

type testAsyncMetricsRecorder struct {
data map[string]int64
}

func (r *testAsyncMetricsRecorder) RecordInt64AsyncGauge(h *estats.Int64AsyncGaugeHandle, v int64, _ ...string) {
if r.data == nil {
r.data = make(map[string]int64)
}
Comment on lines +251 to +253
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ensure the object is initialized with a non-nil map during creation (via a constructor or otherwise). This makes it easier to reason about the state and avoids potential nil pointer dereferences.

r.data[h.Descriptor().Name] = v
}
Comment on lines +250 to +255
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this functionality be added to the existing NewTestMetricsRecorder so it can be reused by other tests in the future?


func (s) TestDataCache_Metrics(t *testing.T) {
cacheEntriesMetricsTests := []*cacheEntry{
{size: 1},
Expand All @@ -251,8 +262,8 @@ func (s) TestDataCache_Metrics(t *testing.T) {
{size: 4},
{size: 5},
}
tmr := stats.NewTestMetricsRecorder()
dc := newDataCache(50, nil, tmr, "")
tmr := &testAsyncMetricsRecorder{}
dc := newDataCache(50, nil, "")

dc.updateRLSServerTarget("rls-server-target")
for i, k := range cacheKeys {
Expand All @@ -261,42 +272,33 @@ func (s) TestDataCache_Metrics(t *testing.T) {

const cacheEntriesKey = "grpc.lb.rls.cache_entries"
const cacheSizeKey = "grpc.lb.rls.cache_size"
// 5 total entries which add up to 15 size, so should record that.
if got, _ := tmr.Metric(cacheEntriesKey); got != 5 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 5)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 15 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 15)
verifyMetrics := func(wantEntries, wantSize int64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This closure seems like a assertions helper which is discouraged by the styleguide: https://google.github.io/styleguide/go/best-practices#leave-testing-to-the-test-function

It does have useful error messages though. I would still recommend inlining the logic since it's just a few lines anyways.

t.Helper()
tmr.data = nil
dc.reportMetrics(tmr)
if got := tmr.data[cacheEntriesKey]; got != wantEntries {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, wantEntries)
}
if got := tmr.data[cacheSizeKey]; got != wantSize {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, wantSize)
}
}

// 5 total entries which add up to 15 size.
verifyMetrics(5, 15)

// Resize down the cache to 2 entries (deterministic as based of LRU).
dc.resize(9)
if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 9 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 9)
}
verifyMetrics(2, 9)

// Update an entry to have size 6. This should reflect in the size metrics,
// which will increase by 1 to 11, while the number of cache entries should
// stay same. This write is deterministic and writes to the last one.
dc.updateEntrySize(cacheEntriesMetricsTests[4], 6)

if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 10 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 10)
}
verifyMetrics(2, 10)

// Delete this scaled up cache key. This should scale down the cache to 1
// entries, and remove 6 size so cache size should be 4.
dc.deleteAndCleanup(cacheKeys[4], cacheEntriesMetricsTests[4])
if got, _ := tmr.Metric(cacheEntriesKey); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 1)
}
if got, _ := tmr.Metric(cacheSizeKey); got != 4 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 4)
}
verifyMetrics(1, 4)
}
18 changes: 1 addition & 17 deletions internal/leakcheck/leakcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,28 +134,21 @@ func TestTrackTimers(t *testing.T) {
}

func TestLeakChecker_DetectsLeak(t *testing.T) {
// 1. Setup the tracker (swaps the delegate in internal).
TrackAsyncReporters()

// Safety defer: ensure we restore the default delegate even if the test crashes
// before CheckAsyncReporters is called.
defer func() {
internal.AsyncReporterCleanupDelegate = func(f func()) func() { return f }
}()

// 2. Simulate a registration using the swapped delegate.
// We utilize the internal delegate directly to simulate stats.RegisterAsyncReporter behavior.
noOpCleanup := func() {}
wrappedCleanup := internal.AsyncReporterCleanupDelegate(noOpCleanup)

// 3. Create a leak: We discard 'wrappedCleanup' without calling it.
// Create a leak: We discard 'wrappedCleanup' without calling it.
_ = wrappedCleanup

// 4. Check for leaks.
tl := &testLogger{}
CheckAsyncReporters(tl)

// 5. Assertions.
if tl.errorCount == 0 {
t.Error("Expected leak checker to report a leak, but it succeeded silently.")
}
Expand All @@ -165,24 +158,15 @@ func TestLeakChecker_DetectsLeak(t *testing.T) {
}

func TestLeakChecker_PassesOnCleanup(t *testing.T) {
// 1. Setup.
TrackAsyncReporters()
defer func() {
internal.AsyncReporterCleanupDelegate = func(f func()) func() { return f }
}()

// 2. Simulate registration.
noOpCleanup := func() {}
wrappedCleanup := internal.AsyncReporterCleanupDelegate(noOpCleanup)

// 3. Behave correctly: Call the cleanup.
wrappedCleanup()

// 4. Check for leaks.
tl := &testLogger{}
CheckAsyncReporters(tl)

// 5. Assertions.
if tl.errorCount > 0 {
t.Errorf("Expected no leaks, but got errors: %v", tl.errors)
}
Expand Down
6 changes: 0 additions & 6 deletions internal/testutils/stats/test_metrics_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,3 @@ func (r *TestMetricsRecorder) TagConn(ctx context.Context, _ *stats.ConnTagInfo)

// HandleConn is TestMetricsRecorder's implementation of HandleConn.
func (r *TestMetricsRecorder) HandleConn(context.Context, stats.ConnStats) {}

// NoopMetricsRecorder is a noop MetricsRecorder to be used in tests to prevent
// nil panics.
type NoopMetricsRecorder struct {
estats.UnimplementedMetricsRecorder
}
Loading