Skip to content

Commit f2fa468

Browse files
committed
fix: ChannelManager double assignment
Signed-off-by: yangxuan <[email protected]>
1 parent 1f5f8a4 commit f2fa468

File tree

5 files changed

+94
-40
lines changed

5 files changed

+94
-40
lines changed

internal/datacoord/channel_manager.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
156156
m.mu.Lock()
157157
nodeChannels := m.store.GetNodeChannelsBy(
158158
WithAllNodes(),
159-
func(ch *StateChannel) bool {
159+
func(ch *StateChannel) bool { // Channel with drop-mark
160160
return m.h.CheckShouldDropChannel(ch.GetName())
161161
})
162162
m.mu.Unlock()
@@ -238,6 +238,7 @@ func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
238238
zap.Array("updates", updates), zap.Error(err))
239239
}
240240

241+
// Speed up channel assignment
241242
// channel already written into meta, try to assign it to the cluster
242243
// not error is returned if failed, the assignment will retry later
243244
updates = m.assignPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
@@ -286,11 +287,8 @@ func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error {
286287
return nil
287288
}
288289

289-
// reassign reassigns a channel to another DataNode.
290+
// inner method, lock before using it, reassign reassigns a channel to another DataNode.
290291
func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error {
291-
m.mu.Lock()
292-
defer m.mu.Unlock()
293-
294292
updates := m.assignPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect())
295293
if updates != nil {
296294
return m.execute(updates)
@@ -436,15 +434,16 @@ func (m *ChannelManagerImpl) CheckLoop(ctx context.Context) {
436434
}
437435

438436
func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
439-
m.mu.RLock()
437+
m.mu.Lock()
440438
standbys := m.store.GetNodeChannelsBy(WithAllNodes(), WithChannelStates(Standby))
441439
toNotifies := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(ToWatch, ToRelease))
442440
toChecks := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(Watching, Releasing))
443-
m.mu.RUnlock()
444441

445-
// Processing standby channels
446-
updatedStandbys := false
447-
updatedStandbys = m.advanceStandbys(ctx, standbys)
442+
// Reassigning standby channels in locks to avoid concurrent assignment with Watch, Remove, AddNode, DeleteNode
443+
updatedStandbys := m.advanceStandbys(ctx, standbys)
444+
m.mu.Unlock()
445+
446+
// RPCs stays out of locks
448447
updatedToCheckes := m.advanceToChecks(ctx, toChecks)
449448
updatedToNotifies := m.advanceToNotifies(ctx, toNotifies)
450449

@@ -469,6 +468,7 @@ func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWCha
469468
}
470469
}
471470

471+
// inner method need locks
472472
func (m *ChannelManagerImpl) advanceStandbys(ctx context.Context, standbys []*NodeChannelInfo) bool {
473473
var advanced bool = false
474474
for _, nodeAssign := range standbys {
@@ -562,21 +562,26 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
562562
got, err := f.Await()
563563
res := got.(poolResult)
564564

565+
action := OnSuccess
565566
if err != nil {
566567
log.Ctx(ctx).Warn("Failed to notify channel operations to datanode",
567568
zap.Int64("assignment", nodeAssign.NodeID),
568569
zap.Int("operation count", channelCount),
569570
zap.String("channel name", res.ch.GetName()),
570571
zap.Error(err),
571572
)
573+
action = OnFailure
574+
if err == merr.ErrChannelReduplicate {
575+
action = OnNotifyDuplicate
576+
}
572577
failedChannels++
573578
} else {
574579
succeededChannels++
575580
advanced = true
576581
}
577582

578583
m.mu.Lock()
579-
m.store.UpdateState(err == nil, nodeID, res.ch, res.opID)
584+
m.store.UpdateState(action, nodeID, res.ch, res.opID)
580585
m.mu.Unlock()
581586
}
582587

@@ -636,7 +641,11 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
636641
if err == nil {
637642
m.mu.Lock()
638643
result := got.(poolResult)
639-
m.store.UpdateState(result.successful, nodeID, result.ch, result.opID)
644+
action := OnSuccess
645+
if !result.successful {
646+
action = OnFailure
647+
}
648+
m.store.UpdateState(action, nodeID, result.ch, result.opID)
640649
m.mu.Unlock()
641650

642651
advanced = true

internal/datacoord/channel_store.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type RWChannelStore interface {
7272
Update(op *ChannelOpSet) error
7373

7474
// UpdateState is used by StateChannelStore only
75-
UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)
75+
UpdateState(action Action, nodeID int64, channel RWChannel, opID int64)
7676
// SegLegacyChannelByNode is used by StateChannelStore only
7777
SetLegacyChannelByNode(nodeIDs ...int64)
7878

@@ -355,10 +355,18 @@ func (c *StateChannelStore) Reload() error {
355355

356356
c.AddNode(nodeID)
357357

358+
channelName := info.GetVchan().GetChannelName()
359+
if c.HasChannel(channelName) {
360+
log.Warn("channel store detects duplicated channel, skip recovering it",
361+
zap.Int64("nodeID", nodeID),
362+
zap.String("channel", channelName))
363+
}
364+
358365
channel := NewStateChannelByWatchInfo(nodeID, info)
359366
c.channelsInfo[nodeID].AddChannel(channel)
360-
log.Info("channel store reload channel",
361-
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
367+
log.Info("channel store reloads channel from meta",
368+
zap.Int64("nodeID", nodeID),
369+
zap.String("channel", channelName))
362370
metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels)))
363371
}
364372
log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan()))
@@ -375,14 +383,39 @@ func (c *StateChannelStore) AddNode(nodeID int64) {
375383
}
376384
}
377385

