2323import org .apache .fluss .fs .FsPath ;
2424import org .apache .fluss .metadata .TableBucket ;
2525import org .apache .fluss .server .SequenceIDCounter ;
26+ import org .apache .fluss .server .zk .ZooKeeperClient ;
27+ import org .apache .fluss .server .zk .data .BucketSnapshot ;
2628import org .apache .fluss .utils .CloseableRegistry ;
2729import org .apache .fluss .utils .ExceptionUtils ;
2830import org .apache .fluss .utils .FlussPaths ;
2931
3032import org .slf4j .Logger ;
3133import org .slf4j .LoggerFactory ;
3234
35+ import javax .annotation .Nonnull ;
3336import javax .annotation .concurrent .NotThreadSafe ;
3437
3538import java .io .IOException ;
@@ -55,6 +58,8 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT
5558
5659 private final CompletedKvSnapshotCommitter completedKvSnapshotCommitter ;
5760
61+ private final ZooKeeperClient zooKeeperClient ;
62+
5863 private final RocksIncrementalSnapshot rocksIncrementalSnapshot ;
5964 private final FsPath remoteKvTabletDir ;
6065 private final FsPath remoteSnapshotSharedDir ;
@@ -82,6 +87,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT
8287 KvTabletSnapshotTarget (
8388 TableBucket tableBucket ,
8489 CompletedKvSnapshotCommitter completedKvSnapshotCommitter ,
90+ ZooKeeperClient zooKeeperClient ,
8591 RocksIncrementalSnapshot rocksIncrementalSnapshot ,
8692 FsPath remoteKvTabletDir ,
8793 Executor ioExecutor ,
@@ -97,6 +103,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT
97103 this (
98104 tableBucket ,
99105 completedKvSnapshotCommitter ,
106+ zooKeeperClient ,
100107 rocksIncrementalSnapshot ,
101108 remoteKvTabletDir ,
102109 (int ) ConfigOptions .REMOTE_FS_WRITE_BUFFER_SIZE .defaultValue ().getBytes (),
@@ -114,6 +121,7 @@ public class KvTabletSnapshotTarget implements PeriodicSnapshotManager.SnapshotT
114121 public KvTabletSnapshotTarget (
115122 TableBucket tableBucket ,
116123 CompletedKvSnapshotCommitter completedKvSnapshotCommitter ,
124+ @ Nonnull ZooKeeperClient zooKeeperClient ,
117125 RocksIncrementalSnapshot rocksIncrementalSnapshot ,
118126 FsPath remoteKvTabletDir ,
119127 int snapshotWriteBufferSize ,
@@ -129,6 +137,7 @@ public KvTabletSnapshotTarget(
129137 throws IOException {
130138 this .tableBucket = tableBucket ;
131139 this .completedKvSnapshotCommitter = completedKvSnapshotCommitter ;
140+ this .zooKeeperClient = zooKeeperClient ;
132141 this .rocksIncrementalSnapshot = rocksIncrementalSnapshot ;
133142 this .remoteKvTabletDir = remoteKvTabletDir ;
134143 this .remoteSnapshotSharedDir = FlussPaths .remoteKvSharedDir (remoteKvTabletDir );
@@ -211,18 +220,13 @@ public void handleSnapshotResult(
211220 // commit the completed snapshot
212221 completedKvSnapshotCommitter .commitKvSnapshot (
213222 completedSnapshot , coordinatorEpoch , bucketLeaderEpoch );
214- // notify the snapshot complete
215- rocksIncrementalSnapshot .notifySnapshotComplete (snapshotId );
216- logOffsetOfLatestSnapshot = snapshotResult .getLogOffset ();
217- snapshotSize = snapshotResult .getSnapshotSize ();
218- // update LogTablet to notify the lowest offset that should be retained
219- updateMinRetainOffset .accept (snapshotResult .getLogOffset ());
223+ // update local state after successful commit
224+ updateStateOnCommitSuccess (snapshotId , snapshotResult );
220225 } catch (Exception e ) {
221226 Throwable t = ExceptionUtils .stripExecutionException (e );
222- snapshotsCleaner .cleanSnapshot (completedSnapshot , () -> {}, ioExecutor );
223- handleSnapshotFailure (snapshotId , snapshotLocation , t );
224- // throw the exception to make PeriodicSnapshotManager can catch the exception
225- throw t ;
227+ // handle the exception with idempotent check
228+ handleSnapshotCommitException (
229+ snapshotId , snapshotResult , completedSnapshot , snapshotLocation , t );
226230 }
227231 }
228232
@@ -249,6 +253,87 @@ protected RocksIncrementalSnapshot getRocksIncrementalSnapshot() {
249253 return rocksIncrementalSnapshot ;
250254 }
251255
256+ /**
257+ * Update local state after successful snapshot completion. This includes notifying RocksDB
258+ * about completion, updating latest snapshot offset/size, and notifying LogTablet about the
259+ * minimum offset to retain.
260+ */
261+ private void updateStateOnCommitSuccess (long snapshotId , SnapshotResult snapshotResult ) {
262+ // notify the snapshot complete
263+ rocksIncrementalSnapshot .notifySnapshotComplete (snapshotId );
264+ logOffsetOfLatestSnapshot = snapshotResult .getLogOffset ();
265+ snapshotSize = snapshotResult .getSnapshotSize ();
266+ // update LogTablet to notify the lowest offset that should be retained
267+ updateMinRetainOffset .accept (snapshotResult .getLogOffset ());
268+ }
269+
270+ /**
271+ * Handle snapshot commit exception with idempotent check. This method implements the fix for
272+ * issue #1304 by double-checking ZooKeeper to verify if the snapshot actually exists before
273+ * cleanup.
274+ */
275+ private void handleSnapshotCommitException (
276+ long snapshotId ,
277+ SnapshotResult snapshotResult ,
278+ CompletedSnapshot completedSnapshot ,
279+ SnapshotLocation snapshotLocation ,
280+ Throwable t )
281+ throws Throwable {
282+
283+ // Fix for issue: https://github.com/apache/fluss/issues/1304
284+ // Tablet server try to commit kv snapshot to coordinator server,
285+ // coordinator server commit the kv snapshot to zk, then failover.
286+ // Tablet server will got exception from coordinator server, but mistake it as a fail
287+ // commit although coordinator server has committed to zk, then discard the commited kv
288+ // snapshot.
289+ //
290+ // Idempotent check: Double check ZK to verify if the snapshot actually exists before
291+ // cleanup
292+ try {
293+ Optional <BucketSnapshot > zkSnapshot =
294+ zooKeeperClient .getTableBucketSnapshot (tableBucket , snapshotId );
295+ if (zkSnapshot .isPresent ()) {
296+ // Snapshot exists in ZK, indicating the commit was actually successful,
297+ // just response was lost
298+ LOG .warn (
299+ "Snapshot {} for TableBucket {} already exists in ZK. "
300+ + "The commit was successful but response was lost due to coordinator failover. "
301+ + "Skipping cleanup and treating as successful." ,
302+ snapshotId ,
303+ tableBucket );
304+
305+ // Update local state as if the commit was successful
306+ updateStateOnCommitSuccess (snapshotId , snapshotResult );
307+ return ; // Snapshot commit succeeded, return directly
308+ } else {
309+ // Snapshot does not exist in ZK, indicating the commit truly failed
310+ LOG .warn (
311+ "Snapshot {} for TableBucket {} does not exist in ZK. "
312+ + "The commit truly failed, proceeding with cleanup." ,
313+ snapshotId ,
314+ tableBucket );
315+ snapshotsCleaner .cleanSnapshot (completedSnapshot , () -> {}, ioExecutor );
316+ handleSnapshotFailure (snapshotId , snapshotLocation , t );
317+ }
318+ } catch (Exception zkException ) {
319+ LOG .warn (
320+ "Failed to query ZK for snapshot {} of TableBucket {}. "
321+ + "Cannot determine actual snapshot status, keeping snapshot in current state "
322+ + "to avoid potential data loss." ,
323+ snapshotId ,
324+ tableBucket ,
325+ zkException );
326+ // When ZK query fails, we cannot determine the actual status.
327+ // The snapshot might have succeeded or failed on the ZK side.
328+ // Therefore, we must not clean up the snapshot files and not update local state.
329+ // This avoids the risk of discarding a successfully committed snapshot that
330+ // connectors may already be reading, which would cause data loss or job failure.
331+ }
332+
333+ // throw the exception to make PeriodicSnapshotManager can catch the exception
334+ throw t ;
335+ }
336+
252337 private SnapshotRunner createSnapshotRunner (CloseableRegistry cancelStreamRegistry ) {
253338 return new SnapshotRunner (rocksIncrementalSnapshot , cancelStreamRegistry );
254339 }
0 commit comments