Skip to content

Commit 49b6236

Browse files
committed
feat: adaptation for gRPC
1 parent d7b32c4 commit 49b6236

File tree

1 file changed

+82
-0
lines changed

1 file changed

+82
-0
lines changed

pkg/adapters/grpc/traffic.go

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/alibaba/sentinel-golang/core/route"
8+
"github.com/alibaba/sentinel-golang/core/route/base"
9+
"github.com/google/uuid"
10+
"google.golang.org/grpc"
11+
"net"
12+
"strings"
13+
)
14+
15+
var (
16+
connToBaggage map[string]map[string]string = make(map[string]map[string]string)
17+
cm *route.ClusterManager = nil
18+
)
19+
20+
func NewDialer(id string) func(context.Context, string) (net.Conn, error) {
21+
return func(ctx context.Context, addr string) (net.Conn, error) {
22+
parts := strings.Split(addr, "/")
23+
if len(parts) != 2 {
24+
return nil, errors.New("invalid address format")
25+
}
26+
tc := &base.TrafficContext{
27+
ServiceName: parts[0],
28+
MethodName: parts[1],
29+
Headers: make(map[string]string),
30+
}
31+
32+
instance, err := cm.GetOne(tc)
33+
34+
if err != nil {
35+
return nil, err
36+
}
37+
if instance == nil {
38+
return nil, errors.New("no matched provider")
39+
}
40+
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%v", instance.Host, instance.Port))
41+
if err != nil {
42+
return nil, err
43+
}
44+
connToBaggage[id] = tc.Baggage
45+
46+
return conn, nil
47+
}
48+
}
49+
50+
func NewTrafficUnaryIntercepter(connId string) grpc.DialOption {
51+
return grpc.WithUnaryInterceptor(
52+
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
53+
newCtx := ctx
54+
if baggage, ok := connToBaggage[connId]; ok {
55+
// TODO modify the request
56+
_ = baggage
57+
_ = newCtx
58+
}
59+
return invoker(newCtx, method, req, reply, cc, opts...)
60+
})
61+
}
62+
63+
func NewTrafficStreamIntercepter(connId string) grpc.DialOption {
64+
return grpc.WithStreamInterceptor(
65+
func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
66+
newCtx := ctx
67+
if baggage, ok := connToBaggage[connId]; ok {
68+
// TODO modify the request
69+
_ = baggage
70+
_ = newCtx
71+
}
72+
return streamer(newCtx, desc, cc, method, opts...)
73+
})
74+
}
75+
76+
func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
77+
id := uuid.New().String()
78+
opts = append(opts, grpc.WithContextDialer(NewDialer(id)))
79+
opts = append(opts, NewTrafficUnaryIntercepter(id))
80+
opts = append(opts, NewTrafficStreamIntercepter(id))
81+
return grpc.Dial(addr, opts...)
82+
}

0 commit comments

Comments
 (0)