Skip to content

Commit 8ccf8e0

Browse files
committed
feat: operationlog index
1 parent 40d47ba commit 8ccf8e0

File tree

4 files changed

+135
-6
lines changed

4 files changed

+135
-6
lines changed

bcs-common/pkg/odm/drivers/db.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ type Table interface {
165165
// Aggregation aggregation operation
166166
Aggregation(ctx context.Context, pipeline interface{}, result interface{}) error
167167

168+
AggregationWithOptions(ctx context.Context, pipeline interface{}, options map[string]interface{}, result interface{}) error
169+
168170
// Insert insert many data
169171
Insert(ctx context.Context, docs []interface{}) (int, error)
170172

bcs-common/pkg/odm/drivers/mongo/mongo.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,85 @@ func (c *Collection) Aggregation(ctx context.Context, pipeline interface{}, resu
306306
return cursor.All(ctx, result)
307307
}
308308

309+
func (c *Collection) AggregationWithOptions(ctx context.Context, pipeline interface{}, options map[string]interface{}, result interface{}) error {
310+
var err error
311+
var cursor *mongo.Cursor
312+
startTime := time.Now()
313+
defer func() {
314+
reportMongdbMetrics("aggregationWithOptions", err, startTime)
315+
}()
316+
aggregateOptions := mopt.AggregateOptions{}
317+
318+
// 处理hint选项
319+
if hint, ok := options["hint"]; ok {
320+
aggregateOptions.SetHint(hint)
321+
}
322+
323+
// 处理allowDiskUse选项 - 允许聚合操作使用磁盘空间
324+
if allowDiskUse, ok := options["allowDiskUse"]; ok {
325+
if allowDiskUseBool, ok := allowDiskUse.(bool); ok {
326+
aggregateOptions.SetAllowDiskUse(allowDiskUseBool)
327+
}
328+
}
329+
330+
// 处理batchSize选项 - 设置批处理大小
331+
if batchSize, ok := options["batchSize"]; ok {
332+
if batchSizeInt32, ok := batchSize.(int32); ok {
333+
aggregateOptions.SetBatchSize(batchSizeInt32)
334+
} else if batchSizeInt, ok := batchSize.(int); ok {
335+
aggregateOptions.SetBatchSize(int32(batchSizeInt))
336+
}
337+
}
338+
339+
// 处理maxTimeMS选项 - 设置最大执行时间(毫秒)
340+
if maxTimeMS, ok := options["maxTimeMS"]; ok {
341+
if maxTimeMSInt64, ok := maxTimeMS.(int64); ok {
342+
aggregateOptions.SetMaxTime(time.Duration(maxTimeMSInt64) * time.Millisecond)
343+
} else if maxTimeMSInt, ok := maxTimeMS.(int); ok {
344+
aggregateOptions.SetMaxTime(time.Duration(maxTimeMSInt) * time.Millisecond)
345+
}
346+
}
347+
348+
// 处理collation选项 - 设置排序规则
349+
if collation, ok := options["collation"]; ok {
350+
if collationOpt, ok := collation.(*mopt.Collation); ok {
351+
aggregateOptions.SetCollation(collationOpt)
352+
}
353+
}
354+
355+
// 处理comment选项 - 设置操作注释
356+
if comment, ok := options["comment"]; ok {
357+
if commentStr, ok := comment.(string); ok {
358+
aggregateOptions.SetComment(commentStr)
359+
}
360+
}
361+
362+
// 处理bypassDocumentValidation选项 - 绕过文档验证
363+
if bypassValidation, ok := options["bypassDocumentValidation"]; ok {
364+
if bypassValidationBool, ok := bypassValidation.(bool); ok {
365+
aggregateOptions.SetBypassDocumentValidation(bypassValidationBool)
366+
}
367+
}
368+
369+
// 处理let选项 - 设置变量
370+
if let, ok := options["let"]; ok {
371+
if letDoc, ok := let.(bson.D); ok {
372+
aggregateOptions.SetLet(letDoc)
373+
}
374+
}
375+
376+
cursor, err = c.mCli.Database(c.dbName).
377+
Collection(c.collectionName).
378+
Aggregate(ctx, pipeline, &aggregateOptions)
379+
if err != nil {
380+
return err
381+
}
382+
defer func() {
383+
_ = cursor.Close(ctx)
384+
}()
385+
return cursor.All(ctx, result)
386+
}
387+
309388
// Insert insert many data
310389
func (c *Collection) Insert(ctx context.Context, docs []interface{}) (int, error) {
311390
var ret *mongo.InsertManyResult

bcs-services/bcs-cluster-manager/internal/store/operationlog/operationlog.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ const (
3737
resourceID = "resourceid"
3838
taskID = "taskid"
3939
createTime = "createtime"
40+
clusterID = "clusterid"
41+
42+
// 索引名称常量
43+
IndexNameMainQuery = "operationlog_idx_main_query"
44+
IndexNameClusterTime = "operationlog_idx_cluster_time"
4045
)
4146

4247
// ModelOperationLog database operation for operation_log
@@ -51,11 +56,20 @@ type ModelOperationLog struct {
5156
var (
5257
operationLogIndexes = []drivers.Index{
5358
{
54-
Name: tableName + "_idx",
59+
Name: IndexNameMainQuery,
5560
Key: bson.D{
61+
bson.E{Key: clusterID, Value: 1},
5662
bson.E{Key: resourceType, Value: 1},
5763
bson.E{Key: resourceID, Value: 1},
58-
bson.E{Key: taskID, Value: 1},
64+
bson.E{Key: createTime, Value: -1},
65+
},
66+
Unique: false,
67+
},
68+
{
69+
Name: IndexNameClusterTime,
70+
Key: bson.D{
71+
bson.E{Key: clusterID, Value: 1},
72+
bson.E{Key: createTime, Value: -1},
5973
},
6074
Unique: false,
6175
},
@@ -208,6 +222,13 @@ func (m *ModelOperationLog) ListAggreOperationLog(ctx context.Context, condSrc,
208222
)
209223

210224
pipeline := make([]map[string]interface{}, 0)
225+
aggreOptions := make(map[string]interface{})
226+
227+
// 根据查询条件自动选择合适的索引
228+
indexHint := m.selectIndexHint(condSrc)
229+
if indexHint != "" {
230+
aggreOptions["hint"] = indexHint
231+
}
211232

212233
// from src table filter
213234
if len(condSrc) > 0 {
@@ -236,7 +257,7 @@ func (m *ModelOperationLog) ListAggreOperationLog(ctx context.Context, condSrc,
236257

237258
// count logs for conds
238259
if opt.Count {
239-
if err := m.db.Table(m.tableName).Aggregation(ctx, pipeline, &retTaskOpLogs); err != nil {
260+
if err := m.db.Table(m.tableName).AggregationWithOptions(ctx, pipeline, aggreOptions, &retTaskOpLogs); err != nil {
240261
return nil, err
241262
}
242263

@@ -258,8 +279,37 @@ func (m *ModelOperationLog) ListAggreOperationLog(ctx context.Context, condSrc,
258279
"$limit": opt.Limit,
259280
})
260281
}
261-
if err := m.db.Table(m.tableName).Aggregation(ctx, pipeline, &retTaskOpLogs); err != nil {
282+
if err := m.db.Table(m.tableName).AggregationWithOptions(ctx, pipeline, aggreOptions, &retTaskOpLogs); err != nil {
262283
return nil, err
263284
}
264285
return retTaskOpLogs, nil
265286
}
287+
288+
// selectIndexHint 根据查询条件选择合适的索引hint
289+
func (m *ModelOperationLog) selectIndexHint(condSrc []bson.E) string {
290+
// 分析查询条件,选择最优索引
291+
hasResourceType := false
292+
hasClusterID := false
293+
294+
for _, cond := range condSrc {
295+
switch cond.Key {
296+
case resourceType:
297+
hasResourceType = true
298+
case clusterID:
299+
hasClusterID = true
300+
}
301+
}
302+
303+
// 索引选择策略:
304+
// 1. 如果有clusterID + resourceType,使用主索引
305+
// 2. 如果只有clusterID,使用clusterID索引
306+
// 3. 其他情况使用默认索引(让MongoDB自动选择)
307+
308+
if hasClusterID && hasResourceType {
309+
return IndexNameMainQuery
310+
} else if hasClusterID {
311+
return IndexNameClusterTime
312+
}
313+
314+
return ""
315+
}

bcs-services/bcs-cluster-manager/internal/store/operationlog/tasksteplog.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ var (
4646
{
4747
Name: taskStepTableName + "_idx",
4848
Key: bson.D{
49-
bson.E{Key: resourceType, Value: 1},
50-
bson.E{Key: resourceID, Value: 1},
5149
bson.E{Key: taskID, Value: 1},
5250
},
5351
Unique: false,

0 commit comments

Comments
 (0)