feat: configure workqueue's internal queue to support event de-duplication#1003
feat: configure workqueue's internal queue to support event de-duplication#1003chetan-rns wants to merge 5 commits into
Conversation
Assisted-by: Cursor Signed-off-by: Chetan Banavikalmutt <chetanrns1997@gmail.com>
Assisted-by: Cursor Signed-off-by: Chetan Banavikalmutt <chetanrns1997@gmail.com>
Signed-off-by: Chetan Banavikalmutt <chetanrns1997@gmail.com>
Signed-off-by: Chetan Banavikalmutt <chetanrns1997@gmail.com>
Assisted-by: Cursor Signed-off-by: Chetan Banavikalmutt <chetanrns1997@gmail.com>
📝 WalkthroughWalkthroughIntroduces a new ChangesDedupeQueue and WorkQueue migration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai full review |
✅ Action performedFull review finished. |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/queue/mocks/QueuePair.go (1)
257-301:⚠️ Potential issue | 🟠 Major
QueuePairmock is only partially migrated and violates the interface contract.The interface defines both
SendQandRecvQto returnqueue.WorkQueue(internal/queue/queue.go:46-47), but the mock hasRecvQcorrectly updated toqueue.WorkQueuewhileSendQstill returnsworkqueue.TypedRateLimitingInterface[*event.Event]. This inconsistency breaks the mock's compliance with thequeue.QueuePairinterface. Regenerate this mock from the updated interface definition or updateSendQand its helper method signatures toqueue.WorkQueueto match.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/queue/mocks/QueuePair.go` around lines 257 - 301, The QueuePair mock in the file has an inconsistency where RecvQ correctly returns queue.WorkQueue but SendQ still returns workqueue.TypedRateLimitingInterface[*event.Event]. To fix this, locate the SendQ method and all its helper methods (QueuePair_SendQ_Call struct and its associated Run and Return methods in the QueuePair_Expecter type) and update their return type signatures from workqueue.TypedRateLimitingInterface[*event.Event] to queue.WorkQueue to match the actual interface definition. Alternatively, regenerate the entire mock from the updated interface definition in internal/queue/queue.go to ensure complete consistency.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/queue/dedupe_queue_test.go`:
- Around line 223-237: The TestDedupeQueue_Done test is missing the critical
regression test scenario where a duplicate event with the same dedupe key
arrives while the first item is being processed (between Get and Done). Modify
the test to add a second event with the same dedupe key (app1_uid1) after
calling Get on the first event but before calling Done, then add a second Get
call after Done to verify it returns the new event with the updated payload (v2)
rather than nil. This ensures the queue correctly handles the case where new
items arrive for the same resource while an existing item is still being
processed.
In `@internal/queue/dedupe_queue.go`:
- Around line 160-162: The issue is that the Add() method uses a blocking
q.queue.Get() call inside the eviction path after checking q.queue.Len() ==
q.maxSize, which creates a race condition where another goroutine could drain
the queue between the length check and the Get() call, causing the producer
thread to block indefinitely. Replace the blocking Get() approach with a
non-blocking alternative for item eviction, such as implementing a separate
lock-protected data structure (like a map or ordered list) that tracks key
insertion order for determining which item to evict, or switch to a non-blocking
peek/remove operation that won't wait if the queue becomes empty.
- Around line 99-123: Add validation in the NewDedupeQueue function to check if
the maxSize parameter is less than or equal to zero before proceeding with queue
initialization. If maxSize is invalid, either panic with a descriptive error
message or log a warning and clamp it to a sensible minimum value (such as 1) to
prevent the queue from entering eviction immediately on the first Add() call and
causing blocking issues on Get().
- Around line 147-153: The issue is that when Done() is called for an older
event that has been superseded by a newer event with the same key, the code
unconditionally deletes latestEvents[key], losing the newer pending event. In
the Done() method (around lines 188-201), before deleting latestEvents[key], you
need to verify that the event being completed is actually the current event
stored in latestEvents for that key by checking if q.eventKeys[item] still
points to that key and if latestEvents[key] still equals item. Only delete
latestEvents[key] if the item matches the current event; otherwise, preserve the
newer event that has arrived.
In `@internal/queue/queue.go`:
- Around line 213-217: The GetWithContext function accepts a WorkQueue interface
parameter but only works for *dedupeQueue implementations, silently returning
nil, false for other types. Either change the parameter type from WorkQueue to
*dedupeQueue to match the actual implementation, or add an explicit failure
mechanism (such as a panic or error return) for unsupported queue types instead
of silently returning nil, false. This will prevent nil events from leaking into
callers when using non-dedupeQueue implementations.
---
Outside diff comments:
In `@internal/queue/mocks/QueuePair.go`:
- Around line 257-301: The QueuePair mock in the file has an inconsistency where
RecvQ correctly returns queue.WorkQueue but SendQ still returns
workqueue.TypedRateLimitingInterface[*event.Event]. To fix this, locate the
SendQ method and all its helper methods (QueuePair_SendQ_Call struct and its
associated Run and Return methods in the QueuePair_Expecter type) and update
their return type signatures from
workqueue.TypedRateLimitingInterface[*event.Event] to queue.WorkQueue to match
the actual interface definition. Alternatively, regenerate the entire mock from
the updated interface definition in internal/queue/queue.go to ensure complete
consistency.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b0f933d5-ffa3-4545-b610-b0277a960794
📒 Files selected for processing (10)
agent/outbound_test.gointernal/queue/dedupe_queue.gointernal/queue/dedupe_queue_test.gointernal/queue/mocks/QueuePair.gointernal/queue/queue.gointernal/queue/queue_test.gointernal/resync/resync.goprincipal/callbacks_test.goprincipal/event.goprincipal/event_test.go
| func TestDedupeQueue_Done(t *testing.T) { | ||
| q := NewDedupeQueue("test", 100) | ||
|
|
||
| ev := newDedupableEvent("app1_uid1", "v1") | ||
| q.Add(ev) | ||
|
|
||
| got, _ := q.Get() | ||
| assert.Equal(t, 0, q.Len()) | ||
| q.Done(got) | ||
|
|
||
| // After Done, adding a new event for the same resource should work fresh | ||
| ev2 := newDedupableEvent("app1_uid1", "v2") | ||
| q.Add(ev2) | ||
| assert.Equal(t, 1, q.Len()) | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Add a regression test for “duplicate arrives while first item is processing”.
Current tests don't exercise Add(new) between Get(old) and Done(old) for the same dedupe key. That path is critical for this queue and should assert that the second Get() returns the latest payload (not nil).
Suggested test
+func TestDedupeQueue_DuplicateWhileProcessingKeepsLatest(t *testing.T) {
+ q := NewDedupeQueue("test", 100)
+
+ ev1 := newDedupableEvent("app1_uid1", "v1")
+ q.Add(ev1)
+
+ inFlight, shutdown := q.Get()
+ require.False(t, shutdown)
+ require.NotNil(t, inFlight)
+
+ ev2 := newDedupableEvent("app1_uid1", "v2")
+ q.Add(ev2)
+ q.Done(inFlight)
+
+ got, shutdown := q.Get()
+ require.False(t, shutdown)
+ require.NotNil(t, got)
+ var data string
+ _ = got.DataAs(&data)
+ assert.Equal(t, "v2", data)
+ q.Done(got)
+}Also applies to: 310-328
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/queue/dedupe_queue_test.go` around lines 223 - 237, The
TestDedupeQueue_Done test is missing the critical regression test scenario where
a duplicate event with the same dedupe key arrives while the first item is being
processed (between Get and Done). Modify the test to add a second event with the
same dedupe key (app1_uid1) after calling Get on the first event but before
calling Done, then add a second Get call after Done to verify it returns the new
event with the updated payload (v2) rather than nil. This ensures the queue
correctly handles the case where new items arrive for the same resource while an
existing item is still being processed.
| func NewDedupeQueue(name string, maxSize int) WorkQueue { | ||
| baseQueue := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[EventKey]{ | ||
| Name: name, | ||
| Queue: newReorderQueue[EventKey](), | ||
| }) | ||
|
|
||
| delayingQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[EventKey]{ | ||
| Queue: baseQueue, | ||
| }) | ||
|
|
||
| queue := workqueue.NewTypedRateLimitingQueueWithConfig( | ||
| workqueue.DefaultTypedControllerRateLimiter[EventKey](), | ||
| workqueue.TypedRateLimitingQueueConfig[EventKey]{ | ||
| DelayingQueue: delayingQueue, | ||
| }, | ||
| ) | ||
|
|
||
| return &dedupeQueue{ | ||
| queue: queue, | ||
| maxSize: maxSize, | ||
| latestEvents: make(map[EventKey]*event.Event), | ||
| eventKeys: make(map[*event.Event]EventKey), | ||
| notify: make(chan struct{}, 10), | ||
| } | ||
| } |
There was a problem hiding this comment.
Validate maxSize at construction time.
maxSize <= 0 causes the first Add() to enter eviction immediately and can block on Get(). Add an explicit constructor guard (panic or clamp with logged warning) to prevent invalid queue instances.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/queue/dedupe_queue.go` around lines 99 - 123, Add validation in the
NewDedupeQueue function to check if the maxSize parameter is less than or equal
to zero before proceeding with queue initialization. If maxSize is invalid,
either panic with a descriptive error message or log a warning and clamp it to a
sensible minimum value (such as 1) to prevent the queue from entering eviction
immediately on the first Add() call and causing blocking issues on Get().
| q.mu.Lock() | ||
| oldEvent := q.latestEvents[key] | ||
| _, exists := q.latestEvents[key] | ||
| q.latestEvents[key] = item | ||
| q.eventKeys[item] = key | ||
| if exists && oldEvent != nil { | ||
| delete(q.eventKeys, oldEvent) |
There was a problem hiding this comment.
In-flight duplicate handling drops the newest event for the same key.
When a key is re-added after Get() but before Done(), the new payload is put back into latestEvents, then Done() for the older payload deletes latestEvents[key] unconditionally. That can lose the pending update and produce nil from a subsequent Get().
Proposed fix
type dedupeQueue struct {
queue workqueue.TypedRateLimitingInterface[EventKey]
maxSize int
mu sync.Mutex
latestEvents map[EventKey]*event.Event
eventKeys map[*event.Event]EventKey
+ processing map[EventKey]*event.Event
notify chan struct{}
}
@@
return &dedupeQueue{
queue: queue,
maxSize: maxSize,
latestEvents: make(map[EventKey]*event.Event),
eventKeys: make(map[*event.Event]EventKey),
+ processing: make(map[EventKey]*event.Event),
notify: make(chan struct{}, 10),
}
}
@@
func (q *dedupeQueue) Add(item *event.Event) {
key := getKey(item)
q.mu.Lock()
oldEvent := q.latestEvents[key]
- _, exists := q.latestEvents[key]
+ _, exists := q.latestEvents[key]
+ _, inFlight := q.processing[key]
q.latestEvents[key] = item
q.eventKeys[item] = key
if exists && oldEvent != nil {
delete(q.eventKeys, oldEvent)
}
+ isNewKey := !exists && !inFlight
q.mu.Unlock()
@@
- if !exists && q.queue.Len() == q.maxSize {
+ if isNewKey && q.queue.Len() == q.maxSize {
@@
func (q *dedupeQueue) Get() (*event.Event, bool) {
@@
q.mu.Lock()
ev := q.latestEvents[key]
+ if ev != nil {
+ q.processing[key] = ev
+ }
delete(q.latestEvents, key)
q.mu.Unlock()
return ev, shutdown
}
func (q *dedupeQueue) Done(item *event.Event) {
q.mu.Lock()
key, ok := q.eventKeys[item]
if ok {
delete(q.eventKeys, item)
- delete(q.latestEvents, key)
+ delete(q.processing, key)
+ if cur, exists := q.latestEvents[key]; exists && cur == item {
+ delete(q.latestEvents, key)
+ }
}
q.mu.Unlock()Also applies to: 188-201
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/queue/dedupe_queue.go` around lines 147 - 153, The issue is that
when Done() is called for an older event that has been superseded by a newer
event with the same key, the code unconditionally deletes latestEvents[key],
losing the newer pending event. In the Done() method (around lines 188-201),
before deleting latestEvents[key], you need to verify that the event being
completed is actually the current event stored in latestEvents for that key by
checking if q.eventKeys[item] still points to that key and if latestEvents[key]
still equals item. Only delete latestEvents[key] if the item matches the current
event; otherwise, preserve the newer event that has arrived.
| if !exists && q.queue.Len() == q.maxSize { | ||
| oldest, shutdown := q.queue.Get() | ||
| if !shutdown { |
There was a problem hiding this comment.
Eviction path can block producer threads indefinitely.
Using blocking q.queue.Get() inside Add() is unsafe. The queue can be drained by another goroutine after the Len()==maxSize check, causing this producer to block before enqueueing the new item.
Please switch eviction to a non-blocking internal structure (or lock-protected key-order structure) rather than calling blocking Get() from the producer path.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/queue/dedupe_queue.go` around lines 160 - 162, The issue is that the
Add() method uses a blocking q.queue.Get() call inside the eviction path after
checking q.queue.Len() == q.maxSize, which creates a race condition where
another goroutine could drain the queue between the length check and the Get()
call, causing the producer thread to block indefinitely. Replace the blocking
Get() approach with a non-blocking alternative for item eviction, such as
implementing a separate lock-protected data structure (like a map or ordered
list) that tracks key insertion order for determining which item to evict, or
switch to a non-blocking peek/remove operation that won't wait if the queue
becomes empty.
| func GetWithContext(q WorkQueue, ctx context.Context) (*event.Event, bool) { | ||
| bq, ok := q.(*dedupeQueue) | ||
| if !ok { | ||
| return nil, false | ||
| } |
There was a problem hiding this comment.
GetWithContext advertises WorkQueue but only works for *dedupeQueue.
This silently returns (nil, false) for any non-*dedupeQueue implementation, which can leak nil events into callers under a “not shutdown” state. Please either narrow the parameter type or make unsupported queue types fail explicitly.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/queue/queue.go` around lines 213 - 217, The GetWithContext function
accepts a WorkQueue interface parameter but only works for *dedupeQueue
implementations, silently returning nil, false for other types. Either change
the parameter type from WorkQueue to *dedupeQueue to match the actual
implementation, or add an explicit failure mechanism (such as a panic or error
return) for unsupported queue types instead of silently returning nil, false.
This will prevent nil events from leaking into callers when using
non-dedupeQueue implementations.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1003 +/- ##
==========================================
- Coverage 48.40% 48.26% -0.15%
==========================================
Files 122 126 +4
Lines 18341 18806 +465
==========================================
+ Hits 8878 9076 +198
- Misses 8664 8914 +250
- Partials 799 816 +17
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
What does this PR do / why we need it:
client-go's workqueue allows us to customize the internal queue data structure. It also provides a hook called
Touchthat is invoked when we try to add an item that already exists in the queue but not yet processed. In this Touch method, we can remove the duplicate item and append the new item to the end of the queue. Instead of storing the entire resource event in the queue, we store the keys to events and maintain a map of keys to the latest events. We differentiate between de-duplicable and non-deduplicable items based on the key type:The EventID is unique for each event and thereby prevents de-duplication.
Which issue(s) this PR fixes:
Fixes #?
How to test changes / Special notes to the reviewer:
Checklist
Summary by CodeRabbit