Skip to content

Commit baf77df

Browse files
committed
grpc: emit per-message events for streaming RPCs
1 parent 6869e65 commit baf77df

File tree

1 file changed

+75
-24
lines changed

1 file changed

+75
-24
lines changed

pkg/grpc/proto_trace_attributes_extractor.go

Lines changed: 75 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ func (pe *ProtoTraceAttributesExtractor) InterceptUnaryClient(ctx context.Contex
6666
if span == nil {
6767
return invoker(ctx, method, req, reply, cc, opts...)
6868
}
69-
me.processRequest(span, req)
69+
me.applyAttributes(span, me.requestAttributesFor(req))
7070
err := invoker(ctx, method, req, reply, cc, opts...)
7171
if err == nil {
72-
me.processResponse(span, reply)
72+
me.applyAttributes(span, me.responseAttributesFor(reply))
7373
}
7474
return err
7575
}
@@ -113,10 +113,10 @@ func (pe *ProtoTraceAttributesExtractor) InterceptUnaryServer(ctx context.Contex
113113
if span == nil {
114114
return handler(ctx, req)
115115
}
116-
me.processRequest(span, req)
116+
me.applyAttributes(span, me.requestAttributesFor(req))
117117
resp, err := handler(ctx, req)
118118
if err == nil {
119-
me.processResponse(span, resp)
119+
me.applyAttributes(span, me.responseAttributesFor(resp))
120120
}
121121
return resp, err
122122
}
@@ -158,22 +158,22 @@ type methodTraceAttributesExtractor struct {
158158
responseExtractor directionTraceAttributesExtractor
159159
}
160160

161-
func (me *methodTraceAttributesExtractor) processRequest(span trace.Span, req interface{}) {
161+
func (me *methodTraceAttributesExtractor) requestAttributesFor(req interface{}) []attribute.KeyValue {
162162
me.requestOnce.Do(func() {
163163
// First time we see an RPC message going from the
164164
// client to the server.
165165
me.requestExtractor.initialize("request", me.requestAttributes, req, me.errorLogger)
166166
})
167-
me.requestExtractor.gatherAttributes(span, req)
167+
return me.requestExtractor.extractAttributes(req)
168168
}
169169

170-
func (me *methodTraceAttributesExtractor) processResponse(span trace.Span, resp interface{}) {
170+
func (me *methodTraceAttributesExtractor) responseAttributesFor(resp interface{}) []attribute.KeyValue {
171171
me.responseOnce.Do(func() {
172172
// First time we see an RPC message going from the
173173
// server to the client.
174174
me.responseExtractor.initialize("response", me.responseAttributes, resp, me.errorLogger)
175175
})
176-
me.responseExtractor.gatherAttributes(span, resp)
176+
return me.responseExtractor.extractAttributes(resp)
177177
}
178178

179179
// methodTraceAttributesExtractor is the bookkeeping that needs to be
@@ -199,15 +199,16 @@ func (de *directionTraceAttributesExtractor) initialize(attributePrefix string,
199199
}
200200
}
201201

202-
func (de *directionTraceAttributesExtractor) gatherAttributes(span trace.Span, m interface{}) {
203-
if len(de.attributeExtractors) > 0 {
204-
mProtoReflect := m.(proto.Message).ProtoReflect()
205-
attributes := make([]attribute.KeyValue, 0, len(de.attributeExtractors))
206-
for _, attributeExtractor := range de.attributeExtractors {
207-
attributes = attributeExtractor(mProtoReflect, attributes)
208-
}
209-
span.SetAttributes(attributes...)
202+
func (de *directionTraceAttributesExtractor) extractAttributes(m interface{}) []attribute.KeyValue {
203+
if len(de.attributeExtractors) == 0 {
204+
return nil
205+
}
206+
mProtoReflect := m.(proto.Message).ProtoReflect()
207+
attributes := make([]attribute.KeyValue, 0, len(de.attributeExtractors))
208+
for _, attributeExtractor := range de.attributeExtractors {
209+
attributes = attributeExtractor(mProtoReflect, attributes)
210210
}
211+
return attributes
211212
}
212213

213214
// attributeExtractor is a function type that is capable of extracting a
@@ -400,14 +401,22 @@ type attributeExtractingClientStream struct {
400401
span trace.Span
401402
gotFirstRequest bool
402403
gotFirstResponse bool
404+
requestIndex uint64
405+
responseIndex uint64
403406
}
404407

405408
func (cs *attributeExtractingClientStream) SendMsg(m interface{}) error {
409+
attributes := cs.method.requestAttributesFor(m)
406410
if !cs.gotFirstRequest {
407411
cs.gotFirstRequest = true
408-
cs.method.processRequest(cs.span, m)
412+
cs.method.applyAttributes(cs.span, attributes)
413+
}
414+
err := cs.ClientStream.SendMsg(m)
415+
if err == nil {
416+
cs.requestIndex++
417+
addMessageEvent(cs.span, "out", cs.requestIndex, attributes)
409418
}
410-
return cs.ClientStream.SendMsg(m)
419+
return err
411420
}
412421

413422
func (cs *attributeExtractingClientStream) RecvMsg(m interface{}) error {
@@ -416,10 +425,18 @@ func (cs *attributeExtractingClientStream) RecvMsg(m interface{}) error {
416425
return err
417426
}
418427
cs.gotFirstResponse = true
419-
cs.method.processResponse(cs.span, m)
428+
attributes := cs.method.responseAttributesFor(m)
429+
cs.method.applyAttributes(cs.span, attributes)
430+
cs.responseIndex++
431+
addMessageEvent(cs.span, "in", cs.responseIndex, attributes)
420432
return nil
421433
}
422-
return cs.ClientStream.RecvMsg(m)
434+
if err := cs.ClientStream.RecvMsg(m); err != nil {
435+
return err
436+
}
437+
cs.responseIndex++
438+
addMessageEvent(cs.span, "in", cs.responseIndex, cs.method.responseAttributesFor(m))
439+
return nil
423440
}
424441

425442
// attributeExtractingServerStream is a decorator for grpc.ServerStream
@@ -431,6 +448,8 @@ type attributeExtractingServerStream struct {
431448
span trace.Span
432449
gotFirstRequest bool
433450
gotFirstResponse bool
451+
requestIndex uint64
452+
responseIndex uint64
434453
}
435454

436455
func (cs *attributeExtractingServerStream) RecvMsg(m interface{}) error {
@@ -439,16 +458,48 @@ func (cs *attributeExtractingServerStream) RecvMsg(m interface{}) error {
439458
return err
440459
}
441460
cs.gotFirstRequest = true
442-
cs.method.processRequest(cs.span, m)
461+
attributes := cs.method.requestAttributesFor(m)
462+
cs.method.applyAttributes(cs.span, attributes)
463+
cs.requestIndex++
464+
addMessageEvent(cs.span, "in", cs.requestIndex, attributes)
443465
return nil
444466
}
445-
return cs.ServerStream.RecvMsg(m)
467+
if err := cs.ServerStream.RecvMsg(m); err != nil {
468+
return err
469+
}
470+
cs.requestIndex++
471+
addMessageEvent(cs.span, "in", cs.requestIndex, cs.method.requestAttributesFor(m))
472+
return nil
446473
}
447474

448475
func (cs *attributeExtractingServerStream) SendMsg(m interface{}) error {
476+
attributes := cs.method.responseAttributesFor(m)
449477
if !cs.gotFirstResponse {
450478
cs.gotFirstResponse = true
451-
cs.method.processResponse(cs.span, m)
479+
cs.method.applyAttributes(cs.span, attributes)
480+
}
481+
err := cs.ServerStream.SendMsg(m)
482+
if err == nil {
483+
cs.responseIndex++
484+
addMessageEvent(cs.span, "out", cs.responseIndex, attributes)
485+
}
486+
return err
487+
}
488+
489+
func (me *methodTraceAttributesExtractor) applyAttributes(span trace.Span, attributes []attribute.KeyValue) {
490+
if span == nil || !span.IsRecording() || len(attributes) == 0 {
491+
return
492+
}
493+
span.SetAttributes(attributes...)
494+
}
495+
496+
func addMessageEvent(span trace.Span, direction string, index uint64, attributes []attribute.KeyValue) {
497+
if span == nil || !span.IsRecording() {
498+
return
452499
}
453-
return cs.ServerStream.SendMsg(m)
500+
attributes = append(attributes,
501+
attribute.String("grpc.message.direction", direction),
502+
attribute.Int64("grpc.message.index", int64(index)),
503+
)
504+
span.AddEvent("grpc.message", trace.WithAttributes(attributes...))
454505
}

0 commit comments

Comments
 (0)