Skip to content

Commit 2c73e76

Browse files
[kv] TabletSever must handle inconsistency or broken snapshot scenarios. (#1482)
1 parent df77b5c commit 2c73e76

File tree

6 files changed

+162
-1
lines changed

6 files changed

+162
-1
lines changed

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,4 +155,10 @@ public FsPath getRemoteKvDir() {
155155
public int maxFetchLogSizeInRecoverKv() {
156156
return maxFetchLogSizeInRecoverKv;
157157
}
158+
159+
@Override
160+
public void handleSnapshotBroken(CompletedSnapshot snapshot) throws Exception {
161+
completedSnapshotHandleStore.remove(snapshot.getTableBucket(), snapshot.getSnapshotID());
162+
snapshot.discardAsync(asyncOperationsThreadPool);
163+
}
158164
}

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,19 @@ public interface SnapshotContext {
6464
FunctionWithException<TableBucket, CompletedSnapshot, Exception>
6565
getLatestCompletedSnapshotProvider();
6666

67+
/**
68+
* Handles broken snapshots.
69+
*
70+
* <p>In the current implementation, broken snapshots may already have occurred in production
71+
* environments due to issues such as https://github.com/apache/fluss/issues/1304. While we must
72+
* prevent inconsistent or broken snapshots from being committed, we also need to provide
73+
* mechanisms to help the server recover from such snapshots rather than failing permanently.
74+
*
75+
* @param snapshot The broken snapshot to handle
76+
* @throws Exception if recovery handling fails
77+
*/
78+
void handleSnapshotBroken(CompletedSnapshot snapshot) throws Exception;
79+
6780
/**
6881
* Get the max fetch size for fetching log to apply kv during recovering kv. The kv may apply
6982
* log during recovering.

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ public final class Replica {
176176
private final ReadWriteLock leaderIsrUpdateLock = new ReentrantReadWriteLock();
177177
private final Clock clock;
178178

179+
private static final int INIT_KV_TABLET_MAX_RETRY_TIMES = 5;
179180
/**
180181
* storing the remote follower replicas' state, used to update leader's highWatermark and
181182
* replica ISR.
@@ -543,7 +544,15 @@ private void createKv() {
543544
}
544545

545546
// init kv tablet and get the snapshot it uses to init if have any
546-
Optional<CompletedSnapshot> snapshotUsed = initKvTablet();
547+
Optional<CompletedSnapshot> snapshotUsed = Optional.empty();
548+
for (int i = 1; i <= INIT_KV_TABLET_MAX_RETRY_TIMES; i++) {
549+
try {
550+
snapshotUsed = initKvTablet();
551+
break;
552+
} catch (Exception e) {
553+
LOG.warn("Fail to init kv tablet, retrying for {} times", i, e);
554+
}
555+
}
547556
// start periodic kv snapshot
548557
startPeriodicKvSnapshot(snapshotUsed.orElse(null));
549558
}
@@ -660,6 +669,13 @@ private void downloadKvSnapshots(CompletedSnapshot completedSnapshot, Path kvTab
660669
try {
661670
kvSnapshotDataDownloader.transferAllDataToDirectory(downloadSpec, closeableRegistry);
662671
} catch (Exception e) {
672+
if (e.getMessage().contains(CompletedSnapshot.SNAPSHOT_DATA_NOT_EXISTS_ERROR_MESSAGE)) {
673+
try {
674+
snapshotContext.handleSnapshotBroken(completedSnapshot);
675+
} catch (Exception t) {
676+
LOG.error("Handle broken snapshot {} failed.", completedSnapshot, t);
677+
}
678+
}
663679
throw new IOException("Fail to download kv snapshot.", e);
664680
}
665681
long end = clock.milliseconds();

fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,22 @@ public int getSnapshotLeaderEpoch(TableBucket tableBucket, long snapshotId) {
8585
}
8686
return -1;
8787
}
88+
89+
/**
90+
* Remove a snapshot with the given snapshot ID from the store. This simulates the cleanup of
91+
* broken snapshot (metadata from ZooKeeper exists, but data was corrupted).
92+
*/
93+
public void removeSnapshot(TableBucket tableBucket, long snapshotId) {
94+
Deque<CompletedSnapshot> bucketSnapshots = snapshots.get(tableBucket);
95+
if (bucketSnapshots != null) {
96+
// Remove the snapshot with matching ID
97+
bucketSnapshots.removeIf(snapshot -> snapshot.getSnapshotID() == snapshotId);
98+
}
99+
100+
Map<Long, Integer> bucketSnapshotLeaderEpochMap =
101+
bucketSnapshotLeaderEpoch.get(tableBucket);
102+
if (bucketSnapshotLeaderEpochMap != null) {
103+
bucketSnapshotLeaderEpochMap.remove(snapshotId);
104+
}
105+
}
88106
}

fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,106 @@ void testSnapshotUseLatestLeaderEpoch(@TempDir File snapshotKvTabletDir) throws
439439
.isEqualTo(latestLeaderEpoch);
440440
}
441441

