diff --git a/clients/cache/simple_cache.go b/clients/cache/simple_cache.go new file mode 100644 index 00000000..5886652d --- /dev/null +++ b/clients/cache/simple_cache.go @@ -0,0 +1,177 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cache + +import ( + "sync" +) + +type ICache[K comparable, V any] interface { + Load(key K) (V, bool) + Store(key K, value V) + LoadOrStore(key K, value V) (V, bool) + LoadOrStoreFunc(key K, apply func() V) (V, bool) + LoadAndDelete(key K) (V, bool) + Delete(key K) + Range(func(key K, value V) bool) + Size() int + Empty() bool +} + +type IComputeCache[K comparable, V any] interface { + ICache[K, V] + Compute(key K, apply func(value V) V) V + ComputeIfAbsent(key K, apply func() V) V + ComputeIfPresent(key K, apply func(value V) V) V +} + +// SimpleCache k,v must both be comparable +type SimpleCache[K comparable, V any] struct { + locker sync.RWMutex + m sync.Map +} + +func NewCache[K comparable, V any]() *SimpleCache[K, V] { + return &SimpleCache[K, V]{} +} + +func (s *SimpleCache[K, V]) Load(key K) (V, bool) { + value, ok := s.m.Load(key) + if ok { + return value.(V), ok + } + var empty V + return empty, ok +} + +func (s *SimpleCache[K, V]) Store(key K, value V) { + s.locker.RLock() + defer s.locker.RUnlock() + s.m.Store(key, value) +} + +func (s *SimpleCache[K, V]) LoadOrStore(key K, value V) (V, bool) { + s.locker.RLock() + defer s.locker.RUnlock() + actual, loaded := s.m.LoadOrStore(key, value) + return actual.(V), loaded + +} + +func (s *SimpleCache[K, V]) LoadOrStoreFunc(key K, apply func() V) (V, bool) { + actual, loaded := s.m.Load(key) + if loaded { + return actual.(V), loaded + } + s.locker.Lock() + defer s.locker.Unlock() + actual, loaded = s.m.Load(key) + if loaded { + return actual.(V), loaded + } + value := apply() + s.m.Store(key, value) + return value, loaded + +} + +func (s *SimpleCache[K, V]) LoadAndDelete(key K) (V, bool) { + s.locker.RLock() + defer s.locker.RUnlock() + value, loaded := s.m.LoadAndDelete(key) + if loaded { + return value.(V), loaded + } + var empty V + return empty, loaded +} + +func (s *SimpleCache[K, V]) Delete(key K) { + s.locker.RLock() + defer s.locker.RUnlock() + s.m.Delete(key) +} + +func (s *SimpleCache[K, V]) Range(f func(key K, value V) bool) { + s.m.Range(func(key, value any) bool { + return f(key.(K), value.(V)) + }) +} + +func (s *SimpleCache[K, V]) Size() int { + count := 0 + s.m.Range(func(key, value any) bool { + count++ + return true + }) + return count +} + +func (s *SimpleCache[K, V]) Empty() bool { + empty := true + s.m.Range(func(key, value any) bool { + empty = false + return false + }) + return empty +} +func (s *SimpleCache[K, V]) Compute(key K, apply func(value V) V) V { + s.locker.Lock() + defer s.locker.Unlock() + old, ok := s.m.Load(key) + var empty, newValue V + if !ok { + newValue = apply(empty) + } else { + newValue = apply(old.(V)) + } + s.m.Store(key, newValue) + return newValue +} + +func (s *SimpleCache[K, V]) ComputeIfAbsent(key K, apply func() V) V { + old, ok := s.m.Load(key) + if ok { + return old.(V) + } + s.locker.Lock() + defer s.locker.Unlock() + old, ok = s.m.Load(key) + if ok { + return old.(V) + } + newValue := apply() + s.m.Store(key, newValue) + return newValue +} + +func (s *SimpleCache[K, V]) ComputeIfPresent(key K, apply func(value V) V) V { + var empty V + _, ok := s.m.Load(key) + if !ok { + return empty + } + s.locker.Lock() + defer s.locker.Unlock() + old, ok := s.m.Load(key) + if !ok { + return empty + } + newValue := apply(old.(V)) + s.m.Store(key, newValue) + return newValue +} diff --git a/clients/cache/simple_cache_test.go b/clients/cache/simple_cache_test.go new file mode 100644 index 00000000..5111b0bd --- /dev/null +++ b/clients/cache/simple_cache_test.go @@ -0,0 +1,321 @@ +package cache + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +type valueDemo struct { + name string +} + +var cacheNotPoint IComputeCache[string, string] +var cachePoint IComputeCache[string, *valueDemo] + +func TestMain(t *testing.M) { + cacheNotPoint = NewCache[string, string]() + cachePoint = NewCache[string, *valueDemo]() + t.Run() +} + +func TestComputeNotPoint(t *testing.T) { + defer cacheNotPoint.Delete("TestComputeNotPoint") + v := "not empty" + count := 0 + computeFunc := func(value string) string { + count++ + if len(value) == 0 { + return v + } + return value + } + result1 := cacheNotPoint.Compute("TestComputeNotPoint", computeFunc) + result2 := cacheNotPoint.Compute("TestComputeNotPoint", computeFunc) + at := assert.New(t) + at.EqualValues(2, count) + at.EqualValues(result1, result2) + at.EqualValues(result1, v) +} + +func TestComputePoint(t *testing.T) { + defer cachePoint.Delete("TestComputePoint") + v := &valueDemo{name: "init"} + count := 0 + computeFunc := func(value *valueDemo) *valueDemo { + count++ + if value == nil { + return v + } + return value + } + result1 := cachePoint.Compute("TestComputePoint", computeFunc) + result2 := cachePoint.Compute("TestComputePoint", computeFunc) + at := assert.New(t) + at.EqualValues(2, count) + at.Equal(result1, result2) + at.Equal(result1, v) + v.name = "finish" + at.Equal(v, result1) +} + +func TestComputeIfAbsentNotPoint(t *testing.T) { + defer cacheNotPoint.Delete("TestComputeIfAbsentNotPoint1") + defer cacheNotPoint.Delete("TestComputeIfAbsentNotPoint2") + init := "init" + v := "computed" + count := 0 + computeFunc := func() string { + count++ + return v + } + cacheNotPoint.Store("TestComputeIfAbsentNotPoint1", init) + result1 := cacheNotPoint.ComputeIfAbsent("TestComputeIfAbsentNotPoint1", computeFunc) + result2 := cacheNotPoint.ComputeIfAbsent("TestComputeIfAbsentNotPoint2", computeFunc) + at := assert.New(t) + at.EqualValues(1, count) + at.EqualValues(init, result1) + at.NotEqualValues(result1, v) + at.EqualValues(v, result2) +} + +func TestComputeIfAbsentPoint(t *testing.T) { + defer cachePoint.Delete("TestComputeIfAbsentPoint1") + defer cachePoint.Delete("TestComputeIfAbsentPoint2") + init := &valueDemo{name: "init"} + cachePoint.Store("TestComputeIfAbsentPoint1", init) + compute := &valueDemo{name: "compute"} + count := 0 + computeFunc := func() *valueDemo { + count++ + return compute + } + result1 := cachePoint.ComputeIfAbsent("TestComputeIfAbsentPoint1", computeFunc) + result2 := cachePoint.ComputeIfAbsent("TestComputeIfAbsentPoint2", computeFunc) + at := assert.New(t) + at.EqualValues(1, count) + at.Equal(result1, init) + at.Equal(result2, compute) + compute.name = "finish" + at.Equal(compute, result2) +} + +func TestComputeIfPresentNotPoint(t *testing.T) { + defer cacheNotPoint.Delete("TestComputeIfPresentNotPoint1") + defer cacheNotPoint.Delete("TestComputeIfPresentNotPoint2") + init := "init" + v := "computed" + count := 0 + computeFunc := func(value string) string { + count++ + return value + v + } + cacheNotPoint.Store("TestComputeIfPresentNotPoint1", init) + result1 := cacheNotPoint.ComputeIfPresent("TestComputeIfPresentNotPoint1", computeFunc) + result2 := cacheNotPoint.ComputeIfPresent("TestComputeIfPresentNotPoint2", computeFunc) + at := assert.New(t) + at.EqualValues(1, count) + at.EqualValues(init+v, result1) + at.Empty(result2) +} + +func TestComputeIfPresentPoint(t *testing.T) { + defer cachePoint.Delete("TestComputeIfPresentPoint1") + defer cachePoint.Delete("TestComputeIfPresentPoint2") + init := &valueDemo{name: "init"} + cachePoint.Store("TestComputeIfPresentPoint1", init) + count := 0 + computeFunc := func(v *valueDemo) *valueDemo { + count++ + v.name = "compute" + return v + } + result1 := cachePoint.ComputeIfPresent("TestComputeIfPresentPoint1", computeFunc) + result2 := cachePoint.ComputeIfPresent("TestComputeIfPresentPoint2", computeFunc) + at := assert.New(t) + at.EqualValues(1, count) + at.Equal(init, result1) + at.Nil(result2) + init.name = "finish" + at.Equal(init, result1) +} + +func TestCRUDNotPoint(t *testing.T) { + var empty string + load, ok := cacheNotPoint.Load("TestCRUDNotPoint1") + at := assert.New(t) + at.EqualValues(empty, load) + at.False(ok) + at.Equal(0, cacheNotPoint.Size()) + at.True(cacheNotPoint.Empty()) + + cacheNotPoint.Store("TestCRUDNotPoint1", "1") + load, ok = cacheNotPoint.Load("TestCRUDNotPoint1") + at.True(ok) + at.Equal("1", load) + at.Equal(1, cacheNotPoint.Size()) + at.False(cacheNotPoint.Empty()) + cacheNotPoint.Delete("TestCRUDNotPoint1") + at.Equal(0, cacheNotPoint.Size()) + at.True(cacheNotPoint.Empty()) + + load, ok = cacheNotPoint.Load("TestCRUDNotPoint1") + at.EqualValues(empty, load) + at.False(ok) + + at.NotPanics(func() { cacheNotPoint.Delete("TestCRUDNotPoint2") }) + at.Equal(0, cacheNotPoint.Size()) + at.True(cacheNotPoint.Empty()) +} + +func TestCRUDPoint(t *testing.T) { + load, ok := cachePoint.Load("TestCRUDPoint1") + at := assert.New(t) + at.Nil(load) + at.False(ok) + at.Equal(0, cachePoint.Size()) + at.True(cachePoint.Empty()) + + v := &valueDemo{name: "init"} + cachePoint.Store("TestCRUDPoint1", v) + load, ok = cachePoint.Load("TestCRUDPoint1") + at.True(ok) + at.Equal(v, load) + at.Equal(1, cachePoint.Size()) + at.False(cachePoint.Empty()) + + cachePoint.Delete("TestCRUDPoint1") + at.Equal(0, cachePoint.Size()) + at.True(cachePoint.Empty()) + + load, ok = cachePoint.Load("TestCRUDPoint1") + at.Nil(load) + at.False(ok) + + at.NotPanics(func() { cachePoint.Delete("TestCRUDPoint2") }) + at.Equal(0, cachePoint.Size()) + at.True(cachePoint.Empty()) +} + +func TestLoadAndOpsNotPoint(t *testing.T) { + at := assert.New(t) + value, loaded := cacheNotPoint.LoadOrStore("TestLoadAndOpsNotPoint1", "1") + at.False(loaded) + at.Equal("1", value) + + value, loaded = cacheNotPoint.LoadOrStore("TestLoadAndOpsNotPoint1", "2") + at.True(loaded) + at.Equal("1", value) + + value, deleted := cacheNotPoint.LoadAndDelete("TestLoadAndOpsNotPoint1") + at.True(deleted) + at.Equal("1", value) + + value, deleted = cacheNotPoint.LoadAndDelete("TestLoadAndOpsNotPoint1") + at.False(deleted) + at.Empty(value) + + value, deleted = cacheNotPoint.LoadAndDelete("TestLoadAndOpsNotPoint2") + at.False(deleted) + at.Empty(value) + + value, loaded = cacheNotPoint.LoadOrStoreFunc("TestLoadAndOpsNotPoint1", func() string { + return "2" + }) + at.False(loaded) + at.Equal("2", value) + + value, loaded = cacheNotPoint.LoadOrStoreFunc("TestLoadAndOpsNotPoint1", func() string { + return "3" + }) + at.True(loaded) + at.Equal("2", value) + cacheNotPoint.Delete("TestLoadAndOpsNotPoint1") +} + +func TestLoadAndOpsPoint(t *testing.T) { + at := assert.New(t) + init := &valueDemo{name: "init"} + changed := &valueDemo{name: "changed"} + value, loaded := cachePoint.LoadOrStore("TestLoadAndOpsPoint1", init) + at.False(loaded) + at.Equal(init, value) + + value, loaded = cachePoint.LoadOrStore("TestLoadAndOpsPoint1", changed) + at.True(loaded) + at.Equal(init, value) + + value, deleted := cachePoint.LoadAndDelete("TestLoadAndOpsPoint1") + at.True(deleted) + at.Equal(init, value) + + value, deleted = cachePoint.LoadAndDelete("TestLoadAndOpsPoint1") + at.False(deleted) + at.Nil(value) + + value, deleted = cachePoint.LoadAndDelete("TestLoadAndOpsPoint2") + at.False(deleted) + at.Nil(value) + + value, loaded = cachePoint.LoadOrStoreFunc("TestLoadAndOpsPoint1", func() *valueDemo { + return init + }) + at.False(loaded) + at.Equal(init, value) + + value, loaded = cachePoint.LoadOrStoreFunc("TestLoadAndOpsPoint1", func() *valueDemo { + return changed + }) + at.True(loaded) + at.Equal(init, value) + cachePoint.Delete("TestLoadAndOpsPoint1") +} + +func TestRangeNotPoint(t *testing.T) { + at := assert.New(t) + called := 0 + rangeFunc := func(key string, value string) bool { + called++ + return true + } + cacheNotPoint.Range(rangeFunc) + + at.Zero(called) + + cacheNotPoint.Store("TestRangeNotPoint1", "1") + cacheNotPoint.Range(rangeFunc) + at.Equal(1, called) + + cacheNotPoint.Store("TestRangeNotPoint2", "2") + at.Equal(2, cacheNotPoint.Size()) + called = 0 + cacheNotPoint.Range(func(key string, value string) bool { + called++ + return false + }) + at.Equal(1, called) +} + +func TestRangePoint(t *testing.T) { + at := assert.New(t) + called := 0 + rangeFunc := func(key string, value *valueDemo) bool { + called++ + return true + } + cachePoint.Range(rangeFunc) + + at.Zero(called) + + cachePoint.Store("TestRangePoint1", &valueDemo{}) + cachePoint.Range(rangeFunc) + at.Equal(1, called) + + cachePoint.Store("TestRangePoint2", &valueDemo{}) + at.Equal(2, cachePoint.Size()) + called = 0 + cachePoint.Range(func(key string, value *valueDemo) bool { + called++ + return false + }) + at.Equal(1, called) +} diff --git a/clients/naming_client/naming_cache/service_info_holder.go b/clients/naming_client/naming_cache/service_info_holder.go index 5ddaf0f6..66e26f04 100644 --- a/clients/naming_client/naming_cache/service_info_holder.go +++ b/clients/naming_client/naming_cache/service_info_holder.go @@ -17,6 +17,7 @@ package naming_cache import ( + "github.com/nacos-group/nacos-sdk-go/v2/vo" "os" "reflect" "sort" @@ -115,11 +116,11 @@ func (s *ServiceInfoHolder) GetServiceInfo(serviceName, groupName, clusters stri return model.Service{}, ok } -func (s *ServiceInfoHolder) RegisterCallback(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) { +func (s *ServiceInfoHolder) RegisterCallback(serviceName string, clusters string, callbackFunc *vo.SubscribeCallbackFunc) { s.subCallback.AddCallbackFunc(serviceName, clusters, callbackFunc) } -func (s *ServiceInfoHolder) DeregisterCallback(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) { +func (s *ServiceInfoHolder) DeregisterCallback(serviceName string, clusters string, callbackFunc *vo.SubscribeCallbackFunc) { s.subCallback.RemoveCallbackFunc(serviceName, clusters, callbackFunc) } diff --git a/clients/naming_client/naming_cache/subscribe_callback.go b/clients/naming_client/naming_cache/subscribe_callback.go index 1f4964db..b4866b28 100644 --- a/clients/naming_client/naming_cache/subscribe_callback.go +++ b/clients/naming_client/naming_cache/subscribe_callback.go @@ -17,6 +17,7 @@ package naming_cache import ( + "github.com/nacos-group/nacos-sdk-go/v2/vo" "sync" "github.com/nacos-group/nacos-sdk-go/v2/clients/cache" @@ -26,53 +27,60 @@ import ( ) type SubscribeCallback struct { - callbackFuncMap cache.ConcurrentMap + callbackFuncMap cache.ICache[string, []*vo.SubscribeCallbackFunc] mux *sync.Mutex } func NewSubscribeCallback() *SubscribeCallback { - return &SubscribeCallback{callbackFuncMap: cache.NewConcurrentMap(), mux: new(sync.Mutex)} + return &SubscribeCallback{callbackFuncMap: cache.NewCache[string, []*vo.SubscribeCallbackFunc](), mux: new(sync.Mutex)} } func (ed *SubscribeCallback) IsSubscribed(serviceName, clusters string) bool { key := util.GetServiceCacheKey(serviceName, clusters) - _, ok := ed.callbackFuncMap.Get(key) + _, ok := ed.callbackFuncMap.Load(key) return ok } -func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) { +func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string, callbackFunc *vo.SubscribeCallbackFunc) { key := util.GetServiceCacheKey(serviceName, clusters) defer ed.mux.Unlock() ed.mux.Lock() - var funcSlice []*func(services []model.Instance, err error) - old, ok := ed.callbackFuncMap.Get(key) + var funcSlice []*vo.SubscribeCallbackFunc + old, ok := ed.callbackFuncMap.Load(key) if ok { - funcSlice = append(funcSlice, old.([]*func(services []model.Instance, err error))...) + funcSlice = append(funcSlice, old...) } funcSlice = append(funcSlice, callbackFunc) - ed.callbackFuncMap.Set(key, funcSlice) + ed.callbackFuncMap.Store(key, funcSlice) } -func (ed *SubscribeCallback) RemoveCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) { +func (ed *SubscribeCallback) RemoveCallbackFunc(serviceName string, clusters string, callbackFunc *vo.SubscribeCallbackFunc) { logger.Info("removing " + serviceName + " with " + clusters + " to listener map") key := util.GetServiceCacheKey(serviceName, clusters) - funcs, ok := ed.callbackFuncMap.Get(key) - if ok && funcs != nil { - var newFuncs []*func(services []model.Instance, err error) - for _, funcItem := range funcs.([]*func(services []model.Instance, err error)) { + funcs, ok := ed.callbackFuncMap.Load(key) + if ok { + var newFuncs []*vo.SubscribeCallbackFunc + for _, funcItem := range funcs { if funcItem != callbackFunc { newFuncs = append(newFuncs, funcItem) } } - ed.callbackFuncMap.Set(key, newFuncs) + if len(newFuncs) == 0 { + ed.callbackFuncMap.Delete(key) + return + } + ed.callbackFuncMap.Store(key, newFuncs) } } func (ed *SubscribeCallback) ServiceChanged(cacheKey string, service *model.Service) { - funcs, ok := ed.callbackFuncMap.Get(cacheKey) + funcs, ok := ed.callbackFuncMap.Load(cacheKey) if ok { - for _, funcItem := range funcs.([]*func(services []model.Instance, err error)) { + for _, funcItem := range funcs { + if *funcItem == nil { + continue + } (*funcItem)(service.Hosts, nil) } } diff --git a/clients/naming_client/naming_cache/subscribe_callback_test.go b/clients/naming_client/naming_cache/subscribe_callback_test.go index fce3354a..3f07de60 100644 --- a/clients/naming_client/naming_cache/subscribe_callback_test.go +++ b/clients/naming_client/naming_cache/subscribe_callback_test.go @@ -30,65 +30,26 @@ import ( ) func TestEventDispatcher_AddCallbackFuncs(t *testing.T) { - service := model.Service{ - Clusters: strings.Join([]string{"default"}, ","), - CacheMillis: 10000, - Checksum: "abcd", - LastRefTime: uint64(time.Now().Unix()), - } - var hosts []model.Instance - host := model.Instance{ - Enable: true, - InstanceId: "123", - Port: 8080, - Ip: "127.0.0.1", - Weight: 10, - ServiceName: "public@@Test", - ClusterName: strings.Join([]string{"default"}, ","), - } - hosts = append(hosts, host) - service.Hosts = hosts - ed := NewSubscribeCallback() param := vo.SubscribeParam{ ServiceName: "Test", Clusters: []string{"default"}, GroupName: "public", SubscribeCallback: func(services []model.Instance, err error) { - fmt.Println(util.ToJsonString(ed.callbackFuncMap)) + fmt.Println(util.ToJsonString(services)) }, } ed.AddCallbackFunc(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), ¶m.SubscribeCallback) key := util.GetServiceCacheKey(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ",")) - for k, v := range ed.callbackFuncMap.Items() { + ed.callbackFuncMap.Range(func(k string, value []*vo.SubscribeCallbackFunc) bool { assert.Equal(t, key, k, "key should be equal!") - funcs := v.([]*func(services []model.Instance, err error)) - assert.Equal(t, len(funcs), 1) - assert.Equal(t, funcs[0], ¶m.SubscribeCallback, "callback function must be equal!") - - } + assert.Equal(t, len(value), 1) + assert.Equal(t, (value)[0], ¶m.SubscribeCallback, "callback function must be equal!") + return true + }) } func TestEventDispatcher_RemoveCallbackFuncs(t *testing.T) { - service := model.Service{ - Clusters: strings.Join([]string{"default"}, ","), - CacheMillis: 10000, - Checksum: "abcd", - LastRefTime: uint64(time.Now().Unix()), - } - var hosts []model.Instance - host := model.Instance{ - Enable: true, - InstanceId: "123", - Port: 8080, - Ip: "127.0.0.1", - Weight: 10, - ServiceName: "public@@Test", - ClusterName: strings.Join([]string{"default"}, ","), - } - hosts = append(hosts, host) - service.Hosts = hosts - ed := NewSubscribeCallback() param := vo.SubscribeParam{ ServiceName: "Test", @@ -99,7 +60,7 @@ func TestEventDispatcher_RemoveCallbackFuncs(t *testing.T) { }, } ed.AddCallbackFunc(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), ¶m.SubscribeCallback) - assert.Equal(t, len(ed.callbackFuncMap.Items()), 1, "callback funcs map length should be 1") + assert.Equal(t, ed.callbackFuncMap.Size(), 1, "callback funcs map length should be 1") param2 := vo.SubscribeParam{ ServiceName: "Test", @@ -110,22 +71,22 @@ func TestEventDispatcher_RemoveCallbackFuncs(t *testing.T) { }, } ed.AddCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), ¶m2.SubscribeCallback) - assert.Equal(t, len(ed.callbackFuncMap.Items()), 1, "callback funcs map length should be 2") + assert.Equal(t, ed.callbackFuncMap.Size(), 1, "callback funcs map length should be 2") - for k, v := range ed.callbackFuncMap.Items() { - log.Printf("key:%s,%d", k, len(v.([]*func(services []model.Instance, err error)))) - } + ed.callbackFuncMap.Range(func(k string, v []*vo.SubscribeCallbackFunc) bool { + log.Printf("key:%s,%d", k, len(v)) + return true + }) ed.RemoveCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), ¶m2.SubscribeCallback) key := util.GetServiceCacheKey(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ",")) - for k, v := range ed.callbackFuncMap.Items() { + ed.callbackFuncMap.Range(func(k string, v []*vo.SubscribeCallbackFunc) bool { assert.Equal(t, key, k, "key should be equal!") - funcs := v.([]*func(services []model.Instance, err error)) - assert.Equal(t, len(funcs), 1) - assert.Equal(t, funcs[0], ¶m.SubscribeCallback, "callback function must be equal!") - - } + assert.Equal(t, len(v), 1) + assert.Equal(t, (v)[0], ¶m.SubscribeCallback, "callback function must be equal!") + return true + }) } func TestSubscribeCallback_ServiceChanged(t *testing.T) { @@ -170,6 +131,48 @@ func TestSubscribeCallback_ServiceChanged(t *testing.T) { }, } ed.AddCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), ¶m2.SubscribeCallback) - cacheKey := util.GetServiceCacheKey(util.GetGroupName(service.Name, service.GroupName), service.Clusters) + cacheKey := util.GetServiceCacheKey(service.Name, service.Clusters) ed.ServiceChanged(cacheKey, &service) } + +func TestServiceChangedWithNilCallback(t *testing.T) { + ed := NewSubscribeCallback() + param := vo.SubscribeParam{ + ServiceName: "Test", + Clusters: []string{"default"}, + GroupName: "public", + SubscribeCallback: func(services []model.Instance, err error) { + log.Printf("func1:%s \n", util.ToJsonString(services)) + }, + } + ed.AddCallbackFunc(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), ¶m.SubscribeCallback) + param1 := vo.SubscribeParam{ + ServiceName: "Test", + Clusters: []string{"default"}, + GroupName: "public", + SubscribeCallback: nil, + } + ed.AddCallbackFunc(util.GetGroupName(param1.ServiceName, param1.GroupName), strings.Join(param1.Clusters, ","), ¶m1.SubscribeCallback) + service := model.Service{ + Name: "public@@Test", + Clusters: strings.Join([]string{"default"}, ","), + CacheMillis: 10000, + Checksum: "abcd", + LastRefTime: uint64(time.Now().Unix()), + Hosts: []model.Instance{ + { + Enable: true, + InstanceId: "123", + Port: 8080, + Ip: "127.0.0.1", + Weight: 10, + ServiceName: "public@@Test", + ClusterName: strings.Join([]string{"default"}, ","), + }, + }, + } + at := assert.New(t) + at.NotPanics(func() { + ed.ServiceChanged(util.GetServiceCacheKey(service.Name, service.Clusters), &service) + }) +} diff --git a/clients/naming_client/naming_client.go b/clients/naming_client/naming_client.go index a2257e1a..d147b8ae 100644 --- a/clients/naming_client/naming_client.go +++ b/clients/naming_client/naming_client.go @@ -35,6 +35,8 @@ import ( "github.com/nacos-group/nacos-sdk-go/v2/vo" ) +var _ INamingClient = (*NamingClient)(nil) + // NamingClient ... type NamingClient struct { nacos_client.INacosClient @@ -92,9 +94,9 @@ func initLogger(clientConfig constant.ClientConfig) error { } // RegisterInstance ... -func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) { +func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) error { if param.ServiceName == "" { - return false, errors.New("serviceName cannot be empty!") + return errors.New("serviceName cannot be empty!") } if len(param.GroupName) == 0 { param.GroupName = constant.DEFAULT_GROUP @@ -115,20 +117,20 @@ func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, return sc.serviceProxy.RegisterInstance(param.ServiceName, param.GroupName, instance) } -func (sc *NamingClient) BatchRegisterInstance(param vo.BatchRegisterInstanceParam) (bool, error) { +func (sc *NamingClient) BatchRegisterInstance(param vo.BatchRegisterInstanceParam) error { if param.ServiceName == "" { - return false, errors.New("serviceName cannot be empty!") + return errors.New("serviceName cannot be empty!") } if len(param.GroupName) == 0 { param.GroupName = constant.DEFAULT_GROUP } if len(param.Instances) == 0 { - return false, errors.New("instances cannot be empty!") + return errors.New("instances cannot be empty!") } var modelInstances []model.Instance for _, param := range param.Instances { if !param.Ephemeral { - return false, errors.Errorf("Batch registration does not allow persistent instance registration! instance:%+v", param) + return errors.Errorf("Batch registration does not allow persistent instance registration! instance:%+v", param) } modelInstances = append(modelInstances, model.Instance{ Ip: param.Ip, @@ -146,7 +148,7 @@ func (sc *NamingClient) BatchRegisterInstance(param vo.BatchRegisterInstancePara } // DeregisterInstance ... -func (sc *NamingClient) DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error) { +func (sc *NamingClient) DeregisterInstance(param vo.DeregisterInstanceParam) error { if len(param.GroupName) == 0 { param.GroupName = constant.DEFAULT_GROUP } @@ -160,9 +162,9 @@ func (sc *NamingClient) DeregisterInstance(param vo.DeregisterInstanceParam) (bo } // UpdateInstance ... -func (sc *NamingClient) UpdateInstance(param vo.UpdateInstanceParam) (bool, error) { +func (sc *NamingClient) UpdateInstance(param vo.UpdateInstanceParam) error { if param.ServiceName == "" { - return false, errors.New("serviceName cannot be empty!") + return errors.New("serviceName cannot be empty!") } if len(param.GroupName) == 0 { param.GroupName = constant.DEFAULT_GROUP diff --git a/clients/naming_client/naming_client_interface.go b/clients/naming_client/naming_client_interface.go index a07675db..80b0e6e9 100644 --- a/clients/naming_client/naming_client_interface.go +++ b/clients/naming_client/naming_client_interface.go @@ -37,14 +37,14 @@ type INamingClient interface { // ServiceName require // GroupName optional,default:DEFAULT_GROUP // Ephemeral optional - RegisterInstance(param vo.RegisterInstanceParam) (bool, error) + RegisterInstance(param vo.RegisterInstanceParam) error // BatchRegisterInstance use to batch register instance // ClusterName optional,default:DEFAULT // ServiceName require // GroupName optional,default:DEFAULT_GROUP // Instances require,batch register instance list (serviceName, groupName in instances do not need to be set) - BatchRegisterInstance(param vo.BatchRegisterInstanceParam) (bool, error) + BatchRegisterInstance(param vo.BatchRegisterInstanceParam) error // DeregisterInstance use to deregister instance // Ip required @@ -54,7 +54,7 @@ type INamingClient interface { // ServiceName require // GroupName optional,default:DEFAULT_GROUP // Ephemeral optional - DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error) + DeregisterInstance(param vo.DeregisterInstanceParam) error // UpdateInstance use to update instance // Ip require @@ -67,7 +67,7 @@ type INamingClient interface { // ServiceName require // GroupName optional,default:DEFAULT_GROUP // Ephemeral optional - UpdateInstance(param vo.UpdateInstanceParam) (bool, error) + UpdateInstance(param vo.UpdateInstanceParam) error // GetService use to get service // ServiceName require diff --git a/clients/naming_client/naming_client_test.go b/clients/naming_client/naming_client_test.go index 58ac40e6..e63f2bff 100644 --- a/clients/naming_client/naming_client_test.go +++ b/clients/naming_client/naming_client_test.go @@ -17,6 +17,7 @@ package naming_client import ( + "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_proxy" "testing" "github.com/nacos-group/nacos-sdk-go/v2/common/http_agent" @@ -34,21 +35,24 @@ var clientConfigTest = *constant.NewClientConfig( constant.WithNotLoadCacheAtStart(true), ) -var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos")) +var ( + serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos")) + _ naming_proxy.INamingProxy = (*MockNamingProxy)(nil) +) type MockNamingProxy struct { } -func (m *MockNamingProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) { - return true, nil +func (m *MockNamingProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) error { + return nil } -func (m *MockNamingProxy) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) (bool, error) { - return true, nil +func (m *MockNamingProxy) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) error { + return nil } -func (m *MockNamingProxy) DeregisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) { - return true, nil +func (m *MockNamingProxy) DeregisterInstance(serviceName string, groupName string, instance model.Instance) error { + return nil } func (m *MockNamingProxy) GetServiceList(pageNo uint32, pageSize uint32, groupName, namespaceId string, selector *model.ExpressionSelector) (model.ServiceList, error) { @@ -83,18 +87,17 @@ func NewTestNamingClient() *NamingClient { return client } func Test_RegisterServiceInstance_withoutGroupName(t *testing.T) { - success, err := NewTestNamingClient().RegisterInstance(vo.RegisterInstanceParam{ + err := NewTestNamingClient().RegisterInstance(vo.RegisterInstanceParam{ ServiceName: "DEMO", Ip: "10.0.0.10", Port: 80, Ephemeral: false, }) assert.Equal(t, nil, err) - assert.Equal(t, true, success) } func Test_RegisterServiceInstance_withGroupName(t *testing.T) { - success, err := NewTestNamingClient().RegisterInstance(vo.RegisterInstanceParam{ + err := NewTestNamingClient().RegisterInstance(vo.RegisterInstanceParam{ ServiceName: "DEMO", Ip: "10.0.0.10", Port: 80, @@ -102,11 +105,10 @@ func Test_RegisterServiceInstance_withGroupName(t *testing.T) { Ephemeral: false, }) assert.Equal(t, nil, err) - assert.Equal(t, true, success) } func Test_RegisterServiceInstance_withCluster(t *testing.T) { - success, err := NewTestNamingClient().RegisterInstance(vo.RegisterInstanceParam{ + err := NewTestNamingClient().RegisterInstance(vo.RegisterInstanceParam{ ServiceName: "DEMO", Ip: "10.0.0.10", Port: 80, @@ -115,21 +117,19 @@ func Test_RegisterServiceInstance_withCluster(t *testing.T) { Ephemeral: false, }) assert.Equal(t, nil, err) - assert.Equal(t, true, success) } func TestNamingProxy_DeregisterService_WithoutGroupName(t *testing.T) { - success, err := NewTestNamingClient().DeregisterInstance(vo.DeregisterInstanceParam{ + err := NewTestNamingClient().DeregisterInstance(vo.DeregisterInstanceParam{ ServiceName: "DEMO5", Ip: "10.0.0.10", Port: 80, Ephemeral: true, }) assert.Equal(t, nil, err) - assert.Equal(t, true, success) } func TestNamingProxy_DeregisterService_WithGroupName(t *testing.T) { - success, err := NewTestNamingClient().DeregisterInstance(vo.DeregisterInstanceParam{ + err := NewTestNamingClient().DeregisterInstance(vo.DeregisterInstanceParam{ ServiceName: "DEMO6", Ip: "10.0.0.10", Port: 80, @@ -137,7 +137,6 @@ func TestNamingProxy_DeregisterService_WithGroupName(t *testing.T) { Ephemeral: true, }) assert.Equal(t, nil, err) - assert.Equal(t, true, success) } func TestNamingClient_SelectOneHealthyInstance_SameWeight(t *testing.T) { diff --git a/clients/naming_client/naming_grpc/connection_event_listener.go b/clients/naming_client/naming_grpc/connection_event_listener.go deleted file mode 100644 index c4f9937e..00000000 --- a/clients/naming_client/naming_grpc/connection_event_listener.go +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright 1999-2020 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package naming_grpc - -import ( - "strings" - - "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_proxy" - - "github.com/nacos-group/nacos-sdk-go/v2/clients/cache" - "github.com/nacos-group/nacos-sdk-go/v2/common/constant" - "github.com/nacos-group/nacos-sdk-go/v2/common/logger" - "github.com/nacos-group/nacos-sdk-go/v2/model" - "github.com/nacos-group/nacos-sdk-go/v2/util" -) - -type ConnectionEventListener struct { - clientProxy naming_proxy.INamingProxy - registeredInstanceCached cache.ConcurrentMap - subscribes cache.ConcurrentMap -} - -func NewConnectionEventListener(clientProxy naming_proxy.INamingProxy) *ConnectionEventListener { - return &ConnectionEventListener{ - clientProxy: clientProxy, - registeredInstanceCached: cache.NewConcurrentMap(), - subscribes: cache.NewConcurrentMap(), - } -} - -func (c *ConnectionEventListener) OnConnected() { - c.redoSubscribe() - c.redoRegisterEachService() -} - -func (c *ConnectionEventListener) OnDisConnect() { - -} - -func (c *ConnectionEventListener) redoSubscribe() { - for _, key := range c.subscribes.Keys() { - info := strings.Split(key, constant.SERVICE_INFO_SPLITER) - var err error - var service model.Service - if len(info) > 2 { - service, err = c.clientProxy.Subscribe(info[1], info[0], info[2]) - } else { - service, err = c.clientProxy.Subscribe(info[1], info[0], "") - } - - if err != nil { - logger.Warnf("redo subscribe service:%s faild:%+v", info[1], err) - return - } - - grpcProxy, ok := c.clientProxy.(*NamingGrpcProxy) - if !ok { - return - } - grpcProxy.serviceInfoHolder.ProcessService(&service) - } -} - -func (c *ConnectionEventListener) redoRegisterEachService() { - for k, v := range c.registeredInstanceCached.Items() { - info := strings.Split(k, constant.SERVICE_INFO_SPLITER) - serviceName := info[1] - groupName := info[0] - if instance, ok := v.(model.Instance); ok { - if _, err := c.clientProxy.RegisterInstance(serviceName, groupName, instance); err != nil { - logger.Warnf("redo register service:%s groupName:%s faild:%s", info[1], info[0], err.Error()) - continue - } - } - if instances, ok := v.([]model.Instance); ok { - if _, err := c.clientProxy.BatchRegisterInstance(serviceName, groupName, instances); err != nil { - logger.Warnf("redo batch register service:%s groupName:%s faild:%s", info[1], info[0], err.Error()) - continue - } - } - } -} - -func (c *ConnectionEventListener) CacheInstanceForRedo(serviceName, groupName string, instance model.Instance) { - key := util.GetGroupName(serviceName, groupName) - c.registeredInstanceCached.Set(key, instance) -} - -func (c *ConnectionEventListener) CacheInstancesForRedo(serviceName, groupName string, instances []model.Instance) { - key := util.GetGroupName(serviceName, groupName) - c.registeredInstanceCached.Set(key, instances) -} - -func (c *ConnectionEventListener) RemoveInstanceForRedo(serviceName, groupName string, instance model.Instance) { - key := util.GetGroupName(serviceName, groupName) - _, ok := c.registeredInstanceCached.Get(key) - if !ok { - return - } - c.registeredInstanceCached.Remove(key) -} - -func (c *ConnectionEventListener) CacheSubscriberForRedo(fullServiceName, clusters string) { - key := util.GetServiceCacheKey(fullServiceName, clusters) - if !c.IsSubscriberCached(key) { - c.subscribes.Set(key, struct{}{}) - } -} - -func (c *ConnectionEventListener) IsSubscriberCached(key string) bool { - _, ok := c.subscribes.Get(key) - return ok -} - -func (c *ConnectionEventListener) RemoveSubscriberForRedo(fullServiceName, clusters string) { - c.subscribes.Remove(util.GetServiceCacheKey(fullServiceName, clusters)) -} diff --git a/clients/naming_client/naming_grpc/connection_event_listener_test.go b/clients/naming_client/naming_grpc/connection_event_listener_test.go deleted file mode 100644 index 118d388f..00000000 --- a/clients/naming_client/naming_grpc/connection_event_listener_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package naming_grpc - -import ( - "testing" - - "github.com/golang/mock/gomock" - "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_proxy" - "github.com/nacos-group/nacos-sdk-go/v2/util" -) - -func TestRedoSubscribe(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockProxy := naming_proxy.NewMockINamingProxy(ctrl) - evListener := NewConnectionEventListener(mockProxy) - - cases := []struct { - serviceName string - groupName string - clusters string - }{ - {"service-a", "group-a", ""}, - {"service-b", "group-b", "cluster-b"}, - } - - for _, v := range cases { - fullServiceName := util.GetGroupName(v.serviceName, v.groupName) - evListener.CacheSubscriberForRedo(fullServiceName, v.clusters) - mockProxy.EXPECT().Subscribe(v.serviceName, v.groupName, v.clusters) - evListener.redoSubscribe() - evListener.RemoveSubscriberForRedo(fullServiceName, v.clusters) - } -} diff --git a/clients/naming_client/naming_grpc/naming_grpc_proxy.go b/clients/naming_client/naming_grpc/naming_grpc_proxy.go index bc885391..acbadc80 100644 --- a/clients/naming_client/naming_grpc/naming_grpc_proxy.go +++ b/clients/naming_client/naming_grpc/naming_grpc_proxy.go @@ -18,6 +18,8 @@ package naming_grpc import ( "context" + "fmt" + "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_proxy" "time" "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_cache" @@ -38,10 +40,12 @@ type NamingGrpcProxy struct { clientConfig constant.ClientConfig nacosServer *nacos_server.NacosServer rpcClient rpc.IRpcClient - eventListener *ConnectionEventListener + redoService IRedoService serviceInfoHolder *naming_cache.ServiceInfoHolder } +var _ naming_proxy.INamingProxy = (*NamingGrpcProxy)(nil) + // NewNamingGrpcProxy create naming grpc proxy func NewNamingGrpcProxy(ctx context.Context, clientCfg constant.ClientConfig, nacosServer *nacos_server.NacosServer, serviceInfoHolder *naming_cache.ServiceInfoHolder) (*NamingGrpcProxy, error) { @@ -75,8 +79,8 @@ func NewNamingGrpcProxy(ctx context.Context, clientCfg constant.ClientConfig, na return &rpc_request.NotifySubscriberRequest{NamingRequest: &rpc_request.NamingRequest{}} }, &rpc.NamingPushRequestHandler{ServiceInfoHolder: serviceInfoHolder}) - srvProxy.eventListener = NewConnectionEventListener(&srvProxy) - rpcClient.RegisterConnectionListener(srvProxy.eventListener) + srvProxy.redoService = NewRedoService(ctx, &srvProxy) + rpcClient.RegisterConnectionListener(srvProxy.redoService) return &srvProxy, nil } @@ -86,47 +90,67 @@ func (proxy *NamingGrpcProxy) requestToServer(request rpc_request.IRequest) (rpc proxy.nacosServer.InjectSign(request, request.GetHeaders(), proxy.clientConfig) proxy.nacosServer.InjectSecurityInfo(request.GetHeaders()) response, err := proxy.rpcClient.GetRpcClient().Request(request, int64(proxy.clientConfig.TimeoutMs)) + if err == nil && response != nil && !response.IsSuccess() { + return nil, fmt.Errorf("nacos server response type [%s] error code=%d, message=[%s]", response.GetResponseType(), response.GetErrorCode(), response.GetMessage()) + } monitor.GetNamingRequestMonitor(constant.GRPC, request.GetRequestType(), rpc_response.GetGrpcResponseStatusCode(response)).Observe(float64(time.Now().Nanosecond() - start.Nanosecond())) return response, err } // RegisterInstance ... -func (proxy *NamingGrpcProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) { - logger.Infof("register instance namespaceId:<%s>,serviceName:<%s> with instance:<%s>", +func (proxy *NamingGrpcProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) error { + logger.Infof("register instance namespaceId:<%s>,serviceName:<%s>, groupName:<%s> with instance:<%s>", proxy.clientConfig.NamespaceId, serviceName, util.ToJsonString(instance)) - proxy.eventListener.CacheInstanceForRedo(serviceName, groupName, instance) + proxy.redoService.CacheInstanceForRedo(serviceName, groupName, instance) + return proxy.DoRegisterInstance(serviceName, groupName, instance) +} + +func (proxy *NamingGrpcProxy) DoRegisterInstance(serviceName string, groupName string, instance model.Instance) error { instanceRequest := rpc_request.NewInstanceRequest(proxy.clientConfig.NamespaceId, serviceName, groupName, "registerInstance", instance) - response, err := proxy.requestToServer(instanceRequest) + _, err := proxy.requestToServer(instanceRequest) if err != nil { - return false, err + return err } - return response.IsSuccess(), err + + proxy.redoService.InstanceRegistered(serviceName, groupName) + return nil } // BatchRegisterInstance ... -func (proxy *NamingGrpcProxy) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) (bool, error) { - logger.Infof("batch register instance namespaceId:<%s>,serviceName:<%s> with instance:<%s>", - proxy.clientConfig.NamespaceId, serviceName, util.ToJsonString(instances)) - proxy.eventListener.CacheInstancesForRedo(serviceName, groupName, instances) +func (proxy *NamingGrpcProxy) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) error { + logger.Infof("batch register instance namespaceId:<%s>,serviceName:<%s>, groupName:<%s> with instance:<%s>", + proxy.clientConfig.NamespaceId, serviceName, groupName, util.ToJsonString(instances)) + proxy.redoService.CacheInstancesForRedo(serviceName, groupName, instances) + return proxy.DoBatchRegisterInstance(serviceName, groupName, instances) +} + +func (proxy *NamingGrpcProxy) DoBatchRegisterInstance(serviceName, groupName string, instances []model.Instance) error { batchInstanceRequest := rpc_request.NewBatchInstanceRequest(proxy.clientConfig.NamespaceId, serviceName, groupName, "batchRegisterInstance", instances) - response, err := proxy.requestToServer(batchInstanceRequest) + _, err := proxy.requestToServer(batchInstanceRequest) if err != nil { - return false, err + return err } - return response.IsSuccess(), err + proxy.redoService.InstanceRegistered(serviceName, groupName) + return nil } -// DeregisterInstance ... -func (proxy *NamingGrpcProxy) DeregisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) { - logger.Infof("deregister instance namespaceId:<%s>,serviceName:<%s> with instance:<%s:%d@%s>", - proxy.clientConfig.NamespaceId, serviceName, instance.Ip, instance.Port, instance.ClusterName) +func (proxy *NamingGrpcProxy) DoDeRegisterInstance(serviceName, groupName string, instance model.Instance) error { instanceRequest := rpc_request.NewInstanceRequest(proxy.clientConfig.NamespaceId, serviceName, groupName, "deregisterInstance", instance) - response, err := proxy.requestToServer(instanceRequest) - proxy.eventListener.RemoveInstanceForRedo(serviceName, groupName, instance) + _, err := proxy.requestToServer(instanceRequest) if err != nil { - return false, err + return err } - return response.IsSuccess(), err + proxy.redoService.InstanceDeRegistered(serviceName, groupName) + return nil +} + +// DeregisterInstance ... +func (proxy *NamingGrpcProxy) DeregisterInstance(serviceName, groupName string, instance model.Instance) error { + logger.Infof("deregister instance namespaceId:<%s>,serviceName:<%s>, groupName:<%s> with instance:<%s:%d@%s>", + proxy.clientConfig.NamespaceId, serviceName, instance.Ip, instance.Port, instance.ClusterName) + proxy.redoService.InstanceDeRegister(serviceName, groupName) + return proxy.DoDeRegisterInstance(serviceName, groupName, instance) + } // GetServiceList ... @@ -169,14 +193,18 @@ func (proxy *NamingGrpcProxy) QueryInstancesOfService(serviceName, groupName, cl } func (proxy *NamingGrpcProxy) IsSubscribed(serviceName, groupName string, clusters string) bool { - return proxy.eventListener.IsSubscriberCached(util.GetServiceCacheKey(util.GetGroupName(serviceName, groupName), clusters)) + return proxy.redoService.IsSubscriberCached(serviceName, groupName, clusters) } // Subscribe ... func (proxy *NamingGrpcProxy) Subscribe(serviceName, groupName string, clusters string) (model.Service, error) { logger.Infof("Subscribe Service namespaceId:<%s>, serviceName:<%s>, groupName:<%s>, clusters:<%s>", proxy.clientConfig.NamespaceId, serviceName, groupName, clusters) - proxy.eventListener.CacheSubscriberForRedo(util.GetGroupName(serviceName, groupName), clusters) + proxy.redoService.CacheSubscriberForRedo(serviceName, groupName, clusters) + return proxy.DoSubscribe(serviceName, groupName, clusters) +} + +func (proxy *NamingGrpcProxy) DoSubscribe(serviceName, groupName string, clusters string) (model.Service, error) { request := rpc_request.NewSubscribeServiceRequest(proxy.clientConfig.NamespaceId, serviceName, groupName, clusters, true) request.Headers["app"] = proxy.clientConfig.AppName @@ -184,6 +212,8 @@ func (proxy *NamingGrpcProxy) Subscribe(serviceName, groupName string, clusters if err != nil { return model.Service{}, err } + + proxy.redoService.SubscribeRegistered(serviceName, groupName, clusters) subscribeServiceResponse := response.(*rpc_response.SubscribeServiceResponse) return subscribeServiceResponse.ServiceInfo, nil } @@ -192,9 +222,17 @@ func (proxy *NamingGrpcProxy) Subscribe(serviceName, groupName string, clusters func (proxy *NamingGrpcProxy) Unsubscribe(serviceName, groupName, clusters string) error { logger.Infof("Unsubscribe Service namespaceId:<%s>, serviceName:<%s>, groupName:<%s>, clusters:<%s>", proxy.clientConfig.NamespaceId, serviceName, groupName, clusters) - proxy.eventListener.RemoveSubscriberForRedo(util.GetGroupName(serviceName, groupName), clusters) + proxy.redoService.SubscribeDeRegister(serviceName, groupName, clusters) + return proxy.DoUnSubscribe(serviceName, groupName, clusters) +} + +// DoUnSubscribe ... +func (proxy *NamingGrpcProxy) DoUnSubscribe(serviceName, groupName, clusters string) error { _, err := proxy.requestToServer(rpc_request.NewSubscribeServiceRequest(proxy.clientConfig.NamespaceId, serviceName, groupName, clusters, false)) + if err == nil { + proxy.redoService.SubscribeDeRegistered(serviceName, groupName, clusters) + } return err } diff --git a/clients/naming_client/naming_grpc/naming_grpc_proxy_test.go b/clients/naming_client/naming_grpc/naming_grpc_proxy_test.go index a190fd29..deee9de8 100644 --- a/clients/naming_client/naming_grpc/naming_grpc_proxy_test.go +++ b/clients/naming_client/naming_grpc/naming_grpc_proxy_test.go @@ -5,16 +5,16 @@ import "github.com/nacos-group/nacos-sdk-go/v2/model" type MockNamingGrpc struct { } -func (m *MockNamingGrpc) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) { - return true, nil +func (m *MockNamingGrpc) RegisterInstance(serviceName string, groupName string, instance model.Instance) error { + return nil } -func (m *MockNamingGrpc) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) (bool, error) { - return true, nil +func (m *MockNamingGrpc) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) error { + return nil } -func (m *MockNamingGrpc) DeregisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) { - return true, nil +func (m *MockNamingGrpc) DeregisterInstance(serviceName string, groupName string, instance model.Instance) error { + return nil } func (m *MockNamingGrpc) GetServiceList(pageNo uint32, pageSize uint32, groupName string, selector *model.ExpressionSelector) (model.ServiceList, error) { diff --git a/clients/naming_client/naming_grpc/redo.go b/clients/naming_client/naming_grpc/redo.go new file mode 100644 index 00000000..1c46f70c --- /dev/null +++ b/clients/naming_client/naming_grpc/redo.go @@ -0,0 +1,172 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package naming_grpc + +import ( + "github.com/nacos-group/nacos-sdk-go/v2/model" + "go.uber.org/atomic" + "sync" +) + +const ( + none Type = iota + register + unregister + remove +) + +type ( + Type int32 + IRedoData interface { + SetRegistered(bool) + SetUnRegistering(bool) + Registered() + Unregistered() + SetExpectRegistered(bool) + IsNeedRedo() bool + GetRedoType() Type + } + Data struct { + locker *sync.Mutex + ServiceName string + GroupName string + registered *atomic.Bool + unregistering *atomic.Bool + expectRegistered *atomic.Bool + } + + InstanceRedoData struct { + *Data + value model.Instance + } + + BatchInstancesRedoData struct { + *Data + value []model.Instance + } + + SubscribeRedoData struct { + *Data + cluster string + } +) + +func (r *Data) IsNeedRedo() bool { + return none != r.GetRedoType() +} + +func (r *Data) GetRedoType() Type { + r.locker.Lock() + defer r.locker.Unlock() + if r.registered.Load() && !r.unregistering.Load() { + if r.expectRegistered.Load() { + return none + } + return unregister + } else if r.registered.Load() && r.unregistering.Load() { + return unregister + } else if !r.registered.Load() && !r.unregistering.Load() { + return register + } else if r.expectRegistered.Load() { + return register + } else { + return remove + } +} + +func (r *Data) SetRegistered(b bool) { + r.registered.Store(b) +} + +func (r *Data) SetUnRegistering(b bool) { + r.unregistering.Store(b) +} + +func (r *Data) SetExpectRegistered(b bool) { + r.expectRegistered.Store(b) +} + +func (r *Data) Registered() { + r.locker.Lock() + defer r.locker.Unlock() + r.registered.Store(true) + r.unregistering.Store(false) +} + +func (r *Data) Unregistered() { + r.locker.Lock() + defer r.locker.Unlock() + r.registered.Store(false) + r.unregistering.Store(true) +} + +func (i *InstanceRedoData) Get() model.Instance { + return i.value +} + +func (i *BatchInstancesRedoData) Get() []model.Instance { + return i.value +} + +func (i *SubscribeRedoData) Get() string { + return i.cluster +} + +func NewInstanceRedoData(service, group string, ins model.Instance) *InstanceRedoData { + n := &InstanceRedoData{value: ins} + d := &Data{ + locker: &sync.Mutex{}, + ServiceName: service, + GroupName: group, + registered: &atomic.Bool{}, + unregistering: &atomic.Bool{}, + expectRegistered: &atomic.Bool{}, + } + d.expectRegistered.Store(true) + n.Data = d + return n + +} + +func NewBatchInstancesRedoData(service, group string, ins []model.Instance) *BatchInstancesRedoData { + d := &Data{ + locker: &sync.Mutex{}, + ServiceName: service, + GroupName: group, + registered: &atomic.Bool{}, + unregistering: &atomic.Bool{}, + expectRegistered: &atomic.Bool{}, + } + d.expectRegistered.Store(true) + n := &BatchInstancesRedoData{value: ins, Data: d} + return n +} + +func NewSubscribeRedoData(service, group, cluster string) *SubscribeRedoData { + d := &Data{ + locker: &sync.Mutex{}, + ServiceName: service, + GroupName: group, + registered: &atomic.Bool{}, + unregistering: &atomic.Bool{}, + expectRegistered: &atomic.Bool{}, + } + d.expectRegistered.Store(true) + n := &SubscribeRedoData{cluster: cluster, Data: d} + + return n +} diff --git a/clients/naming_client/naming_grpc/redo_service.go b/clients/naming_client/naming_grpc/redo_service.go new file mode 100644 index 00000000..b38d1626 --- /dev/null +++ b/clients/naming_client/naming_grpc/redo_service.go @@ -0,0 +1,266 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package naming_grpc + +import ( + "context" + "github.com/nacos-group/nacos-sdk-go/v2/common/constant" + "github.com/nacos-group/nacos-sdk-go/v2/common/remote/rpc" + "go.uber.org/atomic" + + "time" + + "github.com/nacos-group/nacos-sdk-go/v2/clients/cache" + "github.com/nacos-group/nacos-sdk-go/v2/common/logger" + "github.com/nacos-group/nacos-sdk-go/v2/model" + "github.com/nacos-group/nacos-sdk-go/v2/util" +) + +type ( + IRedoService interface { + rpc.IConnectionEventListener + CacheInstanceForRedo(serviceName, groupName string, instance model.Instance) + + CacheInstancesForRedo(serviceName, groupName string, instances []model.Instance) + + CacheSubscriberForRedo(service, group, clusters string) + + InstanceDeRegister(service, group string) + + SubscribeDeRegister(service, group, clusters string) + + InstanceRegistered(service, group string) + + InstanceDeRegistered(service, group string) + + SubscribeRegistered(service, group, clusters string) + + SubscribeDeRegistered(service, group, clusters string) + + RemoveInstanceForRedo(serviceName, groupName string) + + RemoveSubscriberForRedo(service, group, clusters string) + + IsSubscriberCached(service, group, clusters string) bool + + IsConnected() bool + + FindNeedRedoData() []IRedoData + } + + IWeRedoTask interface { + DoRedo() + } + + WeRedoService struct { + registeredRedoInstanceCached cache.IComputeCache[string, IRedoData] + ctx context.Context + connected atomic.Bool + task IWeRedoTask + } + WeRedoTask struct { + clientProxy *NamingGrpcProxy + redoService IRedoService + } +) + +const ( + connected = iota + disconnected +) + +func NewRedoService(ctx context.Context, clientProxy *NamingGrpcProxy) *WeRedoService { + w := &WeRedoService{ + ctx: ctx, + registeredRedoInstanceCached: cache.NewCache[string, IRedoData](), + connected: atomic.Bool{}, + } + w.task = &WeRedoTask{clientProxy: clientProxy, redoService: w} + go w.scheduleRedo() + return w +} + +func (c *WeRedoService) scheduleRedo() { + ticker := time.NewTicker(time.Second * 3) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.task.DoRedo() + } + } +} + +func getSubscribeCacheKey(service, clusters string) string { + return service + constant.SERVICE_INFO_SPLITER + clusters +} + +func (c *WeRedoService) OnConnected() { + c.connected.Store(true) + logger.Infof("redo notice connection connected") +} + +func (c *WeRedoService) OnDisConnect() { + c.connected.Store(false) + logger.Infof("redo notice connection disconnected") + c.registeredRedoInstanceCached.Range(func(key string, value IRedoData) bool { + value.SetRegistered(false) + return true + }) +} + +func (c *WeRedoService) CacheInstanceForRedo(serviceName, groupName string, instance model.Instance) { + key := util.GetGroupName(serviceName, groupName) + c.registeredRedoInstanceCached.Store(key, NewInstanceRedoData(serviceName, groupName, instance)) +} + +func (c *WeRedoService) CacheInstancesForRedo(serviceName, groupName string, instances []model.Instance) { + key := util.GetGroupName(serviceName, groupName) + c.registeredRedoInstanceCached.Store(key, NewBatchInstancesRedoData(serviceName, groupName, instances)) +} + +func (c *WeRedoService) CacheSubscriberForRedo(service, group, clusters string) { + c.registeredRedoInstanceCached.Store(getSubscribeCacheKey(util.GetGroupName(service, group), clusters), NewSubscribeRedoData(service, group, clusters)) +} + +func (c *WeRedoService) InstanceRegistered(service, group string) { + c.registeredRedoInstanceCached.ComputeIfPresent(util.GetGroupName(service, group), func(value IRedoData) IRedoData { + value.Registered() + return value + }) +} + +func (c *WeRedoService) SubscribeRegistered(service, group, clusters string) { + c.registeredRedoInstanceCached.ComputeIfPresent(getSubscribeCacheKey(util.GetGroupName(service, group), clusters), func(value IRedoData) IRedoData { + value.Registered() + return value + }) +} + +func (c *WeRedoService) RemoveInstanceForRedo(serviceName, groupName string) { + c.registeredRedoInstanceCached.Delete(util.GetGroupName(serviceName, groupName)) +} + +func (c *WeRedoService) RemoveSubscriberForRedo(service, group, clusters string) { + c.registeredRedoInstanceCached.Delete(getSubscribeCacheKey(util.GetGroupName(service, group), clusters)) +} + +func (c *WeRedoService) IsSubscriberCached(service, group, clusters string) bool { + _, ok := c.registeredRedoInstanceCached.Load(getSubscribeCacheKey(util.GetGroupName(service, group), clusters)) + return ok +} + +func (c *WeRedoService) InstanceDeRegister(service, group string) { + c.registeredRedoInstanceCached.ComputeIfPresent(util.GetGroupName(service, group), func(value IRedoData) IRedoData { + value.SetUnRegistering(true) + value.SetExpectRegistered(false) + return value + }) +} + +func (c *WeRedoService) SubscribeDeRegister(service, group, clusters string) { + c.registeredRedoInstanceCached.ComputeIfPresent(getSubscribeCacheKey(util.GetGroupName(service, group), clusters), func(value IRedoData) IRedoData { + value.SetUnRegistering(true) + value.SetExpectRegistered(false) + return value + }) +} + +func (c *WeRedoService) InstanceDeRegistered(service, group string) { + c.registeredRedoInstanceCached.ComputeIfPresent(util.GetGroupName(service, group), func(value IRedoData) IRedoData { + value.Unregistered() + return value + }) +} + +func (c *WeRedoService) SubscribeDeRegistered(service, group, clusters string) { + c.registeredRedoInstanceCached.ComputeIfPresent(getSubscribeCacheKey(util.GetGroupName(service, group), clusters), func(value IRedoData) IRedoData { + value.Unregistered() + return value + }) +} + +func (c *WeRedoService) IsConnected() bool { + return c.connected.Load() +} + +func (c *WeRedoService) FindNeedRedoData() []IRedoData { + var result []IRedoData + c.registeredRedoInstanceCached.Range(func(key string, value IRedoData) bool { + if value.IsNeedRedo() { + result = append(result, value) + } + return true + }) + return result +} + +func (w *WeRedoTask) DoRedo() { + if !w.redoService.IsConnected() { + logger.Debug("gRPC connection status is disconnected, skip current redo task") + return + } + needRedoData := w.redoService.FindNeedRedoData() + for _, value := range needRedoData { + redoType := value.GetRedoType() + logger.Infof("redo task will process type %T, target %v", value, redoType) + switch t := value.(type) { + case *InstanceRedoData: + switch redoType { + case register: + if err := w.clientProxy.DoRegisterInstance(t.ServiceName, t.GroupName, t.Get()); err != nil { + logger.Warnf("redo register service:%s groupName:%s failed:%s", t.ServiceName, t.GroupName, err.Error()) + } + case unregister: + if err := w.clientProxy.DoDeRegisterInstance(t.ServiceName, t.GroupName, model.Instance{}); err != nil { + logger.Warnf("redo register service:%s groupName:%s failed:%s", t.ServiceName, t.GroupName, err.Error()) + } + case remove: + w.redoService.RemoveInstanceForRedo(t.ServiceName, t.GroupName) + } + case *BatchInstancesRedoData: + switch redoType { + case register: + if err := w.clientProxy.DoBatchRegisterInstance(t.ServiceName, t.GroupName, t.Get()); err != nil { + logger.Warnf("redo register service:%s groupName:%s failed:%s", t.ServiceName, t.GroupName, err.Error()) + } + case unregister: + if err := w.clientProxy.DoDeRegisterInstance(t.ServiceName, t.GroupName, model.Instance{}); err != nil { + logger.Warnf("redo register service:%s groupName:%s failed:%s", t.ServiceName, t.GroupName, err.Error()) + } + case remove: + w.redoService.RemoveInstanceForRedo(t.ServiceName, t.GroupName) + } + case *SubscribeRedoData: + switch redoType { + case register: + _, err := w.clientProxy.DoSubscribe(t.ServiceName, t.GroupName, t.Get()) + if err != nil { + logger.Warnf("redo register service:%s groupName:%s failed:%s", t.ServiceName, t.GroupName, err.Error()) + } + case unregister: + if err := w.clientProxy.DoUnSubscribe(t.ServiceName, t.GroupName, t.Get()); err != nil { + logger.Warnf("redo register service:%s groupName:%s failed:%s", t.ServiceName, t.GroupName, err.Error()) + } + case remove: + w.redoService.RemoveSubscriberForRedo(t.ServiceName, t.GroupName, t.cluster) + } + } + } +} diff --git a/clients/naming_client/naming_grpc/redo_service_test.go b/clients/naming_client/naming_grpc/redo_service_test.go new file mode 100644 index 00000000..157a57c3 --- /dev/null +++ b/clients/naming_client/naming_grpc/redo_service_test.go @@ -0,0 +1,155 @@ +package naming_grpc + +import ( + "context" + "github.com/nacos-group/nacos-sdk-go/v2/model" + "github.com/nacos-group/nacos-sdk-go/v2/util" + "testing" + + . "github.com/agiledragon/gomonkey/v2" + . "github.com/smartystreets/goconvey/convey" +) + +func TestRedoSubscribe(t *testing.T) { + Convey("to subscriber", t, func() { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + subCalled := false + var mock *NamingGrpcProxy + evListener := NewRedoService(ctx, mock) + patch := ApplyMethod(mock, "DoSubscribe", func(_ *NamingGrpcProxy, serviceName, groupName, clusters string) (model.Service, error) { + subCalled = true + evListener.SubscribeRegistered(serviceName, groupName, clusters) + return model.Service{}, nil + }) + defer patch.Reset() + unSubCalled := false + patch = ApplyMethod(mock, "DoUnSubscribe", func(_ *NamingGrpcProxy, serviceName, groupName, clusters string) error { + unSubCalled = true + evListener.SubscribeDeRegistered(serviceName, groupName, clusters) + return nil + }) + + subscribeCases := []struct { + serviceName string + groupName string + clusters string + }{ + {"service-a", "group-a", ""}, + {"service-b", "group-b", "cluster-b"}, + } + + for _, v := range subscribeCases { + evListener.CacheSubscriberForRedo(v.serviceName, v.groupName, v.clusters) + evListener.OnConnected() + evListener.task.DoRedo() + + So(subCalled, ShouldBeTrue) + subCalled = false + evListener.SubscribeDeRegister(v.serviceName, v.groupName, v.clusters) + evListener.task.DoRedo() + So(unSubCalled, ShouldBeTrue) + + evListener.task.DoRedo() + So(evListener.IsSubscriberCached(v.serviceName, v.groupName, v.clusters), ShouldBeFalse) + evListener.RemoveSubscriberForRedo(v.serviceName, v.groupName, v.clusters) + } + }) + +} + +func TestRedoInstance(t *testing.T) { + Convey("to instance", t, func() { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + registerCalled := false + var mock *NamingGrpcProxy + evListener := NewRedoService(ctx, mock) + patch := ApplyMethod(mock, "DoRegisterInstance", func(_ *NamingGrpcProxy, serviceName, groupName string, instance model.Instance) error { + registerCalled = true + evListener.InstanceRegistered(serviceName, groupName) + return nil + }) + defer patch.Reset() + deregisterCalled := false + patch = ApplyMethod(mock, "DoDeRegisterInstance", func(_ *NamingGrpcProxy, serviceName, groupName string, instance model.Instance) error { + deregisterCalled = true + evListener.InstanceDeRegistered(serviceName, groupName) + return nil + }) + + instanceCases := []struct { + serviceName string + groupName string + ins model.Instance + }{ + {"service-a", "group-a", model.Instance{}}, + {"service-b", "group-b", model.Instance{}}, + } + + for _, v := range instanceCases { + evListener.CacheInstanceForRedo(v.serviceName, v.groupName, v.ins) + evListener.OnConnected() + evListener.task.DoRedo() + + So(registerCalled, ShouldBeTrue) + registerCalled = false + evListener.InstanceDeRegister(v.serviceName, v.groupName) + evListener.task.DoRedo() + So(deregisterCalled, ShouldBeTrue) + + evListener.task.DoRedo() + _, ok := evListener.registeredRedoInstanceCached.Load(util.GetGroupName(v.serviceName, v.groupName)) + So(ok, ShouldBeFalse) + evListener.RemoveInstanceForRedo(v.serviceName, v.groupName) + } + }) +} + +func TestRedoInstances(t *testing.T) { + Convey("to instance", t, func() { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + registerCalled := false + var mock *NamingGrpcProxy + evListener := NewRedoService(ctx, mock) + patch := ApplyMethod(mock, "DoBatchRegisterInstance", func(_ *NamingGrpcProxy, serviceName, groupName string, instances []model.Instance) error { + registerCalled = true + evListener.InstanceRegistered(serviceName, groupName) + return nil + }) + defer patch.Reset() + deregisterCalled := false + patch = ApplyMethod(mock, "DoDeRegisterInstance", func(_ *NamingGrpcProxy, serviceName, groupName string, instance model.Instance) error { + deregisterCalled = true + evListener.InstanceDeRegistered(serviceName, groupName) + return nil + }) + + instanceCases := []struct { + serviceName string + groupName string + ins []model.Instance + }{ + {"service-a", "group-a", []model.Instance{}}, + {"service-b", "group-b", []model.Instance{}}, + } + + for _, v := range instanceCases { + evListener.CacheInstancesForRedo(v.serviceName, v.groupName, v.ins) + evListener.OnConnected() + evListener.task.DoRedo() + + So(registerCalled, ShouldBeTrue) + registerCalled = false + evListener.InstanceDeRegister(v.serviceName, v.groupName) + evListener.task.DoRedo() + So(deregisterCalled, ShouldBeTrue) + + evListener.task.DoRedo() + _, ok := evListener.registeredRedoInstanceCached.Load(util.GetGroupName(v.serviceName, v.groupName)) + So(ok, ShouldBeFalse) + evListener.RemoveInstanceForRedo(v.serviceName, v.groupName) + } + }) +} diff --git a/clients/naming_client/naming_http/naming_http_proxy.go b/clients/naming_client/naming_http/naming_http_proxy.go index f6f2ad5c..47b1c96e 100644 --- a/clients/naming_client/naming_http/naming_http_proxy.go +++ b/clients/naming_client/naming_http/naming_http_proxy.go @@ -18,6 +18,7 @@ package naming_http import ( "context" + "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_proxy" "net/http" "strconv" "time" @@ -34,6 +35,8 @@ import ( "github.com/nacos-group/nacos-sdk-go/v2/util" ) +var _ naming_proxy.INamingProxy = (*NamingHttpProxy)(nil) + // NamingHttpProxy ... type NamingHttpProxy struct { clientConfig constant.ClientConfig @@ -59,7 +62,7 @@ func NewNamingHttpProxy(ctx context.Context, clientCfg constant.ClientConfig, na } // RegisterInstance ... -func (proxy *NamingHttpProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) { +func (proxy *NamingHttpProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) error { logger.Infof("register instance namespaceId:<%s>,serviceName:<%s> with instance:<%s>", proxy.clientConfig.NamespaceId, serviceName, util.ToJsonString(instance)) serviceName = util.GetGroupName(serviceName, groupName) @@ -78,7 +81,7 @@ func (proxy *NamingHttpProxy) RegisterInstance(serviceName string, groupName str params["ephemeral"] = strconv.FormatBool(instance.Ephemeral) _, err := proxy.nacosServer.ReqApi(constant.SERVICE_PATH, params, http.MethodPost, proxy.clientConfig) if err != nil { - return false, err + return err } if instance.Ephemeral { beatInfo := &model.BeatInfo{ @@ -93,15 +96,15 @@ func (proxy *NamingHttpProxy) RegisterInstance(serviceName string, groupName str } proxy.beatReactor.AddBeatInfo(util.GetGroupName(serviceName, groupName), beatInfo) } - return true, nil + return nil } -func (proxy *NamingHttpProxy) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) (bool, error) { +func (proxy *NamingHttpProxy) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) error { panic("implement me") } // DeregisterInstance ... -func (proxy *NamingHttpProxy) DeregisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) { +func (proxy *NamingHttpProxy) DeregisterInstance(serviceName string, groupName string, instance model.Instance) error { serviceName = util.GetGroupName(serviceName, groupName) logger.Infof("deregister instance namespaceId:<%s>,serviceName:<%s> with instance:<%s:%d@%s>", proxy.clientConfig.NamespaceId, serviceName, instance.Ip, instance.Port, instance.ClusterName) @@ -115,9 +118,9 @@ func (proxy *NamingHttpProxy) DeregisterInstance(serviceName string, groupName s params["ephemeral"] = strconv.FormatBool(instance.Ephemeral) _, err := proxy.nacosServer.ReqApi(constant.SERVICE_PATH, params, http.MethodDelete, proxy.clientConfig) if err != nil { - return false, err + return err } - return true, nil + return nil } // GetServiceList ... diff --git a/clients/naming_client/naming_proxy/proxy_interface.go b/clients/naming_client/naming_proxy/proxy_interface.go index dea2b0b1..ebd956a5 100644 --- a/clients/naming_client/naming_proxy/proxy_interface.go +++ b/clients/naming_client/naming_proxy/proxy_interface.go @@ -22,11 +22,11 @@ import ( // INamingProxy ... type INamingProxy interface { - RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) + RegisterInstance(serviceName string, groupName string, instance model.Instance) error - BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) (bool, error) + BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) error - DeregisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) + DeregisterInstance(serviceName string, groupName string, instance model.Instance) error GetServiceList(pageNo uint32, pageSize uint32, groupName, namespaceId string, selector *model.ExpressionSelector) (model.ServiceList, error) diff --git a/clients/naming_client/naming_proxy/proxy_interface_mock.go b/clients/naming_client/naming_proxy/proxy_interface_mock.go index cb069653..b2adc83a 100644 --- a/clients/naming_client/naming_proxy/proxy_interface_mock.go +++ b/clients/naming_client/naming_proxy/proxy_interface_mock.go @@ -17,6 +17,8 @@ type MockINamingProxy struct { recorder *MockINamingProxyMockRecorder } +var _ INamingProxy = new(MockINamingProxy) + // MockINamingProxyMockRecorder is the mock recorder for MockINamingProxy. type MockINamingProxyMockRecorder struct { mock *MockINamingProxy @@ -35,12 +37,11 @@ func (m *MockINamingProxy) EXPECT() *MockINamingProxyMockRecorder { } // BatchRegisterInstance mocks base method. -func (m *MockINamingProxy) BatchRegisterInstance(serviceName, groupName string, instances []model.Instance) (bool, error) { +func (m *MockINamingProxy) BatchRegisterInstance(serviceName, groupName string, instances []model.Instance) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "BatchRegisterInstance", serviceName, groupName, instances) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(error) + return ret0 } // BatchRegisterInstance indicates an expected call of BatchRegisterInstance. @@ -62,12 +63,11 @@ func (mr *MockINamingProxyMockRecorder) CloseClient() *gomock.Call { } // DeregisterInstance mocks base method. -func (m *MockINamingProxy) DeregisterInstance(serviceName, groupName string, instance model.Instance) (bool, error) { +func (m *MockINamingProxy) DeregisterInstance(serviceName, groupName string, instance model.Instance) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeregisterInstance", serviceName, groupName, instance) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(error) + return ret0 } // DeregisterInstance indicates an expected call of DeregisterInstance. @@ -107,12 +107,11 @@ func (mr *MockINamingProxyMockRecorder) QueryInstancesOfService(serviceName, gro } // RegisterInstance mocks base method. -func (m *MockINamingProxy) RegisterInstance(serviceName, groupName string, instance model.Instance) (bool, error) { +func (m *MockINamingProxy) RegisterInstance(serviceName, groupName string, instance model.Instance) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RegisterInstance", serviceName, groupName, instance) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(error) + return ret0 } // RegisterInstance indicates an expected call of RegisterInstance. diff --git a/clients/naming_client/naming_proxy_delegate.go b/clients/naming_client/naming_proxy_delegate.go index 3b0d0b63..44b0ee0a 100644 --- a/clients/naming_client/naming_proxy_delegate.go +++ b/clients/naming_client/naming_proxy_delegate.go @@ -31,6 +31,8 @@ import ( "github.com/nacos-group/nacos-sdk-go/v2/util" ) +var _ naming_proxy.INamingProxy = (*NamingProxyDelegate)(nil) + // NamingProxyDelegate ... type NamingProxyDelegate struct { httpClientProxy *naming_http.NamingHttpProxy @@ -82,15 +84,15 @@ func (proxy *NamingProxyDelegate) getExecuteClientProxy(instance model.Instance) return namingProxy } -func (proxy *NamingProxyDelegate) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) { +func (proxy *NamingProxyDelegate) RegisterInstance(serviceName string, groupName string, instance model.Instance) error { return proxy.getExecuteClientProxy(instance).RegisterInstance(serviceName, groupName, instance) } -func (proxy *NamingProxyDelegate) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) (bool, error) { +func (proxy *NamingProxyDelegate) BatchRegisterInstance(serviceName string, groupName string, instances []model.Instance) error { return proxy.grpcClientProxy.BatchRegisterInstance(serviceName, groupName, instances) } -func (proxy *NamingProxyDelegate) DeregisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) { +func (proxy *NamingProxyDelegate) DeregisterInstance(serviceName string, groupName string, instance model.Instance) error { return proxy.getExecuteClientProxy(instance).DeregisterInstance(serviceName, groupName, instance) } diff --git a/example/service/service_client_example.go b/example/service/service_client_example.go index 83c49641..e253e150 100644 --- a/example/service/service_client_example.go +++ b/example/service/service_client_example.go @@ -25,35 +25,35 @@ import ( ) func registerServiceInstance(client naming_client.INamingClient, param vo.RegisterInstanceParam) { - success, err := client.RegisterInstance(param) - if !success || err != nil { + err := client.RegisterInstance(param) + if err != nil { panic("RegisterServiceInstance failed!" + err.Error()) } - fmt.Printf("RegisterServiceInstance,param:%+v,result:%+v \n\n", param, success) + fmt.Printf("RegisterServiceInstance,param:%+v,result: success \n\n", param) } func batchRegisterServiceInstance(client naming_client.INamingClient, param vo.BatchRegisterInstanceParam) { - success, err := client.BatchRegisterInstance(param) - if !success || err != nil { + err := client.BatchRegisterInstance(param) + if err != nil { panic("BatchRegisterServiceInstance failed!" + err.Error()) } - fmt.Printf("BatchRegisterServiceInstance,param:%+v,result:%+v \n\n", param, success) + fmt.Printf("BatchRegisterServiceInstance,param:%+v,result: success \n\n", param) } func deRegisterServiceInstance(client naming_client.INamingClient, param vo.DeregisterInstanceParam) { - success, err := client.DeregisterInstance(param) - if !success || err != nil { + err := client.DeregisterInstance(param) + if err != nil { panic("DeRegisterServiceInstance failed!" + err.Error()) } - fmt.Printf("DeRegisterServiceInstance,param:%+v,result:%+v \n\n", param, success) + fmt.Printf("DeRegisterServiceInstance,param:%+v,result: success \n\n", param) } func updateServiceInstance(client naming_client.INamingClient, param vo.UpdateInstanceParam) { - success, err := client.UpdateInstance(param) - if !success || err != nil { + err := client.UpdateInstance(param) + if err != nil { panic("UpdateInstance failed!" + err.Error()) } - fmt.Printf("UpdateServiceInstance,param:%+v,result:%+v \n\n", param, success) + fmt.Printf("UpdateServiceInstance,param:%+v,result: success \n\n", param) } func getService(client naming_client.INamingClient, param vo.GetServiceParam) { diff --git a/go.mod b/go.mod index 09d8efbf..a97d2d11 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/nacos-group/nacos-sdk-go/v2 go 1.18 require ( + github.com/agiledragon/gomonkey/v2 v2.11.0 github.com/alibabacloud-go/tea v1.1.17 github.com/aliyun/alibaba-cloud-sdk-go v1.61.1800 github.com/aliyun/alibabacloud-dkms-gcs-go-sdk v0.2.2 @@ -12,6 +13,7 @@ require ( github.com/golang/protobuf v1.5.3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.2 + github.com/smartystreets/goconvey v1.8.1 github.com/stretchr/testify v1.8.1 go.uber.org/zap v1.21.0 golang.org/x/sync v0.1.0 @@ -27,8 +29,10 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gopherjs/gopherjs v1.17.2 // indirect github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/kr/pretty v0.1.0 // indirect github.com/kr/text v0.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect @@ -38,7 +42,8 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect - go.uber.org/atomic v1.7.0 // indirect + github.com/smarty/assertions v1.15.0 // indirect + go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.17.0 // indirect diff --git a/go.sum b/go.sum index a1614907..ef459ad3 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/agiledragon/gomonkey/v2 v2.11.0 h1:5oxSgA+tC1xuGsrIorR+sYiziYltmJyEZ9qA25b6l5U= +github.com/agiledragon/gomonkey/v2 v2.11.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -145,6 +147,9 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= +github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -159,6 +164,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -213,6 +220,12 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGByCY= +github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY= +github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -234,8 +247,9 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= @@ -392,6 +406,7 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= diff --git a/vo/service_param.go b/vo/service_param.go index 35387ccd..7a97b74f 100644 --- a/vo/service_param.go +++ b/vo/service_param.go @@ -72,11 +72,13 @@ type GetAllServiceInfoParam struct { PageSize uint32 `param:"pageSize"` //optional,default:10 } +type SubscribeCallbackFunc func(services []model.Instance, err error) + type SubscribeParam struct { - ServiceName string `param:"serviceName"` //required - Clusters []string `param:"clusters"` //optional - GroupName string `param:"groupName"` //optional,default:DEFAULT_GROUP - SubscribeCallback func(services []model.Instance, err error) //required + ServiceName string `param:"serviceName"` //required + Clusters []string `param:"clusters"` //optional + GroupName string `param:"groupName"` //optional,default:DEFAULT_GROUP + SubscribeCallback SubscribeCallbackFunc //required } type SelectAllInstancesParam struct {