378-
func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
386+
func (c *StateChannelStore) SetState(targetState ChannelState, nodeID int64, channel RWChannel, opID int64) {
387+
channelName := channel.GetName()
388+
if cInfo, ok := c.channelsInfo[nodeID]; ok {
389+
if stateChannel, ok := cInfo.Channels[channelName]; ok {
390+
stateChannel.(*StateChannel).setState(targetState)
391+
}
392+
}
393+
}
394+
395+
type Action string
396+
397+
const (
398+
OnSuccess Action = "OnSuccess"
399+
OnFailure Action = "OnFailure"
400+
OnNotifyDuplicate Action = "OnNotifyDuplicate" // notify ToWatch to DataNode already subscribed to the channel
401+
)
402+
403+
func (c *StateChannelStore) UpdateState(action Action, nodeID int64, channel RWChannel, opID int64) {
379404
channelName := channel.GetName()
380405
if cInfo, ok := c.channelsInfo[nodeID]; ok {
381406
if stateChannel, ok := cInfo.Channels[channelName]; ok {
382-
if isSuccessful {
407+
switch action {
408+
case OnSuccess:
383409
stateChannel.(*StateChannel).TransitionOnSuccess(opID)
384-
} else {
410+
case OnFailure:
385411
stateChannel.(*StateChannel).TransitionOnFailure(opID)
412+
case OnNotifyDuplicate:
413+
stateChannel.(*StateChannel).setState(ToRelease)
414+
default:
415+
log.Warn("unknown action", zap.Any("action", action),
416+
zap.Int64("nodeID", nodeID),
417+
zap.String("channel", channelName),
418+
zap.Int64("opID", opID))
386419
}
387420
}
388421
}

internal/datacoord/channel_store_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -421,12 +421,18 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
421421
tests := []struct {
422422
description string
423423

424-
inSuccess bool
424+
inAction Action
425425
inChannelState ChannelState
426426
outChannelState ChannelState
427427
}{
428-
{"input standby, fail", false, Standby, Standby},
429-
{"input standby, success", true, Standby, ToWatch},
428+
{"input standby, fail", OnFailure, Standby, Standby},
429+
{"input standby, success", OnSuccess, Standby, ToWatch},
430+
{"input towatch, duplicate", OnNotifyDuplicate, ToWatch, ToRelease},
431+
{"input towatch, fail", OnFailure, ToWatch, Standby},
432+
{"input towatch, success", OnSuccess, ToWatch, Watching},
433+
{"input torelease, duplicate", OnNotifyDuplicate, ToRelease, ToRelease},
434+
{"input torelease, fail", OnFailure, ToRelease, ToRelease},
435+
{"input torelease, success", OnSuccess, ToRelease, Releasing},
430436
}
431437

432438
for _, test := range tests {
@@ -443,7 +449,7 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
443449
},
444450
}
445451

446-
store.UpdateState(test.inSuccess, bufferID, channel, 0)
452+
store.UpdateState(test.inAction, bufferID, channel, 0)
447453
s.Equal(test.outChannelState, channel.currentState)
448454
})
449455
}

internal/datacoord/mock_channel_store.go

Lines changed: 18 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/datanode/channel/channel_manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ func (m *ChannelManagerImpl) Submit(info *datapb.ChannelWatchInfo) error {
105105
return nil
106106
}
107107

108+
// DataNode already watched this channel of other OpID
109+
if info.GetState() == datapb.ChannelWatchState_ToWatch &&
110+
m.fgManager.HasFlowgraph(channel) {
111+
return merr.WrapErrChannelReduplicate(channel)
112+
}
113+
108114
if info.GetState() == datapb.ChannelWatchState_ToRelease &&
109115
!m.fgManager.HasFlowgraph(channel) {
110116
log.Warn("Release op already finished, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel))

0 commit comments

Comments
 (0)