Skip to content

Commit d7e7d87

Browse files
chore: ci & review bugfix
--story=120702860
1 parent 26ae2a5 commit d7e7d87

File tree

21 files changed

+306
-251
lines changed

21 files changed

+306
-251
lines changed

pkg/tenant/event.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ import (
2626
var (
2727
prevTenantInfo = make(map[string]types.Tenant)
2828
tenantEventChannels = make(map[string]chan TenantEvent)
29-
tenantEventChLock sync.RWMutex
29+
tenantEventChLock sync.Mutex
3030
)
3131

3232
// TenantEvent is the tenant event info
3333
type TenantEvent struct {
3434
EventType EventType
35-
TenantID string
35+
Tenant types.Tenant
3636
}
3737

3838
// EventType is the tenant event type
@@ -54,14 +54,14 @@ func NewTenantEventChan(name string) <-chan TenantEvent {
5454
return ch
5555
}
5656

57-
eventChan := make(chan TenantEvent, 1)
57+
eventChan := make(chan TenantEvent)
5858
tenantEventChannels[name] = eventChan
5959
go func() {
6060
for _, tenant := range allTenants {
6161
if tenant.Status == types.EnabledStatus {
6262
eventChan <- TenantEvent{
6363
EventType: Create,
64-
TenantID: tenant.TenantID,
64+
Tenant: tenant,
6565
}
6666
}
6767
}
@@ -85,8 +85,8 @@ func RemoveTenantEventChan(name string) {
8585

8686
// generateAndPushTenantEvent compare the tenant with the previous tenant info to generate and push event
8787
func generateAndPushTenantEvent(tenants []types.Tenant) {
88-
tenantEventChLock.RLock()
89-
defer tenantEventChLock.RUnlock()
88+
tenantEventChLock.Lock()
89+
defer tenantEventChLock.Unlock()
9090

9191
prevTenantMap := make(map[string]types.Tenant)
9292

@@ -99,7 +99,7 @@ func generateAndPushTenantEvent(tenants []types.Tenant) {
9999
for _, eventChan := range tenantEventChannels {
100100
eventChan <- TenantEvent{
101101
EventType: Create,
102-
TenantID: tenantID,
102+
Tenant: tenant,
103103
}
104104
}
105105
continue
@@ -113,19 +113,19 @@ func generateAndPushTenantEvent(tenants []types.Tenant) {
113113
for _, eventChan := range tenantEventChannels {
114114
eventChan <- TenantEvent{
115115
EventType: eventType,
116-
TenantID: tenantID,
116+
Tenant: tenant,
117117
}
118118
}
119119
}
120120

121121
delete(prevTenantInfo, tenantID)
122122
}
123123

124-
for tenantID := range prevTenantInfo {
124+
for _, tenant := range prevTenantInfo {
125125
for _, eventChan := range tenantEventChannels {
126126
eventChan <- TenantEvent{
127127
EventType: Delete,
128-
TenantID: tenantID,
128+
Tenant: tenant,
129129
}
130130
}
131131
}

pkg/tenant/tenant.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ func SetTenant(tenant []types.Tenant) {
9999
for _, t := range allTenants {
100100
tenantMap[t.TenantID] = &t
101101
}
102-
generateAndPushTenantEvent(allTenants)
103102
lock.Unlock()
103+
generateAndPushTenantEvent(allTenants)
104104
}
105105

106106
func refreshTenantInfo() error {

src/apimachinery/refresh/api.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ func (r *refresh) RefreshTenant(moduleName string) ([]types.Tenant, error) {
3434

3535
case commontypes.CC_MODULE_APISERVER:
3636
r.capability.Discover = r.disc.ApiServer()
37-
3837
case commontypes.CC_MODULE_TASK:
3938
r.capability.Discover = r.disc.TaskServer()
40-
39+
case commontypes.CC_MODULE_CACHESERVICE:
40+
r.capability.Discover = r.disc.CacheService()
4141
default:
4242
return nil, fmt.Errorf("unsupported refresh module: %s", moduleName)
4343
}

src/scene_server/admin_server/logics/tenant.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@
1818
package logics
1919

2020
import (
21+
"context"
2122
"fmt"
2223

2324
"configcenter/pkg/tenant"
24-
"configcenter/pkg/tenant/types"
2525
"configcenter/src/apimachinery"
2626
"configcenter/src/common/blog"
2727
"configcenter/src/common/http/rest"
2828
commontypes "configcenter/src/common/types"
29+
"configcenter/src/storage/dal"
2930
"configcenter/src/storage/dal/mongo/local"
31+
"configcenter/src/storage/dal/mongo/sharding"
3032
)
3133

3234
// NewTenantInterface get new tenant cli interface
@@ -52,19 +54,24 @@ func GetNewTenantCli(kit *rest.Kit, cli interface{}) (local.DB, string, error) {
5254
}
5355

5456
// RefreshTenants refresh tenant info, skip tenant verify for apiserver
55-
func RefreshTenants(coreAPI apimachinery.ClientSetInterface) error {
57+
func RefreshTenants(coreAPI apimachinery.ClientSetInterface, db dal.Dal) error {
58+
tenants, err := tenant.GetAllTenantsFromDB(context.Background(),
59+
db.Shard(sharding.NewShardOpts().WithIgnoreTenant()))
60+
if err != nil {
61+
blog.Errorf("get all tenants failed, err: %v", err)
62+
return err
63+
}
64+
tenant.SetTenant(tenants)
5665

57-
var tenants []types.Tenant
58-
var err error
59-
needRefreshServer := []string{commontypes.CC_MODULE_APISERVER, commontypes.CC_MODULE_TASK}
66+
needRefreshServer := []string{commontypes.CC_MODULE_APISERVER, commontypes.CC_MODULE_TASK,
67+
commontypes.CC_MODULE_CACHESERVICE}
6068
for _, module := range needRefreshServer {
61-
tenants, err = coreAPI.Refresh().RefreshTenant(module)
69+
_, err = coreAPI.Refresh().RefreshTenant(module)
6270
if err != nil {
6371
blog.Errorf("refresh tenant info failed, module: %s, err: %v", module, err)
6472
return err
6573
}
6674
}
6775

68-
tenant.SetTenant(tenants)
6976
return nil
7077
}

src/scene_server/admin_server/service/migrate.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,9 @@ func (s *Service) migrateDatabase(req *restful.Request, resp *restful.Response)
8686
return
8787
}
8888

89-
if err = logics.RefreshTenants(s.CoreAPI); err != nil {
89+
// refresh tenants, ignore refresh tenants error
90+
if err = logics.RefreshTenants(s.CoreAPI, s.db); err != nil {
9091
blog.Errorf("refresh tenant failed, err: %v", err)
91-
result := &metadata.RespError{
92-
Msg: defErr.Errorf(common.CCErrCommMigrateFailed, err.Error()),
93-
}
94-
resp.WriteError(http.StatusInternalServerError, result)
9592
}
9693

9794
if err = s.createWatchDBChainCollections(kit); err != nil {
@@ -129,14 +126,12 @@ func (s *Service) createWatchDBChainCollections(kit *rest.Kit) error {
129126
}
130127

131128
err = tenant.ExecForAllTenants(func(tenantID string) error {
132-
// TODO 在新增租户初始化时同时增加watch相关表,并刷新cache的tenant
133129
return s.addTenantWatchToken(kit.NewKit().WithTenant(tenantID), cursorType, key)
134130
})
135131
if err != nil {
136132
return err
137133
}
138134

139-
// TODO 在新增DB时同时增加db relation和token数据
140135
err = s.createWatchTokenForEventKey(kit, key, watchDBToDBRelation)
141136
if err != nil {
142137
return err

src/scene_server/admin_server/service/tenant.go

+28-54
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
tenanttmp "configcenter/pkg/types/tenant-template"
2828
"configcenter/src/common"
2929
"configcenter/src/common/blog"
30-
httpheader "configcenter/src/common/http/header"
3130
"configcenter/src/common/http/rest"
3231
"configcenter/src/common/index"
3332
"configcenter/src/common/metadata"
@@ -43,28 +42,22 @@ import (
4342
)
4443

4544
func (s *Service) addTenant(req *restful.Request, resp *restful.Response) {
46-
rHeader := req.Request.Header
47-
defErr := s.CCErr.CreateDefaultCCErrorIf(httpheader.GetLanguage(rHeader))
48-
kit := rest.NewKitFromHeader(rHeader, s.CCErr)
45+
kit := rest.NewKitFromHeader(req.Request.Header, s.CCErr)
4946

5047
if !s.Config.EnableMultiTenantMode {
5148
blog.Errorf("multi-tenant mode is not enabled, cannot add tenant, rid: %s", kit.Rid)
52-
result := &metadata.RespError{
53-
Msg: defErr.Errorf(common.CCErrCommAddTenantErr,
54-
fmt.Errorf("multi-tenant mode is not enabled, cannot add tenant")),
55-
}
56-
resp.WriteError(http.StatusInternalServerError, result)
49+
resp.WriteError(http.StatusInternalServerError, &metadata.RespError{
50+
Msg: kit.CCError.New(common.CCErrCommAddTenantErr, "multi-tenant mode is not enabled, cannot add tenant")})
5751
return
5852
}
5953

6054
_, exist := tenant.GetTenant(kit.TenantID)
6155
if exist {
62-
// add watch token for new tenant
63-
// TODO 如果租户已经存在的情况下也调一下,防止之前新增租户了但是这个失败了
56+
// add watch token for new tenant in case tenant is created without watch tokens
6457
if err := s.addWatchTokenForNewTenant(kit); err != nil {
6558
blog.Errorf("add watch token for new tenant %s failed, err: %v, rid: %s", kit.TenantID, err, kit.Rid)
66-
result := &metadata.RespError{Msg: defErr.Errorf(common.CCErrCommAddTenantErr, err.Error())}
67-
resp.WriteError(http.StatusInternalServerError, result)
59+
resp.WriteError(http.StatusInternalServerError, &metadata.RespError{
60+
Msg: kit.CCError.New(common.CCErrCommAddTenantErr, err.Error())})
6861
return
6962
}
7063
resp.WriteEntity(metadata.NewSuccessResp("tenant exist"))
@@ -76,10 +69,8 @@ func (s *Service) addTenant(req *restful.Request, resp *restful.Response) {
7669
tenants, err := apigwcli.Client().User().GetTenants(kit.Ctx, kit.Header)
7770
if err != nil {
7871
blog.Errorf("get tenants from bk-user failed, err: %v, rid: %s", err, kit.Rid)
79-
result := &metadata.RespError{
80-
Msg: defErr.Errorf(common.CCErrCommAddTenantErr, fmt.Errorf("get tenants from bk-user failed")),
81-
}
82-
resp.WriteError(http.StatusInternalServerError, result)
72+
resp.WriteError(http.StatusInternalServerError, &metadata.RespError{
73+
Msg: kit.CCError.New(common.CCErrCommAddTenantErr, "get tenants from bk-user failed")})
8374
}
8475

8576
tenantMap := make(map[string]user.Status)
@@ -89,51 +80,42 @@ func (s *Service) addTenant(req *restful.Request, resp *restful.Response) {
8980

9081
if status, ok := tenantMap[kit.TenantID]; !ok || status != user.EnabledStatus {
9182
blog.Errorf("tenant %s invalid, rid: %s", kit.TenantID, kit.Rid)
92-
result := &metadata.RespError{
93-
Msg: defErr.Errorf(common.CCErrCommAddTenantErr,
94-
fmt.Errorf("tenant %s invalid", kit.TenantID)),
95-
}
96-
resp.WriteError(http.StatusInternalServerError, result)
83+
resp.WriteError(http.StatusInternalServerError, &metadata.RespError{
84+
Msg: kit.CCError.Errorf(common.CCErrCommAddTenantErr, fmt.Sprintf("tenant %s invalid", kit.TenantID))})
9785
return
9886
}
9987
}
10088

89+
if err := s.addTenantData(kit); err != nil {
90+
resp.WriteError(http.StatusInternalServerError, &metadata.RespError{
91+
Msg: kit.CCError.New(common.CCErrCommAddTenantErr, err.Error())})
92+
return
93+
}
94+
95+
resp.WriteEntity(metadata.NewSuccessResp("add tenant success"))
96+
}
97+
98+
func (s *Service) addTenantData(kit *rest.Kit) error {
10199
cli, dbUUID, err := logics.GetNewTenantCli(kit, mongodb.Dal())
102100
if err != nil {
103101
blog.Errorf("get new tenant db failed, err: %v, rid: %s", err, kit.Rid)
104-
result := &metadata.RespError{
105-
Msg: defErr.Errorf(common.CCErrCommAddTenantErr, fmt.Errorf("get new tenant db failed")),
106-
}
107-
resp.WriteError(http.StatusInternalServerError, result)
108-
return
102+
return err
109103
}
110104

111105
if err = addTableIndexes(kit, cli); err != nil {
112106
blog.Errorf("create table and indexes for tenant %s failed, err: %v, rid: %s", kit.TenantID, err, kit.Rid)
113-
result := &metadata.RespError{
114-
Msg: defErr.Errorf(common.CCErrCommAddTenantErr, err.Error()),
115-
}
116-
resp.WriteError(http.StatusInternalServerError, result)
117-
return
107+
return err
118108
}
119109

120110
// add default area
121111
if err = addDefaultArea(kit, cli); err != nil {
122112
blog.Errorf("add default area failed, err: %v, rid: %s", err, kit.Rid)
123-
result := &metadata.RespError{
124-
Msg: defErr.Errorf(common.CCErrCommAddTenantErr, err.Error()),
125-
}
126-
resp.WriteError(http.StatusInternalServerError, result)
127-
return
113+
return err
128114
}
129115

130116
if err = addDataFromTemplate(kit, cli); err != nil {
131117
blog.Errorf("create init data for tenant %s failed, err: %v", kit.TenantID, err)
132-
result := &metadata.RespError{
133-
Msg: defErr.Errorf(common.CCErrCommAddTenantErr, err.Error()),
134-
}
135-
resp.WriteError(http.StatusInternalServerError, result)
136-
return
118+
return err
137119
}
138120

139121
// add tenant db relation
@@ -145,28 +127,20 @@ func (s *Service) addTenant(req *restful.Request, resp *restful.Response) {
145127
err = mongodb.Shard(kit.SysShardOpts()).Table(common.BKTableNameTenant).Insert(kit.Ctx, data)
146128
if err != nil {
147129
blog.Errorf("add tenant db relations failed, err: %v, rid: %s", err, kit.Rid)
148-
result := &metadata.RespError{
149-
Msg: defErr.Errorf(common.CCErrCommAddTenantErr, err.Error()),
150-
}
151-
resp.WriteError(http.StatusInternalServerError, result)
152-
return
130+
return err
153131
}
154132

155133
// refresh tenants, ignore refresh tenants error
156-
if err = logics.RefreshTenants(s.CoreAPI); err != nil {
134+
if err = logics.RefreshTenants(s.CoreAPI, s.db); err != nil {
157135
blog.Errorf("refresh tenants failed, err: %v, rid: %s", err, kit.Rid)
158136
}
159137

160138
// add watch token for new tenant
161-
// TODO 如果租户已经存在的情况下也调一下,防止之前新增租户了但是这个失败了
162139
if err = s.addWatchTokenForNewTenant(kit); err != nil {
163140
blog.Errorf("add watch token for new tenant %s failed, err: %v, rid: %s", kit.TenantID, err, kit.Rid)
164-
result := &metadata.RespError{Msg: defErr.Errorf(common.CCErrCommAddTenantErr, err.Error())}
165-
resp.WriteError(http.StatusInternalServerError, result)
166-
return
141+
return err
167142
}
168-
169-
resp.WriteEntity(metadata.NewSuccessResp("add tenant success"))
143+
return nil
170144
}
171145

172146
func (s *Service) addWatchTokenForNewTenant(kit *rest.Kit) error {

src/scene_server/task_server/service/service.go

-19
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"configcenter/src/common/webservice/restfulservice"
3636
"configcenter/src/scene_server/task_server/app/options"
3737
"configcenter/src/scene_server/task_server/logics"
38-
"configcenter/src/storage/dal/mongo/sharding"
3938
"configcenter/src/storage/dal/redis"
4039
"configcenter/src/storage/driver/mongodb"
4140
"configcenter/src/thirdparty/logplatform/opentelemetry"
@@ -175,24 +174,6 @@ func (s *Service) RefreshTenants(req *restful.Request, resp *restful.Response) {
175174
}
176175

177176
tenant.SetTenant(tenants)
178-
// refresh tenant db map
179-
shardingMongoManager, ok := mongodb.Dal().(*sharding.ShardingMongoManager)
180-
if !ok {
181-
blog.Errorf("convert to ShardingMongoManager failed, err: %v, rid: %s", err, kit.Rid)
182-
result := &metadata.RespError{
183-
Msg: defErr.Errorf(common.CCErrCommRefreshTenantErr, fmt.Errorf("get sharding mongo manager failed")),
184-
}
185-
resp.WriteError(http.StatusInternalServerError, result)
186-
}
187-
188-
if err = shardingMongoManager.RefreshTenantDBMap(); err != nil {
189-
blog.Errorf("refresh tenant db map failed, err: %v, rid: %s", err, kit.Rid)
190-
result := &metadata.RespError{
191-
Msg: defErr.Errorf(common.CCErrCommRefreshTenantErr, fmt.Errorf("get sharding mongo manager failed")),
192-
}
193-
resp.WriteError(http.StatusInternalServerError, result)
194-
return
195-
}
196177

197178
resp.WriteEntity(metadata.NewSuccessResp(tenants))
198179
}

src/source_controller/cacheservice/cache/biz-topo/watch/brief.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (w *briefWatcher) watchEvents(cursorType watch.CursorType) error {
7373
case tenant.Create:
7474
loopEventChan <- loop.TenantEvent{
7575
EventType: watch.Create,
76-
TenantID: e.TenantID,
76+
TenantID: e.Tenant.TenantID,
7777
WatchOpts: &watch.WatchEventOptions{
7878
EventTypes: []watch.EventType{watch.Create, watch.Delete},
7979
Fields: []string{common.BKAppIDField},
@@ -83,7 +83,7 @@ func (w *briefWatcher) watchEvents(cursorType watch.CursorType) error {
8383
case tenant.Delete:
8484
loopEventChan <- loop.TenantEvent{
8585
EventType: watch.Delete,
86-
TenantID: e.TenantID,
86+
TenantID: e.Tenant.TenantID,
8787
}
8888
}
8989
}

0 commit comments

Comments
 (0)