Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bcs-common/pkg/odm/drivers/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type Table interface {
// Aggregation aggregation operation
Aggregation(ctx context.Context, pipeline interface{}, result interface{}) error

AggregationWithOptions(ctx context.Context, pipeline interface{}, options map[string]interface{}, result interface{}) error

// Insert insert many data
Insert(ctx context.Context, docs []interface{}) (int, error)

Expand Down
79 changes: 79 additions & 0 deletions bcs-common/pkg/odm/drivers/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,85 @@ func (c *Collection) Aggregation(ctx context.Context, pipeline interface{}, resu
return cursor.All(ctx, result)
}

func (c *Collection) AggregationWithOptions(ctx context.Context, pipeline interface{}, options map[string]interface{}, result interface{}) error {
var err error
var cursor *mongo.Cursor
startTime := time.Now()
defer func() {
reportMongdbMetrics("aggregationWithOptions", err, startTime)
}()
aggregateOptions := mopt.AggregateOptions{}

// 处理hint选项
if hint, ok := options["hint"]; ok {
aggregateOptions.SetHint(hint)
}

// 处理allowDiskUse选项 - 允许聚合操作使用磁盘空间
if allowDiskUse, ok := options["allowDiskUse"]; ok {
if allowDiskUseBool, ok := allowDiskUse.(bool); ok {
aggregateOptions.SetAllowDiskUse(allowDiskUseBool)
}
}

// 处理batchSize选项 - 设置批处理大小
if batchSize, ok := options["batchSize"]; ok {
if batchSizeInt32, ok := batchSize.(int32); ok {
aggregateOptions.SetBatchSize(batchSizeInt32)
} else if batchSizeInt, ok := batchSize.(int); ok {
aggregateOptions.SetBatchSize(int32(batchSizeInt))
}
}

// 处理maxTimeMS选项 - 设置最大执行时间(毫秒)
if maxTimeMS, ok := options["maxTimeMS"]; ok {
if maxTimeMSInt64, ok := maxTimeMS.(int64); ok {
aggregateOptions.SetMaxTime(time.Duration(maxTimeMSInt64) * time.Millisecond)
} else if maxTimeMSInt, ok := maxTimeMS.(int); ok {
aggregateOptions.SetMaxTime(time.Duration(maxTimeMSInt) * time.Millisecond)
}
}

// 处理collation选项 - 设置排序规则
if collation, ok := options["collation"]; ok {
if collationOpt, ok := collation.(*mopt.Collation); ok {
aggregateOptions.SetCollation(collationOpt)
}
}

// 处理comment选项 - 设置操作注释
if comment, ok := options["comment"]; ok {
if commentStr, ok := comment.(string); ok {
aggregateOptions.SetComment(commentStr)
}
}

// 处理bypassDocumentValidation选项 - 绕过文档验证
if bypassValidation, ok := options["bypassDocumentValidation"]; ok {
if bypassValidationBool, ok := bypassValidation.(bool); ok {
aggregateOptions.SetBypassDocumentValidation(bypassValidationBool)
}
}

// 处理let选项 - 设置变量
if let, ok := options["let"]; ok {
if letDoc, ok := let.(bson.D); ok {
aggregateOptions.SetLet(letDoc)
}
}

cursor, err = c.mCli.Database(c.dbName).
Collection(c.collectionName).
Aggregate(ctx, pipeline, &aggregateOptions)
if err != nil {
return err
}
defer func() {
_ = cursor.Close(ctx)
}()
return cursor.All(ctx, result)
}

// Insert insert many data
func (c *Collection) Insert(ctx context.Context, docs []interface{}) (int, error) {
var ret *mongo.InsertManyResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ const (
resourceID = "resourceid"
taskID = "taskid"
createTime = "createtime"
clusterID = "clusterid"

// 索引名称常量
indexNameMainQuery = "idx_main_query"
indexNameClusterTime = "idx_cluster_time"
)

// ModelOperationLog database operation for operation_log
Expand All @@ -51,11 +56,20 @@ type ModelOperationLog struct {
var (
operationLogIndexes = []drivers.Index{
{
Name: tableName + "_idx",
Name: indexNameMainQuery,
Key: bson.D{
bson.E{Key: clusterID, Value: 1},
bson.E{Key: resourceType, Value: 1},
bson.E{Key: resourceID, Value: 1},
bson.E{Key: taskID, Value: 1},
bson.E{Key: createTime, Value: -1},
},
Unique: false,
},
{
Name: indexNameClusterTime,
Key: bson.D{
bson.E{Key: clusterID, Value: 1},
bson.E{Key: createTime, Value: -1},
},
Unique: false,
},
Expand Down Expand Up @@ -208,6 +222,13 @@ func (m *ModelOperationLog) ListAggreOperationLog(ctx context.Context, condSrc,
)

pipeline := make([]map[string]interface{}, 0)
aggreOptions := make(map[string]interface{})

// 根据查询条件自动选择合适的索引
indexHint := m.selectIndexHint(condSrc)
if indexHint != "" {
aggreOptions["hint"] = indexHint
}

// from src table filter
if len(condSrc) > 0 {
Expand Down Expand Up @@ -236,7 +257,7 @@ func (m *ModelOperationLog) ListAggreOperationLog(ctx context.Context, condSrc,

// count logs for conds
if opt.Count {
if err := m.db.Table(m.tableName).Aggregation(ctx, pipeline, &retTaskOpLogs); err != nil {
if err := m.db.Table(m.tableName).AggregationWithOptions(ctx, pipeline, aggreOptions, &retTaskOpLogs); err != nil {
return nil, err
}

Expand All @@ -258,8 +279,37 @@ func (m *ModelOperationLog) ListAggreOperationLog(ctx context.Context, condSrc,
"$limit": opt.Limit,
})
}
if err := m.db.Table(m.tableName).Aggregation(ctx, pipeline, &retTaskOpLogs); err != nil {
if err := m.db.Table(m.tableName).AggregationWithOptions(ctx, pipeline, aggreOptions, &retTaskOpLogs); err != nil {
return nil, err
}
return retTaskOpLogs, nil
}

// selectIndexHint 根据查询条件选择合适的索引hint
func (m *ModelOperationLog) selectIndexHint(condSrc []bson.E) string {
// 分析查询条件,选择最优索引
hasResourceType := false
hasClusterID := false

for _, cond := range condSrc {
switch cond.Key {
case resourceType:
hasResourceType = true
case clusterID:
hasClusterID = true
}
}

// 索引选择策略:
// 1. 如果有clusterID + resourceType,使用主索引
// 2. 如果只有clusterID,使用clusterID索引
// 3. 其他情况使用默认索引(让MongoDB自动选择)

if hasClusterID && hasResourceType {
return indexNameMainQuery
} else if hasClusterID {
return indexNameClusterTime
}

return ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ var (
{
Name: taskStepTableName + "_idx",
Key: bson.D{
bson.E{Key: resourceType, Value: 1},
bson.E{Key: resourceID, Value: 1},
bson.E{Key: taskID, Value: 1},
},
Unique: false,
Expand Down
Loading