Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR implements comprehensive rate-limiting for repeated warning and error logs across 14 files using atomic timestamps and compare-and-swap operations. The changes suppress duplicate log emissions within configured intervals to prevent disk space exhaustion and system stalls. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request implements log rate-limiting across several components (including the dispatcher, maintainer, and messaging modules) to reduce log noise on hot paths, typically using a 10-second interval. The review feedback highlights that the rate-limiting logic is duplicated and inconsistent across the codebase, and suggests refactoring it into a common utility. Additionally, there are concerns regarding the robustness of the duration checks against system clock adjustments and suggestions to optimize slice allocations in the maintainer's heartbeat monitoring loops.
| func shouldLogDispatcherWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { | ||
| now := time.Now().UnixNano() | ||
| for { | ||
| last := lastLogTime.Load() | ||
| if last != 0 && now-last < interval.Nanoseconds() { | ||
| return false | ||
| } | ||
| if lastLogTime.CAS(last, now) { | ||
| return true | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic for rate-limiting log messages is duplicated across multiple files in this PR (e.g., basic_dispatcher.go, dispatcher_manager.go, dispatcher_stat.go, subscription_client.go, maintainer.go, remote_target.go, and module_node_manager.go).
Furthermore, the implementation is inconsistent: some files use go.uber.org/atomic (calling .CAS()), while others use sync/atomic (calling .CompareAndSwap()).
Please refactor this into a single utility function in a common package (e.g., pkg/util) to improve maintainability and ensure consistency across the codebase.
| if last != 0 && now-last < interval.Nanoseconds() { | ||
| return false | ||
| } |
There was a problem hiding this comment.
The duration check now - last < interval.Nanoseconds() using Unix nanoseconds is not robust against system clock adjustments. If the system clock is set backwards, the result of the subtraction can be negative, which will be less than the interval, causing logging to be suppressed until the wall clock catches up to the previously stored timestamp.
Consider using time.Since() or explicitly handling negative deltas to avoid silencing logs for extended periods during clock jumps.
| } | ||
| needUpdate := false | ||
| updateCheckpointTs := true | ||
| missingNodes := make([]node.ID, 0) |
There was a problem hiding this comment.
|
|
||
| // Step 2: Apply heartbeat constraints from all nodes | ||
| updateCheckpointTs := true | ||
| missingNodes := make([]node.ID, 0) |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
pkg/messaging/remote_target.go (1)
519-536: Consider extracting the stream-ready wait loop.The wait-for-stream-ready logic (polling with rate-limited logging) is duplicated between
runSendMessagesandrunReceiveMessages. Consider extracting to a helper method likewaitForStreamReady(ctx context.Context) errorto reduce duplication.♻️ Suggested helper extraction
// waitForStreamReady blocks until the stream is ready or the context is cancelled. func (s *remoteMessageTarget) waitForStreamReady(ctx context.Context) error { for { if s.isReadyToSend() { return nil } select { case <-ctx.Done(): return ctx.Err() case <-time.After(500 * time.Millisecond): if s.shouldLogConnectionIssue(time.Now(), connectionIssueLogInterval) { log.Warn("remote target stream is not ready, wait and check again", zap.Stringer("localID", s.messageCenterID), zap.String("localAddr", s.localAddr), zap.Stringer("remoteID", s.targetId), zap.String("remoteAddr", s.targetAddr)) } } } }Then in both
runSendMessagesandrunReceiveMessages:- // wait stream ready - for { - if s.isReadyToSend() { - break - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(500 * time.Millisecond): - if s.shouldLogConnectionIssue(time.Now(), connectionIssueLogInterval) { - log.Warn("remote target stream is not ready, wait and check again", - ... - } - continue - } - } + if err := s.waitForStreamReady(ctx); err != nil { + return err + }Also applies to: 597-614
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/messaging/remote_target.go` around lines 519 - 536, Extract the duplicated wait loop into a method on remoteMessageTarget named waitForStreamReady(ctx context.Context) error that returns nil when s.isReadyToSend() becomes true and ctx.Err() if the context is canceled; move the polling interval (500ms) and the rate-limited logging using s.shouldLogConnectionIssue(time.Now(), connectionIssueLogInterval) and the existing log.Warn call into that helper; then replace the loop in runSendMessages and runReceiveMessages with a call to s.waitForStreamReady(ctx) and propagate the returned error.server/watcher/module_node_manager.go (1)
86-92: CAS without retry loop may cause missed log opportunities.Unlike the loop-based CAS pattern used in other files (
dispatcher_stat.go,dispatcher_manager.go,basic_dispatcher.go), this implementation doesn't retry on CAS failure. If two goroutines race and one fails the CAS, it returnsfalseeven though it might be the first one to actually check after the interval elapsed.This is a minor inconsistency but not a critical issue since it only affects logging frequency slightly.
🔧 Suggested fix for consistency with other files
func (c *NodeManager) shouldLogGetCoordinatorErr(now time.Time) bool { - last := c.lastGetCoordinatorErrLogTime.Load() - if last != 0 && now.UnixNano()-last < getCoordinatorErrLogInterval.Nanoseconds() { - return false + nowNano := now.UnixNano() + for { + last := c.lastGetCoordinatorErrLogTime.Load() + if last != 0 && nowNano-last < getCoordinatorErrLogInterval.Nanoseconds() { + return false + } + if c.lastGetCoordinatorErrLogTime.CompareAndSwap(last, nowNano) { + return true + } } - return c.lastGetCoordinatorErrLogTime.CompareAndSwap(last, now.UnixNano()) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/watcher/module_node_manager.go` around lines 86 - 92, The shouldLogGetCoordinatorErr method in NodeManager uses a one-shot CompareAndSwap which can lose a logging opportunity under contention; update shouldLogGetCoordinatorErr to use the same CAS-retry loop pattern found in dispatcher_stat.go/dispatcher_manager.go/basic_dispatcher.go: repeatedly Load lastGetCoordinatorErrLogTime, return false if now-last < getCoordinatorErrLogInterval, otherwise attempt CompareAndSwap(last, now.UnixNano()) in a loop until the CAS succeeds (return true) or another goroutine advances the timestamp (in which case re-evaluate and return false if interval not elapsed); reference NodeManager.shouldLogGetCoordinatorErr, lastGetCoordinatorErrLogTime, and getCoordinatorErrLogInterval.downstreamadapter/dispatcher/basic_dispatcher.go (1)
40-51: Use sync/atomic instead of go.uber.org/atomic for consistency.This file uses
go.uber.org/atomicwhile most other changed files use standard librarysync/atomic(dispatcher_stat.go, dispatcher_manager.go, event_collector.go, and module_node_manager.go). Both approaches work correctly, but aligning on a single package across the codebase would improve maintainability.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcher/basic_dispatcher.go` around lines 40 - 51, The function shouldLogDispatcherWarning currently accepts lastLogTime *atomic.Int64 and uses methods Load and CAS from go.uber.org/atomic; change it to use the standard library by making lastLogTime a pointer to int64 (e.g., *int64) and replace lastLogTime.Load()/lastLogTime.CAS(...) with sync/atomic calls: atomic.LoadInt64(lastLogTime), atomic.CompareAndSwapInt64(lastLogTime, last, now), and compute now as int64 using time.Now().UnixNano(); update any callers that pass an *atomic.Int64 to instead pass the underlying *int64 (or adapt initialization) so the function signature and usages consistently use sync/atomic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@logservice/logpuller/region_request_worker.go`:
- Around line 114-119: The Recv error logging in receiveAndDispatchChangeEvents
is unthrottled and logs before handling EOF; update the Recv error handling so
that if err == io.EOF (or the stream closed fast-path) you return silently, and
for other errors wrap the log call with shouldLogLogPullerWarning using the
worker/s receiver's lastConnectionIssueLogTime (e.g.
worker.lastConnectionIssueLogTime) and logPullerWarnLogInterval; apply the same
throttling guard to the other Recv logging sites mentioned (the blocks around
the Recv call at/near receiveAndDispatchChangeEvents and the other occurrences)
so flapping stores don’t emit a log every reconnect.
In `@pkg/messaging/remote_target.go`:
- Around line 40-44: Remove the unused constant reconnectInterval from the
top-level constants in remote_target.go (currently defined as reconnectInterval
= 2 * time.Second); locate the const block that also declares
connectionIssueLogInterval, streamTypeEvent, and streamTypeCommand, and delete
the reconnectInterval entry to avoid dead code, or alternatively replace it with
a brief comment explaining its intended purpose if you expect to use it later.
---
Nitpick comments:
In `@downstreamadapter/dispatcher/basic_dispatcher.go`:
- Around line 40-51: The function shouldLogDispatcherWarning currently accepts
lastLogTime *atomic.Int64 and uses methods Load and CAS from go.uber.org/atomic;
change it to use the standard library by making lastLogTime a pointer to int64
(e.g., *int64) and replace lastLogTime.Load()/lastLogTime.CAS(...) with
sync/atomic calls: atomic.LoadInt64(lastLogTime),
atomic.CompareAndSwapInt64(lastLogTime, last, now), and compute now as int64
using time.Now().UnixNano(); update any callers that pass an *atomic.Int64 to
instead pass the underlying *int64 (or adapt initialization) so the function
signature and usages consistently use sync/atomic.
In `@pkg/messaging/remote_target.go`:
- Around line 519-536: Extract the duplicated wait loop into a method on
remoteMessageTarget named waitForStreamReady(ctx context.Context) error that
returns nil when s.isReadyToSend() becomes true and ctx.Err() if the context is
canceled; move the polling interval (500ms) and the rate-limited logging using
s.shouldLogConnectionIssue(time.Now(), connectionIssueLogInterval) and the
existing log.Warn call into that helper; then replace the loop in
runSendMessages and runReceiveMessages with a call to s.waitForStreamReady(ctx)
and propagate the returned error.
In `@server/watcher/module_node_manager.go`:
- Around line 86-92: The shouldLogGetCoordinatorErr method in NodeManager uses a
one-shot CompareAndSwap which can lose a logging opportunity under contention;
update shouldLogGetCoordinatorErr to use the same CAS-retry loop pattern found
in dispatcher_stat.go/dispatcher_manager.go/basic_dispatcher.go: repeatedly Load
lastGetCoordinatorErrLogTime, return false if now-last <
getCoordinatorErrLogInterval, otherwise attempt CompareAndSwap(last,
now.UnixNano()) in a loop until the CAS succeeds (return true) or another
goroutine advances the timestamp (in which case re-evaluate and return false if
interval not elapsed); reference NodeManager.shouldLogGetCoordinatorErr,
lastGetCoordinatorErrLogTime, and getCoordinatorErrLogInterval.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 20a72332-0801-4e3c-b9f5-51f512ee72ad
📒 Files selected for processing (14)
downstreamadapter/dispatcher/basic_dispatcher.godownstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/eventcollector/dispatcher_stat.godownstreamadapter/eventcollector/event_collector.gologservice/logpuller/region_req_cache.gologservice/logpuller/region_request_worker.gologservice/logpuller/subscription_client.gologservice/schemastore/disk_format.gologservice/schemastore/persist_storage.gologservice/schemastore/schema_store.gomaintainer/maintainer.gopkg/messaging/message_center.gopkg/messaging/remote_target.goserver/watcher/module_node_manager.go
| if shouldLogLogPullerWarning(&worker.lastConnectionIssueLogTime, logPullerWarnLogInterval) { | ||
| log.Error("event feed check store version fails", | ||
| zap.Uint64("workerID", worker.workerID), | ||
| zap.String("addr", worker.store.storeAddr), | ||
| zap.Error(err)) | ||
| } |
There was a problem hiding this comment.
Don’t leave Recv failures unthrottled on the reconnect loop.
Line 221 in receiveAndDispatchChangeEvents still logs every stream Recv failure, and it does so before the EOF fast-path. On a flapping store each worker can still emit one info log per reconnect cycle, so the disk-growth problem behind issue #4691 is not fully addressed. Please either return silently on EOF and throttle the remaining log, or guard that site with shouldLogLogPullerWarning(&s.lastConnectionIssueLogTime, logPullerWarnLogInterval).
Also applies to: 181-186, 354-361
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@logservice/logpuller/region_request_worker.go` around lines 114 - 119, The
Recv error logging in receiveAndDispatchChangeEvents is unthrottled and logs
before handling EOF; update the Recv error handling so that if err == io.EOF (or
the stream closed fast-path) you return silently, and for other errors wrap the
log call with shouldLogLogPullerWarning using the worker/s receiver's
lastConnectionIssueLogTime (e.g. worker.lastConnectionIssueLogTime) and
logPullerWarnLogInterval; apply the same throttling guard to the other Recv
logging sites mentioned (the blocks around the Recv call at/near
receiveAndDispatchChangeEvents and the other occurrences) so flapping stores
don’t emit a log every reconnect.
| reconnectInterval = 2 * time.Second | ||
| connectionIssueLogInterval = 10 * time.Second | ||
| streamTypeEvent = "event" | ||
| streamTypeCommand = "command" | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for usages of reconnectInterval across the codebase
rg -n '\breconnectInterval\b' --type goRepository: pingcap/ticdc
Length of output: 136
Remove the unused reconnectInterval constant at line 40.
It's defined but never referenced in the codebase. Either remove it or add a comment explaining its intended purpose.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/messaging/remote_target.go` around lines 40 - 44, Remove the unused
constant reconnectInterval from the top-level constants in remote_target.go
(currently defined as reconnectInterval = 2 * time.Second); locate the const
block that also declares connectionIssueLogInterval, streamTypeEvent, and
streamTypeCommand, and delete the reconnectInterval entry to avoid dead code, or
alternatively replace it with a brief comment explaining its intended purpose if
you expect to use it later.
What problem does this PR solve?
Issue Number: close #4734, ref #4691
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit