feat: deduplicate unsent event writer events for spec/statusupdate#938
feat: deduplicate unsent event writer events for spec/statusupdate#938jgwest wants to merge 1 commit into
Conversation
📝 WalkthroughWalkthroughEvent writer configuration updated to use unbounded retries and adjusted backoff timing. Event queue storage refactored to deduplicate stale StatusUpdate and SpecUpdate events. Access patterns changed to peek without removal, resource iteration randomized per cycle, and deduplication test coverage added. ChangesEvent Queue and Retry Configuration
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #938 +/- ##
==========================================
- Coverage 48.44% 48.32% -0.12%
==========================================
Files 121 121
Lines 18298 18304 +6
==========================================
- Hits 8865 8846 -19
- Misses 8638 8659 +21
- Partials 795 799 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
| // - Items at the end of the list are 'fresher', items are the beginning of the list are more stale | ||
| // - We thus remove items early in the list in favour of those that are later in the list | ||
| for idx := len(*items) - 1; idx >= 0; idx-- { | ||
| item := (*items)[idx] |
There was a problem hiding this comment.
I like this new approach. I wonder if there is a way to avoid scanning all the queue items for each incoming event. The old approach avoided this by only de-duplicating events that were next to each other. The downside was that older events could still remain in the queue if events of a different type appeared in between. For example:
A,B,C: different incoming event types ordered from left(oldest) to right(latest)
Incoming events: [A1 A2 B1 B2 A3 A4 B3 B4 C1 C2 A5 A6]
Old logic: [A2 B2 A4 B4 C2 A6] (de-duplicate items of the same type next to each other)
New logic: [B4 C2 A6] (de-duplicate items of the same type within the array)
The new logic definitely helps reduce the queue memory by removing all duplicate events. But it trades additional CPU time by scanning the queue for each incoming item. If there are M incoming events and N items in the queue, in the worst case, we take O(N) to scan the queue and O(N) to delete (Go will shift the remaining items), so I think the overall complexity is O(M * N * N).
This may still be completely fine in our case, since our main goal is to reduce queue memory, and the queue size remains relatively smaller. I'm just thinking out loud about a possible future improvement.
There was a problem hiding this comment.
I came across this while working on #994. We could optimize and avoid the linear scan by using a doubly linked list and tracking the node pointers in a map. So, for an incoming event, we use the map to locate the old event in the list and remove the node O(1). But it adds further complexity. I'm using the linear scan to keep it simple for now, and added a TODO to revisit if it becomes an issue.
Signed-off-by: Jonathan West <jgwest@gmail.com>
f246506 to
22bde17
Compare
|
@coderabbitai review |
✅ Action performedReview finished.
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/event/event_writer.go (1)
212-237:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPrevent ACK cleanup from mutating
unsentEventson sent-ID mismatch.
EventWriter.Removeis ACK-driven inprincipal/apis/eventstream/eventstream.go(Lines 313-328). If sent-event ID doesn’t match, falling through to unsent-front removal can drop queued unsent work on stale/out-of-order ACK paths.Suggested fix
func (ew *EventWriter) Remove(ev *cloudevents.Event) { ew.mu.Lock() defer ew.mu.Unlock() // Remove the event only if it matches both the resourceID and eventID. resourceID := ResourceID(ev) incomingEventID := EventID(ev) // First, check and remove from sent events if sent, exists := ew.sentEvents[resourceID]; exists { sent.mu.RLock() sentEventID := EventID(sent.event) sent.mu.RUnlock() if sentEventID == incomingEventID { delete(ew.sentEvents, resourceID) - return + return } } + + // ACKs only confirm already-sent events. Do not mutate unsent queue here. + if Target(ev) == TargetEventAck { + return + } // If not in sent events, check unsent queue eq, exists := ew.unsentEvents[resourceID] if !exists { return }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/event/event_writer.go` around lines 212 - 237, The ACK cleanup currently falls through to the unsent queue when a resource exists in ew.sentEvents but the sent-event ID doesn't match, which can drop queued unsent work; in EventWriter.Remove ensure that if the resourceID exists in ew.sentEvents and the stored sent EventID does not equal incomingEventID you return immediately (do not inspect or mutate ew.unsentEvents). Locate the sent-events check in EventWriter.Remove (the code that reads ew.sentEvents and compares to incomingEventID) and add an early return on mismatch so only a matching sent-event triggers any removals; leave the unsent handling (eq.peek, frontEventID, eq.pop, delete(ew.unsentEvents, resourceID)) unchanged so it only runs when the resource was not present in sentEvents.
🧹 Nitpick comments (2)
internal/event/event_writer.go (1)
512-538: ⚡ Quick winUpdate
eventQueue.addcomments to match actual dedupe behavior.The function comment and retained commented-out block describe the old tail-only coalescing path, which now conflicts with current append-then-global-dedupe logic.
Suggested cleanup
-// add an item to the tail of the queue. -// If the item is the same type as the tail, replace the tail with the new item. +// add appends an item to the queue, then removes stale queued +// SpecUpdate/StatusUpdate entries while keeping the freshest of each. func (eq *eventQueue) add(ev *eventMessage) { eq.mu.Lock() defer eq.mu.Unlock() eq.items = append(eq.items, ev) deduplicateEventMessageItems(&eq.items) - - // if len(eq.items) > 0 { - // tail := eq.items[len(eq.items)-1] - // tail.mu.Lock() - // - // // Replace an older event with a newer one of the same type - // if ev.event.Type() == tail.event.Type() { - // tail.event = ev.event - // tail.backoff = ev.backoff - // tail.retryAfter = ev.retryAfter - // tail.mu.Unlock() - // return - // } - // tail.mu.Unlock() - // } - - // eq.items = append(eq.items, ev) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/event/event_writer.go` around lines 512 - 538, The comment above eventQueue.add is outdated — the function no longer only coalesces with the tail but appends then runs global deduplication via deduplicateEventMessageItems(&eq.items); update the top comment for eventQueue.add to state that the item is appended and a global dedupe pass is performed, and remove the obsolete commented-out tail-only coalescing block (or convert it into a short note about historical behavior) so the code and comment accurately describe the current append-then-dedupe behavior.internal/event/event_writer_test.go (1)
289-325: ⚡ Quick winReplace commented-out retry test with an explicit skipped test (or remove).
Keeping disabled test logic as comments makes intent hard to track and easy to rot. Prefer
t.Skip(...)with rationale, or delete and track in an issue.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/event/event_writer_test.go` around lines 289 - 325, The commented-out t.Run("should give up after max retries", ...) block should not remain as commented code; either convert it into an explicit skipped test using t.Run with t.Skip("reason") (preserving the original test title) or delete it entirely and track it in an issue. Locate the disabled block around the test name "should give up after max retries" that references NewEventWriter, SetOnDiscard, sendEvent, retrySentEvent and sentEvents, then replace the commented block with a real test function that immediately calls t.Skip with a clear rationale (or remove the block) so intent is explicit and the codebase no longer contains large commented tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@internal/event/event_writer.go`:
- Around line 212-237: The ACK cleanup currently falls through to the unsent
queue when a resource exists in ew.sentEvents but the sent-event ID doesn't
match, which can drop queued unsent work; in EventWriter.Remove ensure that if
the resourceID exists in ew.sentEvents and the stored sent EventID does not
equal incomingEventID you return immediately (do not inspect or mutate
ew.unsentEvents). Locate the sent-events check in EventWriter.Remove (the code
that reads ew.sentEvents and compares to incomingEventID) and add an early
return on mismatch so only a matching sent-event triggers any removals; leave
the unsent handling (eq.peek, frontEventID, eq.pop, delete(ew.unsentEvents,
resourceID)) unchanged so it only runs when the resource was not present in
sentEvents.
---
Nitpick comments:
In `@internal/event/event_writer_test.go`:
- Around line 289-325: The commented-out t.Run("should give up after max
retries", ...) block should not remain as commented code; either convert it into
an explicit skipped test using t.Run with t.Skip("reason") (preserving the
original test title) or delete it entirely and track it in an issue. Locate the
disabled block around the test name "should give up after max retries" that
references NewEventWriter, SetOnDiscard, sendEvent, retrySentEvent and
sentEvents, then replace the commented block with a real test function that
immediately calls t.Skip with a clear rationale (or remove the block) so intent
is explicit and the codebase no longer contains large commented tests.
In `@internal/event/event_writer.go`:
- Around line 512-538: The comment above eventQueue.add is outdated — the
function no longer only coalesces with the tail but appends then runs global
deduplication via deduplicateEventMessageItems(&eq.items); update the top
comment for eventQueue.add to state that the item is appended and a global
dedupe pass is performed, and remove the obsolete commented-out tail-only
coalescing block (or convert it into a short note about historical behavior) so
the code and comment accurately describe the current append-then-dedupe
behavior.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c7fb4728-dac4-4d24-aaa6-c2f65f95dedd
📒 Files selected for processing (2)
internal/event/event_writer.gointernal/event/event_writer_test.go
What does this PR do / why we need it:
Which issue(s) this PR fixes:
Fixes N/A
Checklist
Summary by CodeRabbit
Release Notes
Bug Fixes
Tests