Skip to content

Commit 5cb3d57

Browse files
committed
maintainer: harden drain target add path
1 parent 2b2b7ec commit 5cb3d57

File tree

5 files changed

+164
-34
lines changed

5 files changed

+164
-34
lines changed

maintainer/maintainer_manager_maintainers.go

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/pingcap/ticdc/heartbeatpb"
2424
"github.com/pingcap/ticdc/pkg/common"
2525
"github.com/pingcap/ticdc/pkg/config"
26+
"github.com/pingcap/ticdc/pkg/errors"
2627
"github.com/pingcap/ticdc/pkg/liveness"
2728
"github.com/pingcap/ticdc/pkg/messaging"
2829
"github.com/pingcap/ticdc/pkg/node"
@@ -71,8 +72,7 @@ func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest)
7172
return nil
7273
}
7374

74-
target, epoch := m.getDispatcherDrainTarget()
75-
return m.maintainers.handleAddMaintainer(req, target, epoch)
75+
return m.maintainers.handleAddMaintainer(req, m.getDispatcherDrainTarget)
7676
}
7777

7878
// onRemoveMaintainerRequest delegates changefeed removal to the maintainer part.
@@ -86,10 +86,20 @@ func (m *Manager) onDispatchMaintainerRequest(
8686
msg *messaging.TargetMessage,
8787
) *heartbeatpb.MaintainerStatus {
8888
if m.coordinatorID != msg.From {
89-
log.Warn("ignore invalid coordinator id",
90-
zap.Any("request", msg),
91-
zap.Any("coordinatorID", m.coordinatorID),
92-
zap.Stringer("from", msg.From))
89+
fields := []zap.Field{
90+
zap.String("type", msg.Type.String()),
91+
zap.Stringer("coordinatorID", m.coordinatorID),
92+
zap.Stringer("from", msg.From),
93+
}
94+
switch msg.Type {
95+
case messaging.TypeAddMaintainerRequest:
96+
changefeedID := common.NewChangefeedIDFromPB(msg.Message[0].(*heartbeatpb.AddMaintainerRequest).Id)
97+
fields = append(fields, zap.Stringer("changefeedID", changefeedID))
98+
case messaging.TypeRemoveMaintainerRequest:
99+
changefeedID := common.NewChangefeedIDFromPB(msg.Message[0].(*heartbeatpb.RemoveMaintainerRequest).Id)
100+
fields = append(fields, zap.Stringer("changefeedID", changefeedID))
101+
}
102+
log.Warn("ignore invalid coordinator id", fields...)
93103
return nil
94104
}
95105
switch msg.Type {
@@ -155,33 +165,42 @@ func (p *managerMaintainerSet) buildBootstrapResponse() *heartbeatpb.Coordinator
155165
// with the latest node-scoped dispatcher drain target.
156166
func (p *managerMaintainerSet) handleAddMaintainer(
157167
req *heartbeatpb.AddMaintainerRequest,
158-
target node.ID,
159-
epoch uint64,
168+
getDrainTarget func() (node.ID, uint64),
160169
) *heartbeatpb.MaintainerStatus {
161170
changefeedID := common.NewChangefeedIDFromPB(req.Id)
162-
_, ok := p.registry.Load(changefeedID)
163-
if ok {
171+
if _, ok := p.registry.Load(changefeedID); ok {
164172
return nil
165173
}
166174

167175
info := &config.ChangeFeedInfo{}
168-
err := json.Unmarshal(req.Config, info)
169-
if err != nil {
170-
log.Panic("decode changefeed fail", zap.Error(err))
176+
if err := json.Unmarshal(req.Config, info); err != nil {
177+
log.Error("ignore add maintainer request with invalid config",
178+
zap.Stringer("changefeedID", changefeedID),
179+
zap.Int("configBytes", len(req.Config)),
180+
zap.Error(err))
181+
return nil
171182
}
172183
if req.CheckpointTs == 0 {
173-
log.Panic("add maintainer with invalid checkpointTs",
184+
log.Error("ignore add maintainer request with invalid checkpointTs",
174185
zap.Stringer("changefeedID", changefeedID),
175-
zap.Uint64("checkpointTs", req.CheckpointTs),
176-
zap.Any("info", info))
186+
zap.Uint64("checkpointTs", req.CheckpointTs))
187+
return nil
177188
}
178-
179189
maintainer := NewMaintainer(changefeedID, p.conf, info, p.nodeInfo, p.taskScheduler, req.CheckpointTs, req.IsNewChangefeed, req.KeyspaceId)
180-
// Seed the maintainer with the manager-level drain snapshot before its event
181-
// loop starts so late additions still honor an already-active drain target.
182-
maintainer.SetDispatcherDrainTarget(target, epoch)
183-
p.registry.Store(changefeedID, maintainer)
184-
maintainer.pushEvent(&Event{changefeedID: changefeedID, eventType: EventInit})
190+
registered, loaded := p.registry.LoadOrStore(changefeedID, maintainer)
191+
if loaded {
192+
// Duplicate add requests can race on the same changefeed. Drop the loser and
193+
// stop the redundant maintainer immediately so background goroutines do not leak.
194+
maintainer.Close()
195+
return nil
196+
}
197+
198+
registeredMaintainer := registered.(*Maintainer)
199+
// Register the maintainer before seeding the drain snapshot so concurrent
200+
// manager-level drain fanout can always observe it in the registry.
201+
target, epoch := getDrainTarget()
202+
registeredMaintainer.SetDispatcherDrainTarget(target, epoch)
203+
registeredMaintainer.pushEvent(&Event{changefeedID: changefeedID, eventType: EventInit})
185204
return nil
186205
}
187206

@@ -276,7 +295,7 @@ func (p *managerMaintainerSet) dispatchMaintainerMessage(
276295
}
277296
select {
278297
case <-ctx.Done():
279-
return ctx.Err()
298+
return errors.Trace(ctx.Err())
280299
default:
281300
maintainer := c.(*Maintainer)
282301
maintainer.pushEvent(&Event{

maintainer/maintainer_manager_node.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,16 +163,15 @@ func (m *Manager) onSetDispatcherDrainTargetRequest(msg *messaging.TargetMessage
163163

164164
req := msg.Message[0].(*heartbeatpb.SetDispatcherDrainTargetRequest)
165165
target := node.ID(req.TargetNodeId)
166-
if !m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) {
167-
return
168-
}
169-
170-
log.Info("dispatcher drain target updated",
171-
zap.Stringer("targetNodeID", target),
172-
zap.Uint64("targetEpoch", req.TargetEpoch))
173-
m.maintainers.applyDispatcherDrainTarget(target, req.TargetEpoch)
174-
// A manager-level heartbeat is the authoritative acknowledgement that this
175-
// node has applied the latest drain target, even when no maintainers exist.
166+
if m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) {
167+
log.Info("dispatcher drain target updated",
168+
zap.Stringer("targetNodeID", target),
169+
zap.Uint64("targetEpoch", req.TargetEpoch))
170+
m.maintainers.applyDispatcherDrainTarget(target, req.TargetEpoch)
171+
}
172+
// A manager-level heartbeat is the authoritative acknowledgement of the
173+
// latest local drain snapshot, even when this request is a retry or stale
174+
// duplicate and no maintainer update is needed.
176175
m.sendNodeHeartbeat(true)
177176
}
178177

maintainer/node_liveness_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
package maintainer
1414

1515
import (
16+
"encoding/json"
1617
"testing"
1718

1819
"github.com/pingcap/ticdc/heartbeatpb"
20+
"github.com/pingcap/ticdc/pkg/common"
1921
appcontext "github.com/pingcap/ticdc/pkg/common/context"
2022
"github.com/pingcap/ticdc/pkg/config"
2123
"github.com/pingcap/ticdc/pkg/liveness"
@@ -187,4 +189,55 @@ func TestSetDispatcherDrainTargetSendsNodeHeartbeatAck(t *testing.T) {
187189
hb = apply("", 1)
188190
require.Equal(t, "", hb.DispatcherDrainTargetNodeId)
189191
require.Equal(t, uint64(1), hb.DispatcherDrainTargetEpoch)
192+
193+
// Same-epoch reactivation is rejected locally, but retries should still get
194+
// an immediate heartbeat that reflects the latest applied snapshot.
195+
hb = apply("n2", 1)
196+
require.Equal(t, "", hb.DispatcherDrainTargetNodeId)
197+
require.Equal(t, uint64(1), hb.DispatcherDrainTargetEpoch)
198+
}
199+
200+
func TestAddMaintainerIgnoreInvalidConfig(t *testing.T) {
201+
mc := messaging.NewMockMessageCenter()
202+
appcontext.SetService(appcontext.MessageCenter, mc)
203+
204+
var nodeLiveness liveness.Liveness
205+
m := NewMaintainerManager(&node.Info{ID: node.ID("n1")}, &config.SchedulerConfig{}, &nodeLiveness)
206+
207+
changefeedID := common.NewChangeFeedIDWithName("cf-invalid-config", common.DefaultKeyspaceName)
208+
status := m.onAddMaintainerRequest(&heartbeatpb.AddMaintainerRequest{
209+
Id: changefeedID.ToPB(),
210+
Config: []byte("not-json"),
211+
CheckpointTs: 10,
212+
})
213+
require.Nil(t, status)
214+
215+
_, ok := m.GetMaintainerForChangefeed(changefeedID)
216+
require.False(t, ok)
217+
}
218+
219+
func TestAddMaintainerIgnoreInvalidCheckpointTs(t *testing.T) {
220+
mc := messaging.NewMockMessageCenter()
221+
appcontext.SetService(appcontext.MessageCenter, mc)
222+
223+
var nodeLiveness liveness.Liveness
224+
m := NewMaintainerManager(&node.Info{ID: node.ID("n1")}, &config.SchedulerConfig{}, &nodeLiveness)
225+
226+
changefeedID := common.NewChangeFeedIDWithName("cf-invalid-checkpoint", common.DefaultKeyspaceName)
227+
info := &config.ChangeFeedInfo{
228+
ChangefeedID: changefeedID,
229+
Config: config.GetDefaultReplicaConfig(),
230+
}
231+
data, err := json.Marshal(info)
232+
require.NoError(t, err)
233+
234+
status := m.onAddMaintainerRequest(&heartbeatpb.AddMaintainerRequest{
235+
Id: changefeedID.ToPB(),
236+
Config: data,
237+
CheckpointTs: 0,
238+
})
239+
require.Nil(t, status)
240+
241+
_, ok := m.GetMaintainerForChangefeed(changefeedID)
242+
require.False(t, ok)
190243
}

maintainer/scheduler/drain_common.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ func (s *DrainState) SetSelfNodeID(id node.ID) {
5858
}
5959

6060
// SetDispatcherDrainTarget applies the newest drain target visible to the
61-
// changefeed. Older epochs are ignored so scheduler state does not regress.
61+
// changefeed. Older epochs are ignored so scheduler state does not regress, and
62+
// same-epoch updates follow the manager-level monotonic rule: only clearing a
63+
// non-empty target is allowed.
6264
func (s *DrainState) SetDispatcherDrainTarget(target node.ID, epoch uint64) {
6365
if s == nil {
6466
return
@@ -68,6 +70,15 @@ func (s *DrainState) SetDispatcherDrainTarget(target node.ID, epoch uint64) {
6870
if epoch < s.targetEpoch {
6971
return
7072
}
73+
if epoch == s.targetEpoch {
74+
if target == s.targetNodeID {
75+
return
76+
}
77+
if target.IsEmpty() && !s.targetNodeID.IsEmpty() {
78+
s.targetNodeID = target
79+
}
80+
return
81+
}
7182
s.targetNodeID = target
7283
s.targetEpoch = epoch
7384
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package scheduler
15+
16+
import (
17+
"testing"
18+
19+
"github.com/pingcap/ticdc/pkg/node"
20+
"github.com/stretchr/testify/require"
21+
)
22+
23+
func TestDrainStateRejectSameEpochReactivation(t *testing.T) {
24+
state := NewDrainState()
25+
26+
state.SetDispatcherDrainTarget(node.ID("n1"), 1)
27+
target, epoch := state.DispatcherDrainTarget()
28+
require.Equal(t, node.ID("n1"), target)
29+
require.Equal(t, uint64(1), epoch)
30+
31+
state.SetDispatcherDrainTarget("", 1)
32+
target, epoch = state.DispatcherDrainTarget()
33+
require.Equal(t, node.ID(""), target)
34+
require.Equal(t, uint64(1), epoch)
35+
36+
// A stale add path snapshot must not reactivate the target after the same
37+
// epoch has already been cleared.
38+
state.SetDispatcherDrainTarget(node.ID("n1"), 1)
39+
target, epoch = state.DispatcherDrainTarget()
40+
require.Equal(t, node.ID(""), target)
41+
require.Equal(t, uint64(1), epoch)
42+
43+
// A newer epoch still wins.
44+
state.SetDispatcherDrainTarget(node.ID("n2"), 2)
45+
target, epoch = state.DispatcherDrainTarget()
46+
require.Equal(t, node.ID("n2"), target)
47+
require.Equal(t, uint64(2), epoch)
48+
}

0 commit comments

Comments
 (0)