Skip to content

Commit 6d9b49f

Browse files
committed
feat: support polaris service registration discovery
1 parent 7cdc12b commit 6d9b49f

File tree

21 files changed

+1307
-37
lines changed

21 files changed

+1307
-37
lines changed

contrib/registry/polaris/go.mod

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module github.com/TarsCloud/TarsGo/contrib/registry/polaris
2+
3+
go 1.16
4+
5+
require (
6+
github.com/TarsCloud/TarsGo v1.3.9
7+
github.com/polarismesh/polaris-go v1.3.0
8+
)
9+
10+
replace github.com/TarsCloud/TarsGo v1.3.9 => ../../../

contrib/registry/polaris/go.sum

+685
Large diffs are not rendered by default.

contrib/registry/polaris/registry.go

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package polaris
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/TarsCloud/TarsGo/tars/protocol/res/endpointf"
8+
"github.com/TarsCloud/TarsGo/tars/registry"
9+
"github.com/TarsCloud/TarsGo/tars/util/endpoint"
10+
"github.com/polarismesh/polaris-go"
11+
)
12+
13+
const (
14+
endpointMeta = "endpoint"
15+
)
16+
17+
type polarisRegistry struct {
18+
namespace string
19+
provider polaris.ProviderAPI
20+
consumer polaris.ConsumerAPI
21+
}
22+
23+
type RegistryOption func(pr *polarisRegistry)
24+
25+
func WithNamespace(namespace string) RegistryOption {
26+
return func(pr *polarisRegistry) {
27+
pr.namespace = namespace
28+
}
29+
}
30+
31+
func New(provider polaris.ProviderAPI, opts ...RegistryOption) registry.Registry {
32+
consumer := polaris.NewConsumerAPIByContext(provider.SDKContext())
33+
pr := &polarisRegistry{namespace: "tars", provider: provider, consumer: consumer}
34+
for _, opt := range opts {
35+
opt(pr)
36+
}
37+
//pr.addMiddleware()
38+
return pr
39+
}
40+
41+
/*func (pr *polarisRegistry) addMiddleware() {
42+
tars.UseClientFilterMiddleware(func(next tars.ClientFilter) tars.ClientFilter {
43+
return func(ctx context.Context, msg *tars.Message, invoke tars.Invoke, timeout time.Duration) (err error) {
44+
start := time.Now()
45+
defer func() {
46+
delay := time.Since(start)
47+
retStatus := model.RetSuccess
48+
if msg.Resp.IRet != 0 {
49+
retStatus = model.RetFail
50+
}
51+
ret := &polaris.ServiceCallResult{
52+
ServiceCallResult: model.ServiceCallResult{
53+
EmptyInstanceGauge: model.EmptyInstanceGauge{},
54+
CalledInstance: nil, // todo: 怎么获取到或构造 Instance
55+
Method: msg.Req.SServantName + "." + msg.Req.SFuncName,
56+
RetStatus: retStatus,
57+
},
58+
}
59+
ret.SetDelay(delay)
60+
ret.SetRetCode(msg.Resp.IRet)
61+
if er := pr.consumer.UpdateServiceCallResult(ret); er != nil {
62+
TLOG.Errorf("do report service call result : %+v", er)
63+
}
64+
}()
65+
return next(ctx, msg, invoke, timeout)
66+
}
67+
})
68+
}*/
69+
70+
func (pr *polarisRegistry) Registry(_ context.Context, servant *registry.ServantInstance) error {
71+
instance := &polaris.InstanceRegisterRequest{}
72+
instance.Host = servant.Endpoint.Host
73+
instance.Port = int(servant.Endpoint.Port)
74+
instance.Protocol = &servant.Protocol
75+
instance.Namespace = pr.namespace
76+
instance.Service = servant.Servant
77+
if servant.Endpoint.Weight > 0 {
78+
weight := int(servant.Endpoint.Weight)
79+
instance.Weight = &weight
80+
}
81+
if servant.Endpoint.Timeout > 0 {
82+
timeout := time.Duration(servant.Endpoint.Timeout) * time.Millisecond
83+
instance.Timeout = &timeout
84+
}
85+
instance.Metadata = createMetadata(servant)
86+
_, err := pr.provider.RegisterInstance(instance)
87+
return err
88+
}
89+
90+
func (pr *polarisRegistry) Deregister(_ context.Context, servant *registry.ServantInstance) error {
91+
instance := &polaris.InstanceDeRegisterRequest{}
92+
instance.Namespace = pr.namespace
93+
instance.Service = servant.Servant
94+
instance.Host = servant.Endpoint.Host
95+
instance.Port = int(servant.Endpoint.Port)
96+
if servant.Endpoint.Timeout > 0 {
97+
timeout := time.Duration(servant.Endpoint.Timeout) * time.Millisecond
98+
instance.Timeout = &timeout
99+
}
100+
err := pr.provider.Deregister(instance)
101+
return err
102+
}
103+
104+
func (pr *polarisRegistry) QueryServant(_ context.Context, id string) (activeEp []endpointf.EndpointF, inactiveEp []endpointf.EndpointF, err error) {
105+
req := &polaris.GetAllInstancesRequest{}
106+
req.Namespace = pr.namespace
107+
req.Service = id
108+
resp, err := pr.consumer.GetAllInstances(req)
109+
if err != nil {
110+
return nil, nil, err
111+
}
112+
instances := resp.GetInstances()
113+
for _, ins := range instances {
114+
ep := endpoint.Parse(ins.GetMetadata()[endpointMeta])
115+
ep.Host = ins.GetHost()
116+
ep.Port = int32(ins.GetPort())
117+
epf := endpoint.Endpoint2tars(ep)
118+
if ins.IsHealthy() {
119+
activeEp = append(activeEp, epf)
120+
} else {
121+
inactiveEp = append(inactiveEp, epf)
122+
}
123+
}
124+
return activeEp, inactiveEp, err
125+
}
126+
127+
func (pr *polarisRegistry) QueryServantBySet(_ context.Context, id, setId string) (activeEp []endpointf.EndpointF, inactiveEp []endpointf.EndpointF, err error) {
128+
req := &polaris.GetInstancesRequest{}
129+
req.Namespace = pr.namespace
130+
req.Service = id
131+
req.Metadata = map[string]string{
132+
"internal-enable-set": "Y",
133+
"internal-set-name": setId,
134+
}
135+
resp, err := pr.consumer.GetInstances(req)
136+
if err != nil {
137+
return nil, nil, err
138+
}
139+
instances := resp.GetInstances()
140+
for _, ins := range instances {
141+
ep := endpoint.Parse(ins.GetMetadata()[endpointMeta])
142+
ep.Host = ins.GetHost()
143+
ep.Port = int32(ins.GetPort())
144+
epf := endpoint.Endpoint2tars(ep)
145+
if ins.IsHealthy() {
146+
activeEp = append(activeEp, epf)
147+
} else {
148+
inactiveEp = append(inactiveEp, epf)
149+
}
150+
}
151+
return activeEp, inactiveEp, err
152+
}
153+
154+
func createMetadata(servant *registry.ServantInstance) map[string]string {
155+
metadata := make(map[string]string)
156+
metadata["tarsVersion"] = servant.TarsVersion
157+
metadata["app"] = servant.App
158+
metadata["server"] = servant.Server
159+
metadata[endpointMeta] = servant.Endpoint.String()
160+
// polaris plugin
161+
metadata["internal-enable-set"] = "N"
162+
if servant.EnableSet {
163+
metadata["internal-enable-set"] = "Y"
164+
metadata["internal-set-name"] = servant.SetDivision
165+
}
166+
return metadata
167+
}

examples/PolarisServer/.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
PolarisServer
2+
*.log

examples/PolarisServer/HelloObj.tars

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module TestApp
2+
{
3+
interface HelloObj
4+
{
5+
int Add(int a,int b,out int c); // Some example function
6+
int Sub(int a,int b,out int c); // Some example function
7+
};
8+
};
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package main
2+
3+
import (
4+
"context"
5+
)
6+
7+
// HelloObjImp servant implementation
8+
type HelloObjImp struct {
9+
}
10+
11+
// Init servant init
12+
func (imp *HelloObjImp) Init() error {
13+
//initialize servant here:
14+
//...
15+
return nil
16+
}
17+
18+
// Destroy servant destroy
19+
func (imp *HelloObjImp) Destroy() {
20+
//destroy servant here:
21+
//...
22+
}
23+
24+
func (imp *HelloObjImp) Add(ctx context.Context, a int32, b int32, c *int32) (int32, error) {
25+
//Doing something in your function
26+
//...
27+
return 0, nil
28+
}
29+
func (imp *HelloObjImp) Sub(ctx context.Context, a int32, b int32, c *int32) (int32, error) {
30+
//Doing something in your function
31+
//...
32+
return 0, nil
33+
}
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
7+
"github.com/polarismesh/polaris-go"
8+
9+
pr "github.com/TarsCloud/TarsGo/contrib/registry/polaris"
10+
"github.com/TarsCloud/TarsGo/tars"
11+
12+
"polarisserver/tars-protocol/TestApp"
13+
)
14+
15+
func main() {
16+
//provider, err := polaris.NewProviderAPI()
17+
// 或者使用以下方法,则不需要创建配置文件
18+
provider, err := polaris.NewProviderAPIByAddress("127.0.0.1:8091")
19+
if err != nil {
20+
log.Fatalf("fail to create providerAPI, err is %v", err)
21+
}
22+
defer provider.Destroy()
23+
// 注册中心
24+
tars.SetRegistry(pr.New(provider, pr.WithNamespace("tars")))
25+
26+
comm := tars.NewCommunicator()
27+
obj := fmt.Sprintf("TestApp.PolarisServer.HelloObj")
28+
app := new(TestApp.HelloObj)
29+
comm.StringToProxy(obj, app)
30+
var out, i int32
31+
i = 123
32+
ret, err := app.Add(i, i*2, &out)
33+
if err != nil {
34+
fmt.Println(err)
35+
return
36+
}
37+
fmt.Println(ret, out)
38+
}
+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<tars>
2+
<application>
3+
enableset=y
4+
setdivision=public.ali.1
5+
<server>
6+
app=TestApp
7+
server=PolarisServer
8+
local=tcp -h 127.0.0.1 -p 10014 -t 30000
9+
logpath=/tmp
10+
<TestApp.PolarisServer.HelloObjAdapter>
11+
allow
12+
endpoint=tcp -h 127.0.0.1 -p 10015 -t 60000
13+
handlegroup=TestApp.PolarisServer.HelloObjAdapter
14+
maxconns=200000
15+
protocol=tars
16+
queuecap=10000
17+
queuetimeout=60000
18+
servant=TestApp.PolarisServer.HelloObj
19+
shmcap=0
20+
shmkey=0
21+
threads=1
22+
</TestApp.PolarisServer.HelloObjAdapter>
23+
</server>
24+
</application>
25+
</tars>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/TarsCloud/TarsGo/tars"
7+
"github.com/TarsCloud/TarsGo/tars/protocol/res/adminf"
8+
)
9+
10+
func main() {
11+
comm := tars.NewCommunicator()
12+
obj := "TestApp.PolarisServer.HelloObjObj@tcp -h 127.0.0.1 -p 10014 -t 60000"
13+
app := new(adminf.AdminF)
14+
comm.StringToProxy(obj, app)
15+
ret, err := app.Notify("tars.dumpstack")
16+
if err != nil {
17+
fmt.Println(err)
18+
return
19+
}
20+
fmt.Println(ret)
21+
}

