Skip to content

Commit 87ba874

Browse files
committed
grpc: add streaming event coverage
1 parent baf77df commit 87ba874

File tree

2 files changed

+199
-6
lines changed

2 files changed

+199
-6
lines changed

pkg/grpc/proto_trace_attributes_extractor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ func (cs *attributeExtractingServerStream) SendMsg(m interface{}) error {
486486
return err
487487
}
488488

489-
func (me *methodTraceAttributesExtractor) applyAttributes(span trace.Span, attributes []attribute.KeyValue) {
489+
func (methodTraceAttributesExtractor) applyAttributes(span trace.Span, attributes []attribute.KeyValue) {
490490
if span == nil || !span.IsRecording() || len(attributes) == 0 {
491491
return
492492
}

pkg/grpc/proto_trace_attributes_extractor_test.go

Lines changed: 198 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package grpc_test
33
import (
44
"context"
55
"encoding/base64"
6+
"reflect"
67
"testing"
78

89
remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
@@ -24,6 +25,28 @@ import (
2425
"go.uber.org/mock/gomock"
2526
)
2627

28+
func hasAttribute(attributes []attribute.KeyValue, want attribute.KeyValue) bool {
29+
for _, attribute := range attributes {
30+
if reflect.DeepEqual(attribute, want) {
31+
return true
32+
}
33+
}
34+
return false
35+
}
36+
37+
func findEventAttributes(t *testing.T, events []trace.EventConfig, direction string, index int64) []attribute.KeyValue {
38+
t.Helper()
39+
for _, event := range events {
40+
attributes := event.Attributes()
41+
if hasAttribute(attributes, attribute.String("grpc.message.direction", direction)) &&
42+
hasAttribute(attributes, attribute.Int64("grpc.message.index", index)) {
43+
return attributes
44+
}
45+
}
46+
t.Fatalf("event not found for direction=%q index=%d", direction, index)
47+
return nil
48+
}
49+
2750
func TestProtoTraceAttributesExtractor(t *testing.T) {
2851
ctrl, ctx := gomock.WithContext(context.Background(), t)
2952

@@ -56,7 +79,7 @@ func TestProtoTraceAttributesExtractor(t *testing.T) {
5679
}, mock.NewMockErrorLogger(ctrl))
5780

5881
handler := mock.NewMockUnaryHandler(ctrl)
59-
span.EXPECT().IsRecording().Return(true)
82+
span.EXPECT().IsRecording().Return(true).AnyTimes()
6083
handler.EXPECT().Call(ctxWithSpan, request).Return(response, nil)
6184
span.EXPECT().SetAttributes([]attribute.KeyValue{
6285
attribute.String("response.bloom_filter", base64.StdEncoding.EncodeToString([]byte{0x01, 0x02, 0x03})),
@@ -69,7 +92,6 @@ func TestProtoTraceAttributesExtractor(t *testing.T) {
6992
require.NoError(t, err)
7093
testutil.RequireEqualProto(t, response, observedResponse.(proto.Message))
7194
})
72-
7395
exampleUnaryFullMethod := "/build.bazel.remote.execution.v2.Capabilities/GetCapabilities"
7496
exampleUnaryRequest := &remoteexecution.GetCapabilitiesRequest{
7597
InstanceName: "default-scheduler",
@@ -156,6 +178,8 @@ func TestProtoTraceAttributesExtractor(t *testing.T) {
156178
// If the span is not recording, we shouldn't
157179
// spend any effort inspecting the request and
158180
// response.
181+
span := mock.NewMockSpan(ctrl)
182+
ctxWithSpan := trace.ContextWithSpan(ctx, span)
159183
span.EXPECT().IsRecording().Return(false)
160184
var observedResponse remoteexecution.ServerCapabilities
161185
invoker.EXPECT().Call(ctxWithSpan, exampleUnaryFullMethod, exampleUnaryRequest, &observedResponse, nil).
@@ -173,7 +197,9 @@ func TestProtoTraceAttributesExtractor(t *testing.T) {
173197
// should use protoreflect to create extractors
174198
// for each of the attributes. This should cause
175199
// any configuration errors to be reported.
176-
span.EXPECT().IsRecording().Return(true)
200+
span := mock.NewMockSpan(ctrl)
201+
ctxWithSpan := trace.ContextWithSpan(ctx, span)
202+
span.EXPECT().IsRecording().Return(true).AnyTimes()
177203
errorLogger.EXPECT().Log(testutil.EqStatus(t, status.Error(codes.InvalidArgument, "Failed to create extractor for attribute \"request\": Attribute name does not contain any fields")))
178204
errorLogger.EXPECT().Log(testutil.EqStatus(t, status.Error(codes.InvalidArgument, "Failed to create extractor for attribute \"response.cache_capabilities\": Field \"cache_capabilities\" does not have a boolean, enumeration, floating point, integer, bytes or string type")))
179205
errorLogger.EXPECT().Log(testutil.EqStatus(t, status.Error(codes.InvalidArgument, "Failed to create extractor for attribute \"response.nonexistent\": Field \"nonexistent\" does not exist")))
@@ -218,6 +244,8 @@ func TestProtoTraceAttributesExtractor(t *testing.T) {
218244
// Methods for which we've not specified a
219245
// configuration should have their calls
220246
// forwarded in literal form.
247+
span := mock.NewMockSpan(ctrl)
248+
ctxWithSpan := trace.ContextWithSpan(ctx, span)
221249
request := &remoteexecution.UpdateActionResultRequest{
222250
InstanceName: "default-scheduler",
223251
}
@@ -235,6 +263,8 @@ func TestProtoTraceAttributesExtractor(t *testing.T) {
235263
// If the span is not recording, we shouldn't
236264
// spend any effort inspecting the request and
237265
// response.
266+
span := mock.NewMockSpan(ctrl)
267+
ctxWithSpan := trace.ContextWithSpan(ctx, span)
238268
span.EXPECT().IsRecording().Return(false)
239269
handler.EXPECT().Call(ctxWithSpan, exampleUnaryRequest).Return(exampleUnaryResponse, nil)
240270

@@ -248,7 +278,9 @@ func TestProtoTraceAttributesExtractor(t *testing.T) {
248278
t.Run("RecordingSpan", func(t *testing.T) {
249279
// As the span is recording, we should see calls
250280
// to SetAttributes().
251-
span.EXPECT().IsRecording().Return(true)
281+
span := mock.NewMockSpan(ctrl)
282+
ctxWithSpan := trace.ContextWithSpan(ctx, span)
283+
span.EXPECT().IsRecording().Return(true).AnyTimes()
252284
span.EXPECT().SetAttributes([]attribute.KeyValue{
253285
attribute.String("request.instance_name", "default-scheduler"),
254286
})
@@ -267,6 +299,167 @@ func TestProtoTraceAttributesExtractor(t *testing.T) {
267299
testutil.RequireEqualProto(t, exampleUnaryResponse, observedResponse.(proto.Message))
268300
})
269301
})
302+
}
303+
304+
func TestProtoTraceAttributesExtractorStreaming(t *testing.T) {
305+
ctrl, ctx := gomock.WithContext(context.Background(), t)
306+
307+
t.Run("InterceptStreamClient", func(t *testing.T) {
308+
span := mock.NewMockSpan(ctrl)
309+
ctxWithSpan := trace.ContextWithSpan(ctx, span)
310+
streamMethod := "/build.bazel.remote.execution.v2.Capabilities/StreamCapabilities"
311+
streamDesc := grpc.StreamDesc{StreamName: "StreamCapabilities", ClientStreams: true, ServerStreams: true}
312+
streamer := mock.NewMockStreamer(ctrl)
313+
clientStream := mock.NewMockClientStream(ctrl)
314+
315+
extractor := bb_grpc.NewProtoTraceAttributesExtractor(map[string]*configuration.TracingMethodConfiguration{
316+
streamMethod: {
317+
AttributesFromFirstRequestMessage: []string{
318+
"instance_name",
319+
},
320+
AttributesFromFirstResponseMessage: []string{
321+
"execution_capabilities.exec_enabled",
322+
},
323+
},
324+
}, mock.NewMockErrorLogger(ctrl))
325+
326+
request := &remoteexecution.GetCapabilitiesRequest{
327+
InstanceName: "default-scheduler",
328+
}
329+
response := &remoteexecution.ServerCapabilities{
330+
ExecutionCapabilities: &remoteexecution.ExecutionCapabilities{
331+
ExecEnabled: true,
332+
},
333+
}
334+
335+
span.EXPECT().IsRecording().Return(true).AnyTimes()
336+
streamer.EXPECT().Call(ctxWithSpan, &streamDesc, nil, streamMethod).Return(clientStream, nil)
337+
clientStream.EXPECT().SendMsg(request).Return(nil)
338+
clientStream.EXPECT().SendMsg(request).Return(nil)
339+
clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(
340+
func(m interface{}) error {
341+
proto.Merge(m.(proto.Message), response)
342+
return nil
343+
}).Times(2)
344+
345+
span.EXPECT().SetAttributes([]attribute.KeyValue{
346+
attribute.String("request.instance_name", "default-scheduler"),
347+
})
348+
span.EXPECT().SetAttributes([]attribute.KeyValue{
349+
attribute.Bool("response.execution_capabilities.exec_enabled", true),
350+
})
351+
352+
var events []trace.EventConfig
353+
span.EXPECT().AddEvent("grpc.message", gomock.Any()).AnyTimes().Do(
354+
func(_ string, options ...trace.EventOption) {
355+
events = append(events, trace.NewEventConfig(options...))
356+
})
357+
358+
wrappedStream, err := extractor.InterceptStreamClient(ctxWithSpan, &streamDesc, nil, streamMethod, streamer.Call)
359+
require.NoError(t, err)
360+
361+
require.NoError(t, wrappedStream.SendMsg(request))
362+
require.NoError(t, wrappedStream.SendMsg(request))
363+
var observedResponse remoteexecution.ServerCapabilities
364+
require.NoError(t, wrappedStream.RecvMsg(&observedResponse))
365+
require.NoError(t, wrappedStream.RecvMsg(&observedResponse))
366+
367+
require.Len(t, events, 4)
368+
attributes := findEventAttributes(t, events, "out", 1)
369+
require.True(t, hasAttribute(attributes, attribute.String("request.instance_name", "default-scheduler")))
370+
attributes = findEventAttributes(t, events, "out", 2)
371+
require.True(t, hasAttribute(attributes, attribute.String("request.instance_name", "default-scheduler")))
372+
attributes = findEventAttributes(t, events, "in", 1)
373+
require.True(t, hasAttribute(attributes, attribute.Bool("response.execution_capabilities.exec_enabled", true)))
374+
attributes = findEventAttributes(t, events, "in", 2)
375+
require.True(t, hasAttribute(attributes, attribute.Bool("response.execution_capabilities.exec_enabled", true)))
376+
})
377+
378+
t.Run("InterceptStreamServer", func(t *testing.T) {
379+
span := mock.NewMockSpan(ctrl)
380+
ctxWithSpan := trace.ContextWithSpan(ctx, span)
381+
streamMethod := "/build.bazel.remote.execution.v2.Capabilities/StreamCapabilities"
382+
serverStream := mock.NewMockServerStream(ctrl)
383+
handler := mock.NewMockStreamHandler(ctrl)
384+
385+
extractor := bb_grpc.NewProtoTraceAttributesExtractor(map[string]*configuration.TracingMethodConfiguration{
386+
streamMethod: {
387+
AttributesFromFirstRequestMessage: []string{
388+
"instance_name",
389+
},
390+
AttributesFromFirstResponseMessage: []string{
391+
"execution_capabilities.exec_enabled",
392+
},
393+
},
394+
}, mock.NewMockErrorLogger(ctrl))
395+
396+
requestOne := &remoteexecution.GetCapabilitiesRequest{
397+
InstanceName: "first",
398+
}
399+
requestTwo := &remoteexecution.GetCapabilitiesRequest{
400+
InstanceName: "second",
401+
}
402+
responseOne := &remoteexecution.ServerCapabilities{
403+
ExecutionCapabilities: &remoteexecution.ExecutionCapabilities{
404+
ExecEnabled: true,
405+
},
406+
}
407+
responseTwo := &remoteexecution.ServerCapabilities{
408+
ExecutionCapabilities: &remoteexecution.ExecutionCapabilities{
409+
ExecEnabled: false,
410+
},
411+
}
412+
413+
span.EXPECT().IsRecording().Return(true).AnyTimes()
414+
serverStream.EXPECT().Context().Return(ctxWithSpan).AnyTimes()
415+
serverStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(
416+
func(m interface{}) error {
417+
proto.Merge(m.(proto.Message), requestOne)
418+
return nil
419+
})
420+
serverStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(
421+
func(m interface{}) error {
422+
proto.Merge(m.(proto.Message), requestTwo)
423+
return nil
424+
})
425+
serverStream.EXPECT().SendMsg(responseOne).Return(nil)
426+
serverStream.EXPECT().SendMsg(responseTwo).Return(nil)
427+
428+
span.EXPECT().SetAttributes([]attribute.KeyValue{
429+
attribute.String("request.instance_name", "first"),
430+
})
431+
span.EXPECT().SetAttributes([]attribute.KeyValue{
432+
attribute.Bool("response.execution_capabilities.exec_enabled", true),
433+
})
434+
435+
var events []trace.EventConfig
436+
span.EXPECT().AddEvent("grpc.message", gomock.Any()).AnyTimes().Do(
437+
func(_ string, options ...trace.EventOption) {
438+
events = append(events, trace.NewEventConfig(options...))
439+
})
440+
441+
handler.EXPECT().Call(nil, gomock.Any()).DoAndReturn(
442+
func(srv interface{}, stream grpc.ServerStream) error {
443+
var observedRequest remoteexecution.GetCapabilitiesRequest
444+
require.NoError(t, stream.RecvMsg(&observedRequest))
445+
require.NoError(t, stream.RecvMsg(&observedRequest))
446+
require.NoError(t, stream.SendMsg(responseOne))
447+
require.NoError(t, stream.SendMsg(responseTwo))
448+
return nil
449+
})
270450

271-
// TODO: Add testing coverage for streaming RPCs.
451+
require.NoError(t, extractor.InterceptStreamServer(nil, serverStream, &grpc.StreamServerInfo{
452+
FullMethod: streamMethod,
453+
}, handler.Call))
454+
455+
require.Len(t, events, 4)
456+
attributes := findEventAttributes(t, events, "in", 1)
457+
require.True(t, hasAttribute(attributes, attribute.String("request.instance_name", "first")))
458+
attributes = findEventAttributes(t, events, "in", 2)
459+
require.True(t, hasAttribute(attributes, attribute.String("request.instance_name", "second")))
460+
attributes = findEventAttributes(t, events, "out", 1)
461+
require.True(t, hasAttribute(attributes, attribute.Bool("response.execution_capabilities.exec_enabled", true)))
462+
attributes = findEventAttributes(t, events, "out", 2)
463+
require.True(t, hasAttribute(attributes, attribute.Bool("response.execution_capabilities.exec_enabled", false)))
464+
})
272465
}

0 commit comments

Comments
 (0)