feat: Allow buffering of Application events at informer to reduce event volume#937
feat: Allow buffering of Application events at informer to reduce event volume#937jgwest wants to merge 1 commit into
Conversation
📝 WalkthroughWalkthroughThis PR adds a configurable event buffer for Kubernetes Application informer callbacks. It introduces a generic per-resource throttle mechanism that buffers update events while serializing add and delete operations, integrates it into Agent initialization, and exposes configuration via CLI flag. ChangesApplication Informer Event Buffering
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
cd4aa79 to
77e7ed8
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #937 +/- ##
==========================================
- Coverage 48.44% 47.93% -0.52%
==========================================
Files 121 122 +1
Lines 18298 18491 +193
==========================================
- Hits 8865 8864 -1
- Misses 8638 8830 +192
- Partials 795 797 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
77e7ed8 to
f0a5a3d
Compare
…nt volume Signed-off-by: Jonathan West <jgwest@gmail.com>
f0a5a3d to
16dcb4b
Compare
|
@coderabbitai review |
✅ Action performedReview finished.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (4)
agent/informer_event_buffer.go (4)
291-303: ⚡ Quick winRemove commented-out code and TODO.
Lines 291-301 contain commented-out code and line 303 has a stale TODO comment. Both should be removed.
🧹 Proposed cleanup
buffer.lock.Lock() delete(buffer.instanceMap, resourceID) buffer.lock.Unlock() - - // { - // // Acquire the lock to ensure there is not another running: - // resourceState.fieldLock.Lock() - - // // We have received a delete event for the resource, so any previous update events are now stale: we clear them here. - // resourceState.oldObjectForUpdateFunc = *new(T) // reset to nil - // resourceState.newObjectForUpdateFunc = *new(T) // reset to nil - // resourceState.nextOpportunityToCallUpdateFn = time.Time{} - - // resourceState.fieldLock.Unlock() - // } - - logEntry.Debugf("calling delete function from go-routine") // JGW-TODO // Call the function with the object - logEntry.Debug("calling deleteHandlerFnToCall")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/informer_event_buffer.go` around lines 291 - 303, Remove the dead/commented-out block that acquires resourceState.fieldLock and resets resourceState.oldObjectForUpdateFunc, resourceState.newObjectForUpdateFunc, and resourceState.nextOpportunityToCallUpdateFn, as well as the stale "JGW-TODO" on the logEntry.Debugf call; clean up by deleting the entire commented section and leaving only the active logEntry.Debugf("calling delete function from go-routine") (or remove that log line if it's no longer needed) so the informer_event_buffer.go code no longer contains commented code and TODO noise.
447-447: ⚡ Quick winRemove commented code.
Line 447 contains commented-out code that should be removed.
🧹 Proposed cleanup
nn := types.NamespacedName{Namespace: acc.GetNamespace(), Name: acc.GetName()} return nn.String(), nil - // fmt.Sprintf("%s", nn.String()), nil - // return fmt.Sprintf("%s_%s", nn.String(), string(acc.GetUID())), nil }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/informer_event_buffer.go` at line 447, Remove the leftover commented-out code fragment "fmt.Sprintf("%s", nn.String()), nil" from agent/informer_event_buffer.go (the inactive commented line at the former return/format area) so the file contains no dead/commented code; simply delete that line and ensure surrounding code and formatting remain valid (no stray commas or blank-line artifacts) and run `gofmt`/lint to confirm clean formatting.
16-18: 💤 Low valueAddress or remove TODO comments.
Lines 16-18 and 32 contain TODO comments that should be addressed before merging this PR, or removed if they're no longer relevant.
Also applies to: 32-32
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/informer_event_buffer.go` around lines 16 - 18, Remove or resolve the two JGW-TODOs in informer_event_buffer.go: either implement the suggested logger integration by importing cmd/cmdutil/log and using ParseLogLevels to create a logger and replace any ad-hoc logging calls (reference ParseLogLevels and the logger variable you add), and add a short package-level comment at the top of informer_event_buffer.go that documents the buffer algorithm (describe inputs/outputs, buffering policy, eviction/flush rules and concurrency assumptions) so the TODO "Document the algorithm" is satisfied; if you decide the logger change is no longer required, delete the logger TODO and still add the algorithm doc comment.
223-237: ⚡ Quick winRemove commented-out code.
Lines 223-237 contain commented-out code that should be removed to improve readability.
🧹 Proposed cleanup
resourceState.oldObjectForUpdateFunc = old resourceState.newObjectForUpdateFunc = new - - // isOpportunityWindowOpen := time.Now().After(resourceState.nextOpportunityToCallUpdateFn) resourceState.fieldLock.Unlock() - - // if isOpportunityWindowOpen { - - // If the opportunity window has opened (more than X amount of time has elapsed since we sent the last message, where X is minTimeBetweenSends) - // logEntry.Debug("opportunity window is open") - - // logging.GetDefaultLogger(). + buffer.attemptToCallFuncAndResetOpportunityWindow(resourceState, logEntry) - - // } else { - // // The opportunity window has not yet opened, so we merely return until it is time: the go-routine will ensure it is scheduled appropriately. - // logEntry.Debugf("opportunity window is not yet open: %v", resourceState.nextOpportunityToCallUpdateFn) - // } - }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/informer_event_buffer.go` around lines 223 - 237, Remove the dead commented-out code around the opportunity-window check to improve readability: delete the commented lines that reference isOpportunityWindowOpen, the if/else block, and the logging comments so only the active logic remains (unlock of resourceState.fieldLock and the call to buffer.attemptToCallFuncAndResetOpportunityWindow(resourceState, logEntry)). Ensure you keep the surrounding live symbols (resourceState, resourceState.fieldLock.Unlock(), resourceState.nextOpportunityToCallUpdateFn reference if used elsewhere, buffer.attemptToCallFuncAndResetOpportunityWindow, and logEntry) intact and run tests/lint to confirm no stray comments remain.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@agent/agent.go`:
- Line 310: The log uses "%d" with a time.Duration which prints nanoseconds;
update the Infof call that references a.applicationInformerEventBufferInterval
(the log().Infof invocation) to format the duration correctly by either using
int(a.applicationInformerEventBufferInterval.Seconds()) with "%d" or switch the
format specifier to "%v" and pass a.applicationInformerEventBufferInterval so
the duration is displayed in a human-readable form.
In `@agent/informer_event_buffer.go`:
- Around line 325-333: The loop that reads resourceState.oldObjectForUpdateFunc
(in the code that builds resourceStates from buffer.instanceMap) is a data race
because receiveUpdateInformerEvent modifies that field under
resourceState.fieldLock; fix it by reading oldObjectForUpdateFunc while holding
the same fieldLock (or by adding a thread-safe accessor on the ResourceState
that returns whether the slot is empty) before calling objectSlotEmpty—i.e.,
acquire resourceState.fieldLock, copy/check oldObjectForUpdateFunc, release the
lock, and use that value for the conditional so reads and writes are
synchronized with receiveUpdateInformerEvent.
In `@agent/options.go`:
- Around line 117-125: The validation in
WithApplicationInformerEventBufferInterval currently allows zero but returns an
error message saying "must be positive"; update the error text to reflect the
actual check by changing the returned error in
WithApplicationInformerEventBufferInterval to say "application informer event
buffer interval must be non-negative" so it matches the validation logic that
permits interval == 0 while still rejecting negative durations; locate the error
return inside the AgentOption closure that sets
applicationInformerEventBufferInterval.
---
Nitpick comments:
In `@agent/informer_event_buffer.go`:
- Around line 291-303: Remove the dead/commented-out block that acquires
resourceState.fieldLock and resets resourceState.oldObjectForUpdateFunc,
resourceState.newObjectForUpdateFunc, and
resourceState.nextOpportunityToCallUpdateFn, as well as the stale "JGW-TODO" on
the logEntry.Debugf call; clean up by deleting the entire commented section and
leaving only the active logEntry.Debugf("calling delete function from
go-routine") (or remove that log line if it's no longer needed) so the
informer_event_buffer.go code no longer contains commented code and TODO noise.
- Line 447: Remove the leftover commented-out code fragment "fmt.Sprintf("%s",
nn.String()), nil" from agent/informer_event_buffer.go (the inactive commented
line at the former return/format area) so the file contains no dead/commented
code; simply delete that line and ensure surrounding code and formatting remain
valid (no stray commas or blank-line artifacts) and run `gofmt`/lint to confirm
clean formatting.
- Around line 16-18: Remove or resolve the two JGW-TODOs in
informer_event_buffer.go: either implement the suggested logger integration by
importing cmd/cmdutil/log and using ParseLogLevels to create a logger and
replace any ad-hoc logging calls (reference ParseLogLevels and the logger
variable you add), and add a short package-level comment at the top of
informer_event_buffer.go that documents the buffer algorithm (describe
inputs/outputs, buffering policy, eviction/flush rules and concurrency
assumptions) so the TODO "Document the algorithm" is satisfied; if you decide
the logger change is no longer required, delete the logger TODO and still add
the algorithm doc comment.
- Around line 223-237: Remove the dead commented-out code around the
opportunity-window check to improve readability: delete the commented lines that
reference isOpportunityWindowOpen, the if/else block, and the logging comments
so only the active logic remains (unlock of resourceState.fieldLock and the call
to buffer.attemptToCallFuncAndResetOpportunityWindow(resourceState, logEntry)).
Ensure you keep the surrounding live symbols (resourceState,
resourceState.fieldLock.Unlock(), resourceState.nextOpportunityToCallUpdateFn
reference if used elsewhere, buffer.attemptToCallFuncAndResetOpportunityWindow,
and logEntry) intact and run tests/lint to confirm no stray comments remain.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8407859f-32b9-49e4-ba65-3421718a7163
📒 Files selected for processing (4)
agent/agent.goagent/informer_event_buffer.goagent/options.gocmd/argocd-agent/agent.go
|
|
||
| // If buffer is enabled, add a buffered layer between handlers and k8s informer callbacks | ||
| if a.applicationInformerEventBufferInterval != 0 { | ||
| log().Infof("Buffering of Application events is enabled at '%d' seconds", a.applicationInformerEventBufferInterval) |
There was a problem hiding this comment.
Fix duration format in log message.
The format string uses %d with a time.Duration, which will print the raw nanosecond value instead of seconds. Use int(a.applicationInformerEventBufferInterval.Seconds()) or change to %v to display the duration properly.
🔧 Proposed fix
- log().Infof("Buffering of Application events is enabled at '%d' seconds", a.applicationInformerEventBufferInterval)
+ log().Infof("Buffering of Application events is enabled at '%v'", a.applicationInformerEventBufferInterval)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| log().Infof("Buffering of Application events is enabled at '%d' seconds", a.applicationInformerEventBufferInterval) | |
| log().Infof("Buffering of Application events is enabled at '%v'", a.applicationInformerEventBufferInterval) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@agent/agent.go` at line 310, The log uses "%d" with a time.Duration which
prints nanoseconds; update the Infof call that references
a.applicationInformerEventBufferInterval (the log().Infof invocation) to format
the duration correctly by either using
int(a.applicationInformerEventBufferInterval.Seconds()) with "%d" or switch the
format specifier to "%v" and pass a.applicationInformerEventBufferInterval so
the duration is displayed in a human-readable form.
| for _, resourceState := range buffer.instanceMap { | ||
|
|
||
| if objectSlotEmpty(resourceState.oldObjectForUpdateFunc) { | ||
| // No objects waiting? No work to do. | ||
| continue | ||
| } | ||
|
|
||
| resourceStates = append(resourceStates, resourceState) | ||
| } |
There was a problem hiding this comment.
Data race on oldObjectForUpdateFunc.
Line 327 reads resourceState.oldObjectForUpdateFunc without holding resourceState.fieldLock, while receiveUpdateInformerEvent (lines 214-224) writes to this field under the lock. This is a data race that can cause crashes or incorrect behavior.
🔒 Proposed fix
for _, resourceState := range buffer.instanceMap {
-
- if objectSlotEmpty(resourceState.oldObjectForUpdateFunc) {
+ resourceState.fieldLock.Lock()
+ isEmpty := objectSlotEmpty(resourceState.oldObjectForUpdateFunc)
+ resourceState.fieldLock.Unlock()
+
+ if isEmpty {
// No objects waiting? No work to do.
continue
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for _, resourceState := range buffer.instanceMap { | |
| if objectSlotEmpty(resourceState.oldObjectForUpdateFunc) { | |
| // No objects waiting? No work to do. | |
| continue | |
| } | |
| resourceStates = append(resourceStates, resourceState) | |
| } | |
| for _, resourceState := range buffer.instanceMap { | |
| resourceState.fieldLock.Lock() | |
| isEmpty := objectSlotEmpty(resourceState.oldObjectForUpdateFunc) | |
| resourceState.fieldLock.Unlock() | |
| if isEmpty { | |
| // No objects waiting? No work to do. | |
| continue | |
| } | |
| resourceStates = append(resourceStates, resourceState) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@agent/informer_event_buffer.go` around lines 325 - 333, The loop that reads
resourceState.oldObjectForUpdateFunc (in the code that builds resourceStates
from buffer.instanceMap) is a data race because receiveUpdateInformerEvent
modifies that field under resourceState.fieldLock; fix it by reading
oldObjectForUpdateFunc while holding the same fieldLock (or by adding a
thread-safe accessor on the ResourceState that returns whether the slot is
empty) before calling objectSlotEmpty—i.e., acquire resourceState.fieldLock,
copy/check oldObjectForUpdateFunc, release the lock, and use that value for the
conditional so reads and writes are synchronized with
receiveUpdateInformerEvent.
| func WithApplicationInformerEventBufferInterval(interval time.Duration) AgentOption { | ||
| return func(o *Agent) error { | ||
| if interval < 0 { | ||
| return fmt.Errorf("application informer event buffer interval must be positive") | ||
| } | ||
| o.applicationInformerEventBufferInterval = interval | ||
| return nil | ||
| } | ||
| } |
There was a problem hiding this comment.
Clarify validation error message.
The validation check permits interval == 0 (which disables buffering), but the error message says "must be positive." Update the message to "must be non-negative" to match the validation logic.
📝 Proposed fix
return func(o *Agent) error {
if interval < 0 {
- return fmt.Errorf("application informer event buffer interval must be positive")
+ return fmt.Errorf("application informer event buffer interval must be non-negative")
}
o.applicationInformerEventBufferInterval = interval
return nil📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func WithApplicationInformerEventBufferInterval(interval time.Duration) AgentOption { | |
| return func(o *Agent) error { | |
| if interval < 0 { | |
| return fmt.Errorf("application informer event buffer interval must be positive") | |
| } | |
| o.applicationInformerEventBufferInterval = interval | |
| return nil | |
| } | |
| } | |
| func WithApplicationInformerEventBufferInterval(interval time.Duration) AgentOption { | |
| return func(o *Agent) error { | |
| if interval < 0 { | |
| return fmt.Errorf("application informer event buffer interval must be non-negative") | |
| } | |
| o.applicationInformerEventBufferInterval = interval | |
| return nil | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@agent/options.go` around lines 117 - 125, The validation in
WithApplicationInformerEventBufferInterval currently allows zero but returns an
error message saying "must be positive"; update the error text to reflect the
actual check by changing the returned error in
WithApplicationInformerEventBufferInterval to say "application informer event
buffer interval must be non-negative" so it matches the validation logic that
permits interval == 0 while still rejecting negative durations; locate the error
return inside the AgentOption closure that sets
applicationInformerEventBufferInterval.
What does this PR do / why we need it:
This PR introduces a new, optional, configurable 'informer event buffer' layer that sits between K8s informer and the argo cd agent
Applicationresource informers event handlers.This new layer ensures that in an X second period (for example, 10 second), the argo cd agent 'update' event handlers will be called at most once every X seconds.
This is an example of how the algorithm works:
@0 seconds) Application informer informs us of an Application Event (for example, .status change)
@1 second) Application informer informs us of another Application Event
@6 seconds) Application informer informs us of another Appliation Event
@10 seconds) We detect that the 10 seconds has elaped, and send the message from @6 seconds.
@20 seconds) Application informer informs us of an Application Event (for example, .status change)
This ensures that principal (downstream for Application .status update) is not overloaded by large numbers of Application .status changes at once. For example, as can occur on initial deploy of an Application, or less commonly, when Argo CD has been misconfigured such that it is battling another controller for ownership of a resource.
To test the efficacy, I configured the layer with X=10 seconds (send Application update events at most once every 10 seconds), and then ran the standard E2E tests.
Other details:
SharedInformer:- BUT: If we see a 'delete', it will short circuit any remaining updates that are still to process.
Which issue(s) this PR fixes:
Fixes N/A
Checklist
Summary by CodeRabbit
--buffer-informer-eventsCLI flag orARGOCD_AGENT_BUFFER_INFORMER_EVENTSenvironment variable (defaults to no buffering for backward compatibility).