Skip to content

Commit 2d51986

Browse files
authored
stats/otel: a79 scaffolding to register an async gauge metric and api to record it- part 2 (#8755)
A79: This change introduces the API surface required to support asynchronous metrics (e.g., OpenTelemetry Observable Gauges) in gRPC-Go. This change updates the internal MetricsRecorder interface to support registering asynchronous metric reporters. This is the second of three PRs. It establishes the contracts and wiring without adding the OpenTelemetry implementation logic. This functionality is required to support OpenTelemetry Observable Gauges, which allow components like RLS and xDS to report stateful metrics (e.g., current active requests) via callbacks. RELEASE NOTES: * stats/otel: MetricsRecorder interface updated to include a new method RegisterAsyncReporter that registers a AsyncMetricReporter. * stats/otel: AsyncMetricReporter is added which is an interface for types that record metrics asynchronously.
1 parent 6ed8acb commit 2d51986

File tree

6 files changed

+192
-3
lines changed

6 files changed

+192
-3
lines changed

experimental/stats/metricregistry_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,15 @@ func (r *fakeMetricsRecorder) RecordInt64UpDownCount(handle *Int64UpDownCountHan
309309
r.intValues[handle.Descriptor()] += incr
310310
}
311311

312-
func (r *fakeMetricsRecorder) RecordInt64AsyncGauge(handle *Int64AsyncGaugeHandle, incr int64, labels ...string) {
312+
func (r *fakeMetricsRecorder) RecordInt64AsyncGauge(handle *Int64AsyncGaugeHandle, val int64, labels ...string) {
313313
verifyLabels(r.t, handle.Descriptor().Labels, handle.Descriptor().OptionalLabels, labels)
314-
r.intValues[handle.Descriptor()] += incr
314+
// Async gauges in OTel are "Observer" instruments; they report
315+
// the current state of the world every cycle, they do not accumulate deltas.
316+
r.intValues[handle.Descriptor()] = val
317+
}
318+
319+
// RegisterAsyncReporter is noop implementation, this might be changed at a
320+
// later stage.
321+
func (r *fakeMetricsRecorder) RegisterAsyncReporter(AsyncMetricReporter, ...AsyncMetric) func() {
322+
return func() {}
315323
}

experimental/stats/metrics.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,34 @@ type MetricsRecorder interface {
4141
// RecordInt64UpDownCounter records the measurement alongside labels on the int
4242
// count associated with the provided handle.
4343
RecordInt64UpDownCount(handle *Int64UpDownCountHandle, incr int64, labels ...string)
44+
// RegisterAsyncReporter registers a reporter to produce metric values for
45+
// only the listed descriptors. The returned function must be called when
46+
// the metrics are no longer needed, which will remove the reporter. The
47+
// returned method needs to be idempotent and concurrent safe.
48+
RegisterAsyncReporter(reporter AsyncMetricReporter, descriptors ...AsyncMetric) func()
49+
}
50+
51+
// AsyncMetricReporter is an interface for types that record metrics asynchronously
52+
// for the set of descriptors they are registered with. The AsyncMetricsRecorder
53+
// parameter is used to record values for these metrics.
54+
//
55+
// Implementations must make unique recordings across all registered
56+
// AsyncMetricReporters. Meaning, they should not report values for a metric with
57+
// the same attributes as another AsyncMetricReporter will report.
58+
//
59+
// Implementations must be concurrent-safe.
60+
type AsyncMetricReporter interface {
61+
// Report records metric values using the provided recorder.
62+
Report(AsyncMetricsRecorder) error
63+
}
64+
65+
// AsyncMetricReporterFunc is an adapter to allow the use of ordinary functions as
66+
// AsyncMetricReporters.
67+
type AsyncMetricReporterFunc func(AsyncMetricsRecorder) error
68+
69+
// Report calls f(r).
70+
func (f AsyncMetricReporterFunc) Report(r AsyncMetricsRecorder) error {
71+
return f(r)
4472
}
4573

4674
// AsyncMetricsRecorder records on asynchronous metrics derived from metric registry.

internal/stats/metrics_recorder_list.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,53 @@ func (l *MetricsRecorderList) RecordInt64Gauge(handle *estats.Int64GaugeHandle,
113113
metricRecorder.RecordInt64Gauge(handle, incr, labels...)
114114
}
115115
}
116+
117+
// RegisterAsyncReporter forwards the registration to all underlying metrics
118+
// recorders.
119+
//
120+
// It returns a cleanup function that, when called, invokes the cleanup function
121+
// returned by each underlying recorder, ensuring the reporter is unregistered
122+
// from all of them.
123+
func (l *MetricsRecorderList) RegisterAsyncReporter(reporter estats.AsyncMetricReporter, metrics ...estats.AsyncMetric) func() {
124+
descriptorsMap := make(map[*estats.MetricDescriptor]bool, len(metrics))
125+
for _, m := range metrics {
126+
descriptorsMap[m.Descriptor()] = true
127+
}
128+
unregisterFns := make([]func(), 0, len(l.metricsRecorders))
129+
for _, mr := range l.metricsRecorders {
130+
// Wrap the AsyncMetricsRecorder to intercept calls to RecordInt64Gauge
131+
// and validate the labels.
132+
wrappedCallback := func(recorder estats.AsyncMetricsRecorder) error {
133+
wrappedRecorder := &asyncRecorderWrapper{
134+
delegate: recorder,
135+
descriptors: descriptorsMap,
136+
}
137+
return reporter.Report(wrappedRecorder)
138+
}
139+
unregisterFns = append(unregisterFns, mr.RegisterAsyncReporter(estats.AsyncMetricReporterFunc(wrappedCallback), metrics...))
140+
}
141+
return func() {
142+
for _, unregister := range unregisterFns {
143+
unregister()
144+
}
145+
}
146+
}
147+
148+
type asyncRecorderWrapper struct {
149+
delegate estats.AsyncMetricsRecorder
150+
descriptors map[*estats.MetricDescriptor]bool
151+
}
152+
153+
// RecordIntAsync64Gauge records the measurement alongside labels on the int
154+
// gauge associated with the provided handle.
155+
func (w *asyncRecorderWrapper) RecordInt64AsyncGauge(handle *estats.Int64AsyncGaugeHandle, value int64, labels ...string) {
156+
// Ensure only metrics for descriptors passed during callback registeration
157+
// are emitted.
158+
d := handle.Descriptor()
159+
if _, ok := w.descriptors[d]; !ok {
160+
return
161+
}
162+
// Validate labels and delegate.
163+
verifyLabels(d, labels...)
164+
w.delegate.RecordInt64AsyncGauge(handle, value, labels...)
165+
}

internal/stats/metrics_recorder_list_test.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"google.golang.org/grpc/resolver"
4242
"google.golang.org/grpc/resolver/manual"
4343
"google.golang.org/grpc/serviceconfig"
44+
gstats "google.golang.org/grpc/stats"
4445
)
4546

4647
var defaultTestTimeout = 5 * time.Second
@@ -114,7 +115,6 @@ func (recordingLoadBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer
114115
intHistoHandle.Record(cc.MetricsRecorder(), 3, "int histo label val", "int histo optional label val")
115116
floatHistoHandle.Record(cc.MetricsRecorder(), 4, "float histo label val", "float histo optional label val")
116117
intGaugeHandle.Record(cc.MetricsRecorder(), 5, "int gauge label val", "int gauge optional label val")
117-
118118
return &recordingLoadBalancer{
119119
Balancer: balancer.Get(pickfirst.Name).Build(cc, bOpts),
120120
}
@@ -255,3 +255,82 @@ func (s) TestMetricRecorderListPanic(t *testing.T) {
255255

256256
intCountHandle.Record(mrl, 1, "only one label")
257257
}
258+
259+
// TestMetricsRecorderList_RegisterAsyncReporter verifies that the list implementation
260+
// correctly fans out registration calls to all underlying recorders and
261+
// aggregates the cleanup calls.
262+
func TestMetricsRecorderList_RegisterAsyncReporter(t *testing.T) {
263+
spy1 := &spyMetricsRecorder{name: "spy1"}
264+
spy2 := &spyMetricsRecorder{name: "spy2"}
265+
spy3 := &spyMetricsRecorder{name: "spy3"}
266+
267+
list := istats.NewMetricsRecorderList([]gstats.Handler{spy1, spy2, spy3})
268+
269+
desc := &estats.MetricDescriptor{Name: "test_metric", Description: "test"}
270+
mockMetric := &mockAsyncMetric{d: desc}
271+
272+
dummyReporter := estats.AsyncMetricReporterFunc(func(estats.AsyncMetricsRecorder) error {
273+
return nil
274+
})
275+
cleanup := list.RegisterAsyncReporter(dummyReporter, mockMetric)
276+
277+
// Check that RegisterAsyncReporter was called exactly once on ALL spies
278+
if spy1.registerCalledCount != 1 {
279+
t.Errorf("spy1 register called %d times, want 1", spy1.registerCalledCount)
280+
}
281+
if spy2.registerCalledCount != 1 {
282+
t.Errorf("spy2 register called %d times, want 1", spy2.registerCalledCount)
283+
}
284+
if spy3.registerCalledCount != 1 {
285+
t.Errorf("spy3 register called %d times, want 1", spy3.registerCalledCount)
286+
}
287+
288+
// Verify that cleanup has NOT been called yet
289+
if spy1.cleanupCalledCount != 0 {
290+
t.Error("spy1 cleanup called prematurely")
291+
}
292+
293+
cleanup()
294+
295+
// Check that the cleanup function returned by the list actually triggers
296+
// the cleanup logic on ALL underlying spies.
297+
if spy1.cleanupCalledCount != 1 {
298+
t.Errorf("spy1 cleanup called %d times, want 1", spy1.cleanupCalledCount)
299+
}
300+
if spy2.cleanupCalledCount != 1 {
301+
t.Errorf("spy2 cleanup called %d times, want 1", spy2.cleanupCalledCount)
302+
}
303+
if spy3.cleanupCalledCount != 1 {
304+
t.Errorf("spy3 cleanup called %d times, want 1", spy3.cleanupCalledCount)
305+
}
306+
}
307+
308+
// --- Helpers & Spies ---
309+
310+
// mockAsyncMetric implements estats.AsyncMetric
311+
type mockAsyncMetric struct {
312+
estats.AsyncMetric
313+
d *estats.MetricDescriptor
314+
}
315+
316+
func (m *mockAsyncMetric) Descriptor() *estats.MetricDescriptor {
317+
return m.d
318+
}
319+
320+
// spyMetricsRecorder implements estats.MetricsRecorder
321+
type spyMetricsRecorder struct {
322+
stats.TestMetricsRecorder
323+
name string
324+
registerCalledCount int
325+
cleanupCalledCount int
326+
}
327+
328+
// RegisterAsyncReporter implements the interface and tracks calls.
329+
func (s *spyMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
330+
s.registerCalledCount++
331+
332+
// Return a cleanup function that tracks if it was called
333+
return func() {
334+
s.cleanupCalledCount++
335+
}
336+
}

internal/testutils/stats/test_metrics_recorder.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,12 @@ func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle,
276276
r.data[handle.Name] = float64(incr)
277277
}
278278

279+
// RegisterAsyncReporter is noop implementation, async gauge test recorders should
280+
// provide their own implementation
281+
func (r *TestMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
282+
return func() {}
283+
}
284+
279285
// To implement a stats.Handler, which allows it to be set as a dial option:
280286

281287
// TagRPC is TestMetricsRecorder's implementation of TagRPC.
@@ -316,3 +322,8 @@ func (r *NoopMetricsRecorder) RecordInt64Gauge(*estats.Int64GaugeHandle, int64,
316322
// RecordInt64UpDownCount is a noop implementation of RecordInt64UpDownCount.
317323
func (r *NoopMetricsRecorder) RecordInt64UpDownCount(*estats.Int64UpDownCountHandle, int64, ...string) {
318324
}
325+
326+
// RegisterAsyncReporter is a noop implementation of RegisterAsyncReporter.
327+
func (r *NoopMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
328+
return func() {}
329+
}

stats/opentelemetry/opentelemetry.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,19 @@ func (rm *registryMetrics) RecordInt64Gauge(handle *estats.Int64GaugeHandle, inc
453453
}
454454
}
455455

456+
// RegisterAsyncReporter will register a callback with the underlying OpenTelemetry
457+
// Meter for the provided descriptors.
458+
//
459+
// It will map the provided descriptors to their corresponding OTel Observable
460+
// instruments. If no instruments match the descriptors, registration is
461+
// skipped.
462+
//
463+
// The returned cleanup function unregisters the callback from the Meter.
464+
func (rm *registryMetrics) RegisterAsyncReporter(_ estats.AsyncMetricReporter, _ ...estats.AsyncMetric) func() {
465+
// TODO(@mbissa) - add implementation
466+
return func() {}
467+
}
468+
456469
// Users of this component should use these bucket boundaries as part of their
457470
// SDK MeterProvider passed in. This component sends this as "advice" to the
458471
// API, which works, however this stability is not guaranteed, so for safety the

0 commit comments

Comments
 (0)