5656import org .apache .fluss .server .coordinator .event .EventProcessor ;
5757import org .apache .fluss .server .coordinator .event .FencedCoordinatorEvent ;
5858import org .apache .fluss .server .coordinator .event .NewTabletServerEvent ;
59+ import org .apache .fluss .server .coordinator .event .NotifyKvSnapshotOffsetEvent ;
5960import org .apache .fluss .server .coordinator .event .NotifyLeaderAndIsrResponseReceivedEvent ;
6061import org .apache .fluss .server .coordinator .event .watcher .TableChangeWatcher ;
6162import org .apache .fluss .server .coordinator .event .watcher .TabletServerChangeWatcher ;
@@ -117,6 +118,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
117118 private static final Logger LOG = LoggerFactory .getLogger (CoordinatorEventProcessor .class );
118119
119120 private final ZooKeeperClient zooKeeperClient ;
121+ private final ExecutorService ioExecutor ;
120122 private final CoordinatorContext coordinatorContext ;
121123 private final ReplicaStateMachine replicaStateMachine ;
122124 private final TableBucketStateMachine tableBucketStateMachine ;
@@ -190,6 +192,7 @@ public CoordinatorEventProcessor(
190192 this .lakeTableTieringManager = lakeTableTieringManager ;
191193 this .coordinatorMetricGroup = coordinatorMetricGroup ;
192194 this .internalListenerName = conf .getString (ConfigOptions .INTERNAL_LISTENER_NAME );
195+ this .ioExecutor = ioExecutor ;
193196 }
194197
195198 public CoordinatorEventManager getCoordinatorEventManager () {
@@ -455,9 +458,10 @@ public void process(CoordinatorEvent event) {
455458 adjustIsrReceivedEvent .getLeaderAndIsrMap ())));
456459 } else if (event instanceof CommitKvSnapshotEvent ) {
457460 CommitKvSnapshotEvent commitKvSnapshotEvent = (CommitKvSnapshotEvent ) event ;
458- CompletableFuture <CommitKvSnapshotResponse > callback =
459- commitKvSnapshotEvent .getRespCallback ();
460- completeFromCallable (callback , () -> tryProcessCommitKvSnapshot (commitKvSnapshotEvent ));
461+ tryProcessCommitKvSnapshot (
462+ commitKvSnapshotEvent , commitKvSnapshotEvent .getRespCallback ());
463+ } else if (event instanceof NotifyKvSnapshotOffsetEvent ) {
464+ processNotifyKvSnapshotOffsetEvent ((NotifyKvSnapshotOffsetEvent ) event );
461465 } else if (event instanceof CommitRemoteLogManifestEvent ) {
462466 CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
463467 (CommitRemoteLogManifestEvent ) event ;
@@ -936,21 +940,40 @@ private void validateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr newLeade
936940 }
937941 }
938942
939- private CommitKvSnapshotResponse tryProcessCommitKvSnapshot (CommitKvSnapshotEvent event )
940- throws Exception {
943+ private void tryProcessCommitKvSnapshot (
944+ CommitKvSnapshotEvent event , CompletableFuture < CommitKvSnapshotResponse > callback ) {
941945 // validate
942- validateFencedEvent (event );
946+ try {
947+ validateFencedEvent (event );
948+ } catch (Exception e ) {
949+ callback .completeExceptionally (e );
950+ return ;
951+ }
952+ // commit the kv snapshot asynchronously
953+ ioExecutor .execute (
954+ () -> {
955+ try {
956+ TableBucket tb = event .getTableBucket ();
957+ CompletedSnapshot completedSnapshot =
958+ event .getAddCompletedSnapshotData ().getCompletedSnapshot ();
959+ // add completed snapshot
960+ CompletedSnapshotStore completedSnapshotStore =
961+ completedSnapshotStoreManager .getOrCreateCompletedSnapshotStore (tb );
962+ // this involves IO operation (ZK), so we do it in ioExecutor
963+ completedSnapshotStore .add (completedSnapshot );
964+ coordinatorEventManager .put (
965+ new NotifyKvSnapshotOffsetEvent (
966+ tb , completedSnapshot .getLogOffset ()));
967+ callback .complete (new CommitKvSnapshotResponse ());
968+ } catch (Exception e ) {
969+ callback .completeExceptionally (e );
970+ }
971+ });
972+ }
943973
974+ private void processNotifyKvSnapshotOffsetEvent (NotifyKvSnapshotOffsetEvent event ) {
944975 TableBucket tb = event .getTableBucket ();
945- CompletedSnapshot completedSnapshot =
946- event .getAddCompletedSnapshotData ().getCompletedSnapshot ();
947- // add completed snapshot
948- CompletedSnapshotStore completedSnapshotStore =
949- completedSnapshotStoreManager .getOrCreateCompletedSnapshotStore (tb );
950- completedSnapshotStore .add (completedSnapshot );
951-
952- // send notify snapshot request to all replicas.
953- // TODO: this should be moved after sending AddCompletedSnapshotResponse
976+ long logOffset = event .getLogOffset ();
954977 coordinatorRequestBatch .newBatch ();
955978 coordinatorContext
956979 .getBucketLeaderAndIsr (tb )
@@ -961,10 +984,9 @@ private CommitKvSnapshotResponse tryProcessCommitKvSnapshot(CommitKvSnapshotEven
961984 coordinatorContext .getFollowers (
962985 tb , leaderAndIsr .leader ()),
963986 tb ,
964- completedSnapshot . getLogOffset () ));
987+ logOffset ));
965988 coordinatorRequestBatch .sendNotifyKvSnapshotOffsetRequest (
966989 coordinatorContext .getCoordinatorEpoch ());
967- return new CommitKvSnapshotResponse ();
968990 }
969991
970992 private CommitRemoteLogManifestResponse tryProcessCommitRemoteLogManifest (
0 commit comments