Skip to content

Remove the deprecated StreamServerInterceptor function from otelgrpc #7107

@MrAlias

Description

@MrAlias

This function was deprecated in the last year and should be removed.

  • Remove the deprecated UnaryServerInterceptor function:
    // StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
    // for use in a grpc.NewServer call.
    //
    // Deprecated: Use [NewServerHandler] instead.
    func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
    cfg := newConfig(opts, "server")
    tracer := cfg.TracerProvider.Tracer(
    ScopeName,
    trace.WithInstrumentationVersion(Version()),
    )
    return func(
    srv interface{},
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
    ) error {
    ctx := ss.Context()
    i := &InterceptorInfo{
    StreamServerInfo: info,
    Type: StreamServer,
    }
    if cfg.InterceptorFilter != nil && !cfg.InterceptorFilter(i) {
    return handler(srv, wrapServerStream(ctx, ss, cfg))
    }
    ctx = extract(ctx, cfg.Propagators)
    name, attr, _ := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))
    startOpts := append([]trace.SpanStartOption{
    trace.WithSpanKind(trace.SpanKindServer),
    trace.WithAttributes(attr...),
    },
    cfg.SpanStartOptions...,
    )
    ctx, span := tracer.Start(
    trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
    name,
    startOpts...,
    )
    defer span.End()
    err := handler(srv, wrapServerStream(ctx, ss, cfg))
    if err != nil {
    s, _ := status.FromError(err)
    statusCode, msg := serverStatus(s)
    span.SetStatus(statusCode, msg)
    span.SetAttributes(statusCodeAttr(s.Code()))
    } else {
    span.SetAttributes(statusCodeAttr(grpc_codes.OK))
    }
    return err
    }
    }
  • Remove the benchmark test:
    func BenchmarkStreamServerInterceptor(b *testing.B) {
    benchmark(b, nil, []grpc.ServerOption{
    grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(
    otelgrpc.WithTracerProvider(tracerProvider),
    )),
    })
    }
  • Remove tests:
    • // TestStreamServerInterceptor tests the server interceptor for streaming RPCs.
      func TestStreamServerInterceptor(t *testing.T) {
      for _, check := range serverChecks {
      name := check.grpcCode.String()
      t.Run(name, func(t *testing.T) {
      sr := tracetest.NewSpanRecorder()
      tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
      //nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
      usi := otelgrpc.StreamServerInterceptor(
      otelgrpc.WithTracerProvider(tp),
      )
      // call the stream interceptor
      grpcErr := status.Error(check.grpcCode, check.grpcCode.String())
      handler := func(_ interface{}, _ grpc.ServerStream) error {
      return grpcErr
      }
      err := usi(&grpc_testing.SimpleRequest{}, &mockServerStream{}, &grpc.StreamServerInfo{FullMethod: name}, handler)
      assert.Equal(t, grpcErr, err)
      // validate span
      span, ok := getSpanFromRecorder(sr, name)
      require.True(t, ok, "missing span %s", name)
      assertServerSpan(t, check.wantSpanCode, check.wantSpanStatusDescription, check.grpcCode, span)
      })
      }
      }
      func TestStreamServerInterceptorEvents(t *testing.T) {
      testCases := []struct {
      Name string
      Events []otelgrpc.Event
      }{
      {Name: "With events", Events: []otelgrpc.Event{otelgrpc.ReceivedEvents, otelgrpc.SentEvents}},
      {Name: "With only sent events", Events: []otelgrpc.Event{otelgrpc.SentEvents}},
      {Name: "With only received events", Events: []otelgrpc.Event{otelgrpc.ReceivedEvents}},
      {Name: "No events", Events: []otelgrpc.Event{}},
      }
      for _, testCase := range testCases {
      t.Run(testCase.Name, func(t *testing.T) {
      sr := tracetest.NewSpanRecorder()
      tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
      opts := []otelgrpc.Option{
      otelgrpc.WithTracerProvider(tp),
      }
      if len(testCase.Events) > 0 {
      opts = append(opts, otelgrpc.WithMessageEvents(testCase.Events...))
      }
      //nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
      usi := otelgrpc.StreamServerInterceptor(opts...)
      stream := &mockServerStream{}
      grpcCode := grpc_codes.OK
      name := grpcCode.String()
      // call the stream interceptor
      grpcErr := status.Error(grpcCode, name)
      handler := func(_ interface{}, handlerStream grpc.ServerStream) error {
      var msg grpc_testing.SimpleRequest
      err := handlerStream.RecvMsg(&msg)
      require.NoError(t, err)
      err = handlerStream.SendMsg(&msg)
      require.NoError(t, err)
      return grpcErr
      }
      err := usi(&grpc_testing.SimpleRequest{}, stream, &grpc.StreamServerInfo{FullMethod: name}, handler)
      require.Equal(t, grpcErr, err)
      // validate span
      span, ok := getSpanFromRecorder(sr, name)
      require.True(t, ok, "missing span %s", name)
      if len(testCase.Events) == 0 {
      assert.Empty(t, span.Events())
      } else {
      var eventsAttr []map[attribute.Key]attribute.Value
      for _, event := range testCase.Events {
      switch event {
      case otelgrpc.SentEvents:
      eventsAttr = append(eventsAttr, map[attribute.Key]attribute.Value{
      otelgrpc.RPCMessageTypeKey: attribute.StringValue("SENT"),
      otelgrpc.RPCMessageIDKey: attribute.IntValue(1),
      })
      case otelgrpc.ReceivedEvents:
      eventsAttr = append(eventsAttr, map[attribute.Key]attribute.Value{
      otelgrpc.RPCMessageTypeKey: attribute.StringValue("RECEIVED"),
      otelgrpc.RPCMessageIDKey: attribute.IntValue(1),
      })
      }
      }
      assert.Len(t, span.Events(), len(eventsAttr))
      assert.Equal(t, eventsAttr, eventAttrMap(span.Events()))
      }
      })
      }
      }
      func assertServerMetrics(t *testing.T, reader metric.Reader, serviceName, name string, code grpc_codes.Code) {
      want := metricdata.ScopeMetrics{
      Scope: wantInstrumentationScope,
      Metrics: []metricdata.Metrics{
      {
      Name: "rpc.server.duration",
      Description: "Measures the duration of inbound RPC.",
      Unit: "ms",
      Data: metricdata.Histogram[float64]{
      Temporality: metricdata.CumulativeTemporality,
      DataPoints: []metricdata.HistogramDataPoint[float64]{
      {
      Attributes: attribute.NewSet(
      semconv.RPCMethod(name),
      semconv.RPCService(serviceName),
      otelgrpc.RPCSystemGRPC,
      otelgrpc.GRPCStatusCodeKey.Int64(int64(code)),
      ),
      },
      },
      },
      },
      },
      }
      rm := metricdata.ResourceMetrics{}
      err := reader.Collect(context.Background(), &rm)
      assert.NoError(t, err)
      require.Len(t, rm.ScopeMetrics, 1)
      metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
      }
    • //nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
      grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(
      otelgrpc.WithTracerProvider(serverStreamTP),
      otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
      )),
    • t.Run("StreamServerSpans", func(t *testing.T) {
      checkStreamServerSpans(t, serverStreamSR.Ended())
      })
    • func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
      require.Len(t, spans, 3)
      streamInput := spans[0]
      assert.False(t, streamInput.EndTime().IsZero())
      assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name())
      // sizes from reqSizes in "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/test".
      assertEvents(t, []trace.Event{
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(2),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(3),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(4),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      }, streamInput.Events())
      port, ok := findAttribute(streamInput.Attributes(), semconv.NetSockPeerPortKey)
      assert.True(t, ok)
      assert.ElementsMatch(t, []attribute.KeyValue{
      semconv.RPCMethod("StreamingInputCall"),
      semconv.RPCService("grpc.testing.TestService"),
      otelgrpc.RPCSystemGRPC,
      otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
      semconv.NetSockPeerAddr("127.0.0.1"),
      port,
      }, streamInput.Attributes())
      streamOutput := spans[1]
      assert.False(t, streamOutput.EndTime().IsZero())
      assert.Equal(t, "grpc.testing.TestService/StreamingOutputCall", streamOutput.Name())
      // sizes from respSizes in "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/test".
      assertEvents(t, []trace.Event{
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(2),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(3),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(4),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      }, streamOutput.Events())
      port, ok = findAttribute(streamOutput.Attributes(), semconv.NetSockPeerPortKey)
      assert.True(t, ok)
      assert.ElementsMatch(t, []attribute.KeyValue{
      semconv.RPCMethod("StreamingOutputCall"),
      semconv.RPCService("grpc.testing.TestService"),
      otelgrpc.RPCSystemGRPC,
      otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
      semconv.NetSockPeerAddr("127.0.0.1"),
      port,
      }, streamOutput.Attributes())
      pingPong := spans[2]
      assert.False(t, pingPong.EndTime().IsZero())
      assert.Equal(t, "grpc.testing.TestService/FullDuplexCall", pingPong.Name())
      assertEvents(t, []trace.Event{
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(1),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(2),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(2),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(3),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(3),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(4),
      otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
      },
      },
      {
      Name: "message",
      Attributes: []attribute.KeyValue{
      otelgrpc.RPCMessageIDKey.Int(4),
      otelgrpc.RPCMessageTypeKey.String("SENT"),
      },
      },
      }, pingPong.Events())
      port, ok = findAttribute(pingPong.Attributes(), semconv.NetSockPeerPortKey)
      assert.True(t, ok)
      assert.ElementsMatch(t, []attribute.KeyValue{
      semconv.RPCMethod("FullDuplexCall"),
      semconv.RPCService("grpc.testing.TestService"),
      otelgrpc.RPCSystemGRPC,
      otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
      semconv.NetSockPeerAddr("127.0.0.1"),
      port,
      }, pingPong.Attributes())
      }
  • Add a changelog entry in the ### Removed section of the unreleased changelog for the removal.

Metadata

Metadata

Assignees

Type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions