Skip to content

Commit a4906bd

Browse files
authored
Add option to force uniform output (#196)
This adds the `WithRPCSystem` option, which forces the output to either use Connect or gRPC semantic conventions. When the option is not provided, which conventions are used depends on the actual wire protocol of the request being measured (same as before this change). This also removes the "grpc_web" value for the "rpc.system" attribute since that is not specified as a valid value anywhere in [OpenTelemetry RPC semantic conventions](https://opentelemetry.io/docs/specs/semconv/rpc/). The gRPC semantic conventions suggest that it should always be "grpc" -- "grpc-web" is just a transport-level dialect and should not change telemetry.
1 parent 07c1475 commit a4906bd

6 files changed

Lines changed: 348 additions & 157 deletions

File tree

attributes.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,11 @@ func addProcedureAttributes(attrs []attribute.KeyValue, procedure string) []attr
6262
return attrs
6363
}
6464

65-
func addRequestAttributes(attrs []attribute.KeyValue, spec connect.Spec, peer connect.Peer) []attribute.KeyValue {
65+
func addRequestAttributes(protocol string, attrs []attribute.KeyValue, spec connect.Spec, peer connect.Peer) []attribute.KeyValue {
6666
if addr := peer.Addr; addr != "" {
6767
attrs = addAddressAttributes(attrs, addr)
6868
}
6969
name := strings.TrimLeft(spec.Procedure, "/")
70-
protocol := protocolToSemConv(peer.Protocol)
7170
attrs = append(attrs, semconv.RPCSystemKey.String(protocol))
7271
attrs = addProcedureAttributes(attrs, name)
7372
return attrs
@@ -90,7 +89,7 @@ func statusCodeAttribute(protocol string, serverErr error) (attribute.KeyValue,
9089
// Following the respective specifications, use integers and "status_code" for
9190
// gRPC codes in contrast to strings and "error_code" for Connect codes.
9291
switch protocol {
93-
case grpcProtocol, grpcwebProtocol:
92+
case grpcProtocol:
9493
codeKey := attribute.Key("rpc." + protocol + ".status_code")
9594
if serverErr != nil {
9695
return codeKey.Int64(int64(connect.CodeOf(serverErr))), true

interceptor.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@ func (i *Interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
9292
attributeFilter := i.config.filterAttribute.filter
9393
isClient := request.Spec().IsClient
9494
name := strings.TrimLeft(request.Spec().Procedure, "/")
95-
protocol := protocolToSemConv(request.Peer().Protocol)
95+
protocol := protocolToSemConv(request.Peer().Protocol, i.config.rpcSystem)
9696
attributes := make([]attribute.KeyValue, 0, 6+len(i.config.requestHeaderKeys)) // 5 max request attrs + status code attr + headers
97-
attributes = attributeFilter(request.Spec(), addRequestAttributes(attributes, request.Spec(), request.Peer())...)
97+
attributes = attributeFilter(request.Spec(), addRequestAttributes(protocol, attributes, request.Spec(), request.Peer())...)
9898
instrumentation := i.getInstruments(isClient)
9999
carrier := propagation.HeaderCarrier(request.Header())
100100
spanKind := trace.SpanKindClient
@@ -206,15 +206,16 @@ func (i *Interceptor) WrapStreamingClient(next connect.StreamingClientFunc) conn
206206
// inject the newly created span into the carrier
207207
carrier := propagation.HeaderCarrier(conn.RequestHeader())
208208
i.config.propagator.Inject(ctx, carrier)
209+
protocol := protocolToSemConv(conn.Peer().Protocol, i.config.rpcSystem)
209210
state := newStreamingState(
211+
protocol,
210212
spec,
211213
conn.Peer(),
212214
i.config.filterAttribute,
213215
i.config.omitTraceEvents,
214216
instrumentation.responseSize,
215217
instrumentation.requestSize,
216218
)
217-
protocol := protocolToSemConv(conn.Peer().Protocol)
218219
var requestOnce sync.Once
219220
setRequestAttributes := func() {
220221
if span.IsRecording() {
@@ -282,8 +283,9 @@ func (i *Interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) co
282283
}
283284
}
284285
name := strings.TrimLeft(conn.Spec().Procedure, "/")
285-
protocol := protocolToSemConv(conn.Peer().Protocol)
286+
protocol := protocolToSemConv(conn.Peer().Protocol, i.config.rpcSystem)
286287
state := newStreamingState(
288+
protocol,
287289
conn.Spec(),
288290
conn.Peer(),
289291
i.config.filterAttribute,
@@ -358,11 +360,13 @@ func (i *Interceptor) getInstruments(isClient bool) *instruments {
358360
}
359361

360362
// protocolToSemConv converts the protocol string to the OpenTelemetry format.
361-
func protocolToSemConv(protocol string) string {
363+
func protocolToSemConv(protocol string, system RPCSystem) string {
364+
if system != nil {
365+
// If an explicit system was configured, that overrides the wire protocol.
366+
return system.protocol()
367+
}
362368
switch protocol {
363-
case grpcwebString:
364-
return grpcwebProtocol
365-
case grpcProtocol:
369+
case grpcwebString, grpcString:
366370
return grpcProtocol
367371
case connectString:
368372
return connectProtocol

0 commit comments

Comments
 (0)