Skip to content

Commit 2669d14

Browse files
authored
refactor: Remove balance constraints between channel and segment tasks (milvus-io#42177)
issue: milvus-io#42176 Remove the mutual exclusion constraints between channel and segment balance tasks to allow them to run concurrently. Changes include: - Remove permitBalanceChannel() and permitBalanceSegment() methods from RoundRobinBalancer - Update ChannelLevelScoreBalancer, MultiTargetBalancer, RowCountBasedBalancer, and ScoreBasedBalancer to remove constraint checks - Allow segment balance tasks to proceed even when channel balance tasks are running - Update test cases to reflect new behavior where balance tasks no longer block each other This change improves the efficiency of load balancing by removing unnecessary coordination overhead between different types of balance operations. Signed-off-by: Wei Liu <[email protected]>
1 parent 6d2ad51 commit 2669d14

File tree

6 files changed

+13
-24
lines changed

6 files changed

+13
-24
lines changed

internal/querycoordv2/balance/balance.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,6 @@ func (b *RoundRobinBalancer) BalanceReplica(ctx context.Context, replica *meta.R
155155
return nil, nil
156156
}
157157

158-
func (b *RoundRobinBalancer) permitBalanceChannel(collectionID int64) bool {
159-
return b.scheduler.GetSegmentTaskNum(task.WithCollectionID2TaskFilter(collectionID), task.WithTaskTypeFilter(task.TaskTypeMove)) == 0
160-
}
161-
162-
func (b *RoundRobinBalancer) permitBalanceSegment(collectionID int64) bool {
163-
return b.scheduler.GetChannelTaskNum(task.WithCollectionID2TaskFilter(collectionID), task.WithTaskTypeFilter(task.TaskTypeMove)) == 0
164-
}
165-
166158
func (b *RoundRobinBalancer) getNodes(nodes []int64) []*session.NodeInfo {
167159
ret := make([]*session.NodeInfo, 0, len(nodes))
168160
for _, n := range nodes {

internal/querycoordv2/balance/channel_level_score_balancer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,19 +135,19 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
135135
zap.Any("available nodes", rwNodes),
136136
)
137137
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
138-
if b.permitBalanceChannel(replica.GetCollectionID()) && !streamingutil.IsStreamingServiceEnabled() {
138+
if !streamingutil.IsStreamingServiceEnabled() {
139139
channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, channelName, rwNodes, roNodes)...)
140140
}
141141

142-
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
142+
if len(channelPlans) == 0 {
143143
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, channelName, rwNodes, roNodes)...)
144144
}
145145
} else {
146-
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) && !streamingutil.IsStreamingServiceEnabled() {
146+
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && !streamingutil.IsStreamingServiceEnabled() {
147147
channelPlans = append(channelPlans, b.genChannelPlan(ctx, replica, channelName, rwNodes)...)
148148
}
149149

150-
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
150+
if len(channelPlans) == 0 {
151151
segmentPlans = append(segmentPlans, b.genSegmentPlan(ctx, br, replica, channelName, rwNodes)...)
152152
}
153153
}

internal/querycoordv2/balance/multi_target_balance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ func (b *MultiTargetBalancer) balanceChannels(ctx context.Context, br *balanceRe
503503
rwNodes, roNodes = replica.GetRWNodes(), replica.GetRONodes()
504504
}
505505

506-
if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) {
506+
if len(rwNodes) == 0 {
507507
return nil
508508
}
509509

@@ -525,7 +525,7 @@ func (b *MultiTargetBalancer) balanceSegments(ctx context.Context, replica *meta
525525
rwNodes := replica.GetRWNodes()
526526
roNodes := replica.GetRONodes()
527527

528-
if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) {
528+
if len(rwNodes) == 0 {
529529
return nil
530530
}
531531
// print current distribution before generating plans

internal/querycoordv2/balance/rowcount_based_balancer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func (b *RowCountBasedBalancer) balanceChannels(ctx context.Context, br *balance
213213
rwNodes, roNodes = replica.GetRWNodes(), replica.GetRONodes()
214214
}
215215

216-
if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) {
216+
if len(rwNodes) == 0 {
217217
return nil
218218
}
219219
if len(roNodes) != 0 {
@@ -234,7 +234,7 @@ func (b *RowCountBasedBalancer) balanceSegments(ctx context.Context, replica *me
234234
rwNodes := replica.GetRWNodes()
235235
roNodes := replica.GetRONodes()
236236

237-
if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) {
237+
if len(rwNodes) == 0 {
238238
return nil
239239
}
240240
// print current distribution before generating plans

internal/querycoordv2/balance/score_based_balancer.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ func (b *ScoreBasedBalancer) balanceChannels(ctx context.Context, br *balanceRep
485485
rwNodes, roNodes = replica.GetRWNodes(), replica.GetRONodes()
486486
}
487487

488-
if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) {
488+
if len(rwNodes) == 0 {
489489
return nil
490490
}
491491

@@ -515,9 +515,6 @@ func (b *ScoreBasedBalancer) balanceSegments(ctx context.Context, br *balanceRep
515515
br.AddRecord(StrRecord("no rwNodes to balance"))
516516
return nil
517517
}
518-
if !b.permitBalanceSegment(replica.GetCollectionID()) {
519-
return nil
520-
}
521518
// print current distribution before generating plans
522519
if len(roNodes) != 0 {
523520
if !stoppingBalance {

internal/querycoordv2/balance/score_based_balancer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,14 +1203,14 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceSegmentAndChannel() {
12031203
segmentPlans, _ := suite.getCollectionBalancePlans(balancer, collectionID)
12041204
suite.Equal(len(segmentPlans), 2)
12051205

1206-
// mock balance channel is executing, expect to generate 0 balance segment task
1206+
// mock balance channel is executing, expect to generate 2 balance segment task, balance segment won't be blocked by channel balance
12071207
suite.mockScheduler.ExpectedCalls = nil
12081208
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(1).Maybe()
12091209
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
12101210
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
12111211
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
12121212
segmentPlans, _ = suite.getCollectionBalancePlans(balancer, collectionID)
1213-
suite.Equal(len(segmentPlans), 0)
1213+
suite.Equal(len(segmentPlans), 2)
12141214

12151215
// set unbalance channel distribution
12161216
balancer.dist.ChannelDistManager.Update(1, []*meta.DmChannel{
@@ -1228,14 +1228,14 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceSegmentAndChannel() {
12281228
_, channelPlans := suite.getCollectionBalancePlans(balancer, collectionID)
12291229
suite.Equal(len(channelPlans), 2)
12301230

1231-
// mock balance channel is executing, expect to generate 0 balance segment task
1231+
// mock balance channel is executing, expect to generate 2 balance segment task, balance segment won't be blocked by channel balance
12321232
suite.mockScheduler.ExpectedCalls = nil
12331233
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
12341234
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(1).Maybe()
12351235
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
12361236
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
12371237
_, channelPlans = suite.getCollectionBalancePlans(balancer, collectionID)
1238-
suite.Equal(len(channelPlans), 0)
1238+
suite.Equal(len(channelPlans), 2)
12391239
}
12401240

12411241
func (suite *ScoreBasedBalancerTestSuite) TestBalanceChannelOnMultiCollections() {

0 commit comments

Comments
 (0)