Skip to content

Commit 248b6be

Browse files
committed
Add custom label for per-RPC metrics
TODO: Add tests
1 parent 55ae1d0 commit 248b6be

File tree

6 files changed

+60
-31
lines changed

6 files changed

+60
-31
lines changed

api/src/main/java/io/grpc/Grpc.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ private Grpc() {
5656
public static final Attributes.Key<SSLSession> TRANSPORT_ATTR_SSL_SESSION =
5757
Attributes.Key.create("io.grpc.Grpc.TRANSPORT_ATTR_SSL_SESSION");
5858

59+
/**
60+
* The value for the custom label of per-RPC metrics. Defaults to empty string when unset. Must
61+
* not be set to {@code null}.
62+
*/
63+
public static final CallOptions.Key<String> CALL_OPTION_CUSTOM_LABEL =
64+
CallOptions.Key.createWithDefault("io.grpc.Grpc.CALL_OPTION_CUSTOM_LABEL", "");
65+
5966
/**
6067
* Annotation for transport attributes. It follows the annotation semantics defined
6168
* by {@link Attributes}.

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.common.base.Preconditions.checkNotNull;
2020
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
2121
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
22+
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.CUSTOM_LABEL_KEY;
2223
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
2324
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
2425
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
@@ -39,6 +40,7 @@
3940
import io.grpc.Deadline;
4041
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
4142
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
43+
import io.grpc.Grpc;
4244
import io.grpc.Metadata;
4345
import io.grpc.MethodDescriptor;
4446
import io.grpc.ServerStreamTracer;
@@ -94,6 +96,7 @@ final class OpenTelemetryMetricsModule {
9496
private final Supplier<Stopwatch> stopwatchSupplier;
9597
private final boolean localityEnabled;
9698
private final boolean backendServiceEnabled;
99+
private final boolean customLabelEnabled;
97100
private final ImmutableList<OpenTelemetryPlugin> plugins;
98101

99102
OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
@@ -103,6 +106,7 @@ final class OpenTelemetryMetricsModule {
103106
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
104107
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
105108
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
109+
this.customLabelEnabled = optionalLabels.contains(CUSTOM_LABEL_KEY.getKey());
106110
this.plugins = ImmutableList.copyOf(plugins);
107111
}
108112

@@ -249,7 +253,7 @@ public void streamClosed(Status status) {
249253
statusCode = Code.DEADLINE_EXCEEDED;
250254
}
251255
}
252-
attemptsState.attemptEnded();
256+
attemptsState.attemptEnded(info.getCallOptions());
253257
recordFinishedAttempt();
254258
}
255259

@@ -273,6 +277,10 @@ void recordFinishedAttempt() {
273277
}
274278
builder.put(BACKEND_SERVICE_KEY, savedBackendService);
275279
}
280+
if (module.customLabelEnabled) {
281+
builder.put(
282+
CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
283+
}
276284
for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
277285
plugin.addLabels(builder);
278286
}
@@ -383,7 +391,7 @@ private ClientTracer newClientTracer(StreamInfo info) {
383391
}
384392

385393
// Called whenever each attempt is ended.
386-
void attemptEnded() {
394+
void attemptEnded(CallOptions callOptions) {
387395
boolean shouldRecordFinishedCall = false;
388396
synchronized (lock) {
389397
if (--activeStreams == 0) {
@@ -395,11 +403,11 @@ void attemptEnded() {
395403
}
396404
}
397405
if (shouldRecordFinishedCall) {
398-
recordFinishedCall();
406+
recordFinishedCall(callOptions);
399407
}
400408
}
401409

402-
void callEnded(Status status) {
410+
void callEnded(Status status, CallOptions callOptions) {
403411
callStopWatch.stop();
404412
this.status = status;
405413
boolean shouldRecordFinishedCall = false;
@@ -415,11 +423,11 @@ void callEnded(Status status) {
415423
}
416424
}
417425
if (shouldRecordFinishedCall) {
418-
recordFinishedCall();
426+
recordFinishedCall(callOptions);
419427
}
420428
}
421429

422-
void recordFinishedCall() {
430+
void recordFinishedCall(CallOptions callOptions) {
423431
Context otelContext = otelContextWithBaggage();
424432
if (attemptsPerCall.get() == 0) {
425433
ClientTracer tracer = newClientTracer(null);
@@ -430,11 +438,13 @@ void recordFinishedCall() {
430438
callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);
431439

432440
// Base attributes
433-
io.opentelemetry.api.common.Attributes baseAttributes =
434-
io.opentelemetry.api.common.Attributes.of(
435-
METHOD_KEY, fullMethodName,
436-
TARGET_KEY, target
437-
);
441+
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
442+
.put(METHOD_KEY, fullMethodName)
443+
.put(TARGET_KEY, target);
444+
if (module.customLabelEnabled) {
445+
builder.put(CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
446+
}
447+
io.opentelemetry.api.common.Attributes baseAttributes = builder.build();
438448

439449
// Duration
440450
if (module.resource.clientCallDurationCounter() != null) {
@@ -660,6 +670,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
660670
callOptions = plugin.filterCallOptions(callOptions);
661671
}
662672
}
673+
final CallOptions finalCallOptions = callOptions;
663674
// Only record method name as an attribute if isSampledToLocalTracing is set to true,
664675
// which is true for all generated methods. Otherwise, programatically
665676
// created methods result in high cardinality metrics.
@@ -679,7 +690,7 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
679690
new SimpleForwardingClientCallListener<RespT>(responseListener) {
680691
@Override
681692
public void onClose(Status status, Metadata trailers) {
682-
tracerFactory.callEnded(status);
693+
tracerFactory.callEnded(status, finalCallOptions);
683694
super.onClose(status, trailers);
684695
}
685696
},

opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public final class OpenTelemetryConstants {
3838
public static final AttributeKey<String> BACKEND_SERVICE_KEY =
3939
AttributeKey.stringKey("grpc.lb.backend_service");
4040

41+
public static final AttributeKey<String> CUSTOM_LABEL_KEY =
42+
AttributeKey.stringKey("grpc.client.call.custom");
43+
4144
public static final AttributeKey<String> DISCONNECT_ERROR_KEY =
4245
AttributeKey.stringKey("grpc.disconnect_error");
4346

opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ public void clientBasicMetrics() {
322322
tracer.inboundMessage(1);
323323
tracer.inboundWireSize(154);
324324
tracer.streamClosed(Status.OK);
325-
callAttemptsTracerFactory.callEnded(Status.OK);
325+
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);
326326

327327
io.opentelemetry.api.common.Attributes clientAttributes
328328
= io.opentelemetry.api.common.Attributes.of(
@@ -453,7 +453,7 @@ public void clientBasicMetrics_withRetryMetricsEnabled_shouldRecordZeroOrBeAbsen
453453
fakeClock.forwardTime(100, TimeUnit.MILLISECONDS);
454454
tracer.outboundMessage(0);
455455
tracer.streamClosed(Status.OK);
456-
callAttemptsTracerFactory.callEnded(Status.OK);
456+
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);
457457

458458
io.opentelemetry.api.common.Attributes finalAttributes
459459
= io.opentelemetry.api.common.Attributes.of(
@@ -827,7 +827,7 @@ public void recordAttemptMetrics() {
827827
fakeClock.forwardTime(24, MILLISECONDS);
828828
// RPC succeeded
829829
tracer.streamClosed(Status.OK);
830-
callAttemptsTracerFactory.callEnded(Status.OK);
830+
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);
831831

832832
io.opentelemetry.api.common.Attributes clientAttributes2
833833
= io.opentelemetry.api.common.Attributes.of(
@@ -995,7 +995,7 @@ public void recordAttemptMetrics_withRetryMetricsEnabled() {
995995
tracer.streamClosed(Status.OK); // RPC succeeded
996996

997997
// --- The overall call ends ---
998-
callAttemptsTracerFactory.callEnded(Status.OK);
998+
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);
999999

10001000
// Define attributes for assertions
10011001
io.opentelemetry.api.common.Attributes finalAttributes
@@ -1087,7 +1087,7 @@ public void recordAttemptMetrics_withHedgedCalls() {
10871087
hedgeTracer2.streamClosed(Status.OK); // Second hedge succeeds
10881088

10891089
// --- The overall call ends ---
1090-
callAttemptsTracerFactory.callEnded(Status.OK);
1090+
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);
10911091

10921092
// Define attributes for assertions
10931093
io.opentelemetry.api.common.Attributes finalAttributes
@@ -1141,7 +1141,7 @@ public void clientStreamNeverCreatedStillRecordMetrics() {
11411141
method.getFullMethodName(), emptyList());
11421142
fakeClock.forwardTime(3000, MILLISECONDS);
11431143
Status status = Status.DEADLINE_EXCEEDED.withDescription("5 seconds");
1144-
callAttemptsTracerFactory.callEnded(status);
1144+
callAttemptsTracerFactory.callEnded(status, CallOptions.DEFAULT);
11451145

11461146
io.opentelemetry.api.common.Attributes attemptStartedAttributes
11471147
= io.opentelemetry.api.common.Attributes.of(
@@ -1255,7 +1255,7 @@ public void clientLocalityMetrics_present() {
12551255
tracer.addOptionalLabel("grpc.lb.locality", "the-moon");
12561256
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
12571257
tracer.streamClosed(Status.OK);
1258-
callAttemptsTracerFactory.callEnded(Status.OK);
1258+
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);
12591259

12601260
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
12611261
TARGET_KEY, target,
@@ -1319,7 +1319,7 @@ public void clientLocalityMetrics_missing() {
13191319
ClientStreamTracer tracer =
13201320
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
13211321
tracer.streamClosed(Status.OK);
1322-
callAttemptsTracerFactory.callEnded(Status.OK);
1322+
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);
13231323

13241324
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
13251325
TARGET_KEY, target,
@@ -1388,7 +1388,7 @@ public void clientBackendServiceMetrics_present() {
13881388
tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon");
13891389
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
13901390
tracer.streamClosed(Status.OK);
1391-
callAttemptsTracerFactory.callEnded(Status.OK);
1391+
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);
13921392

13931393
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
13941394
TARGET_KEY, target,
@@ -1453,7 +1453,7 @@ public void clientBackendServiceMetrics_missing() {
14531453
ClientStreamTracer tracer =
14541454
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
14551455
tracer.streamClosed(Status.OK);
1456-
callAttemptsTracerFactory.callEnded(Status.OK);
1456+
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);
14571457

14581458
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
14591459
TARGET_KEY, target,

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.grpc.ChannelLogger;
3434
import io.grpc.ChannelLogger.ChannelLogLevel;
3535
import io.grpc.ConnectivityState;
36+
import io.grpc.Grpc;
3637
import io.grpc.LoadBalancer.Helper;
3738
import io.grpc.LoadBalancer.PickResult;
3839
import io.grpc.LoadBalancer.PickSubchannelArgs;
@@ -141,20 +142,22 @@ final class CachingRlsLbClient {
141142
"grpc.lb.rls.default_target_picks",
142143
"EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}",
143144
Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
144-
"grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(),
145+
"grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"),
146+
Arrays.asList("grpc.client.call.custom"),
145147
false);
146148
TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
147149
"EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default "
148150
+ "target is also returned by the RLS server, RPCs sent to that target from the cache "
149151
+ "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}",
150152
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target",
151-
"grpc.lb.pick_result"), Collections.emptyList(),
153+
"grpc.lb.pick_result"),
154+
Arrays.asList("grpc.client.call.custom"),
152155
false);
153156
FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
154157
"EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the "
155158
+ "RLS channel being throttled", "{pick}",
156159
Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
157-
Collections.emptyList(), false);
160+
Arrays.asList("grpc.client.call.custom"), false);
158161
CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
159162
"EXPERIMENTAL. Number of entries in the RLS cache", "{entry}",
160163
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
@@ -1033,15 +1036,16 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
10331036
helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
10341037
Arrays.asList(helper.getChannelTarget(), lookupService,
10351038
childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1036-
Collections.emptyList());
1039+
Arrays.asList(determineCustomLabel(args)));
10371040
}
10381041
return pickResult;
10391042
} else if (response.hasError()) {
10401043
if (hasFallback) {
10411044
return useFallback(args);
10421045
}
10431046
helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1044-
Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
1047+
Arrays.asList(helper.getChannelTarget(), lookupService),
1048+
Arrays.asList(determineCustomLabel(args)));
10451049
return PickResult.withError(
10461050
convertRlsServerStatus(response.getStatus(),
10471051
lbPolicyConfig.getRouteLookupConfig().lookupService()));
@@ -1061,7 +1065,7 @@ private PickResult useFallback(PickSubchannelArgs args) {
10611065
helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
10621066
Arrays.asList(helper.getChannelTarget(), lookupService,
10631067
fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1064-
Collections.emptyList());
1068+
Arrays.asList(determineCustomLabel(args)));
10651069
}
10661070
return pickResult;
10671071
}
@@ -1076,6 +1080,10 @@ private String determineMetricsPickResult(PickResult pickResult) {
10761080
}
10771081
}
10781082

1083+
private String determineCustomLabel(PickSubchannelArgs args) {
1084+
return args.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL);
1085+
}
1086+
10791087
// GuardedBy CachingRlsLbClient.lock
10801088
void close() {
10811089
synchronized (lock) { // Lock is already held, but ErrorProne can't tell

rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ public void metricsWithRealChannel() throws Exception {
382382
eq(1L),
383383
eq(Arrays.asList("directaddress:///fake-bigtable.googleapis.com", "localhost:8972",
384384
"defaultTarget", "complete")),
385-
eq(Arrays.asList()));
385+
eq(Arrays.asList("")));
386386
}
387387

388388
@Test
@@ -687,7 +687,7 @@ private void verifyLongCounterAdd(String name, int times, long value,
687687
verify(mockMetricRecorder, times(times)).addLongCounter(
688688
eqMetricInstrumentName(name), eq(value),
689689
eq(Lists.newArrayList(channelTarget, "localhost:8972", dataPlaneTargetLabel, pickResult)),
690-
eq(Lists.newArrayList()));
690+
eq(Lists.newArrayList("")));
691691
}
692692

693693
// This one is for verifying the failed_pick metric specifically.
@@ -696,7 +696,7 @@ private void verifyFailedPicksCounterAdd(int times, long value) {
696696
verify(mockMetricRecorder, times(times)).addLongCounter(
697697
eqMetricInstrumentName("grpc.lb.rls.failed_picks"), eq(value),
698698
eq(Lists.newArrayList(channelTarget, "localhost:8972")),
699-
eq(Lists.newArrayList()));
699+
eq(Lists.newArrayList("")));
700700
}
701701

702702
@SuppressWarnings("TypeParameterUnusedInFormals")

0 commit comments

Comments
 (0)