Skip to content

Commit f2773ea

Browse files
feat: supports nacos service discovery (#651)
* support nacos application discovery * delete debug info * format import * add some comments * fix golangci-lint error * delete todo * fix some comment * update * change bool to struct{}
1 parent e77c3aa commit f2773ea

9 files changed

+432
-57
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package nacos
19+
20+
import (
21+
"strings"
22+
"sync"
23+
"time"
24+
)
25+
26+
import (
27+
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
28+
"github.com/nacos-group/nacos-sdk-go/vo"
29+
)
30+
31+
import (
32+
common2 "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
33+
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
34+
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
35+
"github.com/apache/dubbo-go-pixiu/pkg/logger"
36+
"github.com/apache/dubbo-go-pixiu/pkg/model"
37+
)
38+
39+
var _ registry.Listener = new(nacosAppListener)
40+
41+
type nacosAppListener struct {
42+
exit chan struct{}
43+
client naming_client.INamingClient
44+
regConf *model.Registry
45+
reg *NacosRegistry
46+
wg sync.WaitGroup
47+
addr string
48+
adapterListener common2.RegistryEventListener
49+
appInfoMap map[string]*applicationInfo
50+
}
51+
52+
// newNacosAppListener returns a new nacosAppListener with pre-defined path according to the registered type.
53+
func newNacosAppListener(client naming_client.INamingClient, reg *NacosRegistry, regConf *model.Registry, adapterListener common2.RegistryEventListener) registry.Listener {
54+
return &nacosAppListener{
55+
exit: make(chan struct{}),
56+
client: client,
57+
regConf: regConf,
58+
reg: reg,
59+
addr: regConf.Address,
60+
adapterListener: adapterListener,
61+
appInfoMap: map[string]*applicationInfo{},
62+
}
63+
}
64+
65+
func (n *nacosAppListener) Close() {
66+
close(n.exit)
67+
n.wg.Wait()
68+
}
69+
70+
func (n *nacosAppListener) WatchAndHandle() {
71+
n.wg.Add(1)
72+
go n.watch()
73+
}
74+
75+
func (n *nacosAppListener) watch() {
76+
defer n.wg.Done()
77+
var (
78+
failTimes int64 = 0
79+
delayTimer = time.NewTimer(ConnDelay * time.Duration(failTimes))
80+
)
81+
defer delayTimer.Stop()
82+
for {
83+
serviceList, err := n.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
84+
GroupName: n.regConf.Group,
85+
NameSpace: n.regConf.Namespace,
86+
PageSize: 100,
87+
})
88+
if err != nil {
89+
failTimes++
90+
logger.Infof("watching nacos interface with error{%v}", err)
91+
// Exit the watch if root node is in error
92+
// TODO: do not use zookeeper error
93+
if err == zookeeper.ErrNilNode {
94+
logger.Errorf("watching nacos services got errNilNode,so exit listen")
95+
return
96+
}
97+
if failTimes > MaxFailTimes {
98+
logger.Errorf("Error happens on nacos exceed max fail times: %s,so exit listen", MaxFailTimes)
99+
return
100+
}
101+
delayTimer.Reset(ConnDelay * time.Duration(failTimes))
102+
<-delayTimer.C
103+
continue
104+
}
105+
failTimes = 0
106+
if err := n.updateServiceList(serviceList.Doms); err != nil {
107+
logger.Errorf("update service list failed %s", err)
108+
}
109+
time.Sleep(time.Second * 5)
110+
}
111+
}
112+
113+
type applicationInfo struct {
114+
appName string
115+
listener *appServiceListener
116+
}
117+
118+
func (a *applicationInfo) String() string {
119+
return a.appName
120+
}
121+
122+
func fromServiceKey(serviceKey string) *applicationInfo {
123+
// if serviceKey contains ":" means it is a interface registry
124+
// we should ignore it
125+
if strings.Contains(serviceKey, ":") {
126+
return nil
127+
}
128+
return &applicationInfo{
129+
appName: serviceKey,
130+
}
131+
}
132+
133+
func (n *nacosAppListener) updateServiceList(serviceList []string) error {
134+
// add new service info and watch
135+
newServiceMap := make(map[string]struct{}, len(serviceList))
136+
137+
for _, v := range serviceList {
138+
appInfo := fromServiceKey(v)
139+
if appInfo == nil {
140+
// ignore interface registry
141+
continue
142+
}
143+
key := appInfo.String()
144+
newServiceMap[key] = struct{}{}
145+
if _, ok := n.appInfoMap[key]; !ok {
146+
l := newNacosAppSrvListener(n.client, n.adapterListener)
147+
l.wg.Add(1)
148+
149+
appInfo.listener = l
150+
n.appInfoMap[key] = appInfo
151+
152+
sub := &vo.SubscribeParam{
153+
ServiceName: appInfo.appName,
154+
SubscribeCallback: l.Callback,
155+
GroupName: n.regConf.Group,
156+
}
157+
158+
if err := n.client.Subscribe(sub); err != nil {
159+
logger.Errorf("subscribe listener with interfaceKey = %s, error = %s", l, err)
160+
}
161+
l.wg.Done()
162+
}
163+
}
164+
165+
// handle deleted service
166+
for k, v := range n.appInfoMap {
167+
if _, ok := newServiceMap[k]; !ok {
168+
delete(n.appInfoMap, k)
169+
v.listener.Close()
170+
}
171+
}
172+
173+
return nil
174+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package nacos
19+
20+
import (
21+
"fmt"
22+
"reflect"
23+
"strconv"
24+
"strings"
25+
"sync"
26+
)
27+
28+
import (
29+
dubboCommon "dubbo.apache.org/dubbo-go/v3/common"
30+
dubboConst "dubbo.apache.org/dubbo-go/v3/common/constant"
31+
dr "dubbo.apache.org/dubbo-go/v3/registry"
32+
"dubbo.apache.org/dubbo-go/v3/registry/servicediscovery"
33+
"dubbo.apache.org/dubbo-go/v3/remoting"
34+
"github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
35+
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
36+
nacosModel "github.com/nacos-group/nacos-sdk-go/model"
37+
)
38+
39+
import (
40+
common2 "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
41+
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
42+
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
43+
"github.com/apache/dubbo-go-pixiu/pkg/logger"
44+
)
45+
46+
var _ registry.Listener = new(appServiceListener)
47+
48+
type appServiceListener struct {
49+
client naming_client.INamingClient
50+
instanceMap map[string]nacosModel.Instance
51+
cacheLock sync.Mutex
52+
53+
exit chan struct{}
54+
wg sync.WaitGroup
55+
adapterListener common2.RegistryEventListener
56+
}
57+
58+
func newNacosAppSrvListener(client naming_client.INamingClient, adapterListener common2.RegistryEventListener) *appServiceListener {
59+
return &appServiceListener{
60+
client: client,
61+
exit: make(chan struct{}),
62+
adapterListener: adapterListener,
63+
instanceMap: map[string]nacosModel.Instance{},
64+
}
65+
}
66+
67+
func (l *appServiceListener) WatchAndHandle() {
68+
panic("implement me")
69+
}
70+
71+
func (l *appServiceListener) Close() {
72+
close(l.exit)
73+
l.wg.Wait()
74+
}
75+
76+
func (l *appServiceListener) Callback(services []nacosModel.SubscribeService, err error) {
77+
if err != nil {
78+
logger.Errorf("nacos subscribe callback error:%s", err.Error())
79+
return
80+
}
81+
82+
addInstances := make([]nacosModel.Instance, 0, len(services))
83+
delInstances := make([]nacosModel.Instance, 0, len(services))
84+
updateInstances := make([]nacosModel.Instance, 0, len(services))
85+
newInstanceMap := make(map[string]nacosModel.Instance, len(services))
86+
87+
l.cacheLock.Lock()
88+
defer l.cacheLock.Unlock()
89+
for i := range services {
90+
if !services[i].Enable {
91+
// instance is not available, so ignore it
92+
continue
93+
}
94+
host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port))
95+
services[i].ServiceName = handleServiceName(services[i].ServiceName)
96+
instance := generateInstance(services[i])
97+
newInstanceMap[host] = instance
98+
if old, ok := l.instanceMap[host]; ok {
99+
// instance does not exist in cache, add it to cache
100+
addInstances = append(addInstances, instance)
101+
} else {
102+
if !reflect.DeepEqual(old, instance) {
103+
// instance is not different from cache, update it to cache
104+
updateInstances = append(updateInstances, instance)
105+
}
106+
}
107+
}
108+
109+
for host, inst := range l.instanceMap {
110+
if _, ok := newInstanceMap[host]; !ok {
111+
// cache instance does not exist in new instance list, remove it from cache
112+
delInstances = append(delInstances, inst)
113+
}
114+
}
115+
116+
l.instanceMap = newInstanceMap
117+
for i := range addInstances {
118+
newURLs := l.getURLs(addInstances[i])
119+
for _, url := range newURLs {
120+
l.handle(url, remoting.EventTypeAdd)
121+
}
122+
}
123+
for i := range delInstances {
124+
newURLs := l.getURLs(delInstances[i])
125+
for _, url := range newURLs {
126+
l.handle(url, remoting.EventTypeDel)
127+
}
128+
}
129+
for i := range updateInstances {
130+
newURLs := l.getURLs(updateInstances[i])
131+
for _, url := range newURLs {
132+
l.handle(url, remoting.EventTypeUpdate)
133+
}
134+
}
135+
}
136+
137+
func (l *appServiceListener) handle(url *dubboCommon.URL, action remoting.EventType) {
138+
logger.Infof("update begin, service event : %v %v", action, url)
139+
140+
// NOTE: _ is methods, we can not get methods by application discovery
141+
bkConfig, _, location, err := registry.ParseDubboString(url.String())
142+
if err != nil {
143+
logger.Errorf("parse dubbo url error = %s", err)
144+
return
145+
}
146+
147+
apiPattern := registry.GetAPIPattern(bkConfig)
148+
mappingParams := []config.MappingParam{
149+
{
150+
Name: "requestBody.values",
151+
MapTo: "opt.values",
152+
},
153+
{
154+
Name: "requestBody.types",
155+
MapTo: "opt.types",
156+
},
157+
}
158+
159+
api := registry.CreateAPIConfig(apiPattern, location, bkConfig, constant.AnyValue, mappingParams)
160+
if action == remoting.EventTypeDel {
161+
if err := l.adapterListener.OnRemoveAPI(api); err != nil {
162+
logger.Errorf("Error={%s} happens when try to remove api %s", err.Error(), api.Path)
163+
return
164+
}
165+
} else {
166+
if err := l.adapterListener.OnAddAPI(api); err != nil {
167+
logger.Errorf("Error={%s} happens when try to add api %s", err.Error(), api.Path)
168+
return
169+
}
170+
}
171+
}
172+
173+
func (l *appServiceListener) getURLs(nmis nacosModel.Instance) []*dubboCommon.URL {
174+
instance := toNacosInstance(nmis)
175+
metadata := instance.GetMetadata()
176+
metadataInfo, err := servicediscovery.GetMetadataInfo(instance.GetServiceName(), instance, metadata[dubboConst.ExportedServicesRevisionPropertyName])
177+
if err != nil {
178+
logger.Errorf("get instance metadata info error %v", err.Error())
179+
return nil
180+
}
181+
instance.SetServiceMetadata(metadataInfo)
182+
urls := make([]*dubboCommon.URL, 0, len(metadataInfo.Services))
183+
for _, service := range metadataInfo.Services {
184+
urls = append(urls, instance.ToURLs(service)...)
185+
}
186+
return urls
187+
}
188+
189+
// toNacosInstance convert to registry's service instance
190+
func toNacosInstance(nmis nacosModel.Instance) dr.ServiceInstance {
191+
md := make(map[string]string, len(nmis.Metadata))
192+
for k, v := range nmis.Metadata {
193+
md[k] = fmt.Sprint(v)
194+
}
195+
return &dr.DefaultServiceInstance{
196+
ID: nmis.InstanceId,
197+
ServiceName: nmis.ServiceName,
198+
Host: nmis.Ip,
199+
Port: int(nmis.Port),
200+
Enable: nmis.Enable,
201+
Healthy: nmis.Healthy,
202+
Metadata: md,
203+
}
204+
}
205+
206+
// group@@serviceName convert to serviceName
207+
func handleServiceName(serviceName string) string {
208+
parts := strings.Split(serviceName, "@@")
209+
if len(parts) > 1 {
210+
return parts[1]
211+
}
212+
return ""
213+
}

0 commit comments

Comments
 (0)