6262import org .apache .fluss .server .coordinator .event .FencedCoordinatorEvent ;
6363import org .apache .fluss .server .coordinator .event .NewTabletServerEvent ;
6464import org .apache .fluss .server .coordinator .event .NotifyKvSnapshotOffsetEvent ;
65+ import org .apache .fluss .server .coordinator .event .NotifyLakeTableOffsetEvent ;
6566import org .apache .fluss .server .coordinator .event .NotifyLeaderAndIsrResponseReceivedEvent ;
6667import org .apache .fluss .server .coordinator .event .SchemaChangeEvent ;
6768import org .apache .fluss .server .coordinator .event .watcher .TableChangeWatcher ;
@@ -529,6 +530,8 @@ public void process(CoordinatorEvent event) {
529530 commitKvSnapshotEvent , commitKvSnapshotEvent .getRespCallback ());
530531 } else if (event instanceof NotifyKvSnapshotOffsetEvent ) {
531532 processNotifyKvSnapshotOffsetEvent ((NotifyKvSnapshotOffsetEvent ) event );
533+ } else if (event instanceof NotifyLakeTableOffsetEvent ) {
534+ processNotifyLakeTableOffsetEvent ((NotifyLakeTableOffsetEvent ) event );
532535 } else if (event instanceof CommitRemoteLogManifestEvent ) {
533536 CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
534537 (CommitRemoteLogManifestEvent ) event ;
@@ -538,9 +541,8 @@ public void process(CoordinatorEvent event) {
538541 } else if (event instanceof CommitLakeTableSnapshotEvent ) {
539542 CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent =
540543 (CommitLakeTableSnapshotEvent ) event ;
541- completeFromCallable (
542- commitLakeTableSnapshotEvent .getRespCallback (),
543- () -> tryProcessCommitLakeTableSnapshot (commitLakeTableSnapshotEvent ));
544+ tryProcessCommitLakeTableSnapshot (
545+ commitLakeTableSnapshotEvent , commitLakeTableSnapshotEvent .getRespCallback ());
544546 } else if (event instanceof ControlledShutdownEvent ) {
545547 ControlledShutdownEvent controlledShutdownEvent = (ControlledShutdownEvent ) event ;
546548 completeFromCallable (
@@ -1140,6 +1142,30 @@ private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent even
11401142 coordinatorContext .getCoordinatorEpoch ());
11411143 }
11421144
1145+ private void processNotifyLakeTableOffsetEvent (NotifyLakeTableOffsetEvent event ) {
1146+ Map <Long , LakeTableSnapshot > lakeTableSnapshots = event .getLakeTableSnapshots ();
1147+ coordinatorRequestBatch .newBatch ();
1148+ for (Map .Entry <Long , LakeTableSnapshot > lakeTableSnapshotEntry :
1149+ lakeTableSnapshots .entrySet ()) {
1150+ LakeTableSnapshot lakeTableSnapshot = lakeTableSnapshotEntry .getValue ();
1151+ for (Map .Entry <TableBucket , Long > bucketLogEndOffsetEntry :
1152+ lakeTableSnapshot .getBucketLogEndOffset ().entrySet ()) {
1153+ TableBucket tb = bucketLogEndOffsetEntry .getKey ();
1154+ coordinatorContext
1155+ .getBucketLeaderAndIsr (bucketLogEndOffsetEntry .getKey ())
1156+ .ifPresent (
1157+ leaderAndIsr ->
1158+ coordinatorRequestBatch
1159+ .addNotifyLakeTableOffsetRequestForTableServers (
1160+ coordinatorContext .getAssignment (tb ),
1161+ tb ,
1162+ lakeTableSnapshot ));
1163+ }
1164+ }
1165+ coordinatorRequestBatch .sendNotifyLakeTableOffsetRequest (
1166+ coordinatorContext .getCoordinatorEpoch ());
1167+ }
1168+
11431169 private CommitRemoteLogManifestResponse tryProcessCommitRemoteLogManifest (
11441170 CommitRemoteLogManifestEvent event ) {
11451171 CommitRemoteLogManifestData manifestData = event .getCommitRemoteLogManifestData ();
@@ -1189,56 +1215,52 @@ private <T> void processAccessContext(AccessContextEvent<T> event) {
11891215 }
11901216 }
11911217
1192- private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot (
1193- CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent ) {
1218+ private void tryProcessCommitLakeTableSnapshot (
1219+ CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent ,
1220+ CompletableFuture <CommitLakeTableSnapshotResponse > callback ) {
1221+ // commit the lake table snapshot asynchronously
11941222 CommitLakeTableSnapshotData commitLakeTableSnapshotData =
11951223 commitLakeTableSnapshotEvent .getCommitLakeTableSnapshotData ();
1196- CommitLakeTableSnapshotResponse response = new CommitLakeTableSnapshotResponse ();
11971224 Map <Long , LakeTableSnapshot > lakeTableSnapshots =
11981225 commitLakeTableSnapshotData .getLakeTableSnapshot ();
1199- for (Map .Entry <Long , LakeTableSnapshot > lakeTableSnapshotEntry :
1200- lakeTableSnapshots .entrySet ()) {
1201- Long tableId = lakeTableSnapshotEntry .getKey ();
1202-
1203- PbCommitLakeTableSnapshotRespForTable tableResp = response .addTableResp ();
1204- tableResp .setTableId (tableId );
1205-
1206- try {
1207- TablePath tablePath = coordinatorContext .getTablePathById (tableId );
1208- if (tablePath == null ) {
1209- throw new RuntimeException (
1210- String .format ("Failed to find table path for table id: %d" , tableId ));
1211- }
1212- zooKeeperClient .upsertLakeTableSnapshot (
1213- tableId , tablePath , lakeTableSnapshotEntry .getValue ());
1214- } catch (Exception e ) {
1215- ApiError error = ApiError .fromThrowable (e );
1216- tableResp .setError (error .error ().code (), error .message ());
1217- }
1218- }
1226+ ioExecutor .execute (
1227+ () -> {
1228+ try {
1229+ CommitLakeTableSnapshotResponse response =
1230+ new CommitLakeTableSnapshotResponse ();
1231+ for (Map .Entry <Long , LakeTableSnapshot > lakeTableSnapshotEntry :
1232+ lakeTableSnapshots .entrySet ()) {
1233+ Long tableId = lakeTableSnapshotEntry .getKey ();
1234+
1235+ PbCommitLakeTableSnapshotRespForTable tableResp =
1236+ response .addTableResp ();
1237+ tableResp .setTableId (tableId );
1238+
1239+ try {
1240+ TablePath tablePath = coordinatorContext .getTablePathById (tableId );
1241+ if (tablePath == null ) {
1242+ throw new RuntimeException (
1243+ String .format (
1244+ "Failed to find table path for table id: %d" ,
1245+ tableId ));
1246+ }
1247+ // this involves IO operation (ZK), so we do it in ioExecutor
1248+ zooKeeperClient .upsertLakeTableSnapshot (
1249+ tableId , tablePath , lakeTableSnapshotEntry .getValue ());
1250+ } catch (Exception e ) {
1251+ ApiError error = ApiError .fromThrowable (e );
1252+ tableResp .setError (error .error ().code (), error .message ());
1253+ }
1254+ }
12191255
1220- // send notify lakehouse data request to all replicas.
1221- coordinatorRequestBatch .newBatch ();
1222- for (Map .Entry <Long , LakeTableSnapshot > lakeTableSnapshotEntry :
1223- lakeTableSnapshots .entrySet ()) {
1224- LakeTableSnapshot lakeTableSnapshot = lakeTableSnapshotEntry .getValue ();
1225- for (Map .Entry <TableBucket , Long > bucketLogEndOffsetEntry :
1226- lakeTableSnapshot .getBucketLogEndOffset ().entrySet ()) {
1227- TableBucket tb = bucketLogEndOffsetEntry .getKey ();
1228- coordinatorContext
1229- .getBucketLeaderAndIsr (bucketLogEndOffsetEntry .getKey ())
1230- .ifPresent (
1231- leaderAndIsr ->
1232- coordinatorRequestBatch
1233- .addNotifyLakeTableOffsetRequestForTableServers (
1234- coordinatorContext .getAssignment (tb ),
1235- tb ,
1236- lakeTableSnapshot ));
1237- }
1238- }
1239- coordinatorRequestBatch .sendNotifyLakeTableOffsetRequest (
1240- coordinatorContext .getCoordinatorEpoch ());
1241- return response ;
1256+ // send notify lakehouse data request to all replicas via coordinator event
1257+ coordinatorEventManager .put (
1258+ new NotifyLakeTableOffsetEvent (lakeTableSnapshots ));
1259+ callback .complete (response );
1260+ } catch (Exception e ) {
1261+ callback .completeExceptionally (e );
1262+ }
1263+ });
12421264 }
12431265
12441266 private ControlledShutdownResponse tryProcessControlledShutdown (
0 commit comments