Skip to content

Commit d39b8ad

Browse files
feat: watch db support sharding
--story=121164695
1 parent d86048f commit d39b8ad

File tree

11 files changed

+259
-190
lines changed

11 files changed

+259
-190
lines changed

docs/apidoc/inner/admin-server/get_sharding_db_config.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ GET /migrate/v3/find/system/sharding_db_config
1616
"permission": null,
1717
"data": {
1818
"master_db": "masteruuid",
19-
"for_new_tenant": "slave1uuid",
19+
"for_new_data": "slave1uuid",
2020
"slave_db": {
2121
"slave1uuid": {
2222
"name": "slave1",
@@ -53,7 +53,7 @@ GET /migrate/v3/find/system/sharding_db_config
5353
| 参数名称 | 参数类型 | 描述 |
5454
|----------------|-------------------|----------------------------|
5555
| master_db | string | 主库唯一标识 |
56-
| for_new_tenant | string | 指定新增租户数据写入哪个库,存储这个数据库的唯一标识 |
56+
| for_new_data | string | 指定新增租户数据写入哪个库,存储这个数据库的唯一标识 |
5757
| slave_db | map[string]object | 从库唯一标识->从库配置的映射 |
5858

5959
#### data.slave_db[key]

docs/apidoc/inner/admin-server/update_sharding_db_config.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ PUT /migrate/v3/update/system/sharding_db_config
1010

1111
| 参数名称 | 参数类型 | 必选 | 描述 |
1212
|-----------------|-------------------|----|---------------------------------------------|
13-
| for_new_tenant | string || 指定新增租户数据写入哪个库。对于存量数据库指定它的唯一标识。对于新增的从库指定它的名称 |
13+
| for_new_data | string || 指定新增租户数据写入哪个库。对于存量数据库指定它的唯一标识。对于新增的从库指定它的名称 |
1414
| create_slave_db | object array || 新增的从库配置数组 |
1515
| update_slave_db | map[string]object || 更新的从库唯一标识->从库配置的映射 |
1616

@@ -41,7 +41,7 @@ PUT /migrate/v3/update/system/sharding_db_config
4141

4242
```json
4343
{
44-
"for_new_tenant": "slave1uuid",
44+
"for_new_data": "slave1uuid",
4545
"create_slave_db": [
4646
{
4747
"name": "slave2",

src/common/tablenames.go

+4
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ const (
117117

118118
// BKTableNameTenantTemplate is the tenant template(public data that needs to be initialized for all tenants) table
119119
BKTableNameTenantTemplate = "TenantTemplate"
120+
121+
// BKTableNameWatchDBRelation is the db and watch db relation table
122+
BKTableNameWatchDBRelation = "WatchDBRelation"
120123
)
121124

122125
// AllTables is all table names, not include the sharding tables which is created dynamically,
@@ -281,6 +284,7 @@ var platformTableMap = map[string]struct{}{
281284
BKTableNameLastWatchEvent: {},
282285
BKTableNameAPITask: {},
283286
BKTableNameAPITaskSyncHistory: {},
287+
BKTableNameWatchDBRelation: {},
284288
}
285289

286290
// IsPlatformTable returns if the target table is a platform table

src/scene_server/admin_server/app/server.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package app
1616
import (
1717
"context"
1818
"fmt"
19-
"time"
2019

2120
iamcli "configcenter/src/ac/iam"
2221
"configcenter/src/common/auth"
@@ -31,7 +30,6 @@ import (
3130
"configcenter/src/scene_server/admin_server/iam"
3231
"configcenter/src/scene_server/admin_server/logics"
3332
svc "configcenter/src/scene_server/admin_server/service"
34-
"configcenter/src/storage/dal/mongo/sharding"
3533
"configcenter/src/storage/dal/redis"
3634
"configcenter/src/storage/driver/mongodb"
3735
"configcenter/src/thirdparty/monitor"
@@ -64,11 +62,10 @@ func Run(ctx context.Context, cancel context.CancelFunc, op *options.ServerOptio
6462
db := mongodb.Dal()
6563
process.Service.SetDB(db)
6664

67-
watchDB, err := sharding.NewDisableDBShardingMongo(process.Config.WatchDB.GetMongoConf(), time.Minute)
68-
if err != nil {
65+
if err = mongodb.SetWatchCli("", &process.Config.WatchDB, process.Config.Crypto); err != nil {
6966
return fmt.Errorf("connect watch mongo server failed, err: %v", err)
7067
}
71-
process.Service.SetWatchDB(watchDB)
68+
process.Service.SetWatchDB(mongodb.Dal())
7269

7370
cache, err := redis.NewFromConfig(process.Config.Redis)
7471
if err != nil {

src/scene_server/admin_server/service/sharding.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ func (s *Service) initShardingApi(api *restful.WebService) {
4646

4747
// ShardingDBConfig is the sharding db config for api
4848
type ShardingDBConfig struct {
49-
MasterDB string `json:"master_db"`
50-
ForNewTenant string `json:"for_new_tenant"`
51-
SlaveDB map[string]SlaveDBConfig `json:"slave_db"`
49+
MasterDB string `json:"master_db"`
50+
ForNewData string `json:"for_new_data"`
51+
SlaveDB map[string]SlaveDBConfig `json:"slave_db"`
5252
}
5353

5454
// SlaveDBConfig is the slave db config for api
@@ -69,9 +69,9 @@ func (s *Service) GetShardingDBConfig(req *restful.Request, resp *restful.Respon
6969
}
7070

7171
result := &ShardingDBConfig{
72-
MasterDB: conf.MasterDB,
73-
ForNewTenant: conf.ForNewTenant,
74-
SlaveDB: make(map[string]SlaveDBConfig),
72+
MasterDB: conf.MasterDB,
73+
ForNewData: conf.ForNewData,
74+
SlaveDB: make(map[string]SlaveDBConfig),
7575
}
7676

7777
for uuid, mongoConf := range conf.SlaveDB {
@@ -134,7 +134,7 @@ func (s *Service) getShardingDBConf(kit *rest.Kit) (*sharding.ShardingDBConf, er
134134

135135
// UpdateShardingDBReq is the update sharding db config request
136136
type UpdateShardingDBReq struct {
137-
ForNewTenant string `json:"for_new_tenant,omitempty"`
137+
ForNewData string `json:"for_new_data,omitempty"`
138138
CreateSlaveDB []SlaveDBConfig `json:"create_slave_db,omitempty"`
139139
UpdateSlaveDB map[string]UpdateSlaveDBInfo `json:"update_slave_db,omitempty"`
140140
}
@@ -177,8 +177,8 @@ func (s *Service) UpdateShardingDBConfig(req *restful.Request, resp *restful.Res
177177

178178
cond := map[string]any{common.MongoMetaID: common.ShardingDBConfID}
179179
updateData := map[string]any{
180-
"for_new_tenant": updateConf.ForNewTenant,
181-
"slave_db": updateConf.SlaveDB,
180+
"for_new_data": updateConf.ForNewData,
181+
"slave_db": updateConf.SlaveDB,
182182
}
183183
err = s.db.Shard(kit.SysShardOpts()).Table(common.BKTableNameSystem).Update(s.ctx, cond, updateData)
184184
if err != nil {
@@ -247,23 +247,23 @@ func (s *Service) genUpdatedShardingDBConf(kit *rest.Kit, dbConf *sharding.Shard
247247
}
248248

249249
// update new tenant db config, check if the new tenant db config exists
250-
if conf.ForNewTenant != "" {
250+
if conf.ForNewData != "" {
251251
// use uuid to specify the new tenant db config for db that already exists
252-
_, uuidExists := dbConf.SlaveDB[conf.ForNewTenant]
253-
if conf.ForNewTenant == dbConf.MasterDB || uuidExists {
254-
dbConf.ForNewTenant = conf.ForNewTenant
252+
_, uuidExists := dbConf.SlaveDB[conf.ForNewData]
253+
if conf.ForNewData == dbConf.MasterDB || uuidExists {
254+
dbConf.ForNewData = conf.ForNewData
255255
return dbConf, nil
256256
}
257257

258258
// use name to specify the new tenant db config for new db that doesn't have uuid before creation
259-
uuid, nameExists := nameUUIDMap[conf.ForNewTenant]
259+
uuid, nameExists := nameUUIDMap[conf.ForNewData]
260260
if nameExists {
261-
dbConf.ForNewTenant = uuid
261+
dbConf.ForNewData = uuid
262262
return dbConf, nil
263263
}
264264

265-
blog.Errorf("add new tenant db %s is invalid, rid: %s", conf.ForNewTenant, kit.Rid)
266-
return nil, kit.CCError.CCErrorf(common.CCErrCommParamsInvalid, "for_new_tenant")
265+
blog.Errorf("add new tenant db %s is invalid, rid: %s", conf.ForNewData, kit.Rid)
266+
return nil, kit.CCError.CCErrorf(common.CCErrCommParamsInvalid, "for_new_data")
267267
}
268268
return dbConf, nil
269269
}

src/source_controller/cacheservice/cache/cache.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@ import (
2727
"configcenter/src/storage/dal"
2828
"configcenter/src/storage/driver/mongodb"
2929
"configcenter/src/storage/driver/redis"
30-
"configcenter/src/storage/reflector"
3130
"configcenter/src/storage/stream"
3231
)
3332

3433
// NewCache new cache service
35-
func NewCache(reflector reflector.Interface, loopW stream.LoopInterface, isMaster discovery.ServiceManageInterface,
36-
watchDB dal.DB) (*ClientSet, error) {
34+
func NewCache(loopW stream.LoopInterface, isMaster discovery.ServiceManageInterface, watchDB dal.DB) (*ClientSet,
35+
error) {
3736

3837
if err := mainline.NewMainlineCache(loopW); err != nil {
3938
return nil, fmt.Errorf("new business cache failed, err: %v", err)

src/source_controller/cacheservice/service/service.go

+2-15
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"configcenter/src/source_controller/cacheservice/event/identifier"
3838
"configcenter/src/source_controller/coreservice/core"
3939
"configcenter/src/storage/dal/mongo/local"
40-
"configcenter/src/storage/reflector"
4140
"configcenter/src/storage/stream"
4241
"configcenter/src/thirdparty/logplatform/opentelemetry"
4342

@@ -94,37 +93,25 @@ func (s *cacheService) SetConfig(cfg options.Config, engine *backbone.Engine, er
9493
}
9594
s.authManager = extensions.NewAuthManager(engine.CoreAPI, iamCli)
9695

97-
loopW, loopErr := stream.NewLoopStream(s.cfg.Mongo.GetMongoConf(), engine.ServiceManageInterface)
96+
watcher, loopErr := stream.NewLoopStream(s.cfg.Mongo.GetMongoConf(), engine.ServiceManageInterface)
9897
if loopErr != nil {
9998
blog.Errorf("new loop stream failed, err: %v", loopErr)
10099
return loopErr
101100
}
102101

103-
event, eventErr := reflector.NewReflector(s.cfg.Mongo.GetMongoConf())
104-
if eventErr != nil {
105-
blog.Errorf("new reflector failed, err: %v", eventErr)
106-
return eventErr
107-
}
108-
109102
watchDB, dbErr := local.NewMgo(s.cfg.WatchMongo.GetMongoConf(), time.Minute)
110103
if dbErr != nil {
111104
blog.Errorf("new watch mongo client failed, err: %v", dbErr)
112105
return dbErr
113106
}
114107

115-
c, cacheErr := cacheop.NewCache(event, loopW, engine.ServiceManageInterface, watchDB)
108+
c, cacheErr := cacheop.NewCache(watcher, engine.ServiceManageInterface, watchDB)
116109
if cacheErr != nil {
117110
blog.Errorf("new cache instance failed, err: %v", cacheErr)
118111
return cacheErr
119112
}
120113
s.cacheSet = c
121114

122-
watcher, watchErr := stream.NewLoopStream(s.cfg.Mongo.GetMongoConf(), engine.ServiceManageInterface)
123-
if watchErr != nil {
124-
blog.Errorf("new loop watch stream failed, err: %v", watchErr)
125-
return watchErr
126-
}
127-
128115
ccDB, dbErr := local.NewMgo(s.cfg.Mongo.GetMongoConf(), time.Minute)
129116
if dbErr != nil {
130117
blog.Errorf("new cc mongo client failed, err: %v", dbErr)

src/storage/dal/mongo/local/db.go

-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"context"
2222

2323
"configcenter/src/common/metadata"
24-
"configcenter/src/storage/dal/redis"
2524
"configcenter/src/storage/dal/types"
2625
)
2726

@@ -60,8 +59,4 @@ type DB interface {
6059
CommitTransaction(context.Context, *metadata.TxnCapable) error
6160
// AbortTransaction 取消事务
6261
AbortTransaction(context.Context, *metadata.TxnCapable) (bool, error)
63-
64-
// InitTxnManager TxnID management of initial transaction
65-
// TODO 后续放到TenantDB里,只用初始化一次
66-
InitTxnManager(r redis.Client) error
6762
}

0 commit comments

Comments
 (0)