Skip to content

Commit 19400c6

Browse files
authored
maintainer,tests: clamp dispatcher StartTs to committed checkpoint (pingcap#4548)
close pingcap#3846
1 parent fc9050e commit 19400c6

File tree

7 files changed

+231
-12
lines changed

7 files changed

+231
-12
lines changed

maintainer/maintainer.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus {
375375
status := &heartbeatpb.MaintainerStatus{
376376
ChangefeedID: m.changefeedID.ToPB(),
377377
State: heartbeatpb.ComponentState(m.scheduleState.Load()),
378-
CheckpointTs: m.getWatermark().CheckpointTs,
378+
CheckpointTs: m.controller.spanController.GetMaintainerCommittedCheckpointTs(),
379379
Err: runningErrors,
380380
BootstrapDone: m.initialized.Load(),
381381
LastSyncedTs: m.getWatermark().LastSyncedTs,
@@ -657,6 +657,11 @@ func (m *Maintainer) calCheckpointTs(ctx context.Context) {
657657
// CRITICAL SECTION: Calculate checkpointTs with proper ordering to prevent race condition
658658
newWatermark, canUpdate := m.calculateNewCheckpointTs()
659659
if canUpdate {
660+
m.controller.spanController.AdvanceMaintainerCommittedCheckpointTs(newWatermark.CheckpointTs)
661+
if m.enableRedo && m.controller.redoSpanController != nil {
662+
// Redo dispatchers must not start below the changefeed committed checkpoint.
663+
m.controller.redoSpanController.AdvanceMaintainerCommittedCheckpointTs(newWatermark.CheckpointTs)
664+
}
660665
m.setWatermark(*newWatermark)
661666
m.updateMetrics()
662667
}

maintainer/maintainer_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121

2222
"github.com/pingcap/log"
2323
"github.com/pingcap/ticdc/heartbeatpb"
24+
"github.com/pingcap/ticdc/maintainer/replica"
25+
"github.com/pingcap/ticdc/maintainer/span"
2426
"github.com/pingcap/ticdc/maintainer/testutil"
2527
"github.com/pingcap/ticdc/pkg/common"
2628
appcontext "github.com/pingcap/ticdc/pkg/common/context"
@@ -33,6 +35,7 @@ import (
3335
"github.com/pingcap/ticdc/server/watcher"
3436
"github.com/pingcap/ticdc/utils/threadpool"
3537
"github.com/stretchr/testify/require"
38+
"go.uber.org/atomic"
3639
"go.uber.org/zap"
3740
)
3841

@@ -357,3 +360,38 @@ func TestMaintainerSchedule(t *testing.T) {
357360
cancel()
358361
wg.Wait()
359362
}
363+
364+
func TestMaintainer_GetMaintainerStatusUsesCommittedCheckpoint(t *testing.T) {
365+
testutil.SetUpTestServices()
366+
367+
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
368+
tableTriggerEventDispatcherID := common.NewDispatcherID()
369+
ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID,
370+
common.DDLSpanSchemaID,
371+
common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{
372+
ID: tableTriggerEventDispatcherID.ToPB(),
373+
ComponentStatus: heartbeatpb.ComponentState_Working,
374+
CheckpointTs: 10,
375+
Mode: common.DefaultMode,
376+
}, "node1", false)
377+
spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode)
378+
spanController.AdvanceMaintainerCommittedCheckpointTs(20)
379+
380+
m := &Maintainer{
381+
changefeedID: cfID,
382+
controller: &Controller{
383+
spanController: spanController,
384+
},
385+
statusChanged: atomic.NewBool(false),
386+
}
387+
m.watermark.Watermark = &heartbeatpb.Watermark{
388+
CheckpointTs: 30,
389+
ResolvedTs: 40,
390+
LastSyncedTs: 50,
391+
}
392+
m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError)
393+
394+
status := m.GetMaintainerStatus()
395+
require.Equal(t, uint64(20), status.CheckpointTs)
396+
require.Equal(t, uint64(50), status.LastSyncedTs)
397+
}

maintainer/replica/replication_span.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ type SpanReplication struct {
4747
groupID replica.GroupID
4848
status *atomic.Pointer[heartbeatpb.TableSpanStatus]
4949
blockState *atomic.Pointer[heartbeatpb.State]
50+
// committedCheckpointTs points to the controller owned committed checkpoint for this mode.
51+
// It provides a monotonic lower bound for any recreated dispatcher StartTs.
52+
committedCheckpointTs *atomic.Uint64
5053
}
5154

5255
func NewSpanReplication(cfID common.ChangeFeedID,
@@ -234,6 +237,18 @@ func (r *SpanReplication) GetGroupID() replica.GroupID {
234237
return r.groupID
235238
}
236239

240+
// BindCommittedCheckpointTs attaches the controller owned committed checkpoint to this span.
241+
func (r *SpanReplication) BindCommittedCheckpointTs(committedCheckpointTs *atomic.Uint64) {
242+
r.committedCheckpointTs = committedCheckpointTs
243+
}
244+
245+
func (r *SpanReplication) getCommittedCheckpointTs() uint64 {
246+
if r.committedCheckpointTs == nil {
247+
return 0
248+
}
249+
return r.committedCheckpointTs.Load()
250+
}
251+
237252
// NewAddDispatcherMessage creates a ScheduleDispatcherRequest(Create) for this span.
238253
//
239254
// The StartTs in the request is usually the span checkpoint. However, when a dispatcher is being
@@ -243,6 +258,7 @@ func (r *SpanReplication) GetGroupID() replica.GroupID {
243258
func (r *SpanReplication) NewAddDispatcherMessage(server node.ID, operatorType heartbeatpb.OperatorType) *messaging.TargetMessage {
244259
startTs := r.status.Load().CheckpointTs
245260
skipDMLAsStartTs := false
261+
ddlBarrierBlockTs := uint64(0)
246262
if state := r.blockState.Load(); state != nil && state.IsBlocked &&
247263
(state.Stage == heartbeatpb.BlockStage_WAITING || state.Stage == heartbeatpb.BlockStage_WRITING) && state.BlockTs > 0 {
248264
if state.IsSyncPoint {
@@ -256,6 +272,7 @@ func (r *SpanReplication) NewAddDispatcherMessage(server node.ID, operatorType h
256272
startTs = state.BlockTs
257273
}
258274
} else {
275+
ddlBarrierBlockTs = state.BlockTs
259276
// For an in-flight DDL barrier, a recreated dispatcher must start from (blockTs-1) so that it can
260277
// replay the DDL at blockTs. At the same time, it should skip DML events at blockTs to avoid potential
261278
// duplicate DML writes when the dispatcher is moved/recreated during the barrier.
@@ -268,6 +285,34 @@ func (r *SpanReplication) NewAddDispatcherMessage(server node.ID, operatorType h
268285
}
269286
}
270287
}
288+
committedCheckpointTs := r.getCommittedCheckpointTs()
289+
if committedCheckpointTs > startTs {
290+
originalStartTs := startTs
291+
if ddlBarrierBlockTs > 0 {
292+
// A stale DDL barrier state may survive maintainer failover or dispatcher recreation.
293+
// Once the controller committed checkpoint has advanced beyond the replay point, replaying the
294+
// DDL is no longer correct. Recreate the dispatcher from the committed checkpoint instead.
295+
log.Debug("use committed checkpoint for stale ddl barrier",
296+
zap.Stringer("changefeedID", r.ChangefeedID),
297+
zap.String("dispatcherID", r.ID.String()),
298+
zap.Int64("tableID", r.Span.TableID),
299+
zap.String("operatorType", operatorType.String()),
300+
zap.Uint64("ddlBarrierBlockTs", ddlBarrierBlockTs),
301+
zap.Uint64("originalStartTs", originalStartTs),
302+
zap.Uint64("committedCheckpointTs", committedCheckpointTs))
303+
skipDMLAsStartTs = false
304+
} else {
305+
log.Debug("clamp dispatcher start ts to committed checkpoint",
306+
zap.Stringer("changefeedID", r.ChangefeedID),
307+
zap.String("dispatcherID", r.ID.String()),
308+
zap.Int64("tableID", r.Span.TableID),
309+
zap.String("operatorType", operatorType.String()),
310+
zap.Uint64("originalStartTs", originalStartTs),
311+
zap.Uint64("committedCheckpointTs", committedCheckpointTs),
312+
zap.Uint64("finalStartTs", committedCheckpointTs))
313+
}
314+
startTs = committedCheckpointTs
315+
}
271316
return messaging.NewSingleTargetMessage(server,
272317
messaging.HeartbeatCollectorTopic,
273318
&heartbeatpb.ScheduleDispatcherRequest{

maintainer/replica/replication_span_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/pingcap/ticdc/heartbeatpb"
2020
"github.com/pingcap/ticdc/pkg/common"
2121
"github.com/stretchr/testify/require"
22+
"go.uber.org/atomic"
2223
)
2324

2425
func TestUpdateStatus(t *testing.T) {
@@ -58,6 +59,18 @@ func TestSpanReplication_NewAddDispatcherMessage(t *testing.T) {
5859
require.False(t, req.Config.SkipDMLAsStartTs)
5960
}
6061

62+
func TestSpanReplication_NewAddDispatcherMessage_ClampToCommittedCheckpoint(t *testing.T) {
63+
t.Parallel()
64+
65+
replicaSet := NewSpanReplication(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), common.NewDispatcherID(), 1, getTableSpanByID(4), 10, common.DefaultMode, false)
66+
replicaSet.BindCommittedCheckpointTs(atomic.NewUint64(20))
67+
68+
msg := replicaSet.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add)
69+
req := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest)
70+
require.Equal(t, uint64(20), req.Config.StartTs)
71+
require.False(t, req.Config.SkipDMLAsStartTs)
72+
}
73+
6174
func TestSpanReplication_NewAddDispatcherMessage_UseBlockTsForInFlightSyncPoint(t *testing.T) {
6275
t.Parallel()
6376

@@ -75,6 +88,24 @@ func TestSpanReplication_NewAddDispatcherMessage_UseBlockTsForInFlightSyncPoint(
7588
require.False(t, req.Config.SkipDMLAsStartTs)
7689
}
7790

91+
func TestSpanReplication_NewAddDispatcherMessage_UseSyncPointBlockTsWhenCommittedIsLower(t *testing.T) {
92+
t.Parallel()
93+
94+
replicaSet := NewSpanReplication(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), common.NewDispatcherID(), 1, getTableSpanByID(4), 10, common.DefaultMode, false)
95+
replicaSet.BindCommittedCheckpointTs(atomic.NewUint64(20))
96+
replicaSet.UpdateBlockState(heartbeatpb.State{
97+
IsBlocked: true,
98+
BlockTs: 30,
99+
IsSyncPoint: true,
100+
Stage: heartbeatpb.BlockStage_WAITING,
101+
})
102+
103+
msg := replicaSet.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add)
104+
req := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest)
105+
require.Equal(t, uint64(30), req.Config.StartTs)
106+
require.False(t, req.Config.SkipDMLAsStartTs)
107+
}
108+
78109
func TestSpanReplication_NewAddDispatcherMessage_DontUseBlockTsAfterSyncPointDone(t *testing.T) {
79110
t.Parallel()
80111

@@ -109,6 +140,42 @@ func TestSpanReplication_NewAddDispatcherMessage_UseBlockTsMinusOneForDDLInFligh
109140
require.True(t, req.Config.SkipDMLAsStartTs)
110141
}
111142

143+
func TestSpanReplication_NewAddDispatcherMessage_UseCommittedCheckpointForStaleDDLBarrier(t *testing.T) {
144+
t.Parallel()
145+
146+
replicaSet := NewSpanReplication(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), common.NewDispatcherID(), 1, getTableSpanByID(4), 9, common.DefaultMode, false)
147+
replicaSet.BindCommittedCheckpointTs(atomic.NewUint64(10))
148+
replicaSet.UpdateBlockState(heartbeatpb.State{
149+
IsBlocked: true,
150+
BlockTs: 10,
151+
IsSyncPoint: false,
152+
Stage: heartbeatpb.BlockStage_WAITING,
153+
})
154+
155+
msg := replicaSet.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add)
156+
req := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest)
157+
require.Equal(t, uint64(10), req.Config.StartTs)
158+
require.False(t, req.Config.SkipDMLAsStartTs)
159+
}
160+
161+
func TestSpanReplication_NewAddDispatcherMessage_UseCommittedCheckpointForStaleDDLBarrierWritingStage(t *testing.T) {
162+
t.Parallel()
163+
164+
replicaSet := NewSpanReplication(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), common.NewDispatcherID(), 1, getTableSpanByID(4), 9, common.DefaultMode, false)
165+
replicaSet.BindCommittedCheckpointTs(atomic.NewUint64(11))
166+
replicaSet.UpdateBlockState(heartbeatpb.State{
167+
IsBlocked: true,
168+
BlockTs: 10,
169+
IsSyncPoint: false,
170+
Stage: heartbeatpb.BlockStage_WRITING,
171+
})
172+
173+
msg := replicaSet.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add)
174+
req := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest)
175+
require.Equal(t, uint64(11), req.Config.StartTs)
176+
require.False(t, req.Config.SkipDMLAsStartTs)
177+
}
178+
112179
// getTableSpanByID returns a mock TableSpan for testing
113180
func getTableSpanByID(id common.TableID) *heartbeatpb.TableSpan {
114181
totalSpan := common.TableIDToComparableSpan(0, id)

maintainer/span/span_controller.go

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/pingcap/ticdc/pkg/util"
3131
"github.com/pingcap/ticdc/server/watcher"
3232
"github.com/pingcap/ticdc/utils"
33+
"go.uber.org/atomic"
3334
"go.uber.org/zap"
3435
)
3536

@@ -78,6 +79,10 @@ type Controller struct {
7879
enableSplittableCheck bool
7980

8081
keyspaceID uint32
82+
83+
// maintainerCommittedCheckpointTs is the controller owned monotonic scheduling baseline.
84+
// Any created dispatcher in this mode must start from at least this checkpoint.
85+
maintainerCommittedCheckpointTs *atomic.Uint64
8186
}
8287

8388
// NewController creates a new span controller
@@ -91,16 +96,17 @@ func NewController(
9196
mode int64,
9297
) *Controller {
9398
c := &Controller{
94-
changefeedID: changefeedID,
95-
ddlSpan: ddlSpan,
96-
newGroupChecker: replica.GetNewGroupChecker(changefeedID, schedulerCfg, refresher),
97-
nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName),
98-
splitter: splitter,
99-
ddlDispatcherID: ddlSpan.ID,
100-
mode: mode,
101-
enableTableAcrossNodes: schedulerCfg != nil && util.GetOrZero(schedulerCfg.EnableTableAcrossNodes),
102-
enableSplittableCheck: schedulerCfg != nil && util.GetOrZero(schedulerCfg.EnableSplittableCheck),
103-
keyspaceID: keyspaceID,
99+
changefeedID: changefeedID,
100+
ddlSpan: ddlSpan,
101+
newGroupChecker: replica.GetNewGroupChecker(changefeedID, schedulerCfg, refresher),
102+
nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName),
103+
splitter: splitter,
104+
ddlDispatcherID: ddlSpan.ID,
105+
mode: mode,
106+
enableTableAcrossNodes: schedulerCfg != nil && util.GetOrZero(schedulerCfg.EnableTableAcrossNodes),
107+
enableSplittableCheck: schedulerCfg != nil && util.GetOrZero(schedulerCfg.EnableSplittableCheck),
108+
keyspaceID: keyspaceID,
109+
maintainerCommittedCheckpointTs: atomic.NewUint64(ddlSpan.GetStatus().CheckpointTs),
104110

105111
schemaTasks: make(map[int64]map[common.DispatcherID]*replica.SpanReplication),
106112
tableTasks: make(map[int64]map[common.DispatcherID]*replica.SpanReplication),
@@ -127,6 +133,7 @@ func (c *Controller) ShouldEnableSplit(splitable bool) bool {
127133
}
128134

129135
func (c *Controller) initializeDDLSpan(ddlSpan *replica.SpanReplication) {
136+
c.bindCommittedCheckpointTs(ddlSpan)
130137
// we don't need to schedule the ddl span, but added it to the allTasks map, so we can access it by id
131138
c.allTasks[ddlSpan.ID] = ddlSpan
132139
// dispatcher will report a block event with table ID 0,
@@ -140,6 +147,33 @@ func (c *Controller) initializeDDLSpan(ddlSpan *replica.SpanReplication) {
140147
}
141148
}
142149

150+
func (c *Controller) bindCommittedCheckpointTs(span *replica.SpanReplication) {
151+
if span == nil {
152+
return
153+
}
154+
span.BindCommittedCheckpointTs(c.maintainerCommittedCheckpointTs)
155+
}
156+
157+
// GetMaintainerCommittedCheckpointTs returns the controller level monotonic committed checkpoint.
158+
func (c *Controller) GetMaintainerCommittedCheckpointTs() uint64 {
159+
return c.maintainerCommittedCheckpointTs.Load()
160+
}
161+
162+
// AdvanceMaintainerCommittedCheckpointTs advances the controller level committed checkpoint monotonically.
163+
func (c *Controller) AdvanceMaintainerCommittedCheckpointTs(ts uint64) {
164+
if ts == 0 {
165+
return
166+
}
167+
// The committed checkpoint for one controller is only advanced by the dedicated
168+
// calCheckpointTs goroutine of its owning maintainer. Readers may observe it
169+
// concurrently, but there is no competing writer for the same controller state.
170+
oldTs := c.maintainerCommittedCheckpointTs.Load()
171+
if oldTs >= ts {
172+
return
173+
}
174+
c.maintainerCommittedCheckpointTs.Store(ts)
175+
}
176+
143177
// AddNewTable adds a new table to the span controller
144178
// This is a complex business logic method that handles table splitting and span creation
145179
func (c *Controller) AddNewTable(table commonEvent.Table, startTs uint64) {
@@ -350,6 +384,7 @@ func (c *Controller) AddSchedulingReplicaSet(span *replica.SpanReplication, targ
350384
func (c *Controller) AddReplicatingSpan(span *replica.SpanReplication) {
351385
c.mu.Lock()
352386
defer c.mu.Unlock()
387+
c.bindCommittedCheckpointTs(span)
353388
c.allTasks[span.ID] = span
354389
c.addToSchemaAndTableMap(span)
355390
c.AddReplicatingWithoutLock(span)
@@ -393,6 +428,7 @@ func (c *Controller) RemoveReplicatingSpan(span *replica.SpanReplication) {
393428
// addAbsentReplicaSetWithoutLock adds spans to absent map
394429
func (c *Controller) addAbsentReplicaSetWithoutLock(spans ...*replica.SpanReplication) {
395430
for _, span := range spans {
431+
c.bindCommittedCheckpointTs(span)
396432
c.allTasks[span.ID] = span
397433
c.AddAbsentWithoutLock(span)
398434
c.addToSchemaAndTableMap(span)
@@ -401,6 +437,7 @@ func (c *Controller) addAbsentReplicaSetWithoutLock(spans ...*replica.SpanReplic
401437

402438
// addSchedulingReplicaSetWithoutLock adds scheduling replica set without lock
403439
func (c *Controller) addSchedulingReplicaSetWithoutLock(span *replica.SpanReplication, targetNodeID node.ID) {
440+
c.bindCommittedCheckpointTs(span)
404441
c.allTasks[span.ID] = span
405442
c.AddSchedulingReplicaWithoutLock(span, targetNodeID)
406443
c.addToSchemaAndTableMap(span)

maintainer/span/span_controller_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,33 @@ func TestController_Statistics(t *testing.T) {
299299
require.Equal(t, 0, controller.GetTaskSizeBySchemaID(3))
300300
}
301301

302+
func TestController_MaintainerCommittedCheckpointMonotonic(t *testing.T) {
303+
t.Parallel()
304+
305+
controller := newControllerWithCheckerForTest(t)
306+
require.Equal(t, uint64(1), controller.GetMaintainerCommittedCheckpointTs())
307+
308+
controller.AdvanceMaintainerCommittedCheckpointTs(10)
309+
require.Equal(t, uint64(10), controller.GetMaintainerCommittedCheckpointTs())
310+
311+
controller.AdvanceMaintainerCommittedCheckpointTs(5)
312+
require.Equal(t, uint64(10), controller.GetMaintainerCommittedCheckpointTs())
313+
}
314+
315+
func TestController_BindCommittedCheckpointToManagedSpan(t *testing.T) {
316+
t.Parallel()
317+
318+
controller := newControllerWithCheckerForTest(t)
319+
controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 100}, 5)
320+
321+
task := controller.GetTasksByTableID(100)[0]
322+
controller.AdvanceMaintainerCommittedCheckpointTs(20)
323+
324+
msg := task.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add)
325+
req := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest)
326+
require.Equal(t, uint64(20), req.Config.StartTs)
327+
}
328+
302329
// TestBasicFunction tests the basic functionality of the controller
303330
func TestBasicFunction(t *testing.T) {
304331
t.Parallel()

0 commit comments

Comments
 (0)