Skip to content

Commit 1282b04

Browse files
saiwlximenzaoshi
saiwl
authored andcommitted
fix(grpcproxy): support grpc context propagation in grpc proxy;fix the problem that watch exits when recvLoop exits but sendLoop still running
Signed-off-by: ximenzaoshi <[email protected]>
1 parent 7b9013d commit 1282b04

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

server/etcdmain/grpc_proxy.go

+16
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"google.golang.org/grpc"
4141
"google.golang.org/grpc/grpclog"
4242
"google.golang.org/grpc/keepalive"
43+
"google.golang.org/grpc/metadata"
4344

4445
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
4546
"go.etcd.io/etcd/client/pkg/v3/logutil"
@@ -422,6 +423,20 @@ func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux {
422423
return cmux.New(l)
423424
}
424425

426+
func contextPropagationUnaryServerInterceptor() grpc.UnaryServerInterceptor {
427+
return func(
428+
ctx context.Context,
429+
req interface{},
430+
info *grpc.UnaryServerInfo,
431+
handler grpc.UnaryHandler,
432+
) (interface{}, error) {
433+
if md, ok := metadata.FromIncomingContext(ctx); ok {
434+
ctx = metadata.NewOutgoingContext(ctx, md)
435+
}
436+
return handler(ctx, req)
437+
}
438+
}
439+
425440
func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
426441
if grpcProxyEnableOrdering {
427442
vf := ordering.NewOrderViolationSwitchEndpointClosure(client)
@@ -467,6 +482,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
467482
}
468483
grpcChainUnaryList := []grpc.UnaryServerInterceptor{
469484
grpc_prometheus.UnaryServerInterceptor,
485+
contextPropagationUnaryServerInterceptor(),
470486
}
471487
if grpcProxyEnableLogging {
472488
grpcChainStreamList = append(grpcChainStreamList,

server/proxy/grpcproxy/watch.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
123123

124124
// post to stopc => terminate server stream; can't use a waitgroup
125125
// since all goroutines will only terminate after Watch() exits.
126-
stopc := make(chan struct{}, 3)
126+
stopc := make(chan struct{}, 2)
127+
leaderc := make(chan struct{}, 1)
127128
go func() {
128129
defer func() { stopc <- struct{}{} }()
129130
wps.recvLoop()
@@ -134,15 +135,15 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
134135
}()
135136
// tear down watch if leader goes down or entire watch proxy is terminated
136137
go func() {
137-
defer func() { stopc <- struct{}{} }()
138+
defer func() { leaderc <- struct{}{} }()
138139
select {
139140
case <-lostLeaderC:
140141
case <-ctx.Done():
141142
case <-wp.ctx.Done():
142143
}
143144
}()
144145

145-
<-stopc
146+
<-leaderc
146147
cancel()
147148

148149
// recv/send may only shutdown after function exits;

0 commit comments

Comments
 (0)