2121import org .apache .fluss .config .ConfigOptions ;
2222import org .apache .fluss .config .Configuration ;
2323import org .apache .fluss .metadata .TableBucket ;
24+ import org .apache .fluss .metadata .TablePath ;
2425import org .apache .fluss .metrics .registry .MetricRegistry ;
2526import org .apache .fluss .rpc .GatewayClientProxy ;
2627import org .apache .fluss .rpc .RpcClient ;
2728import org .apache .fluss .rpc .gateway .CoordinatorGateway ;
2829import org .apache .fluss .rpc .messages .CommitLakeTableSnapshotRequest ;
30+ import org .apache .fluss .rpc .messages .PbCommitLakeTableSnapshotRespForTable ;
2931import org .apache .fluss .rpc .messages .PbLakeTableOffsetForBucket ;
3032import org .apache .fluss .rpc .messages .PbLakeTableSnapshotInfo ;
33+ import org .apache .fluss .rpc .messages .PbLakeTableSnapshotMetadata ;
34+ import org .apache .fluss .rpc .messages .PbPrepareCommitLakeTableRespForTable ;
35+ import org .apache .fluss .rpc .messages .PrepareCommitLakeTableSnapshotRequest ;
36+ import org .apache .fluss .rpc .messages .PrepareCommitLakeTableSnapshotResponse ;
3137import org .apache .fluss .rpc .metrics .ClientMetricGroup ;
38+ import org .apache .fluss .rpc .protocol .ApiError ;
3239import org .apache .fluss .utils .ExceptionUtils ;
3340
3441import java .io .IOException ;
42+ import java .util .List ;
3543import java .util .Map ;
3644
37- /** Committer to commit {@link FlussTableLakeSnapshot} of lake to Fluss. */
45+ import static org .apache .fluss .utils .Preconditions .checkNotNull ;
46+ import static org .apache .fluss .utils .Preconditions .checkState ;
47+
48+ /**
49+ * Committer to commit lake table snapshots to Fluss cluster.
50+ *
51+ * <p>This committer implements a two-phase commit protocol to record lake table snapshot
52+ * information in Fluss:
53+ *
54+ * <ul>
55+ * <li><b>Prepare phase</b> ({@link #prepareCommit}): Sends log end offsets to the FLuss cluster,
56+ * which merges them with the previous log end offsets and stores the merged snapshot data in
57+ * a file. Returns the file path where the snapshot metadata is stored.
58+ * <li><b>Commit phase</b> ({@link #commit}): Sends the lake snapshot metadata (including snapshot
59+ * ID and file paths) to the coordinator to finalize the commit. Also includes log end offsets
60+ * and max tiered timestamps for metrics reporting to tablet servers.
61+ * </ul>
62+ */
3863public class FlussTableLakeSnapshotCommitter implements AutoCloseable {
3964
4065 private final Configuration flussConf ;
@@ -59,49 +84,170 @@ public void open() {
5984 metadataUpdater ::getCoordinatorServer , rpcClient , CoordinatorGateway .class );
6085 }
6186
62- void commit (FlussTableLakeSnapshot flussTableLakeSnapshot ) throws IOException {
87+ String prepareCommit (long tableId , TablePath tablePath , Map <TableBucket , Long > logEndOffsets )
88+ throws IOException {
89+ PbPrepareCommitLakeTableRespForTable prepareCommitResp = null ;
90+ Exception exception = null ;
6391 try {
64- CommitLakeTableSnapshotRequest request =
65- toCommitLakeTableSnapshotRequest (flussTableLakeSnapshot );
66- coordinatorGateway .commitLakeTableSnapshot (request ).get ();
92+ PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
93+ toPrepareCommitLakeTableSnapshotRequest (tableId , tablePath , logEndOffsets );
94+ PrepareCommitLakeTableSnapshotResponse prepareCommitLakeTableSnapshotResponse =
95+ coordinatorGateway
96+ .prepareCommitLakeTableSnapshot (prepareCommitLakeTableSnapshotRequest )
97+ .get ();
98+ List <PbPrepareCommitLakeTableRespForTable > pbPrepareCommitLakeTableRespForTables =
99+ prepareCommitLakeTableSnapshotResponse .getPrepareCommitLakeTableRespsList ();
100+ checkState (pbPrepareCommitLakeTableRespForTables .size () == 1 );
101+ prepareCommitResp = pbPrepareCommitLakeTableRespForTables .get (0 );
102+ if (prepareCommitResp .hasErrorCode ()) {
103+ exception = ApiError .fromErrorMessage (prepareCommitResp ).exception ();
104+ }
67105 } catch (Exception e ) {
106+ exception = e ;
107+ }
108+
109+ if (exception != null ) {
68110 throw new IOException (
69111 String .format (
70- "Fail to commit table lake snapshot %s to Fluss." ,
71- flussTableLakeSnapshot ),
72- ExceptionUtils .stripExecutionException (e ));
112+ "Fail to prepare commit table lake snapshot for %s to Fluss." ,
113+ tablePath ),
114+ ExceptionUtils .stripExecutionException (exception ));
73115 }
116+ return checkNotNull (prepareCommitResp ).getLakeTableSnapshotFilePath ();
74117 }
75118
76- public void commit (long tableId , long snapshotId , Map <TableBucket , Long > logEndOffsets )
119+ void commit (
120+ long tableId ,
121+ long lakeSnapshotId ,
122+ String lakeSnapshotPath ,
123+ Map <TableBucket , Long > logEndOffsets ,
124+ Map <TableBucket , Long > logMaxTieredTimestamps )
77125 throws IOException {
78- // construct lake snapshot to commit to Fluss
79- FlussTableLakeSnapshot flussTableLakeSnapshot =
80- new FlussTableLakeSnapshot (tableId , snapshotId );
81- for (Map .Entry <TableBucket , Long > entry : logEndOffsets .entrySet ()) {
82- flussTableLakeSnapshot .addBucketOffset (entry .getKey (), entry .getValue ());
126+ Exception exception = null ;
127+ try {
128+ CommitLakeTableSnapshotRequest request =
129+ toCommitLakeTableSnapshotRequest (
130+ tableId ,
131+ lakeSnapshotId ,
132+ lakeSnapshotPath ,
133+ logEndOffsets ,
134+ logMaxTieredTimestamps );
135+ List <PbCommitLakeTableSnapshotRespForTable > commitLakeTableSnapshotRespForTables =
136+ coordinatorGateway .commitLakeTableSnapshot (request ).get ().getTableRespsList ();
137+ checkState (commitLakeTableSnapshotRespForTables .size () == 1 );
138+ PbCommitLakeTableSnapshotRespForTable commitLakeTableSnapshotRes =
139+ commitLakeTableSnapshotRespForTables .get (0 );
140+ if (commitLakeTableSnapshotRes .hasErrorCode ()) {
141+ exception = ApiError .fromErrorMessage (commitLakeTableSnapshotRes ).exception ();
142+ }
143+ } catch (Exception e ) {
144+ exception = e ;
145+ }
146+
147+ if (exception != null ) {
148+ throw new IOException (
149+ String .format (
150+ "Fail to commit table lake snapshot id %d of table %d to Fluss." ,
151+ lakeSnapshotId , tableId ),
152+ ExceptionUtils .stripExecutionException (exception ));
83153 }
84- commit (flussTableLakeSnapshot );
85154 }
86155
87- private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest (
88- FlussTableLakeSnapshot flussTableLakeSnapshot ) {
89- CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
90- new CommitLakeTableSnapshotRequest ();
156+ /**
157+ * Converts the prepare commit parameters to a {@link PrepareCommitLakeTableSnapshotRequest}.
158+ *
159+ * @param tableId the table ID
160+ * @param tablePath the table path
161+ * @param logEndOffsets the log end offsets for each bucket
162+ * @return the prepared commit request
163+ */
164+ private PrepareCommitLakeTableSnapshotRequest toPrepareCommitLakeTableSnapshotRequest (
165+ long tableId , TablePath tablePath , Map <TableBucket , Long > logEndOffsets ) {
166+ PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
167+ new PrepareCommitLakeTableSnapshotRequest ();
91168 PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
92- commitLakeTableSnapshotRequest .addTablesReq ();
169+ prepareCommitLakeTableSnapshotRequest .addTablesReq ();
170+ pbLakeTableSnapshotInfo .setTableId (tableId );
93171
94- pbLakeTableSnapshotInfo .setTableId (flussTableLakeSnapshot .tableId ());
95- pbLakeTableSnapshotInfo .setSnapshotId (flussTableLakeSnapshot .lakeSnapshotId ());
96- for (TableBucket tableBucket : flussTableLakeSnapshot .tableBuckets ()) {
172+ // in prepare phase, we don't know the snapshot id,
173+ // set -1 since the field is required
174+ pbLakeTableSnapshotInfo .setSnapshotId (-1L );
175+ for (Map .Entry <TableBucket , Long > logEndOffsetEntry : logEndOffsets .entrySet ()) {
97176 PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
98177 pbLakeTableSnapshotInfo .addBucketsReq ();
99- long endOffset = flussTableLakeSnapshot .getLogEndOffset (tableBucket );
178+ TableBucket tableBucket = logEndOffsetEntry .getKey ();
179+ pbLakeTableSnapshotInfo
180+ .setTablePath ()
181+ .setDatabaseName (tablePath .getDatabaseName ())
182+ .setTableName (tablePath .getTableName ());
100183 if (tableBucket .getPartitionId () != null ) {
101184 pbLakeTableOffsetForBucket .setPartitionId (tableBucket .getPartitionId ());
102185 }
103186 pbLakeTableOffsetForBucket .setBucketId (tableBucket .getBucket ());
104- pbLakeTableOffsetForBucket .setLogEndOffset (endOffset );
187+ pbLakeTableOffsetForBucket .setLogEndOffset (logEndOffsetEntry .getValue ());
188+ }
189+ return prepareCommitLakeTableSnapshotRequest ;
190+ }
191+
192+ /**
193+ * Converts the commit parameters to a {@link CommitLakeTableSnapshotRequest}.
194+ *
195+ * <p>This method creates a request that includes:
196+ *
197+ * <ul>
198+ * <li>Lake table snapshot metadata (snapshot ID, table ID, file paths)
199+ * <li>PbLakeTableSnapshotInfo for metrics reporting (log end offsets and max tiered
200+ * timestamps)
201+ * </ul>
202+ *
203+ * @param tableId the table ID
204+ * @param snapshotId the lake snapshot ID
205+ * @param lakeSnapshotPath the file path where the snapshot metadata is stored
206+ * @param logEndOffsets the log end offsets for each bucket
207+ * @param logMaxTieredTimestamps the max tiered timestamps for each bucket
208+ * @return the commit request
209+ */
210+ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest (
211+ long tableId ,
212+ long snapshotId ,
213+ String lakeSnapshotPath ,
214+ Map <TableBucket , Long > logEndOffsets ,
215+ Map <TableBucket , Long > logMaxTieredTimestamps ) {
216+ CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
217+ new CommitLakeTableSnapshotRequest ();
218+
219+ // Add lake table snapshot metadata
220+ PbLakeTableSnapshotMetadata pbLakeTableSnapshotMetadata =
221+ commitLakeTableSnapshotRequest .addLakeTableSnapshotMetadata ();
222+ pbLakeTableSnapshotMetadata .setSnapshotId (snapshotId );
223+ pbLakeTableSnapshotMetadata .setTableId (tableId );
224+ // tiered snapshot file path is equal to readable snapshot currently
225+ pbLakeTableSnapshotMetadata .setTieredSnapshotFilePath (lakeSnapshotPath );
226+ pbLakeTableSnapshotMetadata .setReadableSnapshotFilePath (lakeSnapshotPath );
227+
228+ // Add PbLakeTableSnapshotInfo for metrics reporting (to notify tablet servers about
229+ // synchronized log end offsets and max timestamps)
230+ if (!logEndOffsets .isEmpty ()) {
231+ PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
232+ commitLakeTableSnapshotRequest .addTablesReq ();
233+ for (Map .Entry <TableBucket , Long > logEndOffsetEntry : logEndOffsets .entrySet ()) {
234+ pbLakeTableSnapshotInfo .setTableId (tableId );
235+ pbLakeTableSnapshotInfo .setSnapshotId (snapshotId );
236+ TableBucket tableBucket = logEndOffsetEntry .getKey ();
237+ PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
238+ pbLakeTableSnapshotInfo .addBucketsReq ();
239+
240+ if (tableBucket .getPartitionId () != null ) {
241+ pbLakeTableOffsetForBucket .setPartitionId (tableBucket .getPartitionId ());
242+ }
243+ pbLakeTableOffsetForBucket .setBucketId (tableBucket .getBucket ());
244+ pbLakeTableOffsetForBucket .setLogEndOffset (logEndOffsetEntry .getValue ());
245+
246+ Long maxTimestamp = logMaxTieredTimestamps .get (tableBucket );
247+ if (maxTimestamp != null ) {
248+ pbLakeTableOffsetForBucket .setMaxTimestamp (maxTimestamp );
249+ }
250+ }
105251 }
106252 return commitLakeTableSnapshotRequest ;
107253 }
0 commit comments