Skip to content

Commit 37f46dd

Browse files
authored
[server] Added Retry Handler to Address Snapshotting Test Asynchrony (apache#1881)
[server] Added Retry Handler to Address Snapshotting Test Asynchrony
1 parent 23058ad commit 37f46dd

File tree

1 file changed

+21
-3
lines changed

1 file changed

+21
-3
lines changed

fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Except
503503
putRecordsToLeader(kvReplica, kvRecords);
504504

505505
// trigger first snapshot
506-
scheduledExecutorService.triggerNonPeriodicScheduledTask();
506+
triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
507507
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
508508

509509
// put more data and create second snapshot
@@ -514,7 +514,7 @@ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Except
514514
putRecordsToLeader(kvReplica, kvRecords);
515515

516516
// trigger second snapshot
517-
scheduledExecutorService.triggerNonPeriodicScheduledTask();
517+
triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
518518
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
519519

520520
// put more data and create third snapshot (this will be the broken one)
@@ -525,7 +525,7 @@ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Except
525525
putRecordsToLeader(kvReplica, kvRecords);
526526

527527
// trigger third snapshot
528-
scheduledExecutorService.triggerNonPeriodicScheduledTask();
528+
triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
529529
CompletedSnapshot snapshot2 = kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
530530

531531
// verify that snapshot2 is the latest one before we break it
@@ -766,4 +766,22 @@ public void reset() {
766766
isScheduled = false;
767767
}
768768
}
769+
770+
/** A helper function with support for retries for flaky triggering operations. */
771+
private static void triggerSnapshotTaskWithRetry(
772+
ManuallyTriggeredScheduledExecutorService scheduledExecutorService, int maxRetries)
773+
throws Exception {
774+
for (int i = 0; i < maxRetries; i++) {
775+
try {
776+
scheduledExecutorService.triggerNonPeriodicScheduledTask();
777+
return;
778+
} catch (java.util.NoSuchElementException e) {
779+
if (i == maxRetries - 1) {
780+
throw e;
781+
}
782+
783+
Thread.sleep(50);
784+
}
785+
}
786+
}
769787
}

0 commit comments

Comments
 (0)