Skip to content

fix: ChannelManager double assignment #41837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions internal/datacoord/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)

type ROChannel interface {
Expand Down Expand Up @@ -191,15 +193,30 @@ func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *St
return c
}

func (c *StateChannel) TransitionOnSuccess(opID int64) {
func (c *StateChannel) TransitionState(err error, opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on success but opID not match, stay original state ",
log.Warn("Try to transit state but opID not match, stay original state ",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}

if err == nil {
c.transitionOnSuccess()
return
}

if errors.Is(err, merr.ErrChannelReduplicate) {
c.setState(ToRelease)
return
}

c.transitionOnFailure()
}

func (c *StateChannel) transitionOnSuccess() {
switch c.currentState {
case Standby:
c.setState(ToWatch)
Expand All @@ -216,21 +233,11 @@ func (c *StateChannel) TransitionOnSuccess(opID int64) {
}
}

func (c *StateChannel) TransitionOnFailure(opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on failure but opID not match, stay original state",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}
func (c *StateChannel) transitionOnFailure() {
switch c.currentState {
case Watching:
c.setState(Standby)
case Releasing:
case Watching, Releasing, ToWatch:
c.setState(Standby)
case Standby, ToWatch, Watched, ToRelease:
case Standby, Watched, ToRelease:
// Stay original state
}
}
Expand Down
46 changes: 25 additions & 21 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,14 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
m.mu.Lock()
nodeChannels := m.store.GetNodeChannelsBy(
WithAllNodes(),
func(ch *StateChannel) bool {
func(ch *StateChannel) bool { // Channel with drop-mark
return m.h.CheckShouldDropChannel(ch.GetName())
})
m.mu.Unlock()

for _, info := range nodeChannels {
m.finishRemoveChannel(info.NodeID, lo.Values(info.Channels)...)
}
m.mu.Unlock()

if m.balanceCheckLoop != nil {
log.Ctx(ctx).Info("starting channel balance loop")
Expand Down Expand Up @@ -238,6 +238,7 @@ func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
zap.Array("updates", updates), zap.Error(err))
}

// Speed up channel assignment
// channel already written into meta, try to assign it to the cluster
// not error is returned if failed, the assignment will retry later
updates = m.assignPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
Expand Down Expand Up @@ -286,11 +287,8 @@ func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error {
return nil
}

// reassign reassigns a channel to another DataNode.
// inner method, lock before using it, reassign reassigns a channel to another DataNode.
func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error {
m.mu.Lock()
defer m.mu.Unlock()

updates := m.assignPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect())
if updates != nil {
return m.execute(updates)
Expand Down Expand Up @@ -436,15 +434,16 @@ func (m *ChannelManagerImpl) CheckLoop(ctx context.Context) {
}

func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
m.mu.RLock()
m.mu.Lock()
standbys := m.store.GetNodeChannelsBy(WithAllNodes(), WithChannelStates(Standby))
toNotifies := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(ToWatch, ToRelease))
toChecks := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(Watching, Releasing))
m.mu.RUnlock()

// Processing standby channels
updatedStandbys := false
updatedStandbys = m.advanceStandbys(ctx, standbys)
// Reassigning standby channels in locks to avoid concurrent assignment with Watch, Remove, AddNode, DeleteNode
updatedStandbys := m.advanceStandbys(ctx, standbys)
m.mu.Unlock()

// RPCs stays out of locks
updatedToCheckes := m.advanceToChecks(ctx, toChecks)
updatedToNotifies := m.advanceToNotifies(ctx, toNotifies)

Expand All @@ -453,9 +452,8 @@ func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
}
}

// inner method need lock
func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWChannel) {
m.mu.Lock()
defer m.mu.Unlock()
for _, ch := range channels {
if err := m.removeChannel(nodeID, ch); err != nil {
log.Warn("Failed to remove channel", zap.Any("channel", ch), zap.Error(err))
Expand All @@ -469,6 +467,7 @@ func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWCha
}
}

// inner method need locks
func (m *ChannelManagerImpl) advanceStandbys(ctx context.Context, standbys []*NodeChannelInfo) bool {
var advanced bool = false
for _, nodeAssign := range standbys {
Expand Down Expand Up @@ -576,7 +575,7 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
}

m.mu.Lock()
m.store.UpdateState(err == nil, nodeID, res.ch, res.opID)
m.store.UpdateState(err, nodeID, res.ch, res.opID)
m.mu.Unlock()
}

Expand All @@ -592,9 +591,9 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
}

type poolResult struct {
successful bool
ch RWChannel
opID int64
err error
ch RWChannel
opID int64
}

func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
Expand All @@ -620,10 +619,14 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
future := getOrCreateIOPool().Submit(func() (any, error) {
successful, got := m.Check(ctx, nodeID, tmpWatchInfo)
if got {
var err error
if !successful {
err = errors.New("operation in progress")
}
return poolResult{
successful: successful,
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
err: err,
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
}, nil
}
return nil, errors.New("Got results with no progress")
Expand All @@ -636,7 +639,7 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
if err == nil {
m.mu.Lock()
result := got.(poolResult)
m.store.UpdateState(result.successful, nodeID, result.ch, result.opID)
m.store.UpdateState(result.err, nodeID, result.ch, result.opID)
m.mu.Unlock()

advanced = true
Expand Down Expand Up @@ -712,6 +715,7 @@ func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *data
return false, false
}

// inner method need lock
func (m *ChannelManagerImpl) execute(updates *ChannelOpSet) error {
for _, op := range updates.ops {
if op.Type != Delete {
Expand Down
29 changes: 27 additions & 2 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,31 @@ func (s *ChannelManagerSuite) TestFindWatcher() {
func (s *ChannelManagerSuite) TestAdvanceChannelState() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.Run("advance towatch dn watched to torelease", func() {
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error {
s.Require().Equal(1, len(req.GetInfos()))
switch req.GetInfos()[0].GetVchan().GetChannelName() {
case "ch2":
return merr.WrapErrChannelReduplicate("ch2")
default:
return nil
}
}).Twice()
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)

m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", ToRelease)
})
s.Run("advance statndby with no available nodes", func() {
chNodes := map[string]int64{
"ch1": bufferID,
Expand Down Expand Up @@ -680,8 +705,8 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.checkAssignment(m, 1, "ch2", ToWatch)

m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
s.checkAssignment(m, 1, "ch1", Standby)
s.checkAssignment(m, 1, "ch2", Standby)
})
s.Run("advance to release channels notify success", func() {
chNodes := map[string]int64{
Expand Down
42 changes: 31 additions & 11 deletions internal/datacoord/channel_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
Update(op *ChannelOpSet) error

// UpdateState is used by StateChannelStore only
UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)
UpdateState(err error, nodeID int64, channel RWChannel, opID int64)
// SegLegacyChannelByNode is used by StateChannelStore only
SetLegacyChannelByNode(nodeIDs ...int64)

Expand Down Expand Up @@ -339,6 +339,8 @@
if err != nil {
return err
}

dupChannel := []*StateChannel{}
for i := 0; i < len(keys); i++ {
k := keys[i]
v := values[i]
Expand All @@ -353,14 +355,36 @@
}
reviseVChannelInfo(info.GetVchan())

c.AddNode(nodeID)

channelName := info.GetVchan().GetChannelName()
channel := NewStateChannelByWatchInfo(nodeID, info)

if c.HasChannel(channelName) {
dupChannel = append(dupChannel, channel)
log.Warn("channel store detects duplicated channel, skip recovering it",
zap.Int64("nodeID", nodeID),
zap.String("channel", channelName))
continue
}

c.AddNode(nodeID)
c.channelsInfo[nodeID].AddChannel(channel)
log.Info("channel store reload channel",
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
log.Info("channel store reloads channel from meta",
zap.Int64("nodeID", nodeID),
zap.String("channel", channelName))
metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels)))
}

for _, channel := range dupChannel {
log.Warn("channel store clearing duplicated channel",
zap.String("channel", channel.GetName()), zap.Int64("nodeID", channel.assignedNode))
chOp := NewChannelOpSet(NewChannelOp(channel.assignedNode, Delete, channel))
if err := c.Update(chOp); err != nil {
log.Warn("channel store failed to remove duplicated channel, will retry later",
zap.String("channel", channel.GetName()),
zap.Int64("nodeID", channel.assignedNode),
zap.Error(err))
}

Check warning on line 386 in internal/datacoord/channel_store.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/channel_store.go#L382-L386

Added lines #L382 - L386 were not covered by tests
}
log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan()))
return nil
}
Expand All @@ -375,15 +399,11 @@
}
}

func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
func (c *StateChannelStore) UpdateState(err error, nodeID int64, channel RWChannel, opID int64) {
channelName := channel.GetName()
if cInfo, ok := c.channelsInfo[nodeID]; ok {
if stateChannel, ok := cInfo.Channels[channelName]; ok {
if isSuccessful {
stateChannel.(*StateChannel).TransitionOnSuccess(opID)
} else {
stateChannel.(*StateChannel).TransitionOnFailure(opID)
}
stateChannel.(*StateChannel).TransitionState(err, opID)
}
}
}
Expand Down
Loading
Loading