442+
@Test
443+
void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Exception {
444+
TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 1);
445+
446+
// create test context with custom snapshot store
447+
TestSnapshotContext testKvSnapshotContext =
448+
new TestSnapshotContext(snapshotKvTabletDir.getPath());
449+
ManuallyTriggeredScheduledExecutorService scheduledExecutorService =
450+
testKvSnapshotContext.scheduledExecutorService;
451+
TestingCompletedKvSnapshotCommitter kvSnapshotStore =
452+
testKvSnapshotContext.testKvSnapshotStore;
453+
454+
// create a replica and make it leader
455+
Replica kvReplica =
456+
makeKvReplica(DATA1_PHYSICAL_TABLE_PATH_PK, tableBucket, testKvSnapshotContext);
457+
makeKvReplicaAsLeader(kvReplica);
458+
459+
// put initial data and create first snapshot
460+
KvRecordBatch kvRecords =
461+
genKvRecordBatch(
462+
Tuple2.of("k1", new Object[] {1, "a"}),
463+
Tuple2.of("k2", new Object[] {2, "b"}));
464+
putRecordsToLeader(kvReplica, kvRecords);
465+
466+
// trigger first snapshot
467+
scheduledExecutorService.triggerNonPeriodicScheduledTask();
468+
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
469+
470+
// put more data and create second snapshot
471+
kvRecords =
472+
genKvRecordBatch(
473+
Tuple2.of("k1", new Object[] {3, "c"}),
474+
Tuple2.of("k3", new Object[] {4, "d"}));
475+
putRecordsToLeader(kvReplica, kvRecords);
476+
477+
// trigger second snapshot
478+
scheduledExecutorService.triggerNonPeriodicScheduledTask();
479+
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
480+
481+
// put more data and create third snapshot (this will be the broken one)
482+
kvRecords =
483+
genKvRecordBatch(
484+
Tuple2.of("k4", new Object[] {5, "e"}),
485+
Tuple2.of("k5", new Object[] {6, "f"}));
486+
putRecordsToLeader(kvReplica, kvRecords);
487+
488+
// trigger third snapshot
489+
scheduledExecutorService.triggerNonPeriodicScheduledTask();
490+
CompletedSnapshot snapshot2 = kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
491+
492+
// verify that snapshot2 is the latest one before we break it
493+
assertThat(kvSnapshotStore.getLatestCompletedSnapshot(tableBucket).getSnapshotID())
494+
.isEqualTo(2);
495+
496+
// now simulate the latest snapshot (snapshot2) being broken by
497+
// deleting its metadata files and unshared SST files
498+
// This simulates file corruption while ZK metadata remains intact
499+
snapshot2.getKvSnapshotHandle().discard();
500+
501+
// ZK metadata should still show snapshot2 as latest (file corruption hasn't been detected
502+
// yet)
503+
assertThat(kvSnapshotStore.getLatestCompletedSnapshot(tableBucket).getSnapshotID())
504+
.isEqualTo(2);
505+
506+
// make the replica follower to destroy the current kv tablet
507+
makeKvReplicaAsFollower(kvReplica, 1);
508+
509+
// create a new replica with the same snapshot context
510+
// During initialization, it will try to use snapshot2 but find it broken,
511+
// then handle the broken snapshot and fall back to snapshot1
512+
testKvSnapshotContext =
513+
new TestSnapshotContext(snapshotKvTabletDir.getPath(), kvSnapshotStore);
514+
kvReplica = makeKvReplica(DATA1_PHYSICAL_TABLE_PATH_PK, tableBucket, testKvSnapshotContext);
515+
516+
// make it leader again - this should trigger the broken snapshot recovery logic
517+
// The system should detect that snapshot2 files are missing, clean up its metadata,
518+
// and successfully recover using snapshot1
519+
makeKvReplicaAsLeader(kvReplica, 2);
520+
521+
// verify that KvTablet is successfully initialized despite the broken snapshot
522+
assertThat(kvReplica.getKvTablet()).isNotNull();
523+
KvTablet kvTablet = kvReplica.getKvTablet();
524+
525+
// verify that the data from snapshot1 is restored (snapshot2 was broken and cleaned up)
526+
// snapshot1 should contain: k1->3,c and k3->4,d
527+
List<Tuple2<byte[], byte[]>> expectedKeyValues =
528+
getKeyValuePairs(
529+
genKvRecords(
530+
Tuple2.of("k1", new Object[] {3, "c"}),
531+
Tuple2.of("k3", new Object[] {4, "d"})));
532+
verifyGetKeyValues(kvTablet, expectedKeyValues);
533+
534+
// Verify the core functionality: KvTablet successfully initialized despite broken snapshot
535+
// The key test is that the system can handle broken snapshots and recover correctly
536+
537+
// Verify that we successfully simulated the broken snapshot condition
538+
File metadataFile = new File(snapshot2.getMetadataFilePath().getPath());
539+
assertThat(metadataFile.exists()).isFalse();
540+
}
541+
442542
@Test
443543
void testRestore(@TempDir Path snapshotKvTabletDirPath) throws Exception {
444544
TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 1);

fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,5 +640,13 @@ public int maxFetchLogSizeInRecoverKv() {
640640
private void unchecked(ThrowingRunnable<?> throwingRunnable) {
641641
ThrowingRunnable.unchecked(throwingRunnable).run();
642642
}
643+
644+
@Override
645+
public void handleSnapshotBroken(CompletedSnapshot snapshot) throws Exception {
646+
// Remove the broken snapshot from the snapshot store (simulating ZK metadata removal)
647+
testKvSnapshotStore.removeSnapshot(snapshot.getTableBucket(), snapshot.getSnapshotID());
648+
// Discard the snapshot files async (similar to DefaultSnapshotContext implementation)
649+
snapshot.discardAsync(executorService);
650+
}
643651
}
644652
}

0 commit comments

Comments
 (0)