Skip to content

Commit 044f744

Browse files
committed
fix: ReleasePartition cause delegator unserviceable.
issue: milvus-io#42098 milvus-io#42404 related to: #milvus-io#42009 milvus-io#41937 Implement new method to handle partition removal from next target without directly modifying current target. Changes include: - Add RemovePartitionFromNextTarget method and deprecate RemovePartition - Update target_observer to use new method for ReleasePartition operations - Add unit tests and mock methods for new functionality This ensures that all changes to next target will propagates to delegator's query view. Signed-off-by: Wei Liu <[email protected]>
1 parent f55f900 commit 044f744

File tree

4 files changed

+99
-4
lines changed

4 files changed

+99
-4
lines changed

internal/querycoordv2/meta/mock_target_manager.go

Lines changed: 49 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/querycoordv2/meta/target_manager.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type TargetManagerInterface interface {
5454
UpdateCollectionNextTarget(ctx context.Context, collectionID int64) error
5555
RemoveCollection(ctx context.Context, collectionID int64)
5656
RemovePartition(ctx context.Context, collectionID int64, partitionIDs ...int64)
57+
RemovePartitionFromNextTarget(ctx context.Context, collectionID int64, partitionIDs ...int64)
5758
GetGrowingSegmentsByCollection(ctx context.Context, collectionID int64, scope TargetScope) typeutil.UniqueSet
5859
GetGrowingSegmentsByChannel(ctx context.Context, collectionID int64, channelName string, scope TargetScope) typeutil.UniqueSet
5960
GetSealedSegmentsByCollection(ctx context.Context, collectionID int64, scope TargetScope) map[int64]*datapb.SegmentInfo
@@ -229,6 +230,7 @@ func (mgr *TargetManager) RemoveCollection(ctx context.Context, collectionID int
229230

230231
// RemovePartition removes all segment in the given partition,
231232
// NOTE: this doesn't remove any channel even the given one is the only partition
233+
// Deprecated: use RemovePartitionFromNextTarget instead @weiliu1031
232234
func (mgr *TargetManager) RemovePartition(ctx context.Context, collectionID int64, partitionIDs ...int64) {
233235
log := log.With(zap.Int64("collectionID", collectionID),
234236
zap.Int64s("PartitionIDs", partitionIDs))
@@ -266,6 +268,32 @@ func (mgr *TargetManager) RemovePartition(ctx context.Context, collectionID int6
266268
}
267269
}
268270

271+
// remove partition from next target
272+
// NOTE: don't edit current target directly, it will be updated by target observer, which push the new next target as current target
273+
// need the full progress to update next target to current target, so the query view on delegator could be updated when current target is updated
274+
func (mgr *TargetManager) RemovePartitionFromNextTarget(ctx context.Context, collectionID int64, partitionIDs ...int64) {
275+
log := log.With(zap.Int64("collectionID", collectionID),
276+
zap.Int64s("PartitionIDs", partitionIDs))
277+
278+
partitionSet := typeutil.NewUniqueSet(partitionIDs...)
279+
280+
log.Info("remove partition from next target")
281+
oleNextTarget := mgr.next.getCollectionTarget(collectionID)
282+
if oleNextTarget != nil {
283+
newTarget := mgr.removePartitionFromCollectionTarget(oleNextTarget, partitionSet)
284+
if newTarget != nil {
285+
mgr.next.updateCollectionTarget(collectionID, newTarget)
286+
log.Info("finish to remove partition from next target for collection",
287+
zap.Int64s("segments", newTarget.GetAllSegmentIDs()),
288+
zap.Strings("channels", newTarget.GetAllDmChannelNames()))
289+
} else {
290+
log.Info("all partitions have been released, release the collection current target now")
291+
mgr.current.removeCollectionTarget(collectionID)
292+
mgr.next.removeCollectionTarget(collectionID)
293+
}
294+
}
295+
}
296+
269297
func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *CollectionTarget, partitionSet typeutil.UniqueSet) *CollectionTarget {
270298
segments := make(map[int64]*datapb.SegmentInfo)
271299
for _, segment := range oldTarget.GetAllSegments() {

internal/querycoordv2/meta/target_manager_test.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,6 @@ func (suite *TargetManagerSuite) TestUpdateNextTarget() {
243243
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nextTargetChannels, nextTargetSegments, nil)
244244
err := suite.mgr.UpdateCollectionNextTarget(ctx, collectionID)
245245
suite.NoError(err)
246-
247-
err = suite.mgr.UpdateCollectionNextTarget(ctx, collectionID)
248-
suite.NoError(err)
249246
}
250247

251248
func (suite *TargetManagerSuite) TestRemovePartition() {
@@ -263,6 +260,27 @@ func (suite *TargetManagerSuite) TestRemovePartition() {
263260
suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(ctx, collectionID, CurrentTarget))
264261
}
265262

263+
func (suite *TargetManagerSuite) TestRemovePartitionFromNextTarget() {
264+
ctx := suite.ctx
265+
collectionID := int64(1000)
266+
ret := suite.mgr.UpdateCollectionCurrentTarget(ctx, collectionID)
267+
suite.True(ret)
268+
269+
err := suite.mgr.UpdateCollectionNextTarget(ctx, collectionID)
270+
suite.NoError(err)
271+
272+
suite.assertSegments(suite.getAllSegment(collectionID, suite.partitions[collectionID]), suite.mgr.GetSealedSegmentsByCollection(ctx, collectionID, NextTarget))
273+
suite.assertChannels(suite.channels[collectionID], suite.mgr.GetDmChannelsByCollection(ctx, collectionID, NextTarget))
274+
suite.assertSegments(suite.getAllSegment(collectionID, suite.partitions[collectionID]), suite.mgr.GetSealedSegmentsByCollection(ctx, collectionID, CurrentTarget))
275+
suite.assertChannels(suite.channels[collectionID], suite.mgr.GetDmChannelsByCollection(ctx, collectionID, CurrentTarget))
276+
277+
suite.mgr.RemovePartitionFromNextTarget(ctx, collectionID, 100)
278+
suite.assertSegments([]int64{3, 4}, suite.mgr.GetSealedSegmentsByCollection(ctx, collectionID, NextTarget))
279+
suite.assertChannels(suite.channels[collectionID], suite.mgr.GetDmChannelsByCollection(ctx, collectionID, NextTarget))
280+
suite.assertSegments(suite.getAllSegment(collectionID, suite.partitions[collectionID]), suite.mgr.GetSealedSegmentsByCollection(ctx, collectionID, CurrentTarget))
281+
suite.assertChannels(suite.channels[collectionID], suite.mgr.GetDmChannelsByCollection(ctx, collectionID, CurrentTarget))
282+
}
283+
266284
func (suite *TargetManagerSuite) TestRemoveCollection() {
267285
ctx := suite.ctx
268286
collectionID := int64(1000)

internal/querycoordv2/observers/target_observer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
225225
ob.keylocks.Unlock(req.CollectionID)
226226
req.Notifier <- nil
227227
case ReleasePartition:
228-
ob.targetMgr.RemovePartition(ctx, req.CollectionID, req.PartitionIDs...)
228+
ob.targetMgr.RemovePartitionFromNextTarget(ctx, req.CollectionID, req.PartitionIDs...)
229229
req.Notifier <- nil
230230
}
231231
log.Info("manually trigger update target done",

0 commit comments

Comments
 (0)