maintainer: guard invalid global checkpoint#4709
maintainer: guard invalid global checkpoint#4709hongyunyan wants to merge 11 commits intopingcap:masterfrom
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughRefactors maintainer checkpoint/redo advancement: introduces Changes
Sequence Diagram(s)sequenceDiagram
participant Maintainer
participant Scheduler
participant Barrier
participant RedoController as RedoSpanController
Maintainer->>Scheduler: query redo scheduler minima
Maintainer->>Barrier: query redo barrier minima
Maintainer->>RedoController: read per-capture redo heartbeats
Maintainer->>Maintainer: calculateNewRedoCheckpointTs()
alt canUpdate == true
Maintainer->>RedoController: AdvanceMaintainerCommittedCheckpointTs(newRedoCheckpoint)
Maintainer->>Maintainer: update redoMetaTs.ResolvedTs if higher
else canUpdate == false
Maintainer->>Maintainer: skip redo advancement and log (redoCheckpointTs = 0)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a check in the calculateNewCheckpointTs function to prevent advancing the checkpoint when the global checkpoint is invalid (represented by math.MaxUint64). This change ensures that the committed checkpoint is not corrupted by uninitialized values. Corresponding unit tests were added to verify this behavior. Feedback suggests enhancing the warning log by including the current committed checkpoint timestamp to improve debuggability.
maintainer/maintainer.go
Outdated
| log.Warn("checkpointTs can not be advanced, since global checkpoint is invalid", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.Uint64("resolvedTs", newWatermark.ResolvedTs), | ||
| zap.Uint64("minCheckpointTsForScheduler", minCheckpointTsForScheduler), | ||
| zap.Uint64("minCheckpointTsForBarrier", minCheckpointTsForBarrier)) |
There was a problem hiding this comment.
The warning log message would be more helpful for debugging if it included the current committed checkpoint timestamp. This allows operators to see the state of the changefeed when the calculated global checkpoint is invalid (e.g., during initialization or when all nodes report uninitialized watermarks), consistent with the logging pattern used earlier in this function.
| log.Warn("checkpointTs can not be advanced, since global checkpoint is invalid", | |
| zap.Stringer("changefeedID", m.changefeedID), | |
| zap.Uint64("resolvedTs", newWatermark.ResolvedTs), | |
| zap.Uint64("minCheckpointTsForScheduler", minCheckpointTsForScheduler), | |
| zap.Uint64("minCheckpointTsForBarrier", minCheckpointTsForBarrier)) | |
| log.Warn("checkpointTs can not be advanced, since global checkpoint is invalid", | |
| zap.Stringer("changefeedID", m.changefeedID), | |
| zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), | |
| zap.Uint64("resolvedTs", newWatermark.ResolvedTs), | |
| zap.Uint64("minCheckpointTsForScheduler", minCheckpointTsForScheduler), | |
| zap.Uint64("minCheckpointTsForBarrier", minCheckpointTsForBarrier)) |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
maintainer/maintainer_test.go (1)
402-430: Add a mixed sentinel regression case.The PR rationale depends on rejecting only the aggregated
math.MaxUint64result. These subtests cover the finite path and the all-max path, but not the mixed case where one reported watermark ismath.MaxUint64and another constraint still makes the global minimum finite. Locking that in here would catch a future regression that filters the sentinel too early.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@maintainer/maintainer_test.go` around lines 402 - 430, Add a new subtest "mixed sentinel" inside TestMaintainerCalculateNewCheckpointTs that inserts two reported watermarks into m.checkpointTsByCapture: one with CheckpointTs/ResolvedTs = math.MaxUint64 and another with a finite value (e.g., 200), then call m.calculateNewCheckpointTs() and assert that canUpdate is true and the returned watermark equals the finite minimum (not the sentinel). This exercise targets calculateNewCheckpointTs and checkpointTsByCapture to ensure the MaxUint64 sentinel is ignored unless all reports are MaxUint64.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@maintainer/maintainer_test.go`:
- Around line 402-430: Add a new subtest "mixed sentinel" inside
TestMaintainerCalculateNewCheckpointTs that inserts two reported watermarks into
m.checkpointTsByCapture: one with CheckpointTs/ResolvedTs = math.MaxUint64 and
another with a finite value (e.g., 200), then call m.calculateNewCheckpointTs()
and assert that canUpdate is true and the returned watermark equals the finite
minimum (not the sentinel). This exercise targets calculateNewCheckpointTs and
checkpointTsByCapture to ensure the MaxUint64 sentinel is ignored unless all
reports are MaxUint64.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8561c846-e370-4378-a8b2-be057bf65922
📒 Files selected for processing (2)
maintainer/maintainer.gomaintainer/maintainer_test.go
|
/test all |
|
/test all |
|
/retest |
|
/test all |
|
/test pull-cdc-mysql-integration-heavy |
|
/test all |
|
/retest |
1 similar comment
|
/retest |
| // zap.Stringer("dispatcher", d.id), | ||
| // zap.Any("action", dispatcherStatus.GetAction()), | ||
| // zap.Any("ack", dispatcherStatus.GetAck())) | ||
| log.Debug("dispatcher handle dispatcher status", |
There was a problem hiding this comment.
can we remove these log, looks useless.
There was a problem hiding this comment.
It's a debug level log, it will not print in prod scenario, and it's may very important when there is some ddl issues in test. So I think we should keep this log
maintainer/maintainer.go
Outdated
| } | ||
| newWatermark.UpdateMin(watermark) | ||
| redoWatermark, canUpdate := m.calculateNewRedoCheckpointTs() | ||
| if canUpdate && m.controller.redoSpanController != nil { |
There was a problem hiding this comment.
Is redoSpanController nil in this case? If it can be nil, looks the calculateNewRedoCheckpointTs should not be called.
| zap.Any("checkpointTs", m.getWatermark().CheckpointTs), | ||
| zap.Any("resolvedTs", newWatermark.CheckpointTs), | ||
| zap.Bool("canUpdateRedoCheckpointTs", canUpdate), | ||
| zap.Uint64("redoCheckpointTs", func() uint64 { |
There was a problem hiding this comment.
This looks weird, a closure in the log field.
| // MaxUint64 means this round still has no effective global checkpoint. | ||
| // Skipping the update keeps the committed checkpoint from being poisoned. | ||
| if newWatermark.CheckpointTs == math.MaxUint64 { | ||
| log.Debug("checkpointTs can not be advanced, since global checkpoint is invalid", |
There was a problem hiding this comment.
may be need the definition of the invalid. or remove this log.
There was a problem hiding this comment.
Add the invalid reason here
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, lidezhu The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
|
@hongyunyan: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
Background
The maintainer can aggregate a global checkpoint of
math.MaxUint64when every available watermark in a round is still effectively uninitialized. Once that value is promoted into the committed checkpoint, later redo dispatchers may be clamped to an invalid start ts and remain stuck during initialization.Issue Number: close #4703
Motivation
A capture-local
MaxUint64watermark is not always invalid by itself, so filtering it at heartbeat ingestion would change the meaning of node-level reports. The safer minimal fix is to reject only the final aggregated result when the computed global checkpoint is stillmath.MaxUint64.Summary of changes
calculateNewCheckpointTs()so a global checkpoint ofmath.MaxUint64is skipped instead of being committedMaxUint64caseTesting
make fmtgo test ./maintainerSummary by CodeRabbit
Bug Fixes
Refactor
Logging
Chores
Tests
Release note