Skip to content

Commit f572905

Browse files
committed
fix: ChannelManager double assignment
This pr fixs double assign in channels, recurr of dropped channel, amend extra channel meta, and refresh incorrect state with DN by the following edits: 1. Loose the lock in advanceStandbys to avoid concurrent assignment. 2. Trasfer ToWatch to ToRelease if DN returns ErrChannelRedupulicate. 3. Remove dup channel when recovering Signed-off-by: yangxuan <[email protected]>
1 parent b59a2d6 commit f572905

File tree

7 files changed

+175
-49
lines changed

7 files changed

+175
-49
lines changed

internal/datacoord/channel.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,9 @@ func (c *StateChannel) TransitionOnFailure(opID int64) {
226226
return
227227
}
228228
switch c.currentState {
229-
case Watching:
230-
c.setState(Standby)
231-
case Releasing:
229+
case Watching, Releasing, ToWatch:
232230
c.setState(Standby)
233-
case Standby, ToWatch, Watched, ToRelease:
231+
case Standby, Watched, ToRelease:
234232
// Stay original state
235233
}
236234
}

internal/datacoord/channel_manager.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,14 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
156156
m.mu.Lock()
157157
nodeChannels := m.store.GetNodeChannelsBy(
158158
WithAllNodes(),
159-
func(ch *StateChannel) bool {
159+
func(ch *StateChannel) bool { // Channel with drop-mark
160160
return m.h.CheckShouldDropChannel(ch.GetName())
161161
})
162-
m.mu.Unlock()
163162

164163
for _, info := range nodeChannels {
165164
m.finishRemoveChannel(info.NodeID, lo.Values(info.Channels)...)
166165
}
166+
m.mu.Unlock()
167167

168168
if m.balanceCheckLoop != nil {
169169
log.Ctx(ctx).Info("starting channel balance loop")
@@ -238,6 +238,7 @@ func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
238238
zap.Array("updates", updates), zap.Error(err))
239239
}
240240

241+
// Speed up channel assignment
241242
// channel already written into meta, try to assign it to the cluster
242243
// not error is returned if failed, the assignment will retry later
243244
updates = m.assignPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
@@ -286,11 +287,8 @@ func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error {
286287
return nil
287288
}
288289

289-
// reassign reassigns a channel to another DataNode.
290+
// inner method, lock before using it, reassign reassigns a channel to another DataNode.
290291
func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error {
291-
m.mu.Lock()
292-
defer m.mu.Unlock()
293-
294292
updates := m.assignPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect())
295293
if updates != nil {
296294
return m.execute(updates)
@@ -436,15 +434,16 @@ func (m *ChannelManagerImpl) CheckLoop(ctx context.Context) {
436434
}
437435

438436
func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
439-
m.mu.RLock()
437+
m.mu.Lock()
440438
standbys := m.store.GetNodeChannelsBy(WithAllNodes(), WithChannelStates(Standby))
441439
toNotifies := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(ToWatch, ToRelease))
442440
toChecks := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(Watching, Releasing))
443-
m.mu.RUnlock()
444441

445-
// Processing standby channels
446-
updatedStandbys := false
447-
updatedStandbys = m.advanceStandbys(ctx, standbys)
442+
// Reassigning standby channels in locks to avoid concurrent assignment with Watch, Remove, AddNode, DeleteNode
443+
updatedStandbys := m.advanceStandbys(ctx, standbys)
444+
m.mu.Unlock()
445+
446+
// RPCs stays out of locks
448447
updatedToCheckes := m.advanceToChecks(ctx, toChecks)
449448
updatedToNotifies := m.advanceToNotifies(ctx, toNotifies)
450449

@@ -453,9 +452,8 @@ func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
453452
}
454453
}
455454

