Skip to content

watch调整为按库监听 #8296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: feature-tenant
Choose a base branch
from
4 changes: 2 additions & 2 deletions docs/apidoc/inner/admin-server/get_sharding_db_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ GET /migrate/v3/find/system/sharding_db_config
"permission": null,
"data": {
"master_db": "masteruuid",
"for_new_tenant": "slave1uuid",
"for_new_data": "slave1uuid",
"slave_db": {
"slave1uuid": {
"name": "slave1",
Expand Down Expand Up @@ -53,7 +53,7 @@ GET /migrate/v3/find/system/sharding_db_config
| 参数名称 | 参数类型 | 描述 |
|----------------|-------------------|----------------------------|
| master_db | string | 主库唯一标识 |
| for_new_tenant | string | 指定新增租户数据写入哪个库,存储这个数据库的唯一标识 |
| for_new_data | string | 指定新增租户数据写入哪个库,存储这个数据库的唯一标识 |
| slave_db | map[string]object | 从库唯一标识->从库配置的映射 |

#### data.slave_db[key]
Expand Down
4 changes: 2 additions & 2 deletions docs/apidoc/inner/admin-server/update_sharding_db_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ PUT /migrate/v3/update/system/sharding_db_config

| 参数名称 | 参数类型 | 必选 | 描述 |
|-----------------|-------------------|----|---------------------------------------------|
| for_new_tenant | string | 否 | 指定新增租户数据写入哪个库。对于存量数据库指定它的唯一标识。对于新增的从库指定它的名称 |
| for_new_data | string | 否 | 指定新增租户数据写入哪个库。对于存量数据库指定它的唯一标识。对于新增的从库指定它的名称 |
| create_slave_db | object array | 否 | 新增的从库配置数组 |
| update_slave_db | map[string]object | 否 | 更新的从库唯一标识->从库配置的映射 |

Expand Down Expand Up @@ -41,7 +41,7 @@ PUT /migrate/v3/update/system/sharding_db_config

```json
{
"for_new_tenant": "slave1uuid",
"for_new_data": "slave1uuid",
"create_slave_db": [
{
"name": "slave2",
Expand Down
17 changes: 0 additions & 17 deletions docs/db/other.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,6 @@
| _id | ObjectId | 数据唯一ID |
| host_snap | NumberLong | gse数据入库的stream_to_id |

## cc_DelArchive

#### 作用

用于归档被删除的数据

#### 表结构

| 字段 | 类型 | 描述 |
|-------------|----------|--------|
| _id | ObjectId | 数据唯一ID |
| oid | String | 事件ID |
| coll | String | 所操作的表 |
| detail | Object | 操作数据详情 |
| create_time | ISODate | 创建时间 |
| last_time | ISODate | 最后更新时间 |

## cc_idgenerator

#### 作用
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/full-sync-cond/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

const (
// BKTableNameFullSyncCond is the full synchronization cache condition table
BKTableNameFullSyncCond = "FullSyncCond"
BKTableNameFullSyncCond = common.BKTableNameFullSyncCond
)

// FullSyncCond is the full synchronization cache condition
Expand All @@ -42,7 +42,7 @@ type FullSyncCond struct {
IsAll bool `json:"is_all" bson:"is_all"`
Interval int `json:"interval" bson:"interval"`
Condition *filter.Expression `json:"condition,omitempty" bson:"condition,omitempty"`
TenantID string `json:"tenant_id" bson:"tenant_id"`
TenantID string `json:"-" bson:"tenant_id"`
}

// full sync cond field names
Expand Down
14 changes: 7 additions & 7 deletions pkg/cache/general/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,24 @@ func (k *Key) Resource() ResType {
}

// DetailKey returns the general resource detail redis key generated by id key and extra keys
func (k *Key) DetailKey(idKey string, key ...string) string {
return k.namespace + "detail:" + k.genDetailKey(idKey, key...)
func (k *Key) DetailKey(tenantID string, idKey string, key ...string) string {
return k.namespace + "detail:" + tenantID + ":" + k.genDetailKey(idKey, key...)
}

// UniqueKey generates a unique key of the specified type, this cache key stores the unique identifier of the resource
func (k *Key) UniqueKey(typ, key string) string {
return k.namespace + typ + ":" + key
func (k *Key) UniqueKey(typ, tenantID, key string) string {
return k.namespace + typ + ":" + tenantID + ":" + key
}

// IDListKey is a redis zset(sorted set) key to store all the related data ids, which is used to page id quickly,
// without use mongodb's sort method, which is much more expensive.
// This key's ttl is defined where it is used, it might not be the same with the detail cache's ttl.
// NOTE: if the resource has sub resource, the id list key must contain the sub resource
func (k *Key) IDListKey(key ...string) string {
func (k *Key) IDListKey(tenantID string, key ...string) string {
if len(key) == 0 {
return k.namespace + "id_list"
return k.namespace + "id_list:" + tenantID
}
return k.namespace + "id_list:" + strings.Join(key, ":")
return k.namespace + "id_list:" + tenantID + ":" + strings.Join(key, ":")
}

// IDListTempKey is used to store the id list during refresh,
Expand Down
15 changes: 0 additions & 15 deletions pkg/cache/general/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,6 @@ var (
MainlineInstKey = NewKey(MainlineInstance, 6*time.Hour, [2]int{0, 30 * 60}, genIDKeyByID, genDetailKeyWithoutSubRes)
// InstAsstKey is the instance association detail cache key
InstAsstKey = NewKey(InstAsst, 6*time.Hour, [2]int{0, 30 * 60}, genIDKeyByID, genDetailKeyWithoutSubRes)
// KubeClusterKey is the detail cache key
KubeClusterKey = newGeneralKey(KubeCluster, 6*time.Hour, [2]int{0, 30 * 60})
// KubeNodeKey is the detail cache key
KubeNodeKey = newGeneralKey(KubeNode, 6*time.Hour, [2]int{0, 30 * 60})
// KubeNamespaceKey is the detail cache key
KubeNamespaceKey = newGeneralKey(KubeNamespace, 6*time.Hour, [2]int{0, 30 * 60})
// KubeWorkloadKey is the detail cache key
KubeWorkloadKey = newGeneralKey(KubeWorkload, 6*time.Hour, [2]int{0, 30 * 60})
// KubePodKey is the detail cache key
KubePodKey = newGeneralKey(KubePod, 6*time.Hour, [2]int{0, 30 * 60})
)

// newGeneralKey new general Key
Expand Down Expand Up @@ -93,11 +83,6 @@ var cacheKeyMap = map[ResType]*Key{
ObjectInstance: ObjInstKey,
MainlineInstance: MainlineInstKey,
InstAsst: InstAsstKey,
KubeCluster: KubeClusterKey,
KubeNode: KubeNodeKey,
KubeNamespace: KubeNamespaceKey,
KubeWorkload: KubeWorkloadKey,
KubePod: KubePodKey,
}

// GetCacheKeyByResType get general resource detail cache key by resource type
Expand Down
5 changes: 0 additions & 5 deletions pkg/cache/general/mapping/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ var cursorTypeMap = map[general.ResType]watch.CursorType{
general.ObjectInstance: watch.ObjectBase,
general.MainlineInstance: watch.MainlineInstance,
general.InstAsst: watch.InstAsst,
general.KubeCluster: watch.KubeCluster,
general.KubeNode: watch.KubeNode,
general.KubeNamespace: watch.KubeNamespace,
general.KubeWorkload: watch.KubeWorkload,
general.KubePod: watch.KubePod,
}

// GetCursorTypeByResType get event watch cursor type by resource type
Expand Down
11 changes: 0 additions & 11 deletions pkg/cache/general/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,6 @@ const (
MainlineInstance ResType = "mainline_instance"
// InstAsst is the resource type for instance association cache, its sub resource specifies the associated object id
InstAsst ResType = "inst_asst"
// KubeCluster is the resource type for kube cluster cache
KubeCluster ResType = "kube_cluster"
// KubeNode is the resource type for kube node cache
KubeNode ResType = "kube_node"
// KubeNamespace is the resource type for kube namespace cache
KubeNamespace ResType = "kube_namespace"
// KubeWorkload is the resource type for kube workload cache, its sub resource specifies the workload type
KubeWorkload ResType = "kube_workload"
// KubePod is the resource type for kube pod cache, its event detail is pod info with containers in it
KubePod ResType = "kube_pod"
)

// SupportedResTypeMap is a map whose key is resource type that is supported by general resource cache
Expand All @@ -82,7 +72,6 @@ var ResTypeHasSubResMap = map[ResType]struct{}{
ObjectInstance: {},
MainlineInstance: {},
InstAsst: {},
KubeWorkload: {},
}

// ValidateWithSubRes validate ResType with sub resource
Expand Down
138 changes: 138 additions & 0 deletions pkg/tenant/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Tencent is pleased to support the open source community by making
* 蓝鲸智云 - 配置平台 (BlueKing - Configuration System) available.
* Copyright (C) 2017 THL A29 Limited,
* a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* We undertake not to change the open source license (MIT license) applicable
* to the current version of the project delivered to anyone in the future.
*/

package tenant

import (
"sync"

"configcenter/pkg/tenant/types"
)

var (
prevTenantInfo = make(map[string]types.Tenant)
tenantEventChannels = make(map[string]chan TenantEvent)
tenantEventChLock sync.Mutex
)

// TenantEvent is the tenant event info
type TenantEvent struct {
EventType EventType
Tenant types.Tenant
}

// EventType is the tenant event type
type EventType string

const (
// Create is the create or enable tenant event type
Create EventType = "create"
// Delete is the delete or disable tenant event type
Delete EventType = "delete"
)

// NewTenantEventChan generate a new tenant event chan
func NewTenantEventChan(name string) <-chan TenantEvent {
tenantEventChLock.Lock()
defer tenantEventChLock.Unlock()

if ch, exists := tenantEventChannels[name]; exists {
return ch
}

eventChan := make(chan TenantEvent)
tenantEventChannels[name] = eventChan
go func() {
for _, tenant := range allTenants {
if tenant.Status == types.EnabledStatus {
eventChan <- TenantEvent{
EventType: Create,
Tenant: tenant,
}
}
}
}()
return eventChan
}

// RemoveTenantEventChan remove tenant event chan
func RemoveTenantEventChan(name string) {
tenantEventChLock.Lock()
defer tenantEventChLock.Unlock()

ch, exists := tenantEventChannels[name]
if !exists {
return
}

close(ch)
delete(tenantEventChannels, name)
}

// generateAndPushTenantEvent compare the tenant with the previous tenant info to generate and push event
func generateAndPushTenantEvent(tenants []types.Tenant) {
tenantEventChLock.Lock()
defer tenantEventChLock.Unlock()

prevTenantMap := make(map[string]types.Tenant)

for _, tenant := range tenants {
tenantID := tenant.TenantID
prevTenantMap[tenantID] = tenant

prevTenant, exists := prevTenantInfo[tenantID]
if !exists {
if tenant.Status != types.EnabledStatus {
continue
}

for _, eventChan := range tenantEventChannels {
eventChan <- TenantEvent{
EventType: Create,
Tenant: tenant,
}
}
continue
}

if prevTenant.Status != tenant.Status {
eventType := Create
if tenant.Status == types.DisabledStatus {
eventType = Delete
}
for _, eventChan := range tenantEventChannels {
eventChan <- TenantEvent{
EventType: eventType,
Tenant: tenant,
}
}
}

delete(prevTenantInfo, tenantID)
}

for _, tenant := range prevTenantInfo {
for _, eventChan := range tenantEventChannels {
eventChan <- TenantEvent{
EventType: Delete,
Tenant: tenant,
}
}
}

prevTenantInfo = prevTenantMap
}
3 changes: 2 additions & 1 deletion pkg/tenant/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func SetTenant(tenant []types.Tenant) {
for _, t := range allTenants {
tenantMap[t.TenantID] = &t
}

lock.Unlock()
generateAndPushTenantEvent(allTenants)
}

func refreshTenantInfo() error {
Expand All @@ -110,6 +110,7 @@ func refreshTenantInfo() error {
if db != nil {
tenants, err = GetAllTenantsFromDB(context.Background(), db)
if err != nil {
blog.Errorf("get all tenants from db failed, err: %v", err)
return err
}
}
Expand Down
13 changes: 0 additions & 13 deletions src/ac/parser/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,6 @@ func (ps *parseStream) watch() *parseStream {
}
authResource.InstanceID = model.ID
}
case watch.KubeWorkload:
body, err := ps.RequestCtx.getRequestBody()
if err != nil {
ps.err = err
return ps
}

// use sub resource(corresponding to the kind of the workload) for authorization if it is set
// if sub resource is not set, verify authorization of the resource(which means all sub resources)
subResource := gjson.GetBytes(body, "bk_filter."+common.BKSubResourceField)
if subResource.Exists() {
authResource.InstanceIDEx = subResource.String()
}
}

ps.Attribute.Resources = append(ps.Attribute.Resources, authResource)
Expand Down
4 changes: 2 additions & 2 deletions src/apimachinery/refresh/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ func (r *refresh) RefreshTenant(moduleName string) ([]types.Tenant, error) {

case commontypes.CC_MODULE_APISERVER:
r.capability.Discover = r.disc.ApiServer()

case commontypes.CC_MODULE_TASK:
r.capability.Discover = r.disc.TaskServer()

case commontypes.CC_MODULE_CACHESERVICE:
r.capability.Discover = r.disc.CacheService()
default:
return nil, fmt.Errorf("unsupported refresh module: %s", moduleName)
}
Expand Down
20 changes: 20 additions & 0 deletions src/common/http/rest/kit.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,26 @@ func NewKit() *Kit {
return NewKitFromHeader(headerutil.GenDefaultHeader(), errors.GetGlobalCCError())
}

// WithCtx set kit context
func (kit *Kit) WithCtx(ctx context.Context) *Kit {
kit.Ctx = ctx
return kit
}

// WithTenant set kit tenant
func (kit *Kit) WithTenant(tenantID string) *Kit {
kit.TenantID = tenantID
httpheader.SetTenantID(kit.Header, tenantID)
return kit
}

// WithRid set kit rid
func (kit *Kit) WithRid(rid string) *Kit {
kit.Rid = rid
httpheader.SetRid(kit.Header, rid)
return kit
}

// ShardOpts returns sharding options
func (kit *Kit) ShardOpts() sharding.ShardOpts {
return sharding.NewShardOpts().WithTenant(kit.TenantID)
Expand Down
Loading