examples/PolarisServer/go.mod

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
module polarisserver
2+
3+
go 1.17
4+
5+
require (
6+
github.com/TarsCloud/TarsGo v1.3.6
7+
github.com/polarismesh/polaris-go v1.2.0-beta.3
8+
)
9+
10+
require (
11+
github.com/beorn7/perks v1.0.1 // indirect
12+
github.com/cespare/xxhash/v2 v2.1.2 // indirect
13+
github.com/dlclark/regexp2 v1.7.0 // indirect
14+
github.com/gin-contrib/sse v0.1.0 // indirect
15+
github.com/gin-gonic/gin v1.8.1 // indirect
16+
github.com/go-playground/locales v0.14.0 // indirect
17+
github.com/go-playground/universal-translator v0.18.0 // indirect
18+
github.com/go-playground/validator/v10 v10.10.0 // indirect
19+
github.com/goccy/go-json v0.9.7 // indirect
20+
github.com/golang/protobuf v1.5.2 // indirect
21+
github.com/google/uuid v1.3.0 // indirect
22+
github.com/hashicorp/errwrap v1.0.0 // indirect
23+
github.com/hashicorp/go-multierror v1.1.1 // indirect
24+
github.com/json-iterator/go v1.1.12 // indirect
25+
github.com/leodido/go-urn v1.2.1 // indirect
26+
github.com/mattn/go-isatty v0.0.14 // indirect
27+
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
28+
github.com/mitchellh/go-homedir v1.1.0 // indirect
29+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
30+
github.com/modern-go/reflect2 v1.0.2 // indirect
31+
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
32+
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
33+
github.com/prometheus/client_golang v1.12.1 // indirect
34+
github.com/prometheus/client_model v0.2.0 // indirect
35+
github.com/prometheus/common v0.32.1 // indirect
36+
github.com/prometheus/procfs v0.7.3 // indirect
37+
github.com/spaolacci/murmur3 v1.1.0 // indirect
38+
github.com/ugorji/go/codec v1.2.7 // indirect
39+
go.uber.org/atomic v1.7.0 // indirect
40+
go.uber.org/automaxprocs v1.5.1 // indirect
41+
go.uber.org/multierr v1.6.0 // indirect
42+
go.uber.org/zap v1.21.0 // indirect
43+
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect
44+
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
45+
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
46+
golang.org/x/text v0.3.7 // indirect
47+
google.golang.org/genproto v0.0.0-20220504150022-98cd25cafc72 // indirect
48+
google.golang.org/grpc v1.46.2 // indirect
49+
google.golang.org/protobuf v1.28.0 // indirect
50+
gopkg.in/yaml.v2 v2.4.0 // indirect
51+
)
52+
53+
replace github.com/TarsCloud/TarsGo => ../../

0 commit comments

Comments
 (0)