Filter out-of-order/duplicate graph events#16
Filter out-of-order/duplicate graph events#16pallakartheekreddy wants to merge 3 commits intodevelopfrom
Conversation
Add a lastUpdatedCache map and a shouldProcessEvent(...) helper to drop events that are older than or equal to the last-processed lastUpdatedOn for the same nodeUniqueId. The processor now calls shouldProcessEvent before sending events to sinks; accepted events update the cache, while dropped events are logged at debug level. This prevents duplicate or out-of-order event emissions when timestamps are present.
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR adds in-memory filtering to prevent emitting out-of-order or duplicate JanusGraph CDC events by tracking the last processed lastUpdatedOn timestamp per nodeUniqueId. It addresses the issue of duplicate/out-of-order event emissions when timestamps are present in the converted event payload.
Changes:
- Introduces a
lastUpdatedCache(nodeUniqueId→ last processedlastUpdatedOnepoch ms). - Adds
shouldProcessEvent(...)to drop events whoselastUpdatedOnis<=the cached value (and logs dropped events at debug). - Hooks the new filter into
processVertexChange(...)so filtering happens before dispatch to sinks.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
Make lastUpdatedCache a synchronized LRU (LinkedHashMap) with MAX_CACHE_SIZE=10_000 to prevent unbounded heap growth. Modify processing flow so events are only recorded in the cache after a confirmed successful send: sendEventToSinks now returns boolean, shouldProcessEvent no longer mutates the cache, and a new updateLastUpdatedCache helper advances the cache after success. Also clear the cache on stop. Add /src to .gitignore. These changes reduce lost retries on serialization/send failures and cap memory usage.
- Remove dead hasStatusAttribute method (never called) - Cache addedVertices/removedVertices sets before UPDATE loop to avoid repeated getVertices() calls and re-consuming single-use iterables - Downgrade per-edge relation logging from INFO to DEBUG to reduce noise - Fix typo: INitialize -> Initialize in comment Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add a lastUpdatedCache map and a shouldProcessEvent(...) helper to drop events that are older than or equal to the last-processed lastUpdatedOn for the same nodeUniqueId. The processor now calls shouldProcessEvent before sending events to sinks; accepted events update the cache, while dropped events are logged at debug level. This prevents duplicate or out-of-order event emissions when timestamps are present.
Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
Type of change
Please choose appropriate options.
How Has This Been Tested?
Please describe the tests that you ran to verify your changes in the below checkboxes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration
Test Configuration:
Checklist: