Skip to content

Commit b7a66d0

Browse files
anuraagabasvanbeek
authored andcommitted
Implement gRPC server zipkin handler. (#96)
Implement server zipkin handler.
1 parent 07e0ab8 commit b7a66d0

File tree

9 files changed

+376
-77
lines changed

9 files changed

+376
-77
lines changed

middleware/grpc/client.go

Lines changed: 10 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
1-
// +build go1.9
2-
31
package grpc
42

53
import (
64
"context"
7-
"strings"
85

9-
"google.golang.org/grpc/codes"
106
"google.golang.org/grpc/metadata"
117
"google.golang.org/grpc/stats"
12-
"google.golang.org/grpc/status"
138

149
"github.com/openzipkin/zipkin-go"
1510
"github.com/openzipkin/zipkin-go/model"
@@ -18,28 +13,23 @@ import (
1813

1914
type clientHandler struct {
2015
tracer *zipkin.Tracer
21-
rpcHandlers []RPCHandler
2216
remoteServiceName string
2317
}
2418

2519
// A ClientOption can be passed to NewClientHandler to customize the returned handler.
2620
type ClientOption func(*clientHandler)
2721

28-
// A RPCHandler can be registered using WithRPCHandler to intercept calls to HandleRPC of a
29-
// handler for additional span customization.
30-
type RPCHandler func(span zipkin.Span, rpcStats stats.RPCStats)
31-
32-
// WithRPCHandler allows one to add custom logic for handling a stats.RPCStats, e.g.,
33-
// to add additional tags.
34-
func WithRPCHandler(handler RPCHandler) ClientOption {
22+
// WithRemoteServiceName will set the value for the remote endpoint's service name on
23+
// all spans.
24+
func WithRemoteServiceName(name string) ClientOption {
3525
return func(c *clientHandler) {
36-
c.rpcHandlers = append(c.rpcHandlers, handler)
26+
c.remoteServiceName = name
3727
}
3828
}
3929

4030
// NewClientHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
4131
// tracing to a gRPC client. The gRPC method name is used as the span name and by default the only
42-
// tags are the gRPC status code if the call fails. Use WithRPCHandler to add additional tags.
32+
// tags are the gRPC status code if the call fails.
4333
func NewClientHandler(tracer *zipkin.Tracer, options ...ClientOption) stats.Handler {
4434
c := &clientHandler{
4535
tracer: tracer,
@@ -63,36 +53,18 @@ func (c *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con
6353

6454
// HandleRPC implements per-RPC tracing and stats instrumentation.
6555
func (c *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
66-
span := zipkin.SpanFromContext(ctx)
67-
68-
for _, h := range c.rpcHandlers {
69-
h(span, rs)
70-
}
71-
72-
switch rs := rs.(type) {
73-
case *stats.End:
74-
s, ok := status.FromError(rs.Error)
75-
// rs.Error should always be convertable to a status, this is just a defensive check.
76-
if ok {
77-
if s.Code() != codes.OK {
78-
// Uppercase for consistency with Brave
79-
c := strings.ToUpper(s.Code().String())
80-
span.Tag("grpc.status_code", c)
81-
zipkin.TagError.Set(span, c)
82-
}
83-
} else {
84-
zipkin.TagError.Set(span, rs.Error.Error())
85-
}
86-
span.Finish()
87-
}
56+
handleRPC(ctx, rs)
8857
}
8958

9059
// TagRPC implements per-RPC context management.
9160
func (c *clientHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context {
9261
var span zipkin.Span
9362

63+
ep := remoteEndpointFromContext(ctx, c.remoteServiceName)
64+
9465
name := spanName(rti)
95-
span, ctx = c.tracer.StartSpanFromContext(ctx, name, zipkin.Kind(model.Client))
66+
span, ctx = c.tracer.StartSpanFromContext(ctx, name, zipkin.Kind(model.Client), zipkin.RemoteEndpoint(ep))
67+
9668
md, ok := metadata.FromOutgoingContext(ctx)
9769
if ok {
9870
md = md.Copy()

middleware/grpc/client_test.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
// +build go1.9
2-
31
package grpc_test
42

53
import (
@@ -9,10 +7,10 @@ import (
97
"github.com/onsi/gomega"
108
"google.golang.org/grpc"
119
"google.golang.org/grpc/metadata"
12-
"google.golang.org/grpc/stats"
1310

1411
"github.com/openzipkin/zipkin-go"
1512
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
13+
"github.com/openzipkin/zipkin-go/model"
1614
"github.com/openzipkin/zipkin-go/propagation/b3"
1715
service "github.com/openzipkin/zipkin-go/proto/testing"
1816
"github.com/openzipkin/zipkin-go/reporter/recorder"
@@ -32,7 +30,7 @@ var _ = ginkgo.Describe("gRPC Client", func() {
3230
reporter = recorder.NewReporter()
3331
ep, _ := zipkin.NewEndpoint("grpcClient", "")
3432
tracer, err = zipkin.NewTracer(
35-
reporter, zipkin.WithLocalEndpoint(ep), zipkin.WithIDGenerator(newSequentialIdGenerator()))
33+
reporter, zipkin.WithLocalEndpoint(ep), zipkin.WithIDGenerator(newSequentialIdGenerator(1)))
3634
gomega.Expect(tracer, err).ToNot(gomega.BeNil())
3735
})
3836

@@ -56,8 +54,12 @@ var _ = ginkgo.Describe("gRPC Client", func() {
5654

5755
spans := reporter.Flush()
5856
gomega.Expect(spans).To(gomega.HaveLen(1))
59-
gomega.Expect(spans[0].Name).To(gomega.Equal("zipkin.testing.HelloService.Hello"))
60-
gomega.Expect(spans[0].Tags).To(gomega.BeEmpty())
57+
58+
span := spans[0]
59+
gomega.Expect(span.Kind).To(gomega.Equal(model.Client))
60+
gomega.Expect(span.Name).To(gomega.Equal("zipkin.testing.HelloService.Hello"))
61+
gomega.Expect(span.RemoteEndpoint).To(gomega.BeNil())
62+
gomega.Expect(span.Tags).To(gomega.BeEmpty())
6163
})
6264

6365
ginkgo.It("propagates trace context", func() {
@@ -94,7 +96,7 @@ var _ = ginkgo.Describe("gRPC Client", func() {
9496
})
9597
})
9698

97-
ginkgo.Context("with custom RPCHandler", func() {
99+
ginkgo.Context("with remote service name", func() {
98100
ginkgo.BeforeEach(func() {
99101
var err error
100102

@@ -103,21 +105,18 @@ var _ = ginkgo.Describe("gRPC Client", func() {
103105
grpc.WithInsecure(),
104106
grpc.WithStatsHandler(zipkingrpc.NewClientHandler(
105107
tracer,
106-
zipkingrpc.WithRPCHandler(func(span zipkin.Span, rpcStats stats.RPCStats) {
107-
span.Tag("custom", "tag")
108-
}))))
108+
zipkingrpc.WithRemoteServiceName("remoteService"))))
109109
gomega.Expect(conn, err).ToNot(gomega.BeNil())
110110
client = service.NewHelloServiceClient(conn)
111111
})
112112

113-
ginkgo.It("calls custom RPCHandler", func() {
113+
ginkgo.It("has remote service name", func() {
114114
resp, err := client.Hello(context.Background(), &service.HelloRequest{Payload: "Hello"})
115115
gomega.Expect(resp, err).ToNot(gomega.BeNil())
116116

117117
spans := reporter.Flush()
118118
gomega.Expect(spans).To(gomega.HaveLen(1))
119-
gomega.Expect(spans[0].Tags).To(gomega.HaveLen(1))
120-
gomega.Expect(spans[0].Tags).To(gomega.HaveKeyWithValue("custom", "tag"))
119+
gomega.Expect(spans[0].RemoteEndpoint.ServiceName).To(gomega.Equal("remoteService"))
121120
})
122121
})
123122
})

middleware/grpc/doc.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
// +build go1.9
2-
31
/*
42
Package grpc contains several gRPC handlers which can be used for instrumenting calls with Zipkin.
53
*/

middleware/grpc/grpc_suite_test.go

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
// +build go1.9
2-
31
package grpc_test
42

53
import (
@@ -15,13 +13,23 @@ import (
1513
"google.golang.org/grpc/metadata"
1614
"google.golang.org/grpc/status"
1715

16+
"github.com/openzipkin/zipkin-go"
17+
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
1818
"github.com/openzipkin/zipkin-go/model"
19+
"github.com/openzipkin/zipkin-go/propagation/b3"
1920
service "github.com/openzipkin/zipkin-go/proto/testing"
21+
"github.com/openzipkin/zipkin-go/reporter/recorder"
2022
)
2123

2224
var (
25+
serverIdGenerator *sequentialIdGenerator
26+
serverReporter *recorder.ReporterRecorder
27+
2328
server *grpc.Server
2429
serverAddr string
30+
31+
customServer *grpc.Server
32+
customServerAddr string
2533
)
2634

2735
func TestGrpc(t *testing.T) {
@@ -30,28 +38,53 @@ func TestGrpc(t *testing.T) {
3038
}
3139

3240
var _ = ginkgo.BeforeSuite(func() {
41+
var err error
42+
43+
serverReporter = recorder.NewReporter()
44+
ep, _ := zipkin.NewEndpoint("grpcServer", "")
45+
serverIdGenerator = newSequentialIdGenerator(0x1000000)
46+
tracer, err := zipkin.NewTracer(
47+
serverReporter, zipkin.WithLocalEndpoint(ep), zipkin.WithIDGenerator(serverIdGenerator), zipkin.WithSharedSpans(false))
48+
3349
lis, err := net.Listen("tcp", ":0")
3450
gomega.Expect(lis, err).ToNot(gomega.BeNil(), "failed to listen to tcp port")
3551

36-
server = grpc.NewServer()
52+
server = grpc.NewServer(grpc.StatsHandler(zipkingrpc.NewServerHandler(tracer)))
3753
service.RegisterHelloServiceServer(server, &TestHelloService{})
3854
go func() {
3955
_ = server.Serve(lis)
4056
}()
4157
serverAddr = lis.Addr().String()
58+
59+
customLis, err := net.Listen("tcp", ":0")
60+
gomega.Expect(customLis, err).ToNot(gomega.BeNil(), "failed to listen to tcp port")
61+
62+
tracer, err = zipkin.NewTracer(
63+
serverReporter, zipkin.WithLocalEndpoint(ep), zipkin.WithIDGenerator(serverIdGenerator), zipkin.WithSharedSpans(true))
64+
customServer = grpc.NewServer(grpc.StatsHandler(zipkingrpc.NewServerHandler(
65+
tracer,
66+
zipkingrpc.ServerTags(map[string]string{"default": "tag"}))))
67+
service.RegisterHelloServiceServer(customServer, &TestHelloService{})
68+
go func() {
69+
_ = customServer.Serve(customLis)
70+
}()
71+
customServerAddr = customLis.Addr().String()
4272
})
4373

4474
var _ = ginkgo.AfterSuite(func() {
4575
server.Stop()
76+
customServer.Stop()
77+
_ = serverReporter.Close()
4678
})
4779

4880
type sequentialIdGenerator struct {
4981
nextTraceId uint64
5082
nextSpanId uint64
83+
start uint64
5184
}
5285

53-
func newSequentialIdGenerator() *sequentialIdGenerator {
54-
return &sequentialIdGenerator{1, 1}
86+
func newSequentialIdGenerator(start uint64) *sequentialIdGenerator {
87+
return &sequentialIdGenerator{start, start, start}
5588
}
5689

5790
func (g *sequentialIdGenerator) SpanID(traceID model.TraceID) model.ID {
@@ -69,27 +102,43 @@ func (g *sequentialIdGenerator) TraceID() model.TraceID {
69102
return id
70103
}
71104

105+
func (g *sequentialIdGenerator) reset() {
106+
g.nextTraceId = g.start
107+
g.nextSpanId = g.start
108+
}
109+
72110
type TestHelloService struct{}
73111

74112
func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) {
75113
if req.Payload == "fail" {
76114
return nil, status.Error(codes.Aborted, "fail")
77115
}
78116

117+
resp := &service.HelloResponse{
118+
Payload: "World",
119+
Metadata: map[string]string{},
120+
SpanContext: map[string]string{},
121+
}
122+
79123
md, ok := metadata.FromIncomingContext(ctx)
80124
if !ok {
81125
return nil, errors.New("could not parse incoming metadata")
82126
}
83127

84-
resp := &service.HelloResponse{
85-
Payload: "World",
86-
Metadata: map[string]string{},
87-
}
88-
89128
for k := range md {
90129
// Just append the first value for a key for simplicity since we don't use multi-value headers.
91130
resp.GetMetadata()[k] = md[k][0]
92131
}
93132

133+
span := zipkin.SpanFromContext(ctx)
134+
if span != nil {
135+
spanCtx := span.Context()
136+
resp.GetSpanContext()[b3.SpanID] = spanCtx.ID.String()
137+
resp.GetSpanContext()[b3.TraceID] = spanCtx.TraceID.String()
138+
if spanCtx.ParentID != nil {
139+
resp.GetSpanContext()[b3.ParentSpanID] = spanCtx.ParentID.String()
140+
}
141+
}
142+
94143
return resp, nil
95144
}

middleware/grpc/server.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"github.com/openzipkin/zipkin-go"
6+
"github.com/openzipkin/zipkin-go/model"
7+
"github.com/openzipkin/zipkin-go/propagation/b3"
8+
"google.golang.org/grpc/metadata"
9+
"google.golang.org/grpc/stats"
10+
)
11+
12+
type serverHandler struct {
13+
tracer *zipkin.Tracer
14+
defaultTags map[string]string
15+
}
16+
17+
// A ServerOption can be passed to NewServerHandler to customize the returned handler.
18+
type ServerOption func(*serverHandler)
19+
20+
// ServerTags adds default Tags to inject into server spans.
21+
func ServerTags(tags map[string]string) ServerOption {
22+
return func(h *serverHandler) {
23+
h.defaultTags = tags
24+
}
25+
}
26+
27+
// NewServerHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
28+
// tracing to a gRPC server. The gRPC method name is used as the span name and by default the only
29+
// tags are the gRPC status code if the call fails. Use ServerTags to add additional tags that
30+
// should be applied to all spans.
31+
func NewServerHandler(tracer *zipkin.Tracer, options ...ServerOption) stats.Handler {
32+
c := &serverHandler{
33+
tracer: tracer,
34+
}
35+
for _, option := range options {
36+
option(c)
37+
}
38+
return c
39+
}
40+
41+
// HandleConn exists to satisfy gRPC stats.Handler.
42+
func (s *serverHandler) HandleConn(ctx context.Context, cs stats.ConnStats) {
43+
// no-op
44+
}
45+
46+
// TagConn exists to satisfy gRPC stats.Handler.
47+
func (s *serverHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
48+
// no-op
49+
return ctx
50+
}
51+
52+
// HandleRPC implements per-RPC tracing and stats instrumentation.
53+
func (s *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
54+
handleRPC(ctx, rs)
55+
}
56+
57+
// TagRPC implements per-RPC context management.
58+
func (s *serverHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context {
59+
md, ok := metadata.FromIncomingContext(ctx)
60+
// In practice, ok never seems to be false but add a defensive check.
61+
if !ok {
62+
md = metadata.New(nil)
63+
}
64+
65+
name := spanName(rti)
66+
67+
sc := s.tracer.Extract(b3.ExtractGRPC(&md))
68+
69+
span := s.tracer.StartSpan(name, zipkin.Kind(model.Server), zipkin.Parent(sc), zipkin.RemoteEndpoint(remoteEndpointFromContext(ctx, "")))
70+
71+
for k, v := range s.defaultTags {
72+
span.Tag(k, v)
73+
}
74+
75+
return zipkin.NewContext(ctx, span)
76+
}

0 commit comments

Comments
 (0)