Conversation
Signed-off-by: dongmen <414110582@qq.com>
|
Skipping CI for Draft Pull Request. |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces comprehensive logging across the syncpoint lifecycle, including dispatcher resets, preparation, and commit phases, to improve system observability. The review feedback highlights opportunities to optimize these changes by reducing log redundancy and preventing unnecessary memory allocations in performance-critical paths. Specifically, the reviewer suggests wrapping debug-level logs in conditional checks to avoid slice allocations when logging is disabled and consolidating overlapping info-level messages.
| if d.target.EnableSyncPoint() { | ||
| req := resetRequest.DispatcherRequest | ||
| log.Info("send reset dispatcher syncpoint request", | ||
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | ||
| zap.Stringer("dispatcher", d.getDispatcherID()), | ||
| zap.Stringer("eventServiceID", serverID), | ||
| zap.Uint64("epoch", epoch), | ||
| zap.Uint64("resetTs", resetTs), | ||
| zap.Uint64("checkpointTs", d.target.GetCheckpointTs()), | ||
| zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()), | ||
| zap.Uint64("syncPointTs", req.SyncPointTs), | ||
| zap.Uint64("syncPointIntervalSeconds", req.SyncPointInterval), | ||
| zap.Bool("skipSyncpointAtStartTs", d.target.GetSkipSyncpointAtStartTs()), | ||
| ) | ||
| } |
There was a problem hiding this comment.
The new Info log added here is redundant with the existing Info log on line 244. Both logs are triggered during a dispatcher reset when syncpoint is enabled, and they share several fields (changefeedID, dispatcher, eventServiceID, epoch, resetTs). It is recommended to merge these logs into a single Info log with optional fields or change this new log to Debug level to reduce log noise.
| if d.target.EnableSyncPoint() { | |
| req := resetRequest.DispatcherRequest | |
| log.Info("send reset dispatcher syncpoint request", | |
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | |
| zap.Stringer("dispatcher", d.getDispatcherID()), | |
| zap.Stringer("eventServiceID", serverID), | |
| zap.Uint64("epoch", epoch), | |
| zap.Uint64("resetTs", resetTs), | |
| zap.Uint64("checkpointTs", d.target.GetCheckpointTs()), | |
| zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()), | |
| zap.Uint64("syncPointTs", req.SyncPointTs), | |
| zap.Uint64("syncPointIntervalSeconds", req.SyncPointInterval), | |
| zap.Bool("skipSyncpointAtStartTs", d.target.GetSkipSyncpointAtStartTs()), | |
| ) | |
| } | |
| if d.target.EnableSyncPoint() { | |
| req := resetRequest.DispatcherRequest | |
| log.Debug("send reset dispatcher syncpoint request", | |
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | |
| zap.Stringer("dispatcher", d.getDispatcherID()), | |
| zap.Stringer("eventServiceID", serverID), | |
| zap.Uint64("epoch", epoch), | |
| zap.Uint64("resetTs", resetTs), | |
| zap.Uint64("checkpointTs", d.target.GetCheckpointTs()), | |
| zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()), | |
| zap.Uint64("syncPointTs", req.SyncPointTs), | |
| zap.Uint64("syncPointIntervalSeconds", req.SyncPointInterval), | |
| zap.Bool("skipSyncpointAtStartTs", d.target.GetSkipSyncpointAtStartTs()), | |
| ) | |
| } |
| if !ready { | ||
| fields := []zap.Field{ | ||
| zap.Stringer("changefeedID", c.changefeedID), | ||
| zap.Uint64("preparingTs", preparingTs), | ||
| } | ||
| if blockingFound { | ||
| fields = append(fields, | ||
| zap.Stringer("blockingDispatcherID", blockingDispatcherID), | ||
| zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs), | ||
| zap.Uint64("blockingCheckpointTs", blockingCheckpointTs), | ||
| zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPoint), | ||
| zap.Uint64("blockingSeq", blockingSeq), | ||
| zap.Uint64("blockingEpoch", blockingEpoch), | ||
| ) | ||
| } | ||
| log.Debug("syncpoint prepare stage blocked by dispatcher", fields...) | ||
| return | ||
| } |
There was a problem hiding this comment.
The construction of the fields slice and the subsequent append calls happen on every call to tryPromoteSyncPointToCommitIfReady when the syncpoint is blocked, even if the log level is higher than Debug. Since this function is called periodically (every 1 second) for every changefeed, this can lead to unnecessary allocations and GC pressure. Consider wrapping the logging logic in a check for the Debug level.
| if !ready { | |
| fields := []zap.Field{ | |
| zap.Stringer("changefeedID", c.changefeedID), | |
| zap.Uint64("preparingTs", preparingTs), | |
| } | |
| if blockingFound { | |
| fields = append(fields, | |
| zap.Stringer("blockingDispatcherID", blockingDispatcherID), | |
| zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs), | |
| zap.Uint64("blockingCheckpointTs", blockingCheckpointTs), | |
| zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPoint), | |
| zap.Uint64("blockingSeq", blockingSeq), | |
| zap.Uint64("blockingEpoch", blockingEpoch), | |
| ) | |
| } | |
| log.Debug("syncpoint prepare stage blocked by dispatcher", fields...) | |
| return | |
| } | |
| if !ready { | |
| if log.GetLevel() <= zap.DebugLevel { | |
| fields := []zap.Field{ | |
| zap.Stringer("changefeedID", c.changefeedID), | |
| zap.Uint64("preparingTs", preparingTs), | |
| } | |
| if blockingFound { | |
| fields = append(fields, | |
| zap.Stringer("blockingDispatcherID", blockingDispatcherID), | |
| zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs), | |
| zap.Uint64("blockingCheckpointTs", blockingCheckpointTs), | |
| zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPoint), | |
| zap.Uint64("blockingSeq", blockingSeq), | |
| zap.Uint64("blockingEpoch", blockingEpoch), | |
| ) | |
| } | |
| log.Debug("syncpoint prepare stage blocked by dispatcher", fields...) | |
| } | |
| return | |
| } |
| if !canAdvance { | ||
| fields := []zap.Field{ | ||
| zap.Stringer("changefeedID", c.changefeedID), | ||
| zap.Uint64("inFlightTs", inFlightTs), | ||
| zap.Uint64("preparingTs", c.syncPointPreparingTs.Load()), | ||
| } | ||
| if blockingFound { | ||
| fields = append(fields, | ||
| zap.String("blockingReason", blockingReason), | ||
| zap.Stringer("blockingDispatcherID", blockingDispatcherID), | ||
| zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPointTs), | ||
| zap.Uint64("blockingCheckpointTs", blockingCheckpointTs), | ||
| zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs), | ||
| zap.Uint64("blockingSeq", blockingDispatcherSeq), | ||
| zap.Uint64("blockingEpoch", blockingDispatcherEpoch), | ||
| ) | ||
| } | ||
| log.Debug("syncpoint commit stage blocked by dispatcher", fields...) | ||
| return | ||
| } |
There was a problem hiding this comment.
Similar to the previous comment, the fields slice is allocated and populated on every call when the commit stage is blocked, regardless of the active log level. This should be optimized to avoid unnecessary allocations in this periodic task.
if !canAdvance {
if log.GetLevel() <= zap.DebugLevel {
fields := []zap.Field{
zap.Stringer("changefeedID", c.changefeedID),
zap.Uint64("inFlightTs", inFlightTs),
zap.Uint64("preparingTs", c.syncPointPreparingTs.Load()),
}
if blockingFound {
fields = append(fields,
zap.String("blockingReason", blockingReason),
zap.Stringer("blockingDispatcherID", blockingDispatcherID),
zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPointTs),
zap.Uint64("blockingCheckpointTs", blockingCheckpointTs),
zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs),
zap.Uint64("blockingSeq", blockingDispatcherSeq),
zap.Uint64("blockingEpoch", blockingDispatcherEpoch),
)
}
log.Debug("syncpoint commit stage blocked by dispatcher", fields...)
}
return
}| nudged := c.nudgeSyncPointCommitIfNeeded(task) | ||
| log.Debug("scan range empty after capping", | ||
| zap.Stringer("changefeedID", task.changefeedStat.changefeedID), | ||
| zap.Stringer("dispatcherID", task.id), | ||
| zap.Uint64("startTs", dataRange.CommitTsStart), | ||
| zap.Uint64("endTs", dataRange.CommitTsEnd), | ||
| zap.Uint64("scanMaxTs", scanMaxTs), | ||
| zap.Uint64("ddlResolvedTs", ddlState.ResolvedTs), | ||
| zap.Uint64("ddlCommitTs", ddlState.MaxEventCommitTs), | ||
| zap.Bool("hasPendingDDLEventInCurrentRange", hasPendingDDLEventInCurrentRange), | ||
| zap.Uint64("nextSyncPointTs", nextSyncPointTs), | ||
| zap.Uint64("preparingTs", preparingTs), | ||
| zap.Uint64("inFlightTs", inFlightTs), | ||
| zap.Bool("nudgedSyncPointCommit", nudged), | ||
| ) |
There was a problem hiding this comment.
This log.Debug call is located in the scan path and includes many fields. While zap fields are relatively cheap to construct, the variadic arguments still cause a slice allocation on every call, even when Debug logging is disabled. Given this can be triggered frequently for caught-up or capped dispatchers, it should be wrapped in a level check to improve efficiency.
nudged := c.nudgeSyncPointCommitIfNeeded(task)
if log.GetLevel() <= zap.DebugLevel {
log.Debug("scan range empty after capping",
zap.Stringer("changefeedID", task.changefeedStat.changefeedID),
zap.Stringer("dispatcherID", task.id),
zap.Uint64("startTs", dataRange.CommitTsStart),
zap.Uint64("endTs", dataRange.CommitTsEnd),
zap.Uint64("scanMaxTs", scanMaxTs),
zap.Uint64("ddlResolvedTs", ddlState.ResolvedTs),
zap.Uint64("ddlCommitTs", ddlState.MaxEventCommitTs),
zap.Bool("hasPendingDDLEventInCurrentRange", hasPendingDDLEventInCurrentRange),
zap.Uint64("nextSyncPointTs", nextSyncPointTs),
zap.Uint64("preparingTs", preparingTs),
zap.Uint64("inFlightTs", inFlightTs),
zap.Bool("nudgedSyncPointCommit", nudged),
)
}
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note