Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module github.com/apache/servicecomb-kie

require (
github.com/apache/servicecomb-service-center/eventbase v0.0.0-20240328150344-01abe81dc5d0
github.com/dgraph-io/ristretto v0.2.0
github.com/emicklei/go-restful v2.15.1-0.20220703112237-d9c71e118c95+incompatible
github.com/go-chassis/cari v0.9.1-0.20240328115504-88da93faaca7
github.com/go-chassis/etcdadpt v0.5.3-0.20240328092602-984e34b756fe
Expand All @@ -13,7 +14,7 @@ require (
github.com/gofrs/uuid v4.0.0+incompatible
github.com/hashicorp/serf v0.9.5
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.7.2
github.com/stretchr/testify v1.8.4
github.com/urfave/cli v1.22.4
go.etcd.io/etcd/api/v3 v3.5.4
go.mongodb.org/mongo-driver v1.5.1
Expand All @@ -35,7 +36,7 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fatih/color v1.10.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
Expand Down Expand Up @@ -125,7 +126,7 @@ require (
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
Expand Down
16 changes: 10 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,16 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ=
github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
github.com/dgraph-io/ristretto v0.2.0 h1:XAfl+7cmoUDWW/2Lx8TGZQjjxIQ2Ley9DSf52dru4WE=
github.com/dgraph-io/ristretto v0.2.0/go.mod h1:8uBHCU/PBV4Ag0CJrP47b9Ofby5dqWNh4FicAdoqFNU=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down Expand Up @@ -577,17 +581,17 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down Expand Up @@ -852,8 +856,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
4 changes: 3 additions & 1 deletion pkg/stringutil/string_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package stringutil_test
import (
"testing"

"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/stretchr/testify/assert"

"github.com/apache/servicecomb-kie/pkg/stringutil"
)

func TestFormat(t *testing.T) {
Expand All @@ -35,6 +36,7 @@ func TestFormat(t *testing.T) {
})
t.Log(s)
assert.Equal(t, s, s2)
assert.Equal(t, s, "service=a::version=1")
s3 := stringutil.FormatMap(nil)
assert.Equal(t, "none", s3)
}
82 changes: 57 additions & 25 deletions server/datasource/etcd/kv/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"regexp"
"strings"
"sync"
"time"

"github.com/dgraph-io/ristretto"
"github.com/go-chassis/etcdadpt"
"github.com/go-chassis/foundation/backoff"
"github.com/go-chassis/go-archaius"
"github.com/go-chassis/openlog"
goCache "github.com/patrickmn/go-cache"
"go.etcd.io/etcd/api/v3/mvccpb"
Expand All @@ -19,6 +22,7 @@ import (
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/apache/servicecomb-kie/server/plugin/qms"
)

func Init() {
Expand All @@ -37,24 +41,38 @@ const (
)

type Cache struct {
timeOut time.Duration
client etcdadpt.Client
revision int64
kvIDCache sync.Map
kvDocCache *goCache.Cache
timeOut time.Duration
client etcdadpt.Client
revision int64
kvIDCache sync.Map // map[labels]map[kvId]struct{} 精确匹配某组labels的kvId集合
// 非精确匹配时,对每一个查找条件建立缓存,由于n个标签理论上有2^n个查找条件,因此需要限制缓存大小。
// 任意创建,删除操作,由于其频率较低,直接重建整个缓存,更新操作由于不影响此缓存与etcd一致性,不需额外处理
kvIDFuzzyCache *ristretto.Cache // map[labels]map[kvId]struct{} 模糊匹配某组labels的kvId集合
kvDocCache *goCache.Cache
}

func NewKvCache() *Cache {
quota := archaius.GetInt64(qms.QuotaConfigKey, qms.DefaultQuota)
fuzzyCache, err := ristretto.NewCache(&ristretto.Config{
MaxCost: quota * 10,
NumCounters: quota * 100,
BufferItems: 64,
})
if err != nil {
log.Fatalf("init kv cache fail, configuration error %v", err)
}

kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval)
return &Cache{
timeOut: etcdWatchTimeout,
client: etcdadpt.Instance(),
revision: 0,
kvDocCache: kvDocCache,
timeOut: etcdWatchTimeout,
client: etcdadpt.Instance(),
revision: 0,
kvDocCache: kvDocCache,
kvIDFuzzyCache: fuzzyCache,
}
}

func Enabled() bool {
func cacheEnabled() bool {
return kvCache != nil
}

Expand Down Expand Up @@ -106,15 +124,14 @@ func (kc *Cache) watch(ctx context.Context) error {
timoutCtx, cancel := context.WithTimeout(ctx, kc.timeOut)
defer cancel()

rev := kc.revision
opts := append(
etcdadpt.WatchPrefixOpOptions(prefixKvs),
etcdadpt.WithRev(kc.revision+1),
etcdadpt.WithWatchCallback(kc.watchCallBack),
)
err := kc.client.Watch(timoutCtx, opts...)
if err != nil {
openlog.Error(fmt.Sprintf("watch prefix %s failed, start rev: %d+1->%d->0, err %v", prefixKvs, rev, kc.revision, err))
openlog.Error(fmt.Sprintf("watch prefix %s failed, start rev: %d+1->%d->0, err %v", prefixKvs, kc.revision, kc.revision, err))
kc.revision = 0
}
return err
Expand All @@ -129,7 +146,7 @@ func (kc *Cache) list(ctx context.Context) (*etcdadpt.Response, error) {
return rsp, nil
}

func (kc *Cache) watchCallBack(message string, rsp *etcdadpt.Response) error {
func (kc *Cache) watchCallBack(_ string, rsp *etcdadpt.Response) error {
if rsp == nil || len(rsp.Kvs) == 0 {
return fmt.Errorf("unknown event")
}
Expand All @@ -148,7 +165,7 @@ func (kc *Cache) watchCallBack(message string, rsp *etcdadpt.Response) error {

func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
for _, kv := range rsp.Kvs {
kvDoc, err := kc.GetKvDoc(kv)
kvDoc, err := unmarshalKVDoc(kv)
if err != nil {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
continue
Expand All @@ -169,7 +186,7 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {

func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) {
for _, kv := range rsp.Kvs {
kvDoc, err := kc.GetKvDoc(kv)
kvDoc, err := unmarshalKVDoc(kv)
if err != nil {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
continue
Expand Down Expand Up @@ -197,6 +214,18 @@ func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) {
return kvIds, true
}

func (kc *Cache) LoadKvIDSetByFuzzyCache(cacheKey string) (*sync.Map, bool) {
val, ok := kc.kvIDFuzzyCache.Get(cacheKey)
if !ok {
return nil, false
}
kvIds, ok := val.(*sync.Map)
if !ok {
return nil, false
}
return kvIds, true
}

func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) {
kc.kvIDCache.Store(cacheKey, kvIds)
}
Expand All @@ -221,11 +250,7 @@ func (kc *Cache) DeleteKvDoc(kvID string) {
kc.kvDocCache.Delete(kvID)
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
if !req.Opts.ExactLabels {
return nil, false, nil
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, error) {
openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
result := &model.KVResponse{
Data: []*model.KVDoc{},
Expand All @@ -234,9 +259,16 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool,
kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, &sync.Map{})
return result, true, nil
return result, nil
}

return getKvDocsByIds(ctx, req, kvIds)
}

func getKvDocsByIds(ctx context.Context, req *CacheSearchReq, kvIds *sync.Map) (*model.KVResponse, error) {
result := &model.KVResponse{
Data: []*model.KVDoc{},
}
var docs []*model.KVDoc

var kvIdsLeft []string
Expand All @@ -250,7 +282,7 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool,
})
tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
if err != nil {
return nil, true, err
return nil, err
}
docs = append(docs, tpData...)

Expand All @@ -261,7 +293,7 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool,
}
}
result.Total = len(result.Data)
return result, true, nil
return result, nil
}

func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) ([]*model.KVDoc, error) {
Expand All @@ -286,7 +318,7 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
return
}

doc, err := kc.GetKvDoc(kv)
doc, err := unmarshalKVDoc(kv)
if err != nil {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
getKvErr = err
Expand Down Expand Up @@ -320,7 +352,7 @@ func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
return true
}

func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) {
func unmarshalKVDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) {
kvDoc := &model.KVDoc{}
err := json.Unmarshal(kv.Value, kvDoc)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions server/datasource/etcd/kv/kv_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/go-chassis/etcdadpt"
"github.com/go-chassis/go-archaius"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/mvccpb"
)
Expand All @@ -12,6 +13,10 @@ type args struct {
rsp *etcdadpt.Response
}

func init() {
archaius.Init()
}

func TestCachePut(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading
Loading