eventBroker: remove two sgate syncpoint#4807
eventBroker: remove two sgate syncpoint#4807asddongmen wants to merge 3 commits intopingcap:masterfrom
Conversation
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
- cap scan upper bound by checkpointTs + 2*syncPointInterval when syncpoint is enabled - suppress syncpoint emission when dispatcher lag exceeds threshold, while still advancing nextSyncPoint - resume syncpoint emission with hysteresis to avoid flapping - apply checkpoint cap to normal scan path, pending-DDL local advance, and table-trigger DDL path - add metrics for syncpoint lag, suppression count, and checkpoint-cap hits - add unit tests for checkpoint cap and suppress/resume behavior Signed-off-by: dongmen <414110582@qq.com>
|
Skipping CI for Draft Pull Request. |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request refactors the syncpoint handling logic by removing the two-stage prepare/commit state machine and introducing lag-based suppression and checkpoint-based scan capping. New configuration options and metrics are added to support these features. Review feedback identifies critical issues including a race condition in syncpoint emission, an incorrect timestamp comparison that delays syncpoints, the loss of event type validation in action matching, and a reversal of the required DDL-to-syncpoint emission order.
|
|
||
| pendingIsSyncPoint := b.blockPendingEvent.GetType() == commonEvent.TypeSyncPointEvent | ||
| return b.blockCommitTs == action.CommitTs && pendingIsSyncPoint == action.IsSyncPoint | ||
| return b.blockCommitTs == action.CommitTs |
There was a problem hiding this comment.
The check for action.IsSyncPoint was removed. If a DDL event and a SyncPoint event share the same CommitTs, the dispatcher might incorrectly match an action intended for one event to the other. This can lead to incorrect processing, such as passing a DDL that should have been written to the downstream.
pendingIsSyncPoint := b.blockPendingEvent.GetType() == commonEvent.TypeSyncPointEvent
return b.blockCommitTs == action.CommitTs && pendingIsSyncPoint == action.IsSyncPoint| for d.enableSyncPoint && ts > d.nextSyncPoint.Load() { | ||
| commitTs := d.nextSyncPoint.Load() | ||
| if !d.changefeedStat.isSyncPointInCommitStage(commitTs) { | ||
| if ts <= commitTs { | ||
| return | ||
| } | ||
| d.changefeedStat.tryEnterSyncPointPrepare(commitTs) | ||
| if !d.changefeedStat.isSyncPointInCommitStage(commitTs) { | ||
| return | ||
| } | ||
| } else if ts < commitTs { | ||
| return | ||
| } | ||
|
|
||
| nextSyncPoint := oracle.GoTimeToTS(oracle.GetTimeFromTS(commitTs).Add(d.syncPointInterval)) | ||
| // Advance nextSyncPoint with CAS so concurrent send paths cannot emit the same | ||
| // syncpoint twice or move nextSyncPoint backward. | ||
| if !d.nextSyncPoint.CompareAndSwap(commitTs, nextSyncPoint) { | ||
| d.nextSyncPoint.Store(oracle.GoTimeToTS(oracle.GetTimeFromTS(commitTs).Add(d.syncPointInterval))) |
There was a problem hiding this comment.
There are two critical issues in this loop logic:
- The condition
ts > d.nextSyncPoint.Load()prevents a syncpoint from being emitted when the current timestamptsis exactly equal to the syncpoint timestamp. This causes syncpoints to be delayed until the next event with a strictly greater timestamp arrives, or missed entirely if the stream stops at the syncpoint timestamp. It should bets >= d.nextSyncPoint.Load(). - Using
d.nextSyncPoint.Storeinstead ofCompareAndSwapintroduces a race condition. SinceemitSyncPointEventIfNeededcan be called concurrently (e.g., from a scan worker and the table trigger ticker), multiple goroutines might advancenextSyncPointsimultaneously, leading to duplicate syncpoint emissions.
| for d.enableSyncPoint && ts > d.nextSyncPoint.Load() { | |
| commitTs := d.nextSyncPoint.Load() | |
| if !d.changefeedStat.isSyncPointInCommitStage(commitTs) { | |
| if ts <= commitTs { | |
| return | |
| } | |
| d.changefeedStat.tryEnterSyncPointPrepare(commitTs) | |
| if !d.changefeedStat.isSyncPointInCommitStage(commitTs) { | |
| return | |
| } | |
| } else if ts < commitTs { | |
| return | |
| } | |
| nextSyncPoint := oracle.GoTimeToTS(oracle.GetTimeFromTS(commitTs).Add(d.syncPointInterval)) | |
| // Advance nextSyncPoint with CAS so concurrent send paths cannot emit the same | |
| // syncpoint twice or move nextSyncPoint backward. | |
| if !d.nextSyncPoint.CompareAndSwap(commitTs, nextSyncPoint) { | |
| d.nextSyncPoint.Store(oracle.GoTimeToTS(oracle.GetTimeFromTS(commitTs).Add(d.syncPointInterval))) | |
| for d.enableSyncPoint { | |
| commitTs := d.nextSyncPoint.Load() | |
| if ts < commitTs { | |
| return | |
| } | |
| nextSyncPoint := oracle.GoTimeToTS(oracle.GetTimeFromTS(commitTs).Add(d.syncPointInterval)) | |
| if !d.nextSyncPoint.CompareAndSwap(commitTs, nextSyncPoint) { | |
| continue | |
| } |
| return | ||
| } | ||
| c.emitSyncPointEventIfNeeded(e.FinishedTs, d, remoteID) | ||
| c.emitSyncPointEventIfNeeded(e.FinishedTs, d, remoteID) |
There was a problem hiding this comment.
The order of DDL and SyncPoint emission has been reversed. Previously, DDL events were sent before SyncPoint events if they shared the same timestamp, which is the expected order for the maintainer and dispatcher. Now, emitSyncPointEventIfNeeded is called before sending the DDL event. If e.FinishedTs matches the next syncpoint, the syncpoint will be emitted first (assuming the loop condition is fixed to >=).
|
/test all |
|
@asddongmen: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
scanEnd = min(scanEnd, checkpointTs + multiplier*syncPointInterval).Default multiplier is
2.emitSyncPointEventIfNeeded:lag(sentResolvedTs, checkpointTs) > 20m,<= 15m(hysteresis),nextSyncPointeven when emission is suppressed.sync-point-checkpoint-cap-multiplier(default2)sync-point-lag-suppress-threshold(default20m)sync-point-lag-resume-threshold(default15m)syncpoint_lag_secondssyncpoint_suppressed_countscan_capped_by_checkpoint_countCheck List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note