Skip to content

Commit cbe2761

Browse files
authored
fix: Fix L0 segment duplicate load task generation during channel balance (#44700)
issue: #44699 Fix the issue where L0 segment checking logic incorrectly identifies L0 segments as missing when they exist on multiple delegators during channel balance process, which blocks sealed segment loading and target progression. Changes include: - Replace GetLatestShardLeaderByFilter with GetByFilter to check all delegators instead of only the latest leader - Iterate through all delegator views to identify which ones lack the L0 segment The original logic only checked the latest shard leader, causing false positive detection of missing L0 segments when they actually exist on other delegators in the same channel during balance operations. This led to continuous generation of duplicate L0 segment load tasks, preventing normal sealed segment loading flow. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
1 parent 03223dd commit cbe2761

File tree

2 files changed

+232
-103
lines changed

2 files changed

+232
-103
lines changed

internal/querycoordv2/checkers/segment_checker.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
3838
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
3939
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
40+
"github.com/milvus-io/milvus/pkg/v2/util/merr"
4041
)
4142

4243
const initialTargetVersion = int64(0)
@@ -267,20 +268,29 @@ func (c *SegmentChecker) getSealedSegmentDiff(
267268
}
268269

269270
isSegmentLack := func(segment *datapb.SegmentInfo) bool {
270-
node, existInDist := distMap[segment.ID]
271-
272271
if segment.GetLevel() == datapb.SegmentLevel_L0 {
273-
// the L0 segments have to been in the same node as the channel watched
274-
leader := c.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel()))
272+
// Note: In the original design, all segments are forwarded through serviceable delegators to target workers for loading.
273+
// However, L0 segments are always an exception and need to be loaded directly on the delegator.
274+
// during balance channel, we should check each delegator's view to see if it lacks the l0 segment
275+
// cause same channel may have different delegators during balance channel progress, and which last for a long time if memory is not enough
276+
views := c.dist.LeaderViewManager.GetByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel()))
277+
if len(views) == 0 {
278+
msg := "no shard leader for the l0 segment to execute loading"
279+
err := merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "shard delegator not found")
280+
log.Warn(msg, zap.Error(err))
281+
return false
282+
}
275283

276-
// if the leader node's version doesn't match load l0 segment's requirement, skip it
277-
if leader != nil && checkLeaderVersion(leader, segment.ID) {
278-
l0WithWrongLocation := node != leader.ID
279-
return !existInDist || l0WithWrongLocation
284+
// find delegator which lack of l0 segment
285+
for _, view := range views {
286+
if _, ok := view.Segments[segment.ID]; !ok && checkLeaderVersion(view, segment.ID) {
287+
return true
288+
}
280289
}
281290
return false
282291
}
283292

293+
_, existInDist := distMap[segment.ID]
284294
return !existInDist
285295
}
286296

internal/querycoordv2/checkers/segment_checker_test.go

Lines changed: 214 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -872,115 +872,234 @@ func TestSegmentCheckerSuite(t *testing.T) {
872872
}
873873

874874
func TestGetSealedSegmentDiff_WithL0SegmentCheck(t *testing.T) {
875-
mockey.PatchConvey("TestGetSealedSegmentDiff_WithL0SegmentCheck", t, func() {
876-
// Test case 1: L0 segment exists
877-
t.Run("L0_segment_exists", func(t *testing.T) {
878-
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
879-
return true // L0 segment exists
880-
}
875+
// Test case 1: L0 segment exists
876+
t.Run("L0_segment_exists", func(t *testing.T) {
877+
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
878+
return true // L0 segment exists
879+
}
881880

882-
// Create test segments
883-
segments := []*datapb.SegmentInfo{
884-
{
885-
ID: 1,
886-
CollectionID: 1,
887-
PartitionID: 1,
888-
Level: datapb.SegmentLevel_L0,
889-
},
890-
{
891-
ID: 2,
892-
CollectionID: 1,
893-
PartitionID: 1,
894-
Level: datapb.SegmentLevel_L1,
895-
},
896-
}
881+
// Create test segments
882+
segments := []*datapb.SegmentInfo{
883+
{
884+
ID: 1,
885+
CollectionID: 1,
886+
PartitionID: 1,
887+
Level: datapb.SegmentLevel_L0,
888+
},
889+
{
890+
ID: 2,
891+
CollectionID: 1,
892+
PartitionID: 1,
893+
Level: datapb.SegmentLevel_L1,
894+
},
895+
}
897896

898-
// Filter L0 segments with existence check
899-
var level0Segments []*datapb.SegmentInfo
900-
for _, segment := range segments {
901-
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
902-
level0Segments = append(level0Segments, segment)
903-
}
897+
// Filter L0 segments with existence check
898+
var level0Segments []*datapb.SegmentInfo
899+
for _, segment := range segments {
900+
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
901+
level0Segments = append(level0Segments, segment)
904902
}
903+
}
905904

906-
// Verify: L0 segment should be included
907-
assert.Equal(t, 1, len(level0Segments))
908-
assert.Equal(t, int64(1), level0Segments[0].GetID())
909-
})
905+
// Verify: L0 segment should be included
906+
assert.Equal(t, 1, len(level0Segments))
907+
assert.Equal(t, int64(1), level0Segments[0].GetID())
908+
})
910909

911-
// Test case 2: L0 segment does not exist
912-
t.Run("L0_segment_not_exists", func(t *testing.T) {
913-
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
914-
return false // L0 segment does not exist
915-
}
910+
// Test case 2: L0 segment does not exist
911+
t.Run("L0_segment_not_exists", func(t *testing.T) {
912+
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
913+
return false // L0 segment does not exist
914+
}
916915

917-
// Create test segments
918-
segments := []*datapb.SegmentInfo{
919-
{
920-
ID: 1,
921-
CollectionID: 1,
922-
PartitionID: 1,
923-
Level: datapb.SegmentLevel_L0,
924-
},
925-
{
926-
ID: 2,
927-
CollectionID: 1,
928-
PartitionID: 1,
929-
Level: datapb.SegmentLevel_L1,
930-
},
931-
}
916+
// Create test segments
917+
segments := []*datapb.SegmentInfo{
918+
{
919+
ID: 1,
920+
CollectionID: 1,
921+
PartitionID: 1,
922+
Level: datapb.SegmentLevel_L0,
923+
},
924+
{
925+
ID: 2,
926+
CollectionID: 1,
927+
PartitionID: 1,
928+
Level: datapb.SegmentLevel_L1,
929+
},
930+
}
932931

933-
// Filter L0 segments with existence check
934-
var level0Segments []*datapb.SegmentInfo
935-
for _, segment := range segments {
936-
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
937-
level0Segments = append(level0Segments, segment)
938-
}
932+
// Filter L0 segments with existence check
933+
var level0Segments []*datapb.SegmentInfo
934+
for _, segment := range segments {
935+
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
936+
level0Segments = append(level0Segments, segment)
939937
}
938+
}
939+
940+
// Verify: L0 segment should be filtered out
941+
assert.Equal(t, 0, len(level0Segments))
942+
})
943+
944+
// Test case 3: Mixed L0 segments, only some exist
945+
t.Run("Mixed_L0_segments", func(t *testing.T) {
946+
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
947+
return segmentID == 1 // Only segment 1 exists
948+
}
940949

941-
// Verify: L0 segment should be filtered out
942-
assert.Equal(t, 0, len(level0Segments))
943-
})
950+
// Create test segments
951+
segments := []*datapb.SegmentInfo{
952+
{
953+
ID: 1,
954+
CollectionID: 1,
955+
PartitionID: 1,
956+
Level: datapb.SegmentLevel_L0,
957+
},
958+
{
959+
ID: 2,
960+
CollectionID: 1,
961+
PartitionID: 1,
962+
Level: datapb.SegmentLevel_L0,
963+
},
964+
{
965+
ID: 3,
966+
CollectionID: 1,
967+
PartitionID: 1,
968+
Level: datapb.SegmentLevel_L1,
969+
},
970+
}
944971

945-
// Test case 3: Mixed L0 segments, only some exist
946-
t.Run("Mixed_L0_segments", func(t *testing.T) {
947-
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
948-
return segmentID == 1 // Only segment 1 exists
972+
// Filter L0 segments with existence check
973+
var level0Segments []*datapb.SegmentInfo
974+
for _, segment := range segments {
975+
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
976+
level0Segments = append(level0Segments, segment)
949977
}
978+
}
950979

951-
// Create test segments
952-
segments := []*datapb.SegmentInfo{
953-
{
954-
ID: 1,
955-
CollectionID: 1,
956-
PartitionID: 1,
957-
Level: datapb.SegmentLevel_L0,
958-
},
959-
{
960-
ID: 2,
961-
CollectionID: 1,
962-
PartitionID: 1,
963-
Level: datapb.SegmentLevel_L0,
964-
},
965-
{
966-
ID: 3,
967-
CollectionID: 1,
968-
PartitionID: 1,
969-
Level: datapb.SegmentLevel_L1,
980+
// Verify: Only existing L0 segment should be included
981+
assert.Equal(t, 1, len(level0Segments))
982+
assert.Equal(t, int64(1), level0Segments[0].GetID())
983+
})
984+
}
985+
986+
// createTestSegmentChecker creates a test SegmentChecker with mocked dependencies
987+
func createTestSegmentChecker() (*SegmentChecker, *meta.Meta, *meta.DistributionManager, *session.NodeManager) {
988+
nodeMgr := session.NewNodeManager()
989+
metaManager := meta.NewMeta(nil, nil, nodeMgr)
990+
distManager := meta.NewDistributionManager()
991+
targetManager := meta.NewTargetManager(nil, metaManager)
992+
getBalancerFunc := func() balance.Balance { return balance.NewScoreBasedBalancer(nil, nil, nil, nil, nil) }
993+
checkSegmentExist := func(ctx context.Context, collectionID int64, segmentID int64) bool {
994+
return true
995+
}
996+
checker := NewSegmentChecker(metaManager, distManager, targetManager, nodeMgr, getBalancerFunc, checkSegmentExist)
997+
return checker, metaManager, distManager, nodeMgr
998+
}
999+
1000+
func TestGetSealedSegmentDiff_L0SegmentMultipleDelegators(t *testing.T) {
1001+
defer mockey.UnPatchAll()
1002+
1003+
ctx := context.Background()
1004+
1005+
// Setup test data
1006+
collectionID := int64(1)
1007+
replicaID := int64(1)
1008+
partitionID := int64(1)
1009+
segmentID := int64(1)
1010+
channel := "test-insert-channel"
1011+
nodeID1 := int64(1)
1012+
nodeID2 := int64(2)
1013+
1014+
// Create test components
1015+
checker, _, _, _ := createTestSegmentChecker()
1016+
1017+
// Mock GetSealedSegmentsByCollection to return L0 segment
1018+
mockey.Mock((*meta.TargetManager).GetSealedSegmentsByCollection).To(func(ctx context.Context, collectionID int64, scope meta.TargetScope) map[int64]*datapb.SegmentInfo {
1019+
if scope == meta.CurrentTarget {
1020+
return map[int64]*datapb.SegmentInfo{
1021+
segmentID: {
1022+
ID: segmentID,
1023+
CollectionID: collectionID,
1024+
PartitionID: partitionID,
1025+
InsertChannel: channel,
1026+
Level: datapb.SegmentLevel_L0,
9701027
},
9711028
}
1029+
}
1030+
return make(map[int64]*datapb.SegmentInfo)
1031+
}).Build()
1032+
1033+
// Mock IsNextTargetExist to return false
1034+
mockey.Mock((*meta.TargetManager).IsNextTargetExist).Return(false).Build()
1035+
1036+
// Mock meta manager methods to avoid direct meta manipulation
1037+
// Mock Get method to return the test replica
1038+
testReplica := utils.CreateTestReplica(replicaID, collectionID, []int64{nodeID1, nodeID2})
1039+
mockey.Mock((*meta.ReplicaManager).Get).To(func(ctx context.Context, rid int64) *meta.Replica {
1040+
if rid == replicaID {
1041+
return testReplica
1042+
}
1043+
return nil
1044+
}).Build()
1045+
1046+
// Mock GetCollection to return test collection
1047+
testCollection := utils.CreateTestCollection(collectionID, 1)
1048+
mockey.Mock((*meta.Meta).GetCollection).To(func(ctx context.Context, cid int64) *meta.Collection {
1049+
if cid == collectionID {
1050+
return testCollection
1051+
}
1052+
return nil
1053+
}).Build()
1054+
1055+
// Mock NodeManager Get method to return compatible node versions
1056+
mockey.Mock((*session.NodeManager).Get).To(func(nodeID int64) *session.NodeInfo {
1057+
if nodeID == nodeID1 || nodeID == nodeID2 {
1058+
return session.NewNodeInfo(session.ImmutableNodeInfo{
1059+
NodeID: nodeID,
1060+
Address: "localhost",
1061+
Hostname: "localhost",
1062+
Version: common.Version,
1063+
})
1064+
}
1065+
return nil
1066+
}).Build()
9721067

973-
// Filter L0 segments with existence check
974-
var level0Segments []*datapb.SegmentInfo
975-
for _, segment := range segments {
976-
if segment.GetLevel() == datapb.SegmentLevel_L0 && checkSegmentExist(context.Background(), segment.GetCollectionID(), segment.GetID()) {
977-
level0Segments = append(level0Segments, segment)
978-
}
979-
}
1068+
// Mock SegmentDistManager GetByFilter to return empty distribution initially
1069+
mockey.Mock((*meta.SegmentDistManager).GetByFilter).Return([]*meta.Segment{}).Build()
9801070

981-
// Verify: Only existing L0 segment should be included
982-
assert.Equal(t, 1, len(level0Segments))
983-
assert.Equal(t, int64(1), level0Segments[0].GetID())
984-
})
985-
})
1071+
// Test case 1: Multiple delegators, one lacks the L0 segment
1072+
leaderView1 := utils.CreateTestLeaderView(nodeID1, collectionID, channel,
1073+
map[int64]int64{segmentID: nodeID1}, map[int64]*meta.Segment{}) // Has the segment
1074+
leaderView2 := utils.CreateTestLeaderView(nodeID2, collectionID, channel,
1075+
map[int64]int64{}, map[int64]*meta.Segment{}) // Missing the segment
1076+
1077+
// Mock LeaderViewManager GetByFilter to return leader views for L0 segment checking
1078+
mockGetByFilter := mockey.Mock((*meta.LeaderViewManager).GetByFilter).To(func(filters ...meta.LeaderViewFilter) []*meta.LeaderView {
1079+
// Return both leader views for L0 segment checking
1080+
return []*meta.LeaderView{leaderView1, leaderView2}
1081+
}).Build()
1082+
toLoad, toRelease := checker.getSealedSegmentDiff(ctx, collectionID, replicaID)
1083+
mockGetByFilter.Release()
1084+
1085+
// Verify: L0 segment should be loaded for the delegator that lacks it
1086+
assert.Len(t, toLoad, 1, "Should load L0 segment for delegator that lacks it")
1087+
assert.Equal(t, segmentID, toLoad[0].GetID(), "Should load the correct L0 segment")
1088+
assert.Empty(t, toRelease, "Should not release any segments")
1089+
1090+
// Test case 2: All delegators have the L0 segment
1091+
leaderView2WithSegment := utils.CreateTestLeaderView(nodeID2, collectionID, channel,
1092+
map[int64]int64{segmentID: nodeID2}, map[int64]*meta.Segment{}) // Now has the segment
1093+
1094+
// Update the mock to return leader views where both have the segment
1095+
mockey.Mock((*meta.LeaderViewManager).GetByFilter).To(func(filters ...meta.LeaderViewFilter) []*meta.LeaderView {
1096+
// Return both leader views, both now have the L0 segment
1097+
return []*meta.LeaderView{leaderView1, leaderView2WithSegment}
1098+
}).Build()
1099+
1100+
toLoad, toRelease = checker.getSealedSegmentDiff(ctx, collectionID, replicaID)
1101+
1102+
// Verify: No segments should be loaded when all delegators have the L0 segment
1103+
assert.Empty(t, toLoad, "Should not load L0 segment when all delegators have it")
1104+
assert.Empty(t, toRelease, "Should not release any segments")
9861105
}

0 commit comments

Comments
 (0)