Skip to content

Commit 252d49d

Browse files
authored
fix: ChannelManager double assignment (#41837)
See also: #41876 --------- Signed-off-by: yangxuan <[email protected]>
1 parent f71930e commit 252d49d

File tree

7 files changed

+165
-62
lines changed

7 files changed

+165
-62
lines changed

internal/datacoord/channel.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ import (
2222
"go.uber.org/zap"
2323
"google.golang.org/protobuf/proto"
2424

25+
"github.com/cockroachdb/errors"
2526
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
2627
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
2728
"github.com/milvus-io/milvus/pkg/v2/log"
2829
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
30+
"github.com/milvus-io/milvus/pkg/v2/util/merr"
2931
)
3032

3133
type ROChannel interface {
@@ -191,15 +193,30 @@ func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *St
191193
return c
192194
}
193195

194-
func (c *StateChannel) TransitionOnSuccess(opID int64) {
196+
func (c *StateChannel) TransitionState(err error, opID int64) {
195197
if opID != c.Info.GetOpID() {
196-
log.Warn("Try to transit on success but opID not match, stay original state ",
198+
log.Warn("Try to transit state but opID not match, stay original state ",
197199
zap.Any("currentState", c.currentState),
198200
zap.String("channel", c.Name),
199201
zap.Int64("target opID", opID),
200202
zap.Int64("channel opID", c.Info.GetOpID()))
201203
return
202204
}
205+
206+
if err == nil {
207+
c.transitionOnSuccess()
208+
return
209+
}
210+
211+
if errors.Is(err, merr.ErrChannelReduplicate) {
212+
c.setState(ToRelease)
213+
return
214+
}
215+
216+
c.transitionOnFailure()
217+
}
218+
219+
func (c *StateChannel) transitionOnSuccess() {
203220
switch c.currentState {
204221
case Standby:
205222
c.setState(ToWatch)
@@ -216,21 +233,11 @@ func (c *StateChannel) TransitionOnSuccess(opID int64) {
216233
}
217234
}
218235

219-
func (c *StateChannel) TransitionOnFailure(opID int64) {
220-
if opID != c.Info.GetOpID() {
221-
log.Warn("Try to transit on failure but opID not match, stay original state",
222-
zap.Any("currentState", c.currentState),
223-
zap.String("channel", c.Name),
224-
zap.Int64("target opID", opID),
225-
zap.Int64("channel opID", c.Info.GetOpID()))
226-
return
227-
}
236+
func (c *StateChannel) transitionOnFailure() {
228237
switch c.currentState {
229-
case Watching:
230-
c.setState(Standby)
231-
case Releasing:
238+
case Watching, Releasing, ToWatch:
232239
c.setState(Standby)
233-
case Standby, ToWatch, Watched, ToRelease:
240+
case Standby, Watched, ToRelease:
234241
// Stay original state
235242
}
236243
}

internal/datacoord/channel_manager.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,14 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
156156
m.mu.Lock()
157157
nodeChannels := m.store.GetNodeChannelsBy(
158158
WithAllNodes(),
159-
func(ch *StateChannel) bool {
159+
func(ch *StateChannel) bool { // Channel with drop-mark
160160
return m.h.CheckShouldDropChannel(ch.GetName())
161161
})
162-
m.mu.Unlock()
163162

164163
for _, info := range nodeChannels {
165164
m.finishRemoveChannel(info.NodeID, lo.Values(info.Channels)...)
166165
}
166+
m.mu.Unlock()
167167

168168
if m.balanceCheckLoop != nil {
169169
log.Ctx(ctx).Info("starting channel balance loop")
@@ -238,6 +238,7 @@ func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
238238
zap.Array("updates", updates), zap.Error(err))
239239
}
240240

241+
// Speed up channel assignment
241242
// channel already written into meta, try to assign it to the cluster
242243
// not error is returned if failed, the assignment will retry later
243244
updates = m.assignPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
@@ -286,11 +287,8 @@ func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error {
286287
return nil
287288
}
288289

289-
// reassign reassigns a channel to another DataNode.
290+
// inner method, lock before using it, reassign reassigns a channel to another DataNode.
290291
func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error {
291-
m.mu.Lock()
292-
defer m.mu.Unlock()
293-
294292
updates := m.assignPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect())
295293
if updates != nil {
296294
return m.execute(updates)
@@ -436,15 +434,16 @@ func (m *ChannelManagerImpl) CheckLoop(ctx context.Context) {
436434
}
437435

438436
func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
439-
m.mu.RLock()
437+
m.mu.Lock()
440438
standbys := m.store.GetNodeChannelsBy(WithAllNodes(), WithChannelStates(Standby))
441439
toNotifies := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(ToWatch, ToRelease))
442440
toChecks := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(Watching, Releasing))
443-
m.mu.RUnlock()
444441

445-
// Processing standby channels
446-
updatedStandbys := false
447-
updatedStandbys = m.advanceStandbys(ctx, standbys)
442+
// Reassigning standby channels in locks to avoid concurrent assignment with Watch, Remove, AddNode, DeleteNode
443+
updatedStandbys := m.advanceStandbys(ctx, standbys)
444+
m.mu.Unlock()
445+
446+
// RPCs stays out of locks
448447
updatedToCheckes := m.advanceToChecks(ctx, toChecks)
449448
updatedToNotifies := m.advanceToNotifies(ctx, toNotifies)
450449

@@ -453,9 +452,8 @@ func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
453452
}
454453
}
455454

455+
// inner method need lock
456456
func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWChannel) {
457-
m.mu.Lock()
458-
defer m.mu.Unlock()
459457
for _, ch := range channels {
460458
if err := m.removeChannel(nodeID, ch); err != nil {
461459
log.Warn("Failed to remove channel", zap.Any("channel", ch), zap.Error(err))
@@ -469,6 +467,7 @@ func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWCha
469467
}
470468
}
471469

470+
// inner method need locks
472471
func (m *ChannelManagerImpl) advanceStandbys(ctx context.Context, standbys []*NodeChannelInfo) bool {
473472
var advanced bool = false
474473
for _, nodeAssign := range standbys {
@@ -576,7 +575,7 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
576575
}
577576

578577
m.mu.Lock()
579-
m.store.UpdateState(err == nil, nodeID, res.ch, res.opID)
578+
m.store.UpdateState(err, nodeID, res.ch, res.opID)
580579
m.mu.Unlock()
581580
}
582581

@@ -592,9 +591,9 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
592591
}
593592

594593
type poolResult struct {
595-
successful bool
596-
ch RWChannel
597-
opID int64
594+
err error
595+
ch RWChannel
596+
opID int64
598597
}
599598

600599
func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
@@ -620,10 +619,14 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
620619
future := getOrCreateIOPool().Submit(func() (any, error) {
621620
successful, got := m.Check(ctx, nodeID, tmpWatchInfo)
622621
if got {
622+
var err error
623+
if !successful {
624+
err = errors.New("operation in progress")
625+
}
623626
return poolResult{
624-
successful: successful,
625-
ch: innerCh,
626-
opID: tmpWatchInfo.GetOpID(),
627+
err: err,
628+
ch: innerCh,
629+
opID: tmpWatchInfo.GetOpID(),
627630
}, nil
628631
}
629632
return nil, errors.New("Got results with no progress")
@@ -636,7 +639,7 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
636639
if err == nil {
637640
m.mu.Lock()
638641
result := got.(poolResult)
639-
m.store.UpdateState(result.successful, nodeID, result.ch, result.opID)
642+
m.store.UpdateState(result.err, nodeID, result.ch, result.opID)
640643
m.mu.Unlock()
641644

642645
advanced = true
@@ -712,6 +715,7 @@ func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *data
712715
return false, false
713716
}
714717

718+
// inner method need lock
715719
func (m *ChannelManagerImpl) execute(updates *ChannelOpSet) error {
716720
for _, op := range updates.ops {
717721
if op.Type != Delete {

internal/datacoord/channel_manager_test.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,31 @@ func (s *ChannelManagerSuite) TestFindWatcher() {
378378
func (s *ChannelManagerSuite) TestAdvanceChannelState() {
379379
ctx, cancel := context.WithCancel(context.Background())
380380
defer cancel()
381+
s.Run("advance towatch dn watched to torelease", func() {
382+
chNodes := map[string]int64{
383+
"ch1": 1,
384+
"ch2": 1,
385+
}
386+
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
387+
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).
388+
RunAndReturn(func(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error {
389+
s.Require().Equal(1, len(req.GetInfos()))
390+
switch req.GetInfos()[0].GetVchan().GetChannelName() {
391+
case "ch2":
392+
return merr.WrapErrChannelReduplicate("ch2")
393+
default:
394+
return nil
395+
}
396+
}).Twice()
397+
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
398+
s.Require().NoError(err)
399+
s.checkAssignment(m, 1, "ch1", ToWatch)
400+
s.checkAssignment(m, 1, "ch2", ToWatch)
401+
402+
m.AdvanceChannelState(ctx)
403+
s.checkAssignment(m, 1, "ch1", Watching)
404+
s.checkAssignment(m, 1, "ch2", ToRelease)
405+
})
381406
s.Run("advance statndby with no available nodes", func() {
382407
chNodes := map[string]int64{
383408
"ch1": bufferID,
@@ -680,8 +705,8 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
680705
s.checkAssignment(m, 1, "ch2", ToWatch)
681706

682707
m.AdvanceChannelState(ctx)
683-
s.checkAssignment(m, 1, "ch1", ToWatch)
684-
s.checkAssignment(m, 1, "ch2", ToWatch)
708+
s.checkAssignment(m, 1, "ch1", Standby)
709+
s.checkAssignment(m, 1, "ch2", Standby)
685710
})
686711
s.Run("advance to release channels notify success", func() {
687712
chNodes := map[string]int64{

internal/datacoord/channel_store.go

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type RWChannelStore interface {
7272
Update(op *ChannelOpSet) error
7373

7474
// UpdateState is used by StateChannelStore only
75-
UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)
75+
UpdateState(err error, nodeID int64, channel RWChannel, opID int64)
7676
// SegLegacyChannelByNode is used by StateChannelStore only
7777
SetLegacyChannelByNode(nodeIDs ...int64)
7878

@@ -339,6 +339,8 @@ func (c *StateChannelStore) Reload() error {
339339
if err != nil {
340340
return err
341341
}
342+
343+
dupChannel := []*StateChannel{}
342344
for i := 0; i < len(keys); i++ {
343345
k := keys[i]
344346
v := values[i]
@@ -353,14 +355,36 @@ func (c *StateChannelStore) Reload() error {
353355
}
354356
reviseVChannelInfo(info.GetVchan())
355357

356-
c.AddNode(nodeID)
357-
358+
channelName := info.GetVchan().GetChannelName()
358359
channel := NewStateChannelByWatchInfo(nodeID, info)
360+
361+
if c.HasChannel(channelName) {
362+
dupChannel = append(dupChannel, channel)
363+
log.Warn("channel store detects duplicated channel, skip recovering it",
364+
zap.Int64("nodeID", nodeID),
365+
zap.String("channel", channelName))
366+
continue
367+
}
368+
369+
c.AddNode(nodeID)
359370
c.channelsInfo[nodeID].AddChannel(channel)
360-
log.Info("channel store reload channel",
361-
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
371+
log.Info("channel store reloads channel from meta",
372+
zap.Int64("nodeID", nodeID),
373+
zap.String("channel", channelName))
362374
metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels)))
363375
}
376+
377+
for _, channel := range dupChannel {
378+
log.Warn("channel store clearing duplicated channel",
379+
zap.String("channel", channel.GetName()), zap.Int64("nodeID", channel.assignedNode))
380+
chOp := NewChannelOpSet(NewChannelOp(channel.assignedNode, Delete, channel))
381+
if err := c.Update(chOp); err != nil {
382+
log.Warn("channel store failed to remove duplicated channel, will retry later",
383+
zap.String("channel", channel.GetName()),
384+
zap.Int64("nodeID", channel.assignedNode),
385+
zap.Error(err))
386+
}
387+
}
364388
log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan()))
365389
return nil
366390
}
@@ -375,15 +399,11 @@ func (c *StateChannelStore) AddNode(nodeID int64) {
375399
}
376400
}
377401

378-
func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
402+
func (c *StateChannelStore) UpdateState(err error, nodeID int64, channel RWChannel, opID int64) {
379403
channelName := channel.GetName()
380404
if cInfo, ok := c.channelsInfo[nodeID]; ok {
381405
if stateChannel, ok := cInfo.Channels[channelName]; ok {
382-
if isSuccessful {
383-
stateChannel.(*StateChannel).TransitionOnSuccess(opID)
384-
} else {
385-
stateChannel.(*StateChannel).TransitionOnFailure(opID)
386-
}
406+
stateChannel.(*StateChannel).TransitionState(err, opID)
387407
}
388408
}
389409
}

0 commit comments

Comments
 (0)