Skip to content

Commit edbec92

Browse files
committed
[server] Added Retry Handler to Address Snapshotting Test Asynchrony
1 parent 640f40c commit edbec92

File tree

1 file changed

+29
-11
lines changed

1 file changed

+29
-11
lines changed

fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -498,34 +498,34 @@ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Except
498498
// put initial data and create first snapshot
499499
KvRecordBatch kvRecords =
500500
genKvRecordBatch(
501-
Tuple2.of("k1", new Object[] {1, "a"}),
502-
Tuple2.of("k2", new Object[] {2, "b"}));
501+
Tuple2.of("k1", new Object[]{1, "a"}),
502+
Tuple2.of("k2", new Object[]{2, "b"}));
503503
putRecordsToLeader(kvReplica, kvRecords);
504504

505505
// trigger first snapshot
506-
scheduledExecutorService.triggerNonPeriodicScheduledTask();
506+
triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
507507
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
508508

509509
// put more data and create second snapshot
510510
kvRecords =
511511
genKvRecordBatch(
512-
Tuple2.of("k1", new Object[] {3, "c"}),
513-
Tuple2.of("k3", new Object[] {4, "d"}));
512+
Tuple2.of("k1", new Object[]{3, "c"}),
513+
Tuple2.of("k3", new Object[]{4, "d"}));
514514
putRecordsToLeader(kvReplica, kvRecords);
515515

516516
// trigger second snapshot
517-
scheduledExecutorService.triggerNonPeriodicScheduledTask();
517+
triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
518518
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
519519

520520
// put more data and create third snapshot (this will be the broken one)
521521
kvRecords =
522522
genKvRecordBatch(
523-
Tuple2.of("k4", new Object[] {5, "e"}),
524-
Tuple2.of("k5", new Object[] {6, "f"}));
523+
Tuple2.of("k4", new Object[]{5, "e"}),
524+
Tuple2.of("k5", new Object[]{6, "f"}));
525525
putRecordsToLeader(kvReplica, kvRecords);
526526

527527
// trigger third snapshot
528-
scheduledExecutorService.triggerNonPeriodicScheduledTask();
528+
triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
529529
CompletedSnapshot snapshot2 = kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
530530

531531
// verify that snapshot2 is the latest one before we break it
@@ -566,8 +566,8 @@ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Except
566566
List<Tuple2<byte[], byte[]>> expectedKeyValues =
567567
getKeyValuePairs(
568568
genKvRecords(
569-
Tuple2.of("k1", new Object[] {3, "c"}),
570-
Tuple2.of("k3", new Object[] {4, "d"})));
569+
Tuple2.of("k1", new Object[]{3, "c"}),
570+
Tuple2.of("k3", new Object[]{4, "d"})));
571571
verifyGetKeyValues(kvTablet, expectedKeyValues);
572572

573573
// Verify the core functionality: KvTablet successfully initialized despite broken snapshot
@@ -766,4 +766,22 @@ public void reset() {
766766
isScheduled = false;
767767
}
768768
}
769+
770+
/** A helper function with support for retries for flaky triggering operations. */
771+
private static void triggerSnapshotTaskWithRetry(
772+
ManuallyTriggeredScheduledExecutorService scheduledExecutorService,
773+
int maxRetries) throws Exception {
774+
for (int i = 0; i < maxRetries; i++) {
775+
try {
776+
scheduledExecutorService.triggerNonPeriodicScheduledTask();
777+
return;
778+
} catch (java.util.NoSuchElementException e) {
779+
if (i == maxRetries - 1) {
780+
throw e;
781+
}
782+
783+
Thread.sleep(50);
784+
}
785+
}
786+
}
769787
}

0 commit comments

Comments
 (0)