455+
// inner method need lock
456456
func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWChannel) {
457-
m.mu.Lock()
458-
defer m.mu.Unlock()
459457
for _, ch := range channels {
460458
if err := m.removeChannel(nodeID, ch); err != nil {
461459
log.Warn("Failed to remove channel", zap.Any("channel", ch), zap.Error(err))
@@ -469,6 +467,7 @@ func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWCha
469467
}
470468
}
471469

470+
// inner method need locks
472471
func (m *ChannelManagerImpl) advanceStandbys(ctx context.Context, standbys []*NodeChannelInfo) bool {
473472
var advanced bool = false
474473
for _, nodeAssign := range standbys {
@@ -562,21 +561,26 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
562561
got, err := f.Await()
563562
res := got.(poolResult)
564563

564+
action := OnSuccess
565565
if err != nil {
566566
log.Ctx(ctx).Warn("Failed to notify channel operations to datanode",
567567
zap.Int64("assignment", nodeAssign.NodeID),
568568
zap.Int("operation count", channelCount),
569569
zap.String("channel name", res.ch.GetName()),
570570
zap.Error(err),
571571
)
572+
action = OnFailure
573+
if err == merr.ErrChannelReduplicate {
574+
action = OnNotifyDuplicate
575+
}
572576
failedChannels++
573577
} else {
574578
succeededChannels++
575579
advanced = true
576580
}
577581

578582
m.mu.Lock()
579-
m.store.UpdateState(err == nil, nodeID, res.ch, res.opID)
583+
m.store.UpdateState(action, nodeID, res.ch, res.opID)
580584
m.mu.Unlock()
581585
}
582586

@@ -636,7 +640,11 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
636640
if err == nil {
637641
m.mu.Lock()
638642
result := got.(poolResult)
639-
m.store.UpdateState(result.successful, nodeID, result.ch, result.opID)
643+
action := OnSuccess
644+
if !result.successful {
645+
action = OnFailure
646+
}
647+
m.store.UpdateState(action, nodeID, result.ch, result.opID)
640648
m.mu.Unlock()
641649

642650
advanced = true
@@ -712,6 +720,7 @@ func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *data
712720
return false, false
713721
}
714722

723+
// inner method need lock
715724
func (m *ChannelManagerImpl) execute(updates *ChannelOpSet) error {
716725
for _, op := range updates.ops {
717726
if op.Type != Delete {

internal/datacoord/channel_manager_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,31 @@ func (s *ChannelManagerSuite) TestFindWatcher() {
378378
func (s *ChannelManagerSuite) TestAdvanceChannelState() {
379379
ctx, cancel := context.WithCancel(context.Background())
380380
defer cancel()
381+
s.Run("advance towatch dn watched to torelease", func() {
382+
chNodes := map[string]int64{
383+
"ch1": 1,
384+
"ch2": 1,
385+
}
386+
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
387+
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).
388+
RunAndReturn(func(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error {
389+
s.Require().Equal(1, len(req.GetInfos()))
390+
switch req.GetInfos()[0].GetVchan().GetChannelName() {
391+
case "ch2":
392+
return merr.WrapErrChannelReduplicate("ch2")
393+
default:
394+
return nil
395+
}
396+
}).Twice()
397+
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
398+
s.Require().NoError(err)
399+
s.checkAssignment(m, 1, "ch1", ToWatch)
400+
s.checkAssignment(m, 1, "ch2", ToWatch)
401+
402+
m.AdvanceChannelState(ctx)
403+
s.checkAssignment(m, 1, "ch1", Watching)
404+
s.checkAssignment(m, 1, "ch2", ToRelease)
405+
})
381406
s.Run("advance statndby with no available nodes", func() {
382407
chNodes := map[string]int64{
383408
"ch1": bufferID,

internal/datacoord/channel_store.go

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type RWChannelStore interface {
7272
Update(op *ChannelOpSet) error
7373

7474
// UpdateState is used by StateChannelStore only
75-
UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)
75+
UpdateState(action Action, nodeID int64, channel RWChannel, opID int64)
7676
// SegLegacyChannelByNode is used by StateChannelStore only
7777
SetLegacyChannelByNode(nodeIDs ...int64)
7878

@@ -339,6 +339,8 @@ func (c *StateChannelStore) Reload() error {
339339
if err != nil {
340340
return err
341341
}
342+
343+
dupChannel := []*StateChannel{}
342344
for i := 0; i < len(keys); i++ {
343345
k := keys[i]
344346
v := values[i]
@@ -353,14 +355,36 @@ func (c *StateChannelStore) Reload() error {
353355
}
354356
reviseVChannelInfo(info.GetVchan())
355357

356-
c.AddNode(nodeID)
357-
358+
channelName := info.GetVchan().GetChannelName()
358359
channel := NewStateChannelByWatchInfo(nodeID, info)
360+
361+
if c.HasChannel(channelName) {
362+
dupChannel = append(dupChannel, channel)
363+
log.Warn("channel store detects duplicated channel, skip recovering it",
364+
zap.Int64("nodeID", nodeID),
365+
zap.String("channel", channelName))
366+
continue
367+
}
368+
369+
c.AddNode(nodeID)
359370
c.channelsInfo[nodeID].AddChannel(channel)
360-
log.Info("channel store reload channel",
361-
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
371+
log.Info("channel store reloads channel from meta",
372+
zap.Int64("nodeID", nodeID),
373+
zap.String("channel", channelName))
362374
metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels)))
363375
}
376+
377+
for _, channel := range dupChannel {
378+
log.Warn("channel store clearing duplicated channel",
379+
zap.String("channel", channel.GetName()), zap.Int64("nodeID", channel.assignedNode))
380+
chOp := NewChannelOpSet(NewChannelOp(channel.assignedNode, Delete, channel))
381+
if err := c.Update(chOp); err != nil {
382+
log.Warn("channel store failed to remove duplicated channel, will retry later",
383+
zap.String("channel", channel.GetName()),
384+
zap.Int64("nodeID", channel.assignedNode),
385+
zap.Error(err))
386+
}
387+
}
364388
log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan()))
365389
return nil
366390
}
@@ -375,14 +399,39 @@ func (c *StateChannelStore) AddNode(nodeID int64) {
375399
}
376400
}
377401

378-
func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
402+
func (c *StateChannelStore) SetState(targetState ChannelState, nodeID int64, channel RWChannel, opID int64) {
403+
channelName := channel.GetName()
404+
if cInfo, ok := c.channelsInfo[nodeID]; ok {
405+
if stateChannel, ok := cInfo.Channels[channelName]; ok {
406+
stateChannel.(*StateChannel).setState(targetState)
407+
}
408+
}
409+
}
410+
411+
type Action string
412+
413+
const (
414+
OnSuccess Action = "OnSuccess"
415+
OnFailure Action = "OnFailure"
416+
OnNotifyDuplicate Action = "OnNotifyDuplicate" // notify ToWatch to DataNode already subscribed to the channel
417+
)
418+
419+
func (c *StateChannelStore) UpdateState(action Action, nodeID int64, channel RWChannel, opID int64) {
379420
channelName := channel.GetName()
380421
if cInfo, ok := c.channelsInfo[nodeID]; ok {
381422
if stateChannel, ok := cInfo.Channels[channelName]; ok {
382-
if isSuccessful {
423+
switch action {
424+
case OnSuccess:
383425
stateChannel.(*StateChannel).TransitionOnSuccess(opID)
384-
} else {
426+
case OnFailure:
385427
stateChannel.(*StateChannel).TransitionOnFailure(opID)
428+
case OnNotifyDuplicate:
429+
stateChannel.(*StateChannel).setState(ToRelease)
430+
default:
431+
log.Warn("unknown action", zap.Any("action", action),
432+
zap.Int64("nodeID", nodeID),
433+
zap.String("channel", channelName),
434+
zap.Int64("opID", opID))
386435
}
387436
}
388437
}

internal/datacoord/channel_store_test.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -421,12 +421,18 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
421421
tests := []struct {
422422
description string
423423

424-
inSuccess bool
424+
inAction Action
425425
inChannelState ChannelState
426426
outChannelState ChannelState
427427
}{
428-
{"input standby, fail", false, Standby, Standby},
429-
{"input standby, success", true, Standby, ToWatch},
428+
{"input standby fail", OnFailure, Standby, Standby},
429+
{"input standby success", OnSuccess, Standby, ToWatch},
430+
{"input towatch duplicate", OnNotifyDuplicate, ToWatch, ToRelease},
431+
{"input towatch fail", OnFailure, ToWatch, Standby},
432+
{"input towatch success", OnSuccess, ToWatch, Watching},
433+
{"input torelease duplicate", OnNotifyDuplicate, ToRelease, ToRelease},
434+
{"input torelease fail", OnFailure, ToRelease, ToRelease},
435+
{"input torelease success", OnSuccess, ToRelease, Releasing},
430436
}
431437

432438
for _, test := range tests {
@@ -443,12 +449,45 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
443449
},
444450
}
445451

446-
store.UpdateState(test.inSuccess, bufferID, channel, 0)
452+
store.UpdateState(test.inAction, bufferID, channel, 0)
447453
s.Equal(test.outChannelState, channel.currentState)
448454
})
449455
}
450456
}
451457

458+
func (s *StateChannelStoreSuite) TestReloadDupCh() {
459+
s.mockTxn.ExpectedCalls = nil
460+
461+
tests := []struct {
462+
channelName string
463+
nodeID int64
464+
}{
465+
{"ch1", 1},
466+
{"ch1", bufferID},
467+
{"ch1", 2},
468+
}
469+
470+
var keys, values []string
471+
for _, test := range tests {
472+
keys = append(keys, fmt.Sprintf("channel_store/%d/%s", test.nodeID, test.channelName))
473+
info := generateWatchInfo(test.channelName, datapb.ChannelWatchState_WatchSuccess)
474+
bs, err := proto.Marshal(info)
475+
s.Require().NoError(err)
476+
values = append(values, string(bs))
477+
}
478+
s.mockTxn.EXPECT().LoadWithPrefix(mock.Anything, mock.AnythingOfType("string")).Return(keys, values, nil)
479+
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(2)
480+
481+
store := NewStateChannelStore(s.mockTxn)
482+
err := store.Reload()
483+
s.Require().NoError(err)
484+
485+
s.True(store.HasChannel("ch1"))
486+
s.ElementsMatch([]int64{1}, store.GetNodes())
487+
s.EqualValues(1, store.GetNodeChannelCount(1))
488+
s.EqualValues(0, store.GetNodeChannelCount(2))
489+
}
490+
452491
func (s *StateChannelStoreSuite) TestReload() {
453492
type item struct {
454493
nodeID int64

0 commit comments

Comments
 (0)