Skip to content

Commit e850978

Browse files
feat: transfer service watch logics support sharding
--story=120702866
1 parent 301adc5 commit e850978

File tree

8 files changed

+121
-165
lines changed

8 files changed

+121
-165
lines changed

src/common/tablenames.go

+2
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ var platformTableWithTenantMap = map[string]struct{}{
307307
BKTableNameAPITaskSyncHistory: {},
308308
BKTableNameFullSyncCond: {},
309309
BKTableNameCacheWatchToken: {},
310+
"SrcSyncDataToken": {},
311+
"SrcSyncDataCursor": {},
310312
}
311313

312314
// IsPlatformTableWithTenant returns if the target table is a platform table with tenant id field

src/source_controller/transfer-service/app/server.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,12 @@ func (s *TransferService) initResource(exSyncConfFile string) error {
120120
return fmt.Errorf("get mongo config failed, err: %v", err)
121121
}
122122

123-
if err = mongodb.InitClient("", &s.Config.Mongo); err != nil {
123+
cryptoConf, err := cc.Crypto("crypto")
124+
if err != nil {
125+
return fmt.Errorf("get crypto config failed, err: %v", err)
126+
}
127+
128+
if err = mongodb.SetShardingCli("", &s.Config.Mongo, cryptoConf); err != nil {
124129
return fmt.Errorf("init mongo client failed, err: %v", err)
125130
}
126131

@@ -129,7 +134,7 @@ func (s *TransferService) initResource(exSyncConfFile string) error {
129134
return fmt.Errorf("get watch mongo config failed, err: %v", err)
130135
}
131136

132-
if err = mongodb.InitClient("watch", &s.Config.WatchMongo); err != nil {
137+
if err = mongodb.SetWatchCli("watch", &s.Config.WatchMongo, cryptoConf); err != nil {
133138
return fmt.Errorf("init watch mongo client failed, err: %v", err)
134139
}
135140

src/source_controller/transfer-service/service/service.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ import (
2929
"configcenter/src/common/webservice/restfulservice"
3030
"configcenter/src/source_controller/transfer-service/app/options"
3131
"configcenter/src/source_controller/transfer-service/sync"
32-
"configcenter/src/storage/stream"
32+
"configcenter/src/storage/driver/mongodb"
33+
"configcenter/src/storage/stream/task"
34+
"configcenter/src/storage/stream/types"
3335
"configcenter/src/thirdparty/logplatform/opentelemetry"
3436

3537
"github.com/emicklei/go-restful/v3"
@@ -43,13 +45,16 @@ type Service struct {
4345

4446
// New Service
4547
func New(conf *options.Config, engine *backbone.Engine) (*Service, error) {
46-
loopW, err := stream.NewLoopStream(conf.Mongo.GetMongoConf(), engine.ServiceManageInterface)
47-
if err != nil {
48-
blog.Errorf("new loop stream failed, err: %v", err)
49-
return nil, err
48+
watchTaskOpt := &types.NewTaskOptions{
49+
StopNotifier: make(<-chan struct{}),
50+
}
51+
watchTask, taskErr := task.New(mongodb.Dal(), mongodb.Dal("watch"), engine.ServiceManageInterface, watchTaskOpt)
52+
if taskErr != nil {
53+
blog.Errorf("new watch task instance failed, err: %v", taskErr)
54+
return nil, taskErr
5055
}
5156

52-
syncer, err := sync.NewSyncer(conf, engine.ServiceManageInterface, loopW, engine.CoreAPI.CacheService(),
57+
syncer, err := sync.NewSyncer(conf, engine.ServiceManageInterface, watchTask, engine.CoreAPI.CacheService(),
5358
engine.Metric().Registry())
5459
if err != nil {
5560
blog.Errorf("new syncer failed, err: %v", err)

src/source_controller/transfer-service/sync/sync.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ import (
3434
"configcenter/src/source_controller/transfer-service/sync/medium"
3535
"configcenter/src/source_controller/transfer-service/sync/metadata"
3636
"configcenter/src/source_controller/transfer-service/sync/watch"
37+
"configcenter/src/storage/dal/mongo/sharding"
3738
"configcenter/src/storage/driver/mongodb"
38-
"configcenter/src/storage/stream"
39+
"configcenter/src/storage/stream/task"
3940

4041
"github.com/prometheus/client_golang/prometheus"
4142
"github.com/tidwall/gjson"
@@ -50,7 +51,7 @@ type Syncer struct {
5051
}
5152

5253
// NewSyncer new cmdb data syncer
53-
func NewSyncer(conf *options.Config, isMaster discovery.ServiceManageInterface, loopW stream.LoopInterface,
54+
func NewSyncer(conf *options.Config, isMaster discovery.ServiceManageInterface, task *task.Task,
5455
cacheCli cacheservice.CacheServiceClientInterface, reg prometheus.Registerer) (*Syncer, error) {
5556

5657
if !conf.Sync.EnableSync {
@@ -60,8 +61,8 @@ func NewSyncer(conf *options.Config, isMaster discovery.ServiceManageInterface,
6061
// check if id generator is enabled, can only start syncing when id generator is enabled
6162
configAdminCond := map[string]interface{}{"_id": common.ConfigAdminID}
6263
configAdminData := make(map[string]string)
63-
err := mongodb.Client().Table(common.BKTableNameSystem).Find(configAdminCond).Fields(common.ConfigAdminValueField).
64-
One(context.Background(), &configAdminData)
64+
err := mongodb.Shard(sharding.NewShardOpts().WithIgnoreTenant()).Table(common.BKTableNameSystem).
65+
Find(configAdminCond).Fields(common.ConfigAdminValueField).One(context.Background(), &configAdminData)
6566
if err != nil {
6667
blog.Errorf("get config admin data failed, err: %v, cond: %+v", err, configAdminCond)
6768
return nil, err
@@ -111,7 +112,7 @@ func NewSyncer(conf *options.Config, isMaster discovery.ServiceManageInterface,
111112
}
112113
}
113114

114-
err = syncer.run(conf, loopW, transMedium, cacheCli)
115+
err = syncer.run(conf, task, transMedium, cacheCli)
115116
if err != nil {
116117
return nil, err
117118
}
@@ -163,7 +164,7 @@ func parseDestExConf(conf *options.Config) (map[types.ResType]map[string][]optio
163164
return idRuleMap, innerDataIDMap
164165
}
165166

166-
func (s *Syncer) run(conf *options.Config, loopW stream.LoopInterface, transMedium medium.ClientI,
167+
func (s *Syncer) run(conf *options.Config, task *task.Task, transMedium medium.ClientI,
167168
cacheCli cacheservice.CacheServiceClientInterface) error {
168169

169170
switch conf.Sync.Role {
@@ -174,7 +175,7 @@ func (s *Syncer) run(conf *options.Config, loopW stream.LoopInterface, transMedi
174175
return nil
175176
}
176177

177-
watcher, err := watch.New(conf.Sync.Name, loopW, s.isMaster, s.metadata, cacheCli, transMedium)
178+
watcher, err := watch.New(conf.Sync.Name, task, s.isMaster, s.metadata, cacheCli, transMedium)
178179
if err != nil {
179180
blog.Errorf("new watcher failed, err: %v", err)
180181
return err

src/source_controller/transfer-service/sync/watch/token_handler.go

+53-44
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,26 @@ package watch
1919

2020
import (
2121
"context"
22+
"time"
2223

2324
synctypes "configcenter/pkg/synchronize/types"
2425
"configcenter/src/common"
2526
"configcenter/src/common/blog"
27+
"configcenter/src/common/http/rest"
2628
"configcenter/src/common/mapstr"
2729
"configcenter/src/common/metadata"
2830
"configcenter/src/common/watch"
31+
"configcenter/src/storage/dal/mongo/local"
2932
"configcenter/src/storage/driver/mongodb"
3033
"configcenter/src/storage/stream/types"
3134
)
3235

33-
const tokenTable = "cc_SrcSyncDataToken"
36+
const (
37+
tokenTable = "SrcSyncDataToken"
38+
cursorTable = "SrcSyncDataCursor"
39+
)
3440

35-
var _ types.TokenHandler = new(tokenHandler)
41+
var _ types.TaskTokenHandler = new(tokenHandler)
3642

3743
// tokenHandler is cmdb data syncer event token handler
3844
type tokenHandler struct {
@@ -44,62 +50,65 @@ func newTokenHandler(resource synctypes.ResType) *tokenHandler {
4450
return &tokenHandler{resource: resource}
4551
}
4652

47-
// tokenInfo is cmdb data syncer event token info
48-
type tokenInfo struct {
49-
Resource synctypes.ResType `bson:"resource"`
50-
Token string `bson:"token"`
51-
Cursor map[watch.CursorType]string `bson:"cursor"`
52-
StartAtTime *metadata.Time `bson:"start_at_time"`
53-
}
54-
5553
// SetLastWatchToken set last event watch token
56-
func (t *tokenHandler) SetLastWatchToken(ctx context.Context, token string) error {
54+
func (t *tokenHandler) SetLastWatchToken(ctx context.Context, uuid string, watchDB local.DB,
55+
token *types.TokenInfo) error {
56+
5757
tokenData := mapstr.MapStr{
58-
common.BKTokenField: token,
58+
common.BKTokenField: token,
59+
common.BKStartAtTimeField: token.StartAtTime,
5960
}
60-
return t.setWatchTokenInfo(ctx, tokenData)
61-
}
62-
63-
// GetStartWatchToken get event start watch token
64-
func (t *tokenHandler) GetStartWatchToken(ctx context.Context) (string, error) {
65-
info, err := t.getWatchTokenInfo(ctx, common.BKTokenField)
66-
if err != nil {
67-
return "", err
61+
filter := map[string]interface{}{
62+
"resource": watch.GenDBWatchTokenID(uuid, string(t.resource)),
6863
}
6964

70-
return info.Token, nil
65+
if err := watchDB.Table(tokenTable).Upsert(ctx, filter, tokenData); err != nil {
66+
blog.Errorf("set %s watch token info failed, data: %+v, err: %v", t.resource, tokenData, err)
67+
return err
68+
}
69+
return nil
7170
}
7271

73-
// resetWatchToken reset watch token and start watch time
74-
func (t *tokenHandler) resetWatchToken(startAtTime types.TimeStamp) error {
72+
// GetStartWatchToken get event start watch token
73+
func (t *tokenHandler) GetStartWatchToken(ctx context.Context, uuid string, watchDB local.DB) (*types.TokenInfo,
74+
error) {
75+
7576
filter := map[string]interface{}{
76-
"resource": t.resource,
77-
}
78-
data := mapstr.MapStr{
79-
common.BKCursorField: make(map[watch.CursorType]string),
80-
common.BKTokenField: "",
81-
common.BKStartAtTimeField: startAtTime,
77+
"resource": watch.GenDBWatchTokenID(uuid, string(t.resource)),
8278
}
8379

84-
if err := mongodb.Client("watch").Table(tokenTable).Upsert(context.Background(), filter, data); err != nil {
85-
blog.Errorf("reset %s watch token failed, data: %+v, err: %v", t.resource, data, err)
86-
return err
80+
info := new(types.TokenInfo)
81+
if err := watchDB.Table(tokenTable).Find(filter).One(ctx, &info); err != nil {
82+
if mongodb.IsNotFoundError(err) {
83+
return &types.TokenInfo{Token: "", StartAtTime: &types.TimeStamp{Sec: uint32(time.Now().Unix())}}, nil
84+
}
85+
blog.Errorf("get %s event watch token info failed, err: %v", t.resource, err)
86+
return nil, err
8787
}
88-
return nil
88+
89+
return info, nil
90+
}
91+
92+
// cursorInfo is cmdb data syncer event token info
93+
type cursorInfo struct {
94+
Resource synctypes.ResType `bson:"resource"`
95+
Cursor map[watch.CursorType]string `bson:"cursor"`
96+
StartAtTime *metadata.Time `bson:"start_at_time"`
8997
}
9098

91-
// getWatchTokenInfo get event watch token info
92-
func (t *tokenHandler) getWatchTokenInfo(ctx context.Context, fields ...string) (*tokenInfo, error) {
99+
// getWatchCursorInfo get event watch token info
100+
func (t *tokenHandler) getWatchCursorInfo(kit *rest.Kit) (*cursorInfo, error) {
93101
filter := map[string]interface{}{
94102
"resource": t.resource,
95103
}
96104

97-
info := new(tokenInfo)
98-
if err := mongodb.Client("watch").Table(tokenTable).Find(filter).Fields(fields...).One(ctx, &info); err != nil {
99-
if mongodb.Client("watch").IsNotFoundError(err) {
100-
return new(tokenInfo), nil
105+
info := new(cursorInfo)
106+
err := mongodb.Dal("watch").Shard(kit.ShardOpts()).Table(cursorTable).Find(filter).One(kit.Ctx, &info)
107+
if err != nil {
108+
if mongodb.IsNotFoundError(err) {
109+
return new(cursorInfo), nil
101110
}
102-
blog.Errorf("get %s event watch token info failed, err: %v", t.resource, err)
111+
blog.Errorf("get %s event watch token info failed, err: %v, rid: %s", t.resource, err, kit.Rid)
103112
return nil, err
104113
}
105114

@@ -110,14 +119,14 @@ func (t *tokenHandler) getWatchTokenInfo(ctx context.Context, fields ...string)
110119
return info, nil
111120
}
112121

113-
// getWatchTokenInfo get event watch token info
114-
func (t *tokenHandler) setWatchTokenInfo(ctx context.Context, data mapstr.MapStr) error {
122+
// setWatchCursorInfo get event watch token info
123+
func (t *tokenHandler) setWatchCursorInfo(kit *rest.Kit, data mapstr.MapStr) error {
115124
filter := map[string]interface{}{
116125
"resource": t.resource,
117126
}
118127

119-
if err := mongodb.Client("watch").Table(tokenTable).Upsert(ctx, filter, data); err != nil {
120-
blog.Errorf("set %s watch token info failed, data: %+v, err: %v", t.resource, data, err)
128+
if err := mongodb.Dal("watch").Shard(kit.ShardOpts()).Table(cursorTable).Upsert(kit.Ctx, filter, data); err != nil {
129+
blog.Errorf("set %s watch token info failed, data: %+v, err: %v, rid: %s", t.resource, data, err, kit.Rid)
121130
return err
122131
}
123132

src/source_controller/transfer-service/sync/watch/watch.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,19 @@ import (
2727
"configcenter/src/apimachinery/discovery"
2828
"configcenter/src/common"
2929
"configcenter/src/common/blog"
30+
"configcenter/src/common/http/rest"
3031
"configcenter/src/common/metadata"
3132
"configcenter/src/common/util"
3233
"configcenter/src/source_controller/transfer-service/sync/medium"
3334
syncmeta "configcenter/src/source_controller/transfer-service/sync/metadata"
3435
"configcenter/src/storage/driver/mongodb"
35-
"configcenter/src/storage/stream"
36+
"configcenter/src/storage/stream/task"
3637
)
3738

3839
// Watcher is cmdb data syncer event watcher
3940
type Watcher struct {
4041
name string
41-
loopW stream.LoopInterface
42+
task *task.Task
4243
isMaster discovery.ServiceManageInterface
4344
metadata *syncmeta.Metadata
4445
cacheCli cacheservice.CacheServiceClientInterface
@@ -47,7 +48,7 @@ type Watcher struct {
4748
}
4849

4950
// New new cmdb data syncer event watcher
50-
func New(name string, loopW stream.LoopInterface, isMaster discovery.ServiceManageInterface, meta *syncmeta.Metadata,
51+
func New(name string, task *task.Task, isMaster discovery.ServiceManageInterface, meta *syncmeta.Metadata,
5152
cacheCli cacheservice.CacheServiceClientInterface, transMedium medium.ClientI) (*Watcher, error) {
5253

5354
// create cmdb data syncer event watch token table
@@ -60,19 +61,19 @@ func New(name string, loopW stream.LoopInterface, isMaster discovery.ServiceMana
6061

6162
if !exists {
6263
err = mongodb.Client("watch").CreateTable(ctx, tokenTable)
63-
if err != nil && !mongodb.Client("watch").IsDuplicatedError(err) {
64+
if err != nil && !mongodb.IsDuplicatedError(err) {
6465
blog.Errorf("create %s table failed, err: %v", tokenTable, err)
6566
return nil, err
6667
}
6768

6869
for _, resType := range types.ListAllResTypeForIncrSync() {
69-
token := &tokenInfo{
70+
token := &cursorInfo{
7071
Resource: resType,
7172
StartAtTime: &metadata.Time{Time: time.Now()},
7273
}
7374

7475
err = mongodb.Client("watch").Table(tokenTable).Insert(ctx, token)
75-
if err != nil && !mongodb.Client("watch").IsDuplicatedError(err) {
76+
if err != nil && !mongodb.IsDuplicatedError(err) {
7677
blog.Errorf("init %s watch token failed, data: %+v, err: %v", resType, token, err)
7778
return nil, err
7879
}
@@ -82,7 +83,7 @@ func New(name string, loopW stream.LoopInterface, isMaster discovery.ServiceMana
8283
// generate cmdb data syncer event watcher
8384
watcher := &Watcher{
8485
name: name,
85-
loopW: loopW,
86+
task: task,
8687
isMaster: isMaster,
8788
metadata: meta,
8889
cacheCli: cacheCli,
@@ -103,7 +104,8 @@ func (w *Watcher) Watch() error {
103104
cursorTypes, exists := resTypeCursorMap[resType]
104105
if exists {
105106
for _, cursorType := range cursorTypes {
106-
go w.watchAPI(resType, cursorType)
107+
kit := rest.NewKit()
108+
go w.watchAPI(kit, resType, cursorType)
107109
}
108110
continue
109111
}

0 commit comments

Comments
 (0)