Skip to content

Commit 41a7b81

Browse files
committed
feat(grpc): adds support for grpc parsing.
1 parent faa50ce commit 41a7b81

File tree

7 files changed

+250
-16
lines changed

7 files changed

+250
-16
lines changed

go.mod

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,16 @@ require (
77
github.com/eapache/go-resiliency v1.1.0 // indirect
88
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
99
github.com/eapache/queue v1.1.0 // indirect
10-
github.com/gogo/googleapis v1.1.0 // indirect
1110
github.com/gogo/protobuf v1.2.0
1211
github.com/golang/protobuf v1.4.2
1312
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
1413
github.com/gorilla/context v1.1.1 // indirect
1514
github.com/gorilla/mux v1.6.2
16-
github.com/lyft/protoc-gen-validate v0.0.13 // indirect
1715
github.com/onsi/ginkgo v1.7.0
1816
github.com/onsi/gomega v1.4.3
1917
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1 // indirect
2018
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
2119
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
22-
golang.org/x/net v0.0.0-20190311183353-d8887717615a
2320
google.golang.org/grpc v1.30.0
2421
google.golang.org/protobuf v1.25.0
2522
)

go.sum

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
1515
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
1616
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
1717
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
18-
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
1918
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
2019
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
2120
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
2221
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
2322
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
2423
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
25-
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
2624
github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI=
2725
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
2826
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
@@ -46,14 +44,14 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
4644
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
4745
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
4846
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
47+
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
4948
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
5049
github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8=
5150
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
5251
github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk=
5352
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
5453
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
5554
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
56-
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
5755
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
5856
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
5957
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -93,6 +91,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
9391
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
9492
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
9593
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
94+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
9695
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
9796
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
9897
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@@ -102,16 +101,12 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
102101
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
103102
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
104103
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
105-
google.golang.org/grpc v1.20.0 h1:DlsSIrgEBuZAUFJcta2B5i/lzeHHbnfkNFAfFXLVFYQ=
106-
google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=
107104
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
108105
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
109106
google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
110107
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
111108
google.golang.org/grpc v1.30.0 h1:M5a8xTlYTxwMn5ZFkwhRabsygDY5G8TYLyQDBxJNAxE=
112109
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
113-
google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200728012721-8b7764bddbca h1:5sEGdC+FhvT/NORrmnMIGyEB7NcWkbMplI2bxi9IG4c=
114-
google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200728012721-8b7764bddbca/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
115110
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
116111
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
117112
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

middleware/grpc/client.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
type clientHandler struct {
2929
tracer *zipkin.Tracer
3030
remoteServiceName string
31+
handleRPCParser handleRPCParser
3132
}
3233

3334
// A ClientOption can be passed to NewClientHandler to customize the returned handler.
@@ -41,6 +42,30 @@ func WithRemoteServiceName(name string) ClientOption {
4142
}
4243
}
4344

45+
// WithClientInPayloadParser adds a parser for the stats.InPayload to be able to access
46+
// the request payload
47+
func WithClientInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ClientOption {
48+
return func(h *clientHandler) {
49+
h.handleRPCParser.inPayload = parser
50+
}
51+
}
52+
53+
// WithClientInTrailerParser adds a parser for the stats.InTrailer to be able to access
54+
// the request trailer
55+
func WithClientInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ClientOption {
56+
return func(h *clientHandler) {
57+
h.handleRPCParser.inTrailer = parser
58+
}
59+
}
60+
61+
// WithClientInHeaderParser adds a parser for the stats.InHeader to be able to access
62+
// the request payload
63+
func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ClientOption {
64+
return func(h *clientHandler) {
65+
h.handleRPCParser.inHeader = parser
66+
}
67+
}
68+
4469
// NewClientHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
4570
// tracing to a gRPC client. The gRPC method name is used as the span name and by default the only
4671
// tags are the gRPC status code if the call fails.
@@ -67,7 +92,7 @@ func (c *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con
6792

6893
// HandleRPC implements per-RPC tracing and stats instrumentation.
6994
func (c *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
70-
handleRPC(ctx, rs)
95+
handleRPC(ctx, rs, c.handleRPCParser)
7196
}
7297

7398
// TagRPC implements per-RPC context management.

middleware/grpc/grpc_suite_test.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package grpc_test
1717
import (
1818
"context"
1919
"errors"
20+
"log"
2021
"net"
2122
"testing"
2223

@@ -26,6 +27,7 @@ import (
2627
"google.golang.org/grpc/codes"
2728
"google.golang.org/grpc/metadata"
2829
"google.golang.org/grpc/status"
30+
"google.golang.org/grpc/test/bufconn"
2931

3032
"github.com/openzipkin/zipkin-go"
3133
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
@@ -121,7 +123,7 @@ func (g *sequentialIdGenerator) reset() {
121123
g.nextSpanId = g.start
122124
}
123125

124-
type TestHelloService struct{
126+
type TestHelloService struct {
125127
service.UnimplementedHelloServiceServer
126128
}
127129

@@ -158,3 +160,37 @@ func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest)
158160

159161
return resp, nil
160162
}
163+
164+
func initListener(s *grpc.Server) func(context.Context, string) (net.Conn, error) {
165+
const bufSize = 1024 * 1024
166+
167+
listener := bufconn.Listen(bufSize)
168+
bufDialer := func(context.Context, string) (net.Conn, error) {
169+
return listener.Dial()
170+
}
171+
172+
go func() {
173+
if err := s.Serve(listener); err != nil {
174+
log.Fatalf("Server exited with error: %v", err)
175+
}
176+
}()
177+
178+
return bufDialer
179+
}
180+
181+
func createTracer(joinSpans bool) (*zipkin.Tracer, func() []model.SpanModel) {
182+
recorder := recorder.NewReporter()
183+
ep, _ := zipkin.NewEndpoint("grpc-server", "")
184+
185+
serverIdGenerator = newSequentialIdGenerator(0x1000000)
186+
187+
tracer, _ := zipkin.NewTracer(
188+
recorder,
189+
zipkin.WithLocalEndpoint(ep),
190+
zipkin.WithSharedSpans(joinSpans),
191+
zipkin.WithIDGenerator(serverIdGenerator),
192+
)
193+
return tracer, func() []model.SpanModel {
194+
return recorder.Flush()
195+
}
196+
}

middleware/grpc/server.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ import (
2525
)
2626

2727
type serverHandler struct {
28-
tracer *zipkin.Tracer
29-
defaultTags map[string]string
28+
tracer *zipkin.Tracer
29+
defaultTags map[string]string
30+
handleRPCParser handleRPCParser
3031
}
3132

3233
// A ServerOption can be passed to NewServerHandler to customize the returned handler.
@@ -39,6 +40,30 @@ func ServerTags(tags map[string]string) ServerOption {
3940
}
4041
}
4142

43+
// WithServerInPayloadParser adds a parser for the stats.InPayload to be able to access
44+
// the request payload
45+
func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ServerOption {
46+
return func(h *serverHandler) {
47+
h.handleRPCParser.inPayload = parser
48+
}
49+
}
50+
51+
// WithserverInTrailerParser adds a parser for the stats.InTrailer to be able to access
52+
// the request trailer
53+
func WithserverInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ServerOption {
54+
return func(h *serverHandler) {
55+
h.handleRPCParser.inTrailer = parser
56+
}
57+
}
58+
59+
// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access
60+
// the request payload
61+
func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ServerOption {
62+
return func(h *serverHandler) {
63+
h.handleRPCParser.inHeader = parser
64+
}
65+
}
66+
4267
// NewServerHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
4368
// tracing to a gRPC server. The gRPC method name is used as the span name and by default the only
4469
// tags are the gRPC status code if the call fails. Use ServerTags to add additional tags that
@@ -66,7 +91,7 @@ func (s *serverHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con
6691

6792
// HandleRPC implements per-RPC tracing and stats instrumentation.
6893
func (s *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
69-
handleRPC(ctx, rs)
94+
handleRPC(ctx, rs, s.handleRPCParser)
7095
}
7196

7297
// TagRPC implements per-RPC context management.
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright 2019 The OpenZipkin Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package grpc_test
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"google.golang.org/grpc"
22+
"google.golang.org/grpc/metadata"
23+
"google.golang.org/grpc/stats"
24+
25+
"github.com/openzipkin/zipkin-go"
26+
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
27+
"github.com/openzipkin/zipkin-go/model"
28+
service "github.com/openzipkin/zipkin-go/proto/testing"
29+
)
30+
31+
func TestGRPCServerCreatesASpanAndContext(t *testing.T) {
32+
tracer, flusher := createTracer(false)
33+
34+
s := grpc.NewServer(
35+
grpc.StatsHandler(
36+
zipkingrpc.NewServerHandler(
37+
tracer,
38+
zipkingrpc.ServerTags(map[string]string{"default": "tag"}),
39+
),
40+
),
41+
)
42+
defer s.Stop()
43+
44+
service.RegisterHelloServiceServer(s, &TestHelloService{})
45+
46+
dialer := initListener(s)
47+
48+
ctx := context.Background()
49+
conn, err := grpc.DialContext(
50+
ctx,
51+
"bufnet",
52+
grpc.WithContextDialer(dialer),
53+
grpc.WithInsecure(),
54+
)
55+
if err != nil {
56+
t.Fatalf("Failed to dial bufnet: %v", err)
57+
}
58+
defer conn.Close()
59+
60+
client := service.NewHelloServiceClient(conn)
61+
62+
_, err = client.Hello(ctx, &service.HelloRequest{
63+
Payload: "Hello",
64+
})
65+
if err != nil {
66+
t.Fatalf("unexpected error: %v", err)
67+
}
68+
69+
spans := flusher()
70+
if want, have := 1, len(spans); want != have {
71+
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
72+
}
73+
74+
span := spans[0]
75+
if want, have := model.Server, span.Kind; want != have {
76+
t.Errorf("unexpected kind, want %q, have %q", want, have)
77+
}
78+
}
79+
80+
func TestGRPCServerCanAccessToHeaders(t *testing.T) {
81+
tracer, flusher := createTracer(false)
82+
83+
s := grpc.NewServer(
84+
grpc.StatsHandler(
85+
zipkingrpc.NewServerHandler(
86+
tracer,
87+
zipkingrpc.ServerTags(map[string]string{"default": "tag"}),
88+
zipkingrpc.WithServerInHeaderParser(func(inHeader *stats.InHeader, span zipkin.Span) {
89+
if want, have := "test_value", inHeader.Header.Get("test_key")[0]; want != have {
90+
t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have)
91+
}
92+
}),
93+
zipkingrpc.WithServerInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.Span) {
94+
if want, have := "test_value", inTrailer.Trailer.Get("test_key")[0]; want != have {
95+
t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have)
96+
}
97+
}),
98+
),
99+
),
100+
)
101+
defer s.Stop()
102+
103+
service.RegisterHelloServiceServer(s, &TestHelloService{})
104+
105+
dialer := initListener(s)
106+
107+
ctx := context.Background()
108+
conn, err := grpc.DialContext(
109+
ctx,
110+
"bufnet",
111+
grpc.WithContextDialer(dialer),
112+
grpc.WithInsecure(),
113+
)
114+
if err != nil {
115+
t.Fatalf("Failed to dial bufnet: %v", err)
116+
}
117+
defer conn.Close()
118+
119+
client := service.NewHelloServiceClient(conn)
120+
121+
ctx = metadata.AppendToOutgoingContext(ctx, "test_key", "test_value")
122+
_, err = client.Hello(ctx, &service.HelloRequest{
123+
Payload: "Hello",
124+
})
125+
if err != nil {
126+
t.Fatalf("unexpected error: %v", err)
127+
}
128+
129+
spans := flusher()
130+
if want, have := 1, len(spans); want != have {
131+
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
132+
}
133+
134+
span := spans[0]
135+
if want, have := model.Server, span.Kind; want != have {
136+
t.Errorf("unexpected kind, want %q, have %q", want, have)
137+
}
138+
}

0 commit comments

Comments
 (0)