Skip to content

Commit 2232dfc

Browse files
authored
fix: Prevent Close from hanging on etcd reconnection (#45622)
issue: #45623 When etcd reconnects, the DataCoord rewatches DataNodes and calls ChannelManager.Startup again without closing the previous instance. This causes multiple contexts and goroutines to accumulate, leading to Close hanging indefinitely waiting for untracked goroutines. Root cause: - Etcd reconnection triggers rewatch flow and calls Startup again - Startup was not idempotent, allowing repeated calls - Multiple context cancellations and goroutines accumulated - Close would wait indefinitely for untracked goroutines Changes: - Add started field to ChannelManagerImpl - Refactor Startup to check and handle restart scenario - Add state check in Close to prevent hanging --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>
1 parent f1844c9 commit 2232dfc

File tree

3 files changed

+172
-6
lines changed

3 files changed

+172
-6
lines changed

internal/datacoord/channel_manager.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ type ChannelManagerImpl struct {
8484
legacyNodes typeutil.UniqueSet
8585

8686
lastActiveTimestamp time.Time
87+
88+
// Idempotency and restart support
89+
startupMu sync.Mutex // Protects Startup/Close operations
90+
started bool
8791
}
8892

8993
// ChannelBGChecker are goroutining running background
@@ -130,7 +134,19 @@ func NewChannelManager(
130134
}
131135

132136
func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes []int64) error {
133-
ctx, m.cancel = context.WithCancel(ctx)
137+
m.startupMu.Lock()
138+
defer m.startupMu.Unlock()
139+
140+
if m.started {
141+
// Already started, need to close first then restart
142+
m.doClose()
143+
}
144+
145+
return m.doStartup(ctx, legacyNodes, allNodes)
146+
}
147+
148+
func (m *ChannelManagerImpl) doStartup(ctx context.Context, legacyNodes, allNodes []int64) error {
149+
ctx, cancel := context.WithCancel(ctx)
134150

135151
m.legacyNodes = typeutil.NewUniqueSet(legacyNodes...)
136152

@@ -165,6 +181,10 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
165181
}
166182
m.mu.Unlock()
167183

184+
// All operations succeeded, now set the state
185+
m.cancel = cancel
186+
m.started = true
187+
168188
if m.balanceCheckLoop != nil {
169189
log.Ctx(ctx).Info("starting channel balance loop")
170190
m.wg.Add(1)
@@ -184,10 +204,24 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
184204
}
185205

186206
func (m *ChannelManagerImpl) Close() {
207+
m.startupMu.Lock()
208+
defer m.startupMu.Unlock()
209+
m.doClose()
210+
}
211+
212+
// doClose is the internal implementation of Close without acquiring startupMu.
213+
// It should only be called when startupMu is already held.
214+
func (m *ChannelManagerImpl) doClose() {
215+
if !m.started {
216+
return
217+
}
218+
187219
if m.cancel != nil {
188220
m.cancel()
189221
m.wg.Wait()
190222
}
223+
224+
m.started = false
191225
}
192226

193227
func (m *ChannelManagerImpl) AddNode(nodeID UniqueID) error {

internal/datacoord/channel_manager_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,3 +904,140 @@ func (s *ChannelManagerSuite) TestGetChannelWatchInfos() {
904904
infos = cm.GetChannelWatchInfos()
905905
s.Equal(0, len(infos))
906906
}
907+
908+
func (s *ChannelManagerSuite) TestStartupIdempotency() {
909+
s.Run("repeated Startup calls should be idempotent", func() {
910+
chNodes := map[string]int64{
911+
"ch1": 1,
912+
"ch2": 1,
913+
}
914+
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
915+
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Maybe()
916+
917+
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc, withCheckerV2())
918+
s.Require().NoError(err)
919+
920+
ctx, cancel := context.WithCancel(context.Background())
921+
defer cancel()
922+
923+
var (
924+
legacyNodes = []int64{1}
925+
allNodes = []int64{1, 2}
926+
)
927+
928+
// First Startup
929+
err = m.Startup(ctx, legacyNodes, allNodes)
930+
s.NoError(err)
931+
s.True(m.started)
932+
s.checkAssignment(m, 1, "ch1", Legacy)
933+
s.checkAssignment(m, 1, "ch2", Legacy)
934+
935+
// Wait a bit for goroutine to start
936+
// Second Startup - should close and restart
937+
err = m.Startup(ctx, legacyNodes, allNodes)
938+
s.NoError(err)
939+
s.True(m.started)
940+
s.checkAssignment(m, 1, "ch1", Legacy)
941+
s.checkAssignment(m, 1, "ch2", Legacy)
942+
943+
// Third Startup - should still work
944+
err = m.Startup(ctx, legacyNodes, allNodes)
945+
s.NoError(err)
946+
s.True(m.started)
947+
})
948+
}
949+
950+
func (s *ChannelManagerSuite) TestStartupAfterClose() {
951+
s.Run("Startup after Close should restart successfully", func() {
952+
chNodes := map[string]int64{
953+
"ch1": 1,
954+
"ch2": 1,
955+
}
956+
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
957+
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Maybe()
958+
959+
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc, withCheckerV2())
960+
s.Require().NoError(err)
961+
962+
ctx, cancel := context.WithCancel(context.Background())
963+
defer cancel()
964+
965+
var (
966+
legacyNodes = []int64{1}
967+
allNodes = []int64{1}
968+
)
969+
970+
// First Startup
971+
err = m.Startup(ctx, legacyNodes, allNodes)
972+
s.NoError(err)
973+
s.True(m.started)
974+
s.checkAssignment(m, 1, "ch1", Legacy)
975+
s.checkAssignment(m, 1, "ch2", Legacy)
976+
977+
// Close
978+
m.Close()
979+
s.False(m.started)
980+
981+
// Startup again after Close
982+
ctx2, cancel2 := context.WithCancel(context.Background())
983+
defer cancel2()
984+
985+
err = m.Startup(ctx2, legacyNodes, allNodes)
986+
s.NoError(err)
987+
s.True(m.started)
988+
s.checkAssignment(m, 1, "ch1", Legacy)
989+
s.checkAssignment(m, 1, "ch2", Legacy)
990+
991+
// Close again
992+
m.Close()
993+
s.False(m.started)
994+
})
995+
}
996+
997+
func (s *ChannelManagerSuite) TestCloseIdempotency() {
998+
s.Run("multiple Close calls should be idempotent", func() {
999+
chNodes := map[string]int64{
1000+
"ch1": 1,
1001+
}
1002+
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
1003+
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Maybe()
1004+
1005+
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc, withCheckerV2())
1006+
s.Require().NoError(err)
1007+
1008+
ctx, cancel := context.WithCancel(context.Background())
1009+
defer cancel()
1010+
1011+
// Startup first
1012+
err = m.Startup(ctx, []int64{1}, []int64{1})
1013+
s.NoError(err)
1014+
s.True(m.started)
1015+
1016+
// First Close
1017+
m.Close()
1018+
s.False(m.started)
1019+
1020+
// Second Close - should be safe
1021+
m.Close()
1022+
s.False(m.started)
1023+
1024+
// Third Close - should still be safe
1025+
m.Close()
1026+
s.False(m.started)
1027+
})
1028+
1029+
s.Run("Close without Startup should be safe", func() {
1030+
s.prepareMeta(nil, 0)
1031+
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
1032+
s.Require().NoError(err)
1033+
1034+
s.False(m.started)
1035+
1036+
// Close without Startup should not panic
1037+
s.NotPanics(func() {
1038+
m.Close()
1039+
})
1040+
1041+
s.False(m.started)
1042+
})
1043+
}

internal/datacoord/server.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -649,11 +649,6 @@ func (s *Server) rewatchDataNodes(sessions map[string]*sessionutil.Session) erro
649649
datanodes = append(datanodes, info)
650650
}
651651

652-
// if err := s.nodeManager.Startup(s.ctx, datanodes); err != nil {
653-
// log.Warn("DataCoord failed to add datanode", zap.Error(err))
654-
// return err
655-
// }
656-
657652
log.Info("DataCoord Cluster Manager start up")
658653
if err := s.cluster.Startup(s.ctx, datanodes); err != nil {
659654
log.Warn("DataCoord Cluster Manager failed to start up", zap.Error(err))

0 commit comments

Comments
 (0)