2121import org .apache .fluss .config .ConfigOptions ;
2222import org .apache .fluss .config .Configuration ;
2323import org .apache .fluss .metadata .TableBucket ;
24+ import org .apache .fluss .metadata .TablePath ;
2425import org .apache .fluss .metrics .registry .MetricRegistry ;
2526import org .apache .fluss .rpc .GatewayClientProxy ;
2627import org .apache .fluss .rpc .RpcClient ;
2728import org .apache .fluss .rpc .gateway .CoordinatorGateway ;
2829import org .apache .fluss .rpc .messages .CommitLakeTableSnapshotRequest ;
2930import org .apache .fluss .rpc .messages .PbLakeTableOffsetForBucket ;
3031import org .apache .fluss .rpc .messages .PbLakeTableSnapshotInfo ;
32+ import org .apache .fluss .rpc .messages .PbLakeTableSnapshotMetadata ;
33+ import org .apache .fluss .rpc .messages .PbPrepareCommitLakeTableRespForTable ;
34+ import org .apache .fluss .rpc .messages .PrepareCommitLakeTableSnapshotRequest ;
35+ import org .apache .fluss .rpc .messages .PrepareCommitLakeTableSnapshotResponse ;
3136import org .apache .fluss .rpc .metrics .ClientMetricGroup ;
37+ import org .apache .fluss .rpc .protocol .ApiError ;
3238import org .apache .fluss .utils .ExceptionUtils ;
3339
3440import java .io .IOException ;
41+ import java .util .List ;
3542import java .util .Map ;
3643
44+ import static org .apache .fluss .utils .Preconditions .checkState ;
45+
3746/** Committer to commit {@link FlussTableLakeSnapshot} of lake to Fluss. */
3847public class FlussTableLakeSnapshotCommitter implements AutoCloseable {
3948
@@ -59,50 +68,87 @@ public void open() {
5968 metadataUpdater ::getCoordinatorServer , rpcClient , CoordinatorGateway .class );
6069 }
6170
62- void commit (FlussTableLakeSnapshot flussTableLakeSnapshot ) throws IOException {
71+ String prepareCommit (long tableId , TablePath tablePath , Map <TableBucket , Long > logEndOffsets )
72+ throws IOException {
73+ PbPrepareCommitLakeTableRespForTable prepareCommitResp ;
6374 try {
64- CommitLakeTableSnapshotRequest request =
65- toCommitLakeTableSnapshotRequest (flussTableLakeSnapshot );
66- coordinatorGateway .commitLakeTableSnapshot (request ).get ();
75+ PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
76+ toPrepareCommitLakeTableSnapshotRequest (tableId , tablePath , logEndOffsets );
77+ PrepareCommitLakeTableSnapshotResponse prepareCommitLakeTableSnapshotResponse =
78+ coordinatorGateway
79+ .prepareCommitLakeTableSnapshot (prepareCommitLakeTableSnapshotRequest )
80+ .get ();
81+ List <PbPrepareCommitLakeTableRespForTable > pbPrepareCommitLakeTableRespForTables =
82+ prepareCommitLakeTableSnapshotResponse .getPrepareCommitLakeTableRespsList ();
83+ checkState (pbPrepareCommitLakeTableRespForTables .size () == 1 );
84+ prepareCommitResp = pbPrepareCommitLakeTableRespForTables .get (0 );
6785 } catch (Exception e ) {
6886 throw new IOException (
6987 String .format (
70- "Fail to commit table lake snapshot %s to Fluss." ,
71- flussTableLakeSnapshot ),
88+ "Fail to prepare commit table lake snapshot for %s to Fluss." ,
89+ tablePath ),
7290 ExceptionUtils .stripExecutionException (e ));
7391 }
92+
93+ // get the prepare commit lake resp for table
94+ if (prepareCommitResp .hasError ()) {
95+ throw new IOException (
96+ "Fail to prepare commit table lake snapshot." ,
97+ ApiError .fromErrorMessage (prepareCommitResp .getError ()).exception ());
98+ }
99+ return prepareCommitResp .getLakeTableSnapshotFilePath ();
74100 }
75101
76- public void commit (long tableId , long snapshotId , Map <TableBucket , Long > logEndOffsets )
77- throws IOException {
78- // construct lake snapshot to commit to Fluss
79- FlussTableLakeSnapshot flussTableLakeSnapshot =
80- new FlussTableLakeSnapshot (tableId , snapshotId );
81- for (Map .Entry <TableBucket , Long > entry : logEndOffsets .entrySet ()) {
82- flussTableLakeSnapshot .addBucketOffset (entry .getKey (), entry .getValue ());
102+ void commit (long tableId , long lakeSnapshotId , String lakeSnapshotPath ) throws IOException {
103+ try {
104+ CommitLakeTableSnapshotRequest request =
105+ toCommitLakeTableSnapshotRequest (tableId , lakeSnapshotId , lakeSnapshotPath );
106+ coordinatorGateway .commitLakeTableSnapshot (request ).get ();
107+ } catch (Exception e ) {
108+ throw new IOException (
109+ String .format (
110+ "Fail to commit table lake snapshot id %d of table %d to Fluss." ,
111+ lakeSnapshotId , tableId ),
112+ ExceptionUtils .stripExecutionException (e ));
83113 }
84- commit (flussTableLakeSnapshot );
85114 }
86115
87- private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest (
88- FlussTableLakeSnapshot flussTableLakeSnapshot ) {
89- CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
90- new CommitLakeTableSnapshotRequest ();
116+ private PrepareCommitLakeTableSnapshotRequest toPrepareCommitLakeTableSnapshotRequest (
117+ long tableId , TablePath tablePath , Map < TableBucket , Long > logEndOffsets ) {
118+ PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
119+ new PrepareCommitLakeTableSnapshotRequest ();
91120 PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
92- commitLakeTableSnapshotRequest .addTablesReq ();
93-
94- pbLakeTableSnapshotInfo .setTableId (flussTableLakeSnapshot .tableId ());
95- pbLakeTableSnapshotInfo .setSnapshotId (flussTableLakeSnapshot .lakeSnapshotId ());
96- for (TableBucket tableBucket : flussTableLakeSnapshot .tableBuckets ()) {
121+ prepareCommitLakeTableSnapshotRequest .addTablesReq ();
122+ pbLakeTableSnapshotInfo .setTableId (tableId );
123+ pbLakeTableSnapshotInfo .setSnapshotId (-1L );
124+ for (Map .Entry <TableBucket , Long > logEndOffsetEntry : logEndOffsets .entrySet ()) {
97125 PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
98126 pbLakeTableSnapshotInfo .addBucketsReq ();
99- long endOffset = flussTableLakeSnapshot .getLogEndOffset (tableBucket );
127+ TableBucket tableBucket = logEndOffsetEntry .getKey ();
128+ pbLakeTableSnapshotInfo
129+ .setTablePath ()
130+ .setDatabaseName (tablePath .getDatabaseName ())
131+ .setTableName (tablePath .getTableName ());
100132 if (tableBucket .getPartitionId () != null ) {
101133 pbLakeTableOffsetForBucket .setPartitionId (tableBucket .getPartitionId ());
102134 }
103135 pbLakeTableOffsetForBucket .setBucketId (tableBucket .getBucket ());
104- pbLakeTableOffsetForBucket .setLogEndOffset (endOffset );
136+ pbLakeTableOffsetForBucket .setLogEndOffset (logEndOffsetEntry . getValue () );
105137 }
138+ return prepareCommitLakeTableSnapshotRequest ;
139+ }
140+
141+ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest (
142+ long tableId , long snapshotId , String lakeSnapshotPath ) {
143+ CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
144+ new CommitLakeTableSnapshotRequest ();
145+ PbLakeTableSnapshotMetadata pbLakeTableSnapshotMetadata =
146+ commitLakeTableSnapshotRequest .addLakeTableSnapshotMetadata ();
147+ pbLakeTableSnapshotMetadata .setSnapshotId (snapshotId );
148+ pbLakeTableSnapshotMetadata .setTableId (tableId );
149+ // tiered snapshot file path is equal to readable snapshot currently
150+ pbLakeTableSnapshotMetadata .setTieredSnapshotFilePath (lakeSnapshotPath );
151+ pbLakeTableSnapshotMetadata .setReadableSnapshotFilePath (lakeSnapshotPath );
106152 return commitLakeTableSnapshotRequest ;
107153 }
108154
0 commit comments