Skip to content

Commit cace75b

Browse files
chore: ci & review bugfix
--story=120702860
1 parent c05bbd0 commit cace75b

File tree

31 files changed

+400
-316
lines changed

31 files changed

+400
-316
lines changed

pkg/tenant/event.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ import (
2626
var (
2727
prevTenantInfo = make(map[string]types.Tenant)
2828
tenantEventChannels = make(map[string]chan TenantEvent)
29-
tenantEventChLock sync.RWMutex
29+
tenantEventChLock sync.Mutex
3030
)
3131

3232
// TenantEvent is the tenant event info
3333
type TenantEvent struct {
3434
EventType EventType
35-
TenantID string
35+
Tenant types.Tenant
3636
}
3737

3838
// EventType is the tenant event type
@@ -54,14 +54,14 @@ func NewTenantEventChan(name string) <-chan TenantEvent {
5454
return ch
5555
}
5656

57-
eventChan := make(chan TenantEvent, 1)
57+
eventChan := make(chan TenantEvent)
5858
tenantEventChannels[name] = eventChan
5959
go func() {
6060
for _, tenant := range allTenants {
6161
if tenant.Status == types.EnabledStatus {
6262
eventChan <- TenantEvent{
6363
EventType: Create,
64-
TenantID: tenant.TenantID,
64+
Tenant: tenant,
6565
}
6666
}
6767
}
@@ -85,8 +85,8 @@ func RemoveTenantEventChan(name string) {
8585

8686
// generateAndPushTenantEvent compare the tenant with the previous tenant info to generate and push event
8787
func generateAndPushTenantEvent(tenants []types.Tenant) {
88-
tenantEventChLock.RLock()
89-
defer tenantEventChLock.RUnlock()
88+
tenantEventChLock.Lock()
89+
defer tenantEventChLock.Unlock()
9090

9191
prevTenantMap := make(map[string]types.Tenant)
9292

@@ -95,11 +95,15 @@ func generateAndPushTenantEvent(tenants []types.Tenant) {
9595
prevTenantMap[tenantID] = tenant
9696

9797
prevTenant, exists := prevTenantInfo[tenantID]
98-
if !exists && tenant.Status == types.EnabledStatus {
98+
if !exists {
99+
if tenant.Status != types.EnabledStatus {
100+
continue
101+
}
102+
99103
for _, eventChan := range tenantEventChannels {
100104
eventChan <- TenantEvent{
101105
EventType: Create,
102-
TenantID: tenantID,
106+
Tenant: tenant,
103107
}
104108
}
105109
continue
@@ -113,19 +117,19 @@ func generateAndPushTenantEvent(tenants []types.Tenant) {
113117
for _, eventChan := range tenantEventChannels {
114118
eventChan <- TenantEvent{
115119
EventType: eventType,
116-
TenantID: tenantID,
120+
Tenant: tenant,
117121
}
118122
}
119123
}
120124

121125
delete(prevTenantInfo, tenantID)
122126
}
123127

124-
for tenantID := range prevTenantInfo {
128+
for _, tenant := range prevTenantInfo {
125129
for _, eventChan := range tenantEventChannels {
126130
eventChan <- TenantEvent{
127131
EventType: Delete,
128-
TenantID: tenantID,
132+
Tenant: tenant,
129133
}
130134
}
131135
}

pkg/tenant/tenant.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ func SetTenant(tenant []types.Tenant) {
9999
for _, t := range allTenants {
100100
tenantMap[t.TenantID] = &t
101101
}
102-
generateAndPushTenantEvent(allTenants)
103102
lock.Unlock()
103+
generateAndPushTenantEvent(allTenants)
104104
}
105105

106106
func refreshTenantInfo() error {

src/apimachinery/refresh/api.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ func (r *refresh) RefreshTenant(moduleName string) ([]types.Tenant, error) {
3434

3535
case commontypes.CC_MODULE_APISERVER:
3636
r.capability.Discover = r.disc.ApiServer()
37-
3837
case commontypes.CC_MODULE_TASK:
3938
r.capability.Discover = r.disc.TaskServer()
40-
39+
case commontypes.CC_MODULE_CACHESERVICE:
40+
r.capability.Discover = r.disc.CacheService()
4141
default:
4242
return nil, fmt.Errorf("unsupported refresh module: %s", moduleName)
4343
}

src/common/tablenames.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ const (
6363
BKTableNameHistory = "History"
6464
BKTableNameHostFavorite = "HostFavourite"
6565
BKTableNameAuditLog = "AuditLog"
66-
BKTableNamePlatformAuditLog = "PlatformAuditLog"
6766
BKTableNameUserAPI = "UserAPI"
6867
BKTableNameDynamicGroup = "DynamicGroup"
6968
BKTableNameUserCustom = "UserCustom"
@@ -288,12 +287,14 @@ var platformTableMap = map[string]struct{}{
288287
BKTableNameIDgenerator: {},
289288
BKTableNameTenant: {},
290289
BKTableNameTenantTemplate: {},
291-
BKTableNamePlatformAuditLog: {},
292290
BKTableNameWatchToken: {},
293291
BKTableNameAPITask: {},
294292
BKTableNameAPITaskSyncHistory: {},
295293
BKTableNameWatchDBRelation: {},
296294
BKTableNameFullSyncCond: {},
295+
BKTableNameCacheWatchToken: {},
296+
"SrcSyncDataToken": {},
297+
"SrcSyncDataCursor": {},
297298
}
298299

299300
// IsPlatformTable returns if the target table is a platform table

src/scene_server/admin_server/logics/tenant.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@
1818
package logics
1919

2020
import (
21+
"context"
2122
"fmt"
2223

2324
"configcenter/pkg/tenant"
24-
"configcenter/pkg/tenant/types"
2525
"configcenter/src/apimachinery"
2626
"configcenter/src/common/blog"
2727
"configcenter/src/common/http/rest"
2828
commontypes "configcenter/src/common/types"
29+
"configcenter/src/storage/dal"
2930
"configcenter/src/storage/dal/mongo/local"
31+
"configcenter/src/storage/dal/mongo/sharding"
3032
)
3133

3234
// NewTenantInterface get new tenant cli interface
@@ -52,19 +54,24 @@ func GetNewTenantCli(kit *rest.Kit, cli interface{}) (local.DB, string, error) {
5254
}
5355

5456
// RefreshTenants refresh tenant info, skip tenant verify for apiserver
55-
func RefreshTenants(coreAPI apimachinery.ClientSetInterface) error {
57+
func RefreshTenants(coreAPI apimachinery.ClientSetInterface, db dal.Dal) error {
58+
tenants, err := tenant.GetAllTenantsFromDB(context.Background(),
59+
db.Shard(sharding.NewShardOpts().WithIgnoreTenant()))
60+
if err != nil {
61+
blog.Errorf("get all tenants failed, err: %v", err)
62+
return err
63+
}
64+
tenant.SetTenant(tenants)
5665

57-
var tenants []types.Tenant
58-
var err error
59-
needRefreshServer := []string{commontypes.CC_MODULE_APISERVER, commontypes.CC_MODULE_TASK}
66+
needRefreshServer := []string{commontypes.CC_MODULE_APISERVER, commontypes.CC_MODULE_TASK,
67+
commontypes.CC_MODULE_CACHESERVICE}
6068
for _, module := range needRefreshServer {
61-
tenants, err = coreAPI.Refresh().RefreshTenant(module)
69+
_, err = coreAPI.Refresh().RefreshTenant(module)
6270
if err != nil {
6371
blog.Errorf("refresh tenant info failed, module: %s, err: %v", module, err)
6472
return err
6573
}
6674
}
6775

68-
tenant.SetTenant(tenants)
6976
return nil
7077
}

src/scene_server/admin_server/service/migrate.go

+16-14
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ func (s *Service) migrateDatabase(req *restful.Request, resp *restful.Response)
8686
return
8787
}
8888

89+
// refresh tenants, ignore refresh tenants error
90+
if err = logics.RefreshTenants(s.CoreAPI, s.db); err != nil {
91+
blog.Errorf("refresh tenant failed, err: %v", err)
92+
}
93+
8994
if err = s.createWatchDBChainCollections(kit); err != nil {
9095
blog.Errorf("create watch db chain collections failed, err: %v", err)
9196
resp.WriteError(http.StatusInternalServerError, &metadata.RespError{
@@ -94,11 +99,6 @@ func (s *Service) migrateDatabase(req *restful.Request, resp *restful.Response)
9499
return
95100
}
96101

97-
// refresh tenants, ignore refresh tenants error
98-
if err = logics.RefreshTenants(s.CoreAPI); err != nil {
99-
blog.Errorf("refresh tenant failed, err: %v", err)
100-
}
101-
102102
resp.WriteEntity(metadata.NewSuccessResp(result))
103103
}
104104

@@ -126,14 +126,12 @@ func (s *Service) createWatchDBChainCollections(kit *rest.Kit) error {
126126
}
127127

128128
err = tenant.ExecForAllTenants(func(tenantID string) error {
129-
// TODO 在新增租户初始化时同时增加watch相关表,并刷新cache的tenant
130129
return s.addTenantWatchToken(kit.NewKit().WithTenant(tenantID), cursorType, key)
131130
})
132131
if err != nil {
133132
return err
134133
}
135134

136-
// TODO 在新增DB时同时增加db relation和token数据
137135
err = s.createWatchTokenForEventKey(kit, key, watchDBToDBRelation)
138136
if err != nil {
139137
return err
@@ -321,16 +319,20 @@ func (s *Service) createWatchIndexes(kit *rest.Kit, cursorType watch.CursorType,
321319
existIdxMap[index.Name] = true
322320
}
323321

322+
createIndexes := make([]daltypes.Index, 0)
324323
for _, index := range indexes {
325-
if _, exist := existIdxMap[index.Name]; exist {
326-
continue
324+
if _, exist := existIdxMap[index.Name]; !exist {
325+
createIndexes = append(createIndexes, index)
327326
}
327+
}
328+
if len(createIndexes) == 0 {
329+
return nil
330+
}
328331

329-
err = s.watchDB.Shard(kit.ShardOpts()).Table(key.ChainCollection()).CreateIndex(s.ctx, index)
330-
if err != nil && !mongodb.IsDuplicatedError(err) {
331-
blog.Errorf("create indexes for table %s failed, err: %v, rid: %s", key.ChainCollection(), err, kit.Rid)
332-
return err
333-
}
332+
err = s.watchDB.Shard(kit.ShardOpts()).Table(key.ChainCollection()).BatchCreateIndexes(s.ctx, createIndexes)
333+
if err != nil && !mongodb.IsDuplicatedError(err) {
334+
blog.Errorf("create indexes for table %s failed, err: %v, rid: %s", key.ChainCollection(), err, kit.Rid)
335+
return err
334336
}
335337
return nil
336338
}

src/scene_server/admin_server/service/sharding.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func (s *Service) genUpdatedShardingDBConf(kit *rest.Kit, dbConf *sharding.Shard
262262
return dbConf, nil
263263
}
264264

265-
blog.Errorf("add new tenant db %s is invalid, rid: %s", conf.ForNewData, kit.Rid)
265+
blog.Errorf("for new data db %s is invalid, rid: %s", conf.ForNewData, kit.Rid)
266266
return nil, kit.CCError.CCErrorf(common.CCErrCommParamsInvalid, "for_new_data")
267267
}
268268
return dbConf, nil
@@ -292,7 +292,7 @@ func (s *Service) genDBSlaveConf(kit *rest.Kit, name string, disabled bool, conf
292292
}
293293

294294
func (s *Service) saveUpdateShardingDBAudit(kit *rest.Kit, preConf, curConf *sharding.ShardingDBConf) error {
295-
id, err := s.db.Shard(kit.SysShardOpts()).NextSequence(kit.Ctx, common.BKTableNamePlatformAuditLog)
295+
id, err := s.db.Shard(kit.SysShardOpts()).NextSequence(kit.Ctx, common.BKTableNameAuditLog)
296296
if err != nil {
297297
blog.Errorf("generate next audit log id failed, err: %v, rid: %s", err, kit.Rid)
298298
return err

0 commit comments

Comments
 (0)