Skip to content

Commit c140886

Browse files
fix: prevent lost iterations in CircularKey concurrent worker race
Two workers sharing a DocumentGenerator could both detect end-of-pass (updateItr >= update_e) before either called resetUpdate(), causing double-decrements of the plain-int iterations counter in CircularKey. Each race event silently dropped one full pass, producing fewer total mutations than expected (observed: 54204 vs 60000 ep_dcp_items_sent in CDC history retention tests). Add double-checked locking (synchronized on this.keys) in has_next_read(), has_next_update(), and has_next_expiry() so only one worker triggers checkIterations() + reset per pass-end event. The fast unsynchronized path is preserved for normal in-pass execution, keeping full worker concurrency for deduplication-sensitive tests.
1 parent 5ca14fe commit c140886

1 file changed

Lines changed: 36 additions & 24 deletions

File tree

src/main/java/utils/docgen/DocumentGenerator.java

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -211,13 +211,17 @@ public boolean has_next_read() {
211211
if (this.ws.dr.readItr.get() < this.ws.dr.read_e)
212212
return true;
213213
if (this.keyInstance.getSimpleName().equals(CircularKey.class.getSimpleName())) {
214-
try {
215-
if ((boolean)this.iterationsMethod.invoke(this.keys)) {
216-
this.resetRead();
214+
synchronized (this.keys) {
215+
if (this.ws.dr.readItr.get() < this.ws.dr.read_e)
217216
return true;
217+
try {
218+
if ((boolean)this.iterationsMethod.invoke(this.keys)) {
219+
this.resetRead();
220+
return true;
221+
}
222+
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
223+
e1.printStackTrace();
218224
}
219-
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
220-
e1.printStackTrace();
221225
}
222226
}
223227
return false;
@@ -226,21 +230,25 @@ public boolean has_next_read() {
226230
public boolean has_next_update() {
227231
if (this.ws.dr.updateItr.get() < this.ws.dr.update_e)
228232
return true;
229-
if (this.keyInstance.getSimpleName().equals(CircularKey.class.getSimpleName())) {
230-
try {
231-
if ((boolean)this.iterationsMethod.invoke(this.keys)) {
232-
this.resetUpdate();
233-
this.ws.mutated += 1;
234-
return true;
233+
synchronized (this.keys) {
234+
if (this.ws.dr.updateItr.get() < this.ws.dr.update_e)
235+
return true;
236+
if (this.keyInstance.getSimpleName().equals(CircularKey.class.getSimpleName())) {
237+
try {
238+
if ((boolean)this.iterationsMethod.invoke(this.keys)) {
239+
this.resetUpdate();
240+
this.ws.mutated += 1;
241+
return true;
242+
}
243+
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
244+
e1.printStackTrace();
235245
}
236-
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
237-
e1.printStackTrace();
238246
}
239-
}
240-
if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())-startTime<ws.mutation_timeout) {
241-
this.resetUpdate();
242-
this.ws.mutated += 1;
243-
return true;
247+
if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())-startTime<ws.mutation_timeout) {
248+
this.resetUpdate();
249+
this.ws.mutated += 1;
250+
return true;
251+
}
244252
}
245253
return false;
246254
}
@@ -249,13 +257,17 @@ public boolean has_next_expiry() {
249257
if (this.ws.dr.expiryItr.get() < this.ws.dr.expiry_e)
250258
return true;
251259
if (this.keyInstance.getSimpleName().equals(CircularKey.class.getSimpleName())) {
252-
try {
253-
if ((boolean)this.iterationsMethod.invoke(this.keys, null)) {
254-
this.resetExpiry();
260+
synchronized (this.keys) {
261+
if (this.ws.dr.expiryItr.get() < this.ws.dr.expiry_e)
255262
return true;
263+
try {
264+
if ((boolean)this.iterationsMethod.invoke(this.keys)) {
265+
this.resetExpiry();
266+
return true;
267+
}
268+
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
269+
e1.printStackTrace();
256270
}
257-
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
258-
e1.printStackTrace();
259271
}
260272
}
261273
return false;
@@ -311,7 +323,7 @@ public boolean has_next_subdoc_lookup() {
311323
abstract Tuple2<String, Object> next();
312324

313325
void resetRead() {
314-
this.ws.dr.readItr = new AtomicLong(this.ws.dr.read_s);
326+
this.ws.dr.readItr.set(this.ws.dr.read_s);
315327
}
316328

317329
void resetExpiry() {

0 commit comments

Comments
 (0)