Skip to content

maintainer,heartbeatpb: add drain target plumbing#4759

Open
hongyunyan wants to merge 3 commits intomasterfrom
split/pr-4190-2a-drain-protocol
Open

maintainer,heartbeatpb: add drain target plumbing#4759
hongyunyan wants to merge 3 commits intomasterfrom
split/pr-4190-2a-drain-protocol

Conversation

@hongyunyan
Copy link
Copy Markdown
Collaborator

@hongyunyan hongyunyan commented Apr 7, 2026

What problem does this PR solve?

The dispatcher-drain work split out of #4190 still mixes the drain target protocol, maintainer runtime behavior, coordinator scheduling, and public API orchestration in one review path. This PR extracts the drain target protocol and maintainer-manager plumbing into a dedicated review unit so reviewers can focus on how the target is propagated and acknowledged before looking at scheduler behavior.

Issue Number: ref #4190

What is changed and how it works?

Background:

  • coordinator,maintainer: add node liveness foundation #4522 already merged the node liveness foundation into master.
  • The remaining dispatcher-drain work from the old #4523 still mixed protocol, runtime scheduling, and API orchestration.
  • This PR starts a new stacked split from master and keeps only the drain target plumbing.

Motivation:

  • Separate protocol and manager-level state propagation from drain scheduling logic.
  • Let reviewers verify target monotonicity and acknowledgment rules before reading scheduler algorithms.
  • Keep protobuf and messaging changes in the same review unit as the maintainer-manager handlers that consume them.

Summary:

  • add SetDispatcherDrainTargetRequest
  • extend node heartbeat with dispatcher-drain target acknowledgment fields
  • add manager-level drain target state with monotonic update rules and clear-once behavior
  • fan out the latest drain target from maintainer manager to newly created and already running maintainers
  • add focused tests for target apply, stale-update rejection, and heartbeat acknowledgment

How it works:

  • coordinator can send a dispatcher-drain target to the maintainer manager
  • the maintainer manager applies it at node scope and reports the applied target back in node heartbeat
  • the latest target is also seeded into each local maintainer so later runtime layers can consume a consistent snapshot

Check List

Tests

  • Unit test
    • go test ./maintainer -run 'TestSetDispatcherDrainTarget|TestSetNodeLiveness'
    • go test ./pkg/messaging ./heartbeatpb

Questions

Will it cause performance regression or break compatibility?

This PR does not add a user-facing API by itself. It only adds the internal protocol and manager-level drain target propagation needed by the later runtime and API layers.

Do you need to update user documentation, design documentation or monitoring documentation?

No additional user-facing documentation is needed for this split. It is an internal decomposition of the drain-capture implementation.

Release note

None

Summary by CodeRabbit

  • New Features

    • Nodes can acknowledge coordinator-set dispatcher drain targets (node+epoch); coordinators can set or clear targets.
    • Heartbeats now include dispatcher drain target and per-target drain progress for visibility.
  • Improvements

    • Scheduling avoids evacuated targets and pauses rebalancing during active drain windows to reduce disruption.
  • Tests

    • Added tests for drain-target apply/clear/ordering and heartbeat acknowledgement behavior.

@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot bot commented Apr 7, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign wk989898 for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 7, 2026

📝 Walkthrough

Walkthrough

Adds dispatcher drain-target protocol, manager-side coordination, maintainer and scheduler drain-state APIs, messaging support, and tests to propagate and acknowledge dispatcher drain targets and enforce monotonic epoch semantics.

Changes

Cohort / File(s) Summary
Protobuf messages
heartbeatpb/heartbeat.proto
Add DrainProgress message, extend MaintainerStatus and NodeHeartbeat with drain fields, and add SetDispatcherDrainTargetRequest.
Manager: node & message handling
maintainer/maintainer_manager_node.go, maintainer/maintainer_manager.go
Track coordinator-issued dispatcher drain target+epoch; add onSetDispatcherDrainTargetRequest and monotonic update rules; include drain target in node heartbeat; refactor manager to use managerMaintainerSet and adjust heartbeat/cleanup flow.
Maintainer set & lifecycle
maintainer/maintainer_manager_maintainers.go
New managerMaintainerSet owning maintainer registry, lifecycle, message routing, bootstrap/status aggregation, drain-target fanout, and cleanup logic.
Maintainer controller & API
maintainer/maintainer_controller.go, maintainer/maintainer.go
Introduce drainState *mscheduler.DrainState on Controller; add SetDispatcherDrainTarget on Controller and exported Maintainer.SetDispatcherDrainTarget to forward target and mark status changed.
Scheduler drain state & helpers
maintainer/scheduler/drain_common.go, maintainer/scheduler/drain_common_test.go
Add exported DrainState with monotonic epoch rules, snapshotting, pause/cooldown and filtering helpers; include unit test for same-epoch reactivation rule.
Messaging support
pkg/messaging/message.go
Add TypeSetDispatcherDrainTargetRequest IOType = 45; update IOType string, decoder, and NewSingleTargetMessage mapping.
Tests: node liveness & maintainer validation
maintainer/node_liveness_test.go
Add tests for apply/clear/reject semantics of drain targets, heartbeat ack behavior, and maintainer add validation cases.
Other small changes
maintainer/maintainer_manager.go (refactor), maintainer/maintainer.go
Refactor manager to delegate maintainer registry; add Maintainer.SetDispatcherDrainTarget method forwarding to controller.

Sequence Diagram(s)

sequenceDiagram
    rect rgba(200,200,255,0.5)
    participant Coordinator
    end
    rect rgba(200,255,200,0.5)
    participant Manager
    participant MaintainerSet
    participant Maintainer
    participant Scheduler
    end

    Coordinator->>Manager: TypeSetDispatcherDrainTargetRequest(target_node_id, target_epoch)
    Manager->>Manager: tryUpdateDispatcherDrainTarget(target, epoch)
    Manager->>MaintainerSet: applyDispatcherDrainTarget(target, epoch)
    MaintainerSet->>Maintainer: SetDispatcherDrainTarget(target, epoch)
    Maintainer->>Scheduler: controller.SetDispatcherDrainTarget(target, epoch)
    Scheduler->>Scheduler: Update DrainState snapshot & scheduling rules
    Manager->>Coordinator: TypeNodeHeartbeatRequest(dispatcher_drain_target_node_id, dispatcher_drain_target_epoch)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

lgtm, approved

Suggested reviewers

  • wk989898
  • asddongmen

Poem

🐰 I nibble a target, epoch stamped in a hop,
I tell the maintainer, then dance—don’t stop.
Heartbeats return with the drain in sight,
Schedulers step aside while nodes take flight.
Hooray — the cluster hums all through the night!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 58.82% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'maintainer,heartbeatpb: add drain target plumbing' accurately and concisely describes the main change, referring to the addition of drain target propagation infrastructure across the maintainer and heartbeat protocol layers.
Description check ✅ Passed The PR description comprehensively covers the problem statement, detailed changes, motivation, test coverage, and impact assessment according to the template.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch split/pr-4190-2a-drain-protocol

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Apr 7, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a dispatcher drain mechanism, allowing the coordinator to instruct maintainer managers to drain specific nodes. It adds new protobuf definitions for drain progress and requests, updates the maintainer manager to handle these requests, and implements a node-scoped drain state. I have provided feedback regarding the use of log.Panic for input validation, which could lead to process crashes, and suggested ensuring that node heartbeats are sent to acknowledge drain target updates even when the update is a duplicate or stale.

Comment on lines +168 to +171
err := json.Unmarshal(req.Config, info)
if err != nil {
log.Panic("decode changefeed fail", zap.Error(err))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using log.Panic when unmarshaling the changefeed configuration from a network message is risky. If the coordinator (or a malicious actor) sends a malformed configuration, it will crash the entire maintainer manager process. It is better to log an error and return a failure status or ignore the request.

Comment on lines +172 to +177
if req.CheckpointTs == 0 {
log.Panic("add maintainer with invalid checkpointTs",
zap.Stringer("changefeedID", changefeedID),
zap.Uint64("checkpointTs", req.CheckpointTs),
zap.Any("info", info))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the JSON unmarshaling issue, using log.Panic for an invalid CheckpointTs can lead to process crashes on invalid input. Consider logging the error and rejecting the request instead of panicking.

Comment on lines +166 to +168
if !m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) {
return
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If tryUpdateDispatcherDrainTarget returns false (e.g., because the update is a duplicate or stale), the manager currently returns immediately without sending a heartbeat. For better responsiveness and to ensure the coordinator receives an acknowledgement even for retries or stale messages, it is recommended to always send a node heartbeat if the request is from the authorized coordinator.

Suggested change
if !m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) {
return
}
if m.node.tryUpdateDispatcherDrainTarget(target, req.TargetEpoch) {
log.Info("dispatcher drain target updated",
zap.Stringer("targetNodeID", target),
zap.Uint64("targetEpoch", req.TargetEpoch))
m.maintainers.applyDispatcherDrainTarget(target, req.TargetEpoch)
}
// A manager-level heartbeat is the authoritative acknowledgement that this
// node has applied the latest drain target, even when no maintainers exist.
m.sendNodeHeartbeat(true)

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@maintainer/maintainer_manager_maintainers.go`:
- Around line 88-92: The log currently emits the whole rejected control message
(zap.Any("request", msg)) which can leak serialized changefeed config and
secrets; change the warning in the coordinator ID check (m.coordinatorID !=
msg.From) to avoid logging msg itself and instead log only safe metadata fields
from the AddMaintainerRequest such as msg.Type, msg.From (use zap.Stringer if
appropriate) and msg.ChangefeedID so the message body/config is never written to
logs.
- Around line 74-75: Register the maintainer in the registry atomically before
seeding it from a snapshot: instead of calling getDispatcherDrainTarget() and
then m.maintainers.handleAddMaintainer(...) which seeds the maintainer before it
is visible, first perform an atomic register (e.g., use a per-changefeed lock or
m.maintainers.LoadOrStore equivalent) to ensure only one add wins and the
maintainer becomes visible, then apply the snapshot/drain-target returned by
getDispatcherDrainTarget() to that already-registered maintainer; update the
same pattern at the other occurrences referenced (the blocks around lines
156-185 and 258-262) to eliminate the separate Load/Store window and prevent
duplicate-add and stale-target races.
- Around line 168-177: The code currently panics on json.Unmarshal(req.Config,
info) errors and when req.CheckpointTs == 0 (using log.Panic), which can crash
the node; instead, detect these two conditions (the json.Unmarshal failure for
req.Config into info, and req.CheckpointTs == 0 for the incoming
req/changefeedID), log a structured error (use log.Error or log.Warn with
zap.Error(err), zap.Stringer("changefeedID", changefeedID),
zap.Uint64("checkpointTs", req.CheckpointTs), zap.Any("info", info)) and reject
the add-maintainer request by returning an error (or otherwise signaling failure
to the caller) rather than calling log.Panic so the process does not exit and
the bad request is dropped/reported.
- Around line 277-279: Replace the bare return of ctx.Err() in the select block
with a wrapped error using errors.Trace to preserve stack information; i.e.,
when handling <-ctx.Done() capture the error (e := ctx.Err() or err :=
ctx.Err()) and return errors.Trace(err) instead of returning ctx.Err() directly
(update the select case in maintainer_manager_maintainers.go where the code
currently returns ctx.Err()).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 470fe784-acff-4162-88e8-31176a2ebaca

📥 Commits

Reviewing files that changed from the base of the PR and between 567506c and 2b2b7ec.

⛔ Files ignored due to path filters (1)
  • heartbeatpb/heartbeat.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (9)
  • heartbeatpb/heartbeat.proto
  • maintainer/maintainer.go
  • maintainer/maintainer_controller.go
  • maintainer/maintainer_manager.go
  • maintainer/maintainer_manager_maintainers.go
  • maintainer/maintainer_manager_node.go
  • maintainer/node_liveness_test.go
  • maintainer/scheduler/drain_common.go
  • pkg/messaging/message.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
maintainer/maintainer_manager_maintainers.go (1)

176-180: Wrap the decode error before logging it.

This reject path logs the raw json.Unmarshal error, so it loses the stack context the repo expects on library failures. Using errors.Trace(err) here keeps the current behavior but preserves the stack trace in the log.

🩹 Small fix
 		log.Error("ignore add maintainer request with invalid config",
 			zap.Stringer("changefeedID", changefeedID),
 			zap.Int("configBytes", len(req.Config)),
-			zap.Error(err))
+			zap.Error(errors.Trace(err)))

Based on learnings: Applies to **/*.go : When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace; upstream callers should propagate wrapped errors without wrapping again.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer_manager_maintainers.go` around lines 176 - 180, The
json.Unmarshal error is logged raw; wrap it with errors.Trace before logging to
preserve stack context — change the error argument in the reject path around the
json.Unmarshal in the block that currently calls log.Error("ignore add
maintainer request with invalid config", zap.Stringer("changefeedID",
changefeedID), zap.Int("configBytes", len(req.Config)), zap.Error(err)) to use a
traced error (e.g. traced := errors.Trace(err) and pass zap.Error(traced) or
inline zap.Error(errors.Trace(err))) so the log records the wrapped error with
stack info while keeping the existing log fields (changefeedID, configBytes).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@maintainer/maintainer_manager_maintainers.go`:
- Around line 88-100: The code is indexing msg.Message[0] and doing unchecked
type assertions in the coordinator-mismatch block (around the m.coordinatorID !=
msg.From handling and the cases for messaging.TypeAddMaintainerRequest and
messaging.TypeRemoveMaintainerRequest); guard against empty or unexpected
payloads by first checking len(msg.Message) > 0 and then performing type
assertions using the "ok" form (e.g., v, ok :=
msg.Message[0].(*heartbeatpb.AddMaintainerRequest)) and, on failure, append a
descriptive field (or log) indicating a missing/invalid payload and skip adding
the changefeedID instead of panicking; apply the same checks for the other cases
referenced (including the similar logic at lines ~105-109).

---

Nitpick comments:
In `@maintainer/maintainer_manager_maintainers.go`:
- Around line 176-180: The json.Unmarshal error is logged raw; wrap it with
errors.Trace before logging to preserve stack context — change the error
argument in the reject path around the json.Unmarshal in the block that
currently calls log.Error("ignore add maintainer request with invalid config",
zap.Stringer("changefeedID", changefeedID), zap.Int("configBytes",
len(req.Config)), zap.Error(err)) to use a traced error (e.g. traced :=
errors.Trace(err) and pass zap.Error(traced) or inline
zap.Error(errors.Trace(err))) so the log records the wrapped error with stack
info while keeping the existing log fields (changefeedID, configBytes).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 17428b83-c9d7-48e5-81d1-9797660fed85

📥 Commits

Reviewing files that changed from the base of the PR and between 2b2b7ec and 5cb3d57.

📒 Files selected for processing (5)
  • maintainer/maintainer_manager_maintainers.go
  • maintainer/maintainer_manager_node.go
  • maintainer/node_liveness_test.go
  • maintainer/scheduler/drain_common.go
  • maintainer/scheduler/drain_common_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • maintainer/maintainer_manager_node.go
  • maintainer/scheduler/drain_common.go
  • maintainer/node_liveness_test.go

Comment on lines +88 to +100
if m.coordinatorID != msg.From {
fields := []zap.Field{
zap.String("type", msg.Type.String()),
zap.Stringer("coordinatorID", m.coordinatorID),
zap.Stringer("from", msg.From),
}
switch msg.Type {
case messaging.TypeAddMaintainerRequest:
changefeedID := common.NewChangefeedIDFromPB(msg.Message[0].(*heartbeatpb.AddMaintainerRequest).Id)
fields = append(fields, zap.Stringer("changefeedID", changefeedID))
case messaging.TypeRemoveMaintainerRequest:
changefeedID := common.NewChangefeedIDFromPB(msg.Message[0].(*heartbeatpb.RemoveMaintainerRequest).Id)
fields = append(fields, zap.Stringer("changefeedID", changefeedID))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate the control-message payload before indexing msg.Message[0].

Line 96, Line 99, and Line 107 all assume the first payload entry exists and has the expected protobuf type. An empty or mismatched internal message will still panic this manager goroutine before it can be ignored, including the coordinator-mismatch path.

🔧 Minimal hardening sketch
 	switch msg.Type {
 	case messaging.TypeAddMaintainerRequest:
-		changefeedID := common.NewChangefeedIDFromPB(msg.Message[0].(*heartbeatpb.AddMaintainerRequest).Id)
+		if len(msg.Message) != 1 {
+			log.Warn("ignore malformed add maintainer request",
+				zap.Int("messageCount", len(msg.Message)))
+			return nil
+		}
+		req, ok := msg.Message[0].(*heartbeatpb.AddMaintainerRequest)
+		if !ok || req == nil {
+			log.Warn("ignore malformed add maintainer request")
+			return nil
+		}
+		changefeedID := common.NewChangefeedIDFromPB(req.Id)
 		fields = append(fields, zap.Stringer("changefeedID", changefeedID))
 	case messaging.TypeRemoveMaintainerRequest:
-		changefeedID := common.NewChangefeedIDFromPB(msg.Message[0].(*heartbeatpb.RemoveMaintainerRequest).Id)
+		if len(msg.Message) != 1 {
+			log.Warn("ignore malformed remove maintainer request",
+				zap.Int("messageCount", len(msg.Message)))
+			return nil
+		}
+		req, ok := msg.Message[0].(*heartbeatpb.RemoveMaintainerRequest)
+		if !ok || req == nil {
+			log.Warn("ignore malformed remove maintainer request")
+			return nil
+		}
+		changefeedID := common.NewChangefeedIDFromPB(req.Id)
 		fields = append(fields, zap.Stringer("changefeedID", changefeedID))
 	}

Also applies to: 105-109

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer_manager_maintainers.go` around lines 88 - 100, The code
is indexing msg.Message[0] and doing unchecked type assertions in the
coordinator-mismatch block (around the m.coordinatorID != msg.From handling and
the cases for messaging.TypeAddMaintainerRequest and
messaging.TypeRemoveMaintainerRequest); guard against empty or unexpected
payloads by first checking len(msg.Message) > 0 and then performing type
assertions using the "ok" form (e.g., v, ok :=
msg.Message[0].(*heartbeatpb.AddMaintainerRequest)) and, on failure, append a
descriptive field (or log) indicating a missing/invalid payload and skip adding
the changefeedID instead of panicking; apply the same checks for the other cases
referenced (including the similar logic at lines ~105-109).

@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot bot commented Apr 9, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
maintainer/maintainer_manager_maintainers.go (1)

99-115: ⚠️ Potential issue | 🟡 Minor

Validate control-message payload before indexing msg.Message[0].

Lines 107 and 110 perform unchecked type assertions that will panic if the message is malformed. This was flagged in a past review but the validation is still missing in the coordinator-mismatch logging path.

🛡️ Suggested hardening
 	if m.coordinatorID != msg.From {
 		fields := []zap.Field{
 			zap.String("type", msg.Type.String()),
 			zap.Stringer("coordinatorID", m.coordinatorID),
 			zap.Stringer("from", msg.From),
 		}
+		if len(msg.Message) == 0 {
+			log.Warn("ignore invalid coordinator id with empty payload", fields...)
+			return nil
+		}
 		switch msg.Type {
 		case messaging.TypeAddMaintainerRequest:
-			changefeedID := common.NewChangefeedIDFromPB(msg.Message[0].(*heartbeatpb.AddMaintainerRequest).Id)
+			req, ok := msg.Message[0].(*heartbeatpb.AddMaintainerRequest)
+			if ok && req != nil {
+				changefeedID := common.NewChangefeedIDFromPB(req.Id)
+				fields = append(fields, zap.Stringer("changefeedID", changefeedID))
+			}
-			fields = append(fields, zap.Stringer("changefeedID", changefeedID))
 		case messaging.TypeRemoveMaintainerRequest:
-			changefeedID := common.NewChangefeedIDFromPB(msg.Message[0].(*heartbeatpb.RemoveMaintainerRequest).Id)
+			req, ok := msg.Message[0].(*heartbeatpb.RemoveMaintainerRequest)
+			if ok && req != nil {
+				changefeedID := common.NewChangefeedIDFromPB(req.Id)
+				fields = append(fields, zap.Stringer("changefeedID", changefeedID))
+			}
-			fields = append(fields, zap.Stringer("changefeedID", changefeedID))
 		}
 		log.Warn("ignore invalid coordinator id", fields...)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer_manager_maintainers.go` around lines 99 - 115, The
coordinator-mismatch logging path does unchecked indexing and type assertions on
msg.Message which can panic; before accessing msg.Message[0] in the block that
handles messaging.TypeAddMaintainerRequest and
messaging.TypeRemoveMaintainerRequest, validate that msg.Message is non-nil and
has at least one element and perform safe type assertions (e.g. using a type
switch or the "v, ok := msg.Message[0].(*heartbeatpb.AddMaintainerRequest)"
pattern) and only append changefeedID when the cast succeeds; if validation
fails, log a safe fallback (e.g. include message length or raw type) instead of
panicking — apply these checks around the branches that reference
AddMaintainerRequest and RemoveMaintainerRequest and use m.coordinatorID,
msg.Type, and msg.From in the unchanged part of the log.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@maintainer/maintainer_manager_maintainers.go`:
- Around line 234-238: Race can occur when calling
p.registry.Store(changefeedID, maintainer) causing an existing maintainer to be
overwritten and leaked; replace this with LoadOrStore: create the new maintainer
via NewMaintainerForRemove(changefeedID, p.conf, p.nodeInfo, p.taskScheduler,
req.KeyspaceId), call actual, loaded := p.registry.LoadOrStore(changefeedID,
newMaintainer), and if loaded is true then close/stop the newly created
maintainer to avoid leaking goroutines and use actual.(MaintainerType) as the
maintainer; if loaded is false use the new maintainer as stored.

In `@maintainer/maintainer_manager_node.go`:
- Around line 156-180: The handler onSetDispatcherDrainTargetRequest currently
does an unchecked type assertion into req :=
msg.Message[0].(*heartbeatpb.SetDispatcherDrainTargetRequest) which can panic if
msg.Message is empty or contains the wrong type; add guard logic to validate
msg.Message has at least one element and perform the type assertion with the
comma-ok idiom (req, ok :=
msg.Message[0].(*heartbeatpb.SetDispatcherDrainTargetRequest)), log a
warning/error if validation fails (including msg.From and coordinatorID context)
and return early instead of panicking; keep the existing flow (target,
tryUpdateDispatcherDrainTarget, applyDispatcherDrainTarget, sendNodeHeartbeat)
unchanged when the assertion succeeds.

---

Duplicate comments:
In `@maintainer/maintainer_manager_maintainers.go`:
- Around line 99-115: The coordinator-mismatch logging path does unchecked
indexing and type assertions on msg.Message which can panic; before accessing
msg.Message[0] in the block that handles messaging.TypeAddMaintainerRequest and
messaging.TypeRemoveMaintainerRequest, validate that msg.Message is non-nil and
has at least one element and perform safe type assertions (e.g. using a type
switch or the "v, ok := msg.Message[0].(*heartbeatpb.AddMaintainerRequest)"
pattern) and only append changefeedID when the cast succeeds; if validation
fails, log a safe fallback (e.g. include message length or raw type) instead of
panicking — apply these checks around the branches that reference
AddMaintainerRequest and RemoveMaintainerRequest and use m.coordinatorID,
msg.Type, and msg.From in the unchanged part of the log.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 26fd4fd4-f6af-4420-88e9-1e25be118028

📥 Commits

Reviewing files that changed from the base of the PR and between 5cb3d57 and 32e2273.

⛔ Files ignored due to path filters (1)
  • heartbeatpb/heartbeat.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (4)
  • heartbeatpb/heartbeat.proto
  • maintainer/maintainer_manager_maintainers.go
  • maintainer/maintainer_manager_node.go
  • maintainer/scheduler/drain_common.go

Comment on lines +234 to +238

// It's cascade remove, we should remove the dispatcher from all node.
// Here we create a maintainer to run the remove dispatcher logic.
maintainer = NewMaintainerForRemove(changefeedID, p.conf, p.nodeInfo, p.taskScheduler, req.KeyspaceId)
p.registry.Store(changefeedID, maintainer)
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Cascade remove uses Store which can leak a maintainer on race.

If two cascade-remove requests race for the same changefeed, Store at line 238 can overwrite a previously stored maintainer without closing it, leaking its goroutines. Consider using LoadOrStore and handling the loaded case.

🛡️ Suggested fix
 		// It's cascade remove, we should remove the dispatcher from all node.
 		// Here we create a maintainer to run the remove dispatcher logic.
-		maintainer = NewMaintainerForRemove(changefeedID, p.conf, p.nodeInfo, p.taskScheduler, req.KeyspaceId)
-		p.registry.Store(changefeedID, maintainer)
+		newMaintainer := NewMaintainerForRemove(changefeedID, p.conf, p.nodeInfo, p.taskScheduler, req.KeyspaceId)
+		actual, loaded := p.registry.LoadOrStore(changefeedID, newMaintainer)
+		if loaded {
+			// Another request raced us; close our redundant instance and use the existing one.
+			newMaintainer.Close()
+		}
+		maintainer = actual
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer_manager_maintainers.go` around lines 234 - 238, Race
can occur when calling p.registry.Store(changefeedID, maintainer) causing an
existing maintainer to be overwritten and leaked; replace this with LoadOrStore:
create the new maintainer via NewMaintainerForRemove(changefeedID, p.conf,
p.nodeInfo, p.taskScheduler, req.KeyspaceId), call actual, loaded :=
p.registry.LoadOrStore(changefeedID, newMaintainer), and if loaded is true then
close/stop the newly created maintainer to avoid leaking goroutines and use
actual.(MaintainerType) as the maintainer; if loaded is false use the new
maintainer as stored.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This write path is serialized today, so I do not think the Store here can race with another cascade-remove Store in the current design.

All add/remove control messages first go through recvMessages() into m.msgCh, and Manager.Run() processes handleMessage() from that channel on a single goroutine. That means handleRemoveMaintainer() is not executed concurrently for two control messages of the same changefeed. The other concurrent paths touching registry (dispatchMaintainerMessage, heartbeat building, cleanup iteration) only Load/Range; they do not create another remove maintainer for the same key.

So while LoadOrStore would be a reasonable defensive hardening if we ever make the write path concurrent, I do not see a real goroutine leak/overwrite race here with the current manager event-loop model.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check Manager.Run() and recvMessages() to verify serial processing
rg -n "func.*Manager.*Run\|func.*recvMessages\|m\.msgCh\|msgCh" maintainer/maintainer_manager.go -A5 | head -100

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Broader search for the message channel and run loop
rg -n "msgCh\|recvMessages\|handleMessage" maintainer/ -A3 | head -150

echo "---"
# Look at Manager.Run implementation
ast-grep --pattern 'func (m *Manager) Run($$$) {
  $$$
}'

Length of output: 3


🏁 Script executed:

#!/bin/bash
# List all files in maintainer directory
fd . maintainer/ --type f

echo "---"
# Try grep on all go files
grep -rn "msgCh\|recvMessages\|handleMessage\|func.*Run" maintainer/ 2>/dev/null | head -80

Length of output: 4799


@hongyunyan, you're right — the code confirms your explanation. Manager.Run() processes m.msgCh in a single select loop (lines 134–135 of maintainer_manager.go), and recvMessages only pushes incoming messages onto that channel. Since handleMessage (and therefore handleRemoveMaintainer) is always called from that one goroutine, two cascade-remove requests for the same changefeed cannot execute concurrently, and the Store at line 238 cannot race with another Store for the same key.

My original concern was valid only if the write path were concurrent. Since it's fully serialized in the current design, the LoadOrStore hardening is unnecessary today. Thanks for the clear explanation!


✏️ Learnings added
Learnt from: hongyunyan
Repo: pingcap/ticdc PR: 4759
File: maintainer/maintainer_manager_maintainers.go:234-238
Timestamp: 2026-04-09T01:20:43.816Z
Learning: In the TiCDC maintainer package (`maintainer/maintainer_manager.go`), the Manager's write path for add/remove maintainer requests is fully serialized: `recvMessages` pushes all coordinator control messages onto `m.msgCh` (buffered channel), and `Manager.Run()` processes them one-at-a-time via a single `select` loop calling `handleMessage`. Therefore, `handleRemoveMaintainer` (and `handleAddMaintainer`) are never executed concurrently, and `sync.Map.Store` in `handleRemoveMaintainer` cannot race with another concurrent `Store` for the same changefeed key in the current design. Race-safety concerns about `LoadOrStore` vs `Store` for the cascade-remove path are theoretical/future-proofing only.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/needs-triage-completed release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant