Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2686e4a
fix(scheduler): ensure span consistency for operator add, remove, mov…
wlwilliamx Dec 18, 2025
7288257
fix a typo
wlwilliamx Dec 18, 2025
b9637e2
fix a log typo
wlwilliamx Dec 18, 2025
ce070cc
handle err for restoreCurrentWorkingOperators
wlwilliamx Dec 18, 2025
b0929db
modify a comment
wlwilliamx Dec 18, 2025
d8732bc
Merge remote-tracking branch 'upstream/master' into fix/dispatcher-lo…
wlwilliamx Dec 18, 2025
050e13f
resolve conflicts
wlwilliamx Dec 18, 2025
a7be949
fix newRemoveDispatcherOperator node id
wlwilliamx Dec 18, 2025
c9891d8
handle return value for AddOperator and add some logs for it
wlwilliamx Dec 19, 2025
8c4e099
add some logs for dispatcher manager store current working operators
wlwilliamx Dec 19, 2025
75638b2
fix remove operator nil span
wlwilliamx Dec 22, 2025
82637b9
Merge remote-tracking branch 'upstream/master' into fix/dispatcher-lo…
wlwilliamx Dec 22, 2025
26f5a83
remove enabled split
wlwilliamx Dec 22, 2025
d1d8752
fix(*): ensure merge operator consistency when maintainer restart
wlwilliamx Dec 23, 2025
53fb47d
Merge remote-tracking branch 'upstream/master' into fix/merge-operato…
wlwilliamx Dec 23, 2025
b8473b1
fix maintainer uts
wlwilliamx Dec 23, 2025
ba7dd6d
Merge remote-tracking branch 'upstream/master' into fix/merge-operato…
wlwilliamx Mar 10, 2026
8e3b14a
add test case
wlwilliamx Mar 11, 2026
927581d
Merge remote-tracking branch 'upstream/master' into fix/merge-operato…
wlwilliamx Apr 8, 2026
e124c26
maintainer: tolerate bootstrap overlap during merge
wlwilliamx Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type DispatcherManager struct {
// Entries must be deleted on completion (create -> after creation; remove -> on cleanup), otherwise
// future maintainer requests for the same dispatcherID will be ignored.
currentOperatorMap sync.Map // map[common.DispatcherID]SchedulerDispatcherRequest (in dispatcher manager, not heartbeatpb)
// mergeOperatorMap keeps in-flight merge requests so bootstrap can reconstruct merge operators after maintainer failover.
mergeOperatorMap sync.Map // map[mergedDispatcherID.String()]*heartbeatpb.MergeDispatcherRequest
// schemaIDToDispatchers is shared in the DispatcherManager,
// it store all the infos about schemaID->Dispatchers
// Dispatchers may change the schemaID when meets some special events, such as rename ddl
Expand Down Expand Up @@ -191,6 +193,7 @@ func NewDispatcherManager(
ctx: ctx,
dispatcherMap: newDispatcherMap[*dispatcher.EventDispatcher](),
currentOperatorMap: sync.Map{},
mergeOperatorMap: sync.Map{},
changefeedID: changefeedID,
keyspaceID: keyspaceID,
pdClock: pdClock,
Expand Down
105 changes: 105 additions & 0 deletions downstreamadapter/dispatchermanager/dispatcher_manager_merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dispatchermanager

import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
"go.uber.org/zap"
)

// TrackMergeOperator records an in-flight merge request so bootstrap can restore it after maintainer failover.
func (e *DispatcherManager) TrackMergeOperator(req *heartbeatpb.MergeDispatcherRequest) {
if req == nil || req.MergedDispatcherID == nil {
return
}
mergedID := common.NewDispatcherIDFromPB(req.MergedDispatcherID)
e.mergeOperatorMap.Store(mergedID.String(), cloneMergeDispatcherRequest(req))
Comment on lines +24 to +29
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject zero-valued merged dispatcher IDs before touching mergeOperatorMap.

A non-nil protobuf ID can still decode to DispatcherID{}. Right now those requests all collapse to the same key, and the bootstrap side later treats that zero ID as a real merged dispatcher, so one malformed request can poison recovery state.

🔒 Suggested guard
 func (e *DispatcherManager) TrackMergeOperator(req *heartbeatpb.MergeDispatcherRequest) {
 	if req == nil || req.MergedDispatcherID == nil {
 		return
 	}
 	mergedID := common.NewDispatcherIDFromPB(req.MergedDispatcherID)
+	if mergedID.IsZero() {
+		log.Warn("merge operator has invalid merged dispatcher ID",
+			zap.Stringer("changefeedID", e.changefeedID))
+		return
+	}
 	e.mergeOperatorMap.Store(mergedID.String(), cloneMergeDispatcherRequest(req))
 }

Apply the same mergedID.IsZero() guard in MaybeCleanupMergeOperator.

Also applies to: 38-43

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatchermanager/dispatcher_manager_merge.go` around lines
24 - 29, The TrackMergeOperator method currently stores requests whose protobuf
ID decodes to a zero-valued DispatcherID, causing all such requests to collide;
before storing into e.mergeOperatorMap call mergedID.IsZero() and return if true
(mirror the guard used in MaybeCleanupMergeOperator), and do the same check
wherever mergeOperatorMap is written (e.g., in the second occurrence around
MaybeCleanupMergeOperator handling) so that cloneMergeDispatcherRequest(req) and
the mergeOperatorMap.Store only run for non-zero mergedID values.

}

// RemoveMergeOperator drops the persisted merge request once the merged dispatcher has converged.
func (e *DispatcherManager) RemoveMergeOperator(mergedDispatcherID common.DispatcherID) {
e.mergeOperatorMap.Delete(mergedDispatcherID.String())
}

// MaybeCleanupMergeOperator removes a persisted merge request when the merged dispatcher is already complete or gone.
func (e *DispatcherManager) MaybeCleanupMergeOperator(req *heartbeatpb.MergeDispatcherRequest) {
if req == nil || req.MergedDispatcherID == nil {
return
}
mergedID := common.NewDispatcherIDFromPB(req.MergedDispatcherID)
if common.IsRedoMode(req.Mode) {
if dispatcherItem, ok := e.redoDispatcherMap.Get(mergedID); ok {
if dispatcherItem.GetComponentStatus() == heartbeatpb.ComponentState_Working {
e.RemoveMergeOperator(mergedID)
}
return
}
} else {
if dispatcherItem, ok := e.dispatcherMap.Get(mergedID); ok {
if dispatcherItem.GetComponentStatus() == heartbeatpb.ComponentState_Working {
e.RemoveMergeOperator(mergedID)
}
return
}
}
log.Info("cleanup merge operator because merged dispatcher not found",
zap.Stringer("changefeedID", e.changefeedID),
zap.String("dispatcherID", mergedID.String()),
zap.Int64("mode", req.Mode))
e.RemoveMergeOperator(mergedID)
}

// GetMergeOperators returns cloned in-flight merge requests for maintainer bootstrap.
func (e *DispatcherManager) GetMergeOperators() []*heartbeatpb.MergeDispatcherRequest {
operators := make([]*heartbeatpb.MergeDispatcherRequest, 0)
e.mergeOperatorMap.Range(func(_, value any) bool {
req, ok := value.(*heartbeatpb.MergeDispatcherRequest)
if !ok || req == nil {
return true
}
operators = append(operators, cloneMergeDispatcherRequest(req))
return true
})
return operators
}

func cloneMergeDispatcherRequest(req *heartbeatpb.MergeDispatcherRequest) *heartbeatpb.MergeDispatcherRequest {
if req == nil {
return nil
}
clone := &heartbeatpb.MergeDispatcherRequest{
Mode: req.Mode,
}
if req.ChangefeedID != nil {
id := *req.ChangefeedID
clone.ChangefeedID = &id
}
if req.MergedDispatcherID != nil {
mergedID := *req.MergedDispatcherID
clone.MergedDispatcherID = &mergedID
}
if len(req.DispatcherIDs) > 0 {
clone.DispatcherIDs = make([]*heartbeatpb.DispatcherID, 0, len(req.DispatcherIDs))
for _, dispatcherID := range req.DispatcherIDs {
if dispatcherID == nil {
continue
}
id := *dispatcherID
clone.DispatcherIDs = append(clone.DispatcherIDs, &id)
}
}
return clone
}
7 changes: 7 additions & 0 deletions downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type HeartBeatCollector struct {
mergeDispatcherRequestDynamicStream dynstream.DynamicStream[int, common.GID, MergeDispatcherRequest, *DispatcherManager, *MergeDispatcherRequestHandler]
mc messaging.MessageCenter

dispatcherManagers sync.Map // map[common.GID]*DispatcherManager

wg sync.WaitGroup
cancel context.CancelFunc
isClosed atomic.Bool
Expand Down Expand Up @@ -128,6 +130,7 @@ func (c *HeartBeatCollector) RegisterDispatcherManager(m *DispatcherManager) err
if err != nil {
return errors.Trace(err)
}
c.dispatcherManagers.Store(m.changefeedID.Id, m)
return nil
}

Expand Down Expand Up @@ -170,6 +173,7 @@ func (c *HeartBeatCollector) RemoveDispatcherManager(id common.ChangeFeedID) err
if err != nil {
return errors.Trace(err)
}
c.dispatcherManagers.Delete(id.Id)
return nil
}

Expand Down Expand Up @@ -287,6 +291,9 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ
NewRedoMetaMessage(redoMessage))
case messaging.TypeMergeDispatcherRequest:
mergeDispatcherRequest := msg.Message[0].(*heartbeatpb.MergeDispatcherRequest)
if manager, ok := c.dispatcherManagers.Load(common.NewChangefeedGIDFromPB(mergeDispatcherRequest.ChangefeedID)); ok {
manager.(*DispatcherManager).TrackMergeOperator(mergeDispatcherRequest)
}
c.mergeDispatcherRequestDynamicStream.Push(
common.NewChangefeedGIDFromPB(mergeDispatcherRequest.ChangefeedID),
NewMergeDispatcherRequest(mergeDispatcherRequest))
Expand Down
10 changes: 9 additions & 1 deletion downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,11 +785,19 @@ func (h *MergeDispatcherRequestHandler) Handle(dispatcherManager *DispatcherMana
}

mergeDispatcherRequest := reqs[0]
dispatcherManager.TrackMergeOperator(mergeDispatcherRequest.MergeDispatcherRequest)
dispatcherIDs := make([]common.DispatcherID, 0, len(mergeDispatcherRequest.DispatcherIDs))
for _, id := range mergeDispatcherRequest.DispatcherIDs {
dispatcherIDs = append(dispatcherIDs, common.NewDispatcherIDFromPB(id))
}
dispatcherManager.MergeDispatcher(dispatcherIDs, common.NewDispatcherIDFromPB(mergeDispatcherRequest.MergedDispatcherID), mergeDispatcherRequest.Mode)
task := dispatcherManager.MergeDispatcher(
dispatcherIDs,
common.NewDispatcherIDFromPB(mergeDispatcherRequest.MergedDispatcherID),
mergeDispatcherRequest.Mode,
)
if task == nil {
dispatcherManager.MaybeCleanupMergeOperator(mergeDispatcherRequest.MergeDispatcherRequest)
}
return false
}

Expand Down
2 changes: 2 additions & 0 deletions downstreamadapter/dispatchermanager/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func abortMerge[T dispatcher.Dispatcher](t *MergeCheckTask, dispatcherMap *Dispa
}

removeDispatcher(t.manager, t.mergedDispatcher.GetId(), dispatcherMap, sinkType)
t.manager.RemoveMergeOperator(t.mergedDispatcher.GetId())
}

func doMerge[T dispatcher.Dispatcher](t *MergeCheckTask, dispatcherMap *DispatcherMap[T]) {
Expand Down Expand Up @@ -330,6 +331,7 @@ func doMerge[T dispatcher.Dispatcher](t *MergeCheckTask, dispatcherMap *Dispatch
}
dispatcher.Remove()
}
t.manager.RemoveMergeOperator(t.mergedDispatcher.GetId())
}

type mergedStartTsCandidate struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,4 +526,6 @@ func retrieveOperatorsForBootstrapResponse(
})
return true
})

response.MergeOperators = append(response.MergeOperators, manager.GetMergeOperators()...)
}
Loading
Loading