Skip to content

Commit 0451cbf

Browse files
committed
move prepare commit to coordinator gateway
1 parent c29d0f6 commit 0451cbf

File tree

9 files changed

+87
-118
lines changed

9 files changed

+87
-118
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.fluss.metrics.registry.MetricRegistry;
2727
import org.apache.fluss.rpc.GatewayClientProxy;
2828
import org.apache.fluss.rpc.RpcClient;
29-
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
3029
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
3130
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
3231
import org.apache.fluss.rpc.messages.PbBucketOffset;
@@ -69,7 +68,6 @@ public class FlussTableLakeSnapshotCommitter implements AutoCloseable {
6968
private final Configuration flussConf;
7069

7170
private CoordinatorGateway coordinatorGateway;
72-
private AdminReadOnlyGateway readOnlyGateway;
7371
private RpcClient rpcClient;
7472

7573
public FlussTableLakeSnapshotCommitter(Configuration flussConf) {
@@ -87,12 +85,6 @@ public void open() {
8785
this.coordinatorGateway =
8886
GatewayClientProxy.createGatewayProxy(
8987
metadataUpdater::getCoordinatorServer, rpcClient, CoordinatorGateway.class);
90-
91-
this.readOnlyGateway =
92-
GatewayClientProxy.createGatewayProxy(
93-
metadataUpdater::getRandomTabletServer,
94-
rpcClient,
95-
AdminReadOnlyGateway.class);
9688
}
9789

9890
String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, Long> logEndOffsets)
@@ -102,7 +94,7 @@ String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, Long> l
10294
PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
10395
toPrepareCommitLakeTableSnapshotRequest(tableId, tablePath, logEndOffsets);
10496
PrepareCommitLakeTableSnapshotResponse prepareCommitLakeTableSnapshotResponse =
105-
readOnlyGateway
97+
coordinatorGateway
10698
.prepareCommitLakeTableSnapshot(prepareCommitLakeTableSnapshotRequest)
10799
.get();
108100
List<PbPrepareCommitLakeTableRespForTable> pbPrepareCommitLakeTableRespForTables =

fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@
4646
import org.apache.fluss.rpc.messages.ListTablesResponse;
4747
import org.apache.fluss.rpc.messages.MetadataRequest;
4848
import org.apache.fluss.rpc.messages.MetadataResponse;
49-
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotRequest;
50-
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotResponse;
5149
import org.apache.fluss.rpc.messages.TableExistsRequest;
5250
import org.apache.fluss.rpc.messages.TableExistsResponse;
5351
import org.apache.fluss.rpc.protocol.ApiKeys;
@@ -195,28 +193,4 @@ CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
195193
@RPC(api = ApiKeys.DESCRIBE_CLUSTER_CONFIGS)
196194
CompletableFuture<DescribeClusterConfigsResponse> describeClusterConfigs(
197195
DescribeClusterConfigsRequest request);
198-
199-
/**
200-
* Prepares to commit lake table snapshots by merging them with existing snapshots and storing
201-
* them to the file system.
202-
*
203-
* <p>This method is called during the two-phase commit process for lake table snapshots. It
204-
* performs the following operations for each table in the request:
205-
*
206-
* <ul>
207-
* <li>Merges the new snapshot with the previous latest snapshot (if exists) to ensure
208-
* completeness
209-
* <li>Stores the merged snapshot to the remote file system. The stored file contains the log
210-
* end offset information for each bucket in the table
211-
* <li>Returns the file path where the snapshot is stored
212-
* </ul>
213-
*
214-
* @param request the request containing lake table snapshot information for one or more tables
215-
* @return a future that completes with a response containing the file paths where snapshots
216-
* (containing bucket log end offset information) are stored, or error information for
217-
* tables that failed to process
218-
*/
219-
@RPC(api = ApiKeys.PRECOMMIT_LAKE_TABLE_SNAPSHOT)
220-
CompletableFuture<PrepareCommitLakeTableSnapshotResponse> prepareCommitLakeTableSnapshot(
221-
PrepareCommitLakeTableSnapshotRequest request);
222196
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
3131
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
3232
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse;
33+
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotRequest;
34+
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotResponse;
3335
import org.apache.fluss.rpc.protocol.ApiKeys;
3436
import org.apache.fluss.rpc.protocol.RPC;
3537

@@ -85,4 +87,28 @@ CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat(
8587
@RPC(api = ApiKeys.CONTROLLED_SHUTDOWN)
8688
CompletableFuture<ControlledShutdownResponse> controlledShutdown(
8789
ControlledShutdownRequest request);
90+
91+
/**
92+
* Prepares to commit lake table snapshots by merging them with existing snapshots and storing
93+
* them to the file system.
94+
*
95+
* <p>This method is called during the two-phase commit process for lake table snapshots. It
96+
* performs the following operations for each table in the request:
97+
*
98+
* <ul>
99+
* <li>Merges the new snapshot with the previous latest snapshot (if exists) to ensure
100+
* completeness
101+
* <li>Stores the merged snapshot to the remote file system. The stored file contains the log
102+
* end offset information for each bucket in the table
103+
* <li>Returns the file path where the snapshot is stored
104+
* </ul>
105+
*
106+
* @param request the request containing lake table snapshot information for one or more tables
107+
* @return a future that completes with a response containing the file paths where snapshots
108+
* (containing bucket log end offset information) are stored, or error information for
109+
* tables that failed to process
110+
*/
111+
@RPC(api = ApiKeys.PRECOMMIT_LAKE_TABLE_SNAPSHOT)
112+
CompletableFuture<PrepareCommitLakeTableSnapshotResponse> prepareCommitLakeTableSnapshot(
113+
PrepareCommitLakeTableSnapshotRequest request);
88114
}

fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@
6767
import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse;
6868
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6969
import org.apache.fluss.rpc.messages.PrefixLookupResponse;
70-
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotRequest;
71-
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotResponse;
7270
import org.apache.fluss.rpc.messages.ProduceLogRequest;
7371
import org.apache.fluss.rpc.messages.ProduceLogResponse;
7472
import org.apache.fluss.rpc.messages.PutKvRequest;
@@ -251,10 +249,4 @@ public CompletableFuture<DescribeClusterConfigsResponse> describeClusterConfigs(
251249
DescribeClusterConfigsRequest request) {
252250
return null;
253251
}
254-
255-
@Override
256-
public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> prepareCommitLakeTableSnapshot(
257-
PrepareCommitLakeTableSnapshotRequest request) {
258-
throw new UnsupportedOperationException();
259-
}
260252
}

fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
import org.apache.fluss.cluster.ServerNode;
2121
import org.apache.fluss.cluster.ServerType;
22-
import org.apache.fluss.config.ConfigOptions;
23-
import org.apache.fluss.config.Configuration;
2422
import org.apache.fluss.config.cluster.ConfigEntry;
2523
import org.apache.fluss.exception.FlussRuntimeException;
2624
import org.apache.fluss.exception.KvSnapshotNotExistException;
@@ -31,7 +29,6 @@
3129
import org.apache.fluss.exception.SecurityTokenException;
3230
import org.apache.fluss.exception.TableNotPartitionedException;
3331
import org.apache.fluss.fs.FileSystem;
34-
import org.apache.fluss.fs.FsPath;
3532
import org.apache.fluss.fs.token.ObtainedSecurityToken;
3633
import org.apache.fluss.metadata.DatabaseInfo;
3734
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -74,18 +71,12 @@
7471
import org.apache.fluss.rpc.messages.MetadataRequest;
7572
import org.apache.fluss.rpc.messages.MetadataResponse;
7673
import org.apache.fluss.rpc.messages.PbApiVersion;
77-
import org.apache.fluss.rpc.messages.PbPrepareCommitLakeTableRespForTable;
78-
import org.apache.fluss.rpc.messages.PbTableBucketOffsets;
7974
import org.apache.fluss.rpc.messages.PbTablePath;
80-
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotRequest;
81-
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotResponse;
8275
import org.apache.fluss.rpc.messages.TableExistsRequest;
8376
import org.apache.fluss.rpc.messages.TableExistsResponse;
8477
import org.apache.fluss.rpc.netty.server.Session;
85-
import org.apache.fluss.rpc.protocol.ApiError;
8678
import org.apache.fluss.rpc.protocol.ApiKeys;
8779
import org.apache.fluss.rpc.protocol.ApiManager;
88-
import org.apache.fluss.rpc.protocol.Errors;
8980
import org.apache.fluss.security.acl.AclBinding;
9081
import org.apache.fluss.security.acl.AclBindingFilter;
9182
import org.apache.fluss.security.acl.OperationType;
@@ -102,9 +93,7 @@
10293
import org.apache.fluss.server.utils.ServerRpcMessageUtils;
10394
import org.apache.fluss.server.zk.ZooKeeperClient;
10495
import org.apache.fluss.server.zk.data.BucketSnapshot;
105-
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
10696
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
107-
import org.apache.fluss.utils.json.TableBucketOffsets;
10897

10998
import org.slf4j.Logger;
11099
import org.slf4j.LoggerFactory;
@@ -133,7 +122,6 @@
133122
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toGetFileSystemSecurityTokenResponse;
134123
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toListPartitionInfosResponse;
135124
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toPbConfigEntries;
136-
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucketOffsets;
137125
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
138126
import static org.apache.fluss.utils.Preconditions.checkState;
139127

@@ -153,7 +141,6 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR
153141
protected final MetadataManager metadataManager;
154142
protected final @Nullable Authorizer authorizer;
155143
protected final DynamicConfigManager dynamicConfigManager;
156-
private final LakeTableHelper lakeTableHelper;
157144

158145
private long tokenLastUpdateTimeMs = 0;
159146
private ObtainedSecurityToken securityToken = null;
@@ -167,8 +154,7 @@ public RpcServiceBase(
167154
MetadataManager metadataManager,
168155
@Nullable Authorizer authorizer,
169156
DynamicConfigManager dynamicConfigManager,
170-
ExecutorService ioExecutor,
171-
Configuration config) {
157+
ExecutorService ioExecutor) {
172158
this.remoteFileSystem = remoteFileSystem;
173159
this.provider = provider;
174160
this.apiManager = new ApiManager(provider);
@@ -177,8 +163,6 @@ public RpcServiceBase(
177163
this.authorizer = authorizer;
178164
this.dynamicConfigManager = dynamicConfigManager;
179165
this.ioExecutor = ioExecutor;
180-
this.lakeTableHelper =
181-
new LakeTableHelper(zkClient, config.get(ConfigOptions.REMOTE_DATA_DIR));
182166
}
183167

184168
@Override
@@ -607,45 +591,4 @@ protected MetadataResponse processMetadataRequest(
607591
return buildMetadataResponse(
608592
coordinatorServer, aliveTabletServers, tablesMetadata, partitionsMetadata);
609593
}
610-
611-
@Override
612-
public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> prepareCommitLakeTableSnapshot(
613-
PrepareCommitLakeTableSnapshotRequest request) {
614-
CompletableFuture<PrepareCommitLakeTableSnapshotResponse> future =
615-
new CompletableFuture<>();
616-
ioExecutor.submit(
617-
() -> {
618-
PrepareCommitLakeTableSnapshotResponse response =
619-
new PrepareCommitLakeTableSnapshotResponse();
620-
try {
621-
for (PbTableBucketOffsets bucketOffsets : request.getBucketOffsetsList()) {
622-
PbPrepareCommitLakeTableRespForTable
623-
pbPrepareCommitLakeTableRespForTable =
624-
response.addPrepareCommitLakeTableResp();
625-
try {
626-
// upsert lake table snapshot, need to merge the snapshot with
627-
// previous latest snapshot
628-
TableBucketOffsets tableBucketOffsets =
629-
lakeTableHelper.upsertTableBucketOffsets(
630-
bucketOffsets.getTableId(),
631-
toTableBucketOffsets(bucketOffsets));
632-
TablePath tablePath = toTablePath(bucketOffsets.getTablePath());
633-
FsPath fsPath =
634-
lakeTableHelper.storeLakeTableBucketOffsets(
635-
tablePath, tableBucketOffsets);
636-
pbPrepareCommitLakeTableRespForTable.setLakeTableBucketOffsetsPath(
637-
fsPath.toString());
638-
} catch (Exception e) {
639-
Errors error = ApiError.fromThrowable(e).error();
640-
pbPrepareCommitLakeTableRespForTable.setError(
641-
error.code(), error.message());
642-
}
643-
}
644-
future.complete(response);
645-
} catch (Exception e) {
646-
future.completeExceptionally(e);
647-
}
648-
});
649-
return future;
650-
}
651594
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.exception.TableAlreadyExistException;
3434
import org.apache.fluss.exception.TableNotPartitionedException;
3535
import org.apache.fluss.fs.FileSystem;
36+
import org.apache.fluss.fs.FsPath;
3637
import org.apache.fluss.lake.lakestorage.LakeCatalog;
3738
import org.apache.fluss.metadata.DataLakeFormat;
3839
import org.apache.fluss.metadata.DatabaseDescriptor;
@@ -81,8 +82,13 @@
8182
import org.apache.fluss.rpc.messages.PbAlterConfig;
8283
import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable;
8384
import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable;
85+
import org.apache.fluss.rpc.messages.PbPrepareCommitLakeTableRespForTable;
86+
import org.apache.fluss.rpc.messages.PbTableBucketOffsets;
87+
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotRequest;
88+
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotResponse;
8489
import org.apache.fluss.rpc.netty.server.Session;
8590
import org.apache.fluss.rpc.protocol.ApiError;
91+
import org.apache.fluss.rpc.protocol.Errors;
8692
import org.apache.fluss.security.acl.AclBinding;
8793
import org.apache.fluss.security.acl.AclBindingFilter;
8894
import org.apache.fluss.security.acl.FlussPrincipal;
@@ -112,8 +118,10 @@
112118
import org.apache.fluss.server.zk.data.PartitionAssignment;
113119
import org.apache.fluss.server.zk.data.TableAssignment;
114120
import org.apache.fluss.server.zk.data.TableRegistration;
121+
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
115122
import org.apache.fluss.utils.IOUtils;
116123
import org.apache.fluss.utils.concurrent.FutureUtils;
124+
import org.apache.fluss.utils.json.TableBucketOffsets;
117125

118126
import javax.annotation.Nullable;
119127

@@ -138,6 +146,7 @@
138146
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeDropAclsResponse;
139147
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toAlterTableConfigChanges;
140148
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toAlterTableSchemaChanges;
149+
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucketOffsets;
141150
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
142151
import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
143152
import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
@@ -156,6 +165,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina
156165

157166
private final LakeTableTieringManager lakeTableTieringManager;
158167
private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader;
168+
private final ExecutorService ioExecutor;
169+
private final LakeTableHelper lakeTableHelper;
159170

160171
public CoordinatorService(
161172
Configuration conf,
@@ -176,8 +187,7 @@ public CoordinatorService(
176187
metadataManager,
177188
authorizer,
178189
dynamicConfigManager,
179-
ioExecutor,
180-
conf);
190+
ioExecutor);
181191
this.defaultBucketNumber = conf.getInt(ConfigOptions.DEFAULT_BUCKET_NUMBER);
182192
this.defaultReplicationFactor = conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR);
183193
this.logTableAllowCreation = conf.getBoolean(ConfigOptions.LOG_TABLE_ALLOW_CREATION);
@@ -189,6 +199,9 @@ public CoordinatorService(
189199
this.lakeTableTieringManager = lakeTableTieringManager;
190200
this.metadataCache = metadataCache;
191201
this.lakeCatalogDynamicLoader = lakeCatalogDynamicLoader;
202+
this.ioExecutor = ioExecutor;
203+
this.lakeTableHelper =
204+
new LakeTableHelper(zkClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR));
192205
}
193206

194207
@Override
@@ -695,6 +708,47 @@ public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
695708
return response;
696709
}
697710

711+
@Override
712+
public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> prepareCommitLakeTableSnapshot(
713+
PrepareCommitLakeTableSnapshotRequest request) {
714+
CompletableFuture<PrepareCommitLakeTableSnapshotResponse> future =
715+
new CompletableFuture<>();
716+
ioExecutor.submit(
717+
() -> {
718+
PrepareCommitLakeTableSnapshotResponse response =
719+
new PrepareCommitLakeTableSnapshotResponse();
720+
try {
721+
for (PbTableBucketOffsets bucketOffsets : request.getBucketOffsetsList()) {
722+
PbPrepareCommitLakeTableRespForTable
723+
pbPrepareCommitLakeTableRespForTable =
724+
response.addPrepareCommitLakeTableResp();
725+
try {
726+
// upsert lake table snapshot, need to merge the snapshot with
727+
// previous latest snapshot
728+
TableBucketOffsets tableBucketOffsets =
729+
lakeTableHelper.upsertTableBucketOffsets(
730+
bucketOffsets.getTableId(),
731+
toTableBucketOffsets(bucketOffsets));
732+
TablePath tablePath = toTablePath(bucketOffsets.getTablePath());
733+
FsPath fsPath =
734+
lakeTableHelper.storeLakeTableBucketOffsets(
735+
tablePath, tableBucketOffsets);
736+
pbPrepareCommitLakeTableRespForTable.setLakeTableBucketOffsetsPath(
737+
fsPath.toString());
738+
} catch (Exception e) {
739+
Errors error = ApiError.fromThrowable(e).error();
740+
pbPrepareCommitLakeTableRespForTable.setError(
741+
error.code(), error.message());
742+
}
743+
}
744+
future.complete(response);
745+
} catch (Exception e) {
746+
future.completeExceptionally(e);
747+
}
748+
});
749+
return future;
750+
}
751+
698752
@Override
699753
public CompletableFuture<AlterClusterConfigsResponse> alterClusterConfigs(
700754
AlterClusterConfigsRequest request) {

fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,7 @@ protected void startServices() throws Exception {
286286
metadataManager,
287287
authorizer,
288288
dynamicConfigManager,
289-
ioExecutor,
290-
conf);
289+
ioExecutor);
291290

292291
RequestsMetrics requestsMetrics =
293292
RequestsMetrics.createTabletServerRequestMetrics(tabletServerMetricGroup);

0 commit comments

Comments
 (0)