fix: prevent lost iterations in CircularKey concurrent worker race#49
Open
ShivamPaliwal1 wants to merge 1 commit into
Open
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses a concurrency race when multiple workers share a single DocumentGenerator using CircularKey, ensuring only one worker performs the end-of-pass iteration check and counter reset.
Changes:
- Added double-checked locking (
synchronized (this.keys)) around end-of-pass handling inhas_next_read(),has_next_update(), andhas_next_expiry(). - Ensured these methods return
trueimmediately after a successful iteration rollover + reset, preserving progress across passes.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (this.ws.dr.expiryItr.get() < this.ws.dr.expiry_e) | ||
| return true; | ||
| try { | ||
| if ((boolean)this.iterationsMethod.invoke(this.keys, null)) { |
| return true; | ||
| try { | ||
| if ((boolean)this.iterationsMethod.invoke(this.keys)) { | ||
| this.resetRead(); |
Comment on lines
+238
to
+241
| if ((boolean)this.iterationsMethod.invoke(this.keys)) { | ||
| this.resetUpdate(); | ||
| this.ws.mutated += 1; | ||
| return true; |
Two workers sharing a DocumentGenerator could both detect end-of-pass (updateItr >= update_e) before either called resetUpdate(), causing double-decrements of the plain-int iterations counter in CircularKey. Each race event silently dropped one full pass, producing fewer total mutations than expected (observed: 54204 vs 60000 ep_dcp_items_sent in CDC history retention tests). Add double-checked locking (synchronized on this.keys) in has_next_read(), has_next_update(), and has_next_expiry() so only one worker triggers checkIterations() + reset per pass-end event. The fast unsynchronized path is preserved for normal in-pass execution, keeping full worker concurrency for deduplication-sensitive tests.
020aa86 to
c140886
Compare
Comment on lines
+233
to
252
| synchronized (this.keys) { | ||
| if (this.ws.dr.updateItr.get() < this.ws.dr.update_e) | ||
| return true; | ||
| if (this.keyInstance.getSimpleName().equals(CircularKey.class.getSimpleName())) { | ||
| try { | ||
| if ((boolean)this.iterationsMethod.invoke(this.keys)) { | ||
| this.resetUpdate(); | ||
| this.ws.mutated += 1; | ||
| return true; | ||
| } | ||
| } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) { | ||
| e1.printStackTrace(); | ||
| } | ||
| } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) { | ||
| e1.printStackTrace(); | ||
| } | ||
| } | ||
| if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())-startTime<ws.mutation_timeout) { | ||
| this.resetUpdate(); | ||
| this.ws.mutated += 1; | ||
| return true; | ||
| if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())-startTime<ws.mutation_timeout) { | ||
| this.resetUpdate(); | ||
| this.ws.mutated += 1; | ||
| return true; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Two workers sharing a DocumentGenerator could both detect end-of-pass (updateItr >= update_e) before either called resetUpdate(), causing double-decrements of the plain-int iterations counter in CircularKey.
Each race event silently dropped one full pass, producing fewer total mutations than expected (observed: 54204 vs 60000 ep_dcp_items_sent in CDC history retention tests).
Add double-checked locking (synchronized on this.keys) in has_next_read(), has_next_update(), and has_next_expiry() so only one worker triggers checkIterations() + reset per pass-end event.
The fast unsynchronized path is preserved for normal in-pass execution, keeping full worker concurrency for deduplication-sensitive tests.