2121import org .apache .fluss .config .ConfigOptions ;
2222import org .apache .fluss .config .Configuration ;
2323import org .apache .fluss .metadata .TableBucket ;
24+ import org .apache .fluss .metadata .TablePath ;
2425import org .apache .fluss .metrics .registry .MetricRegistry ;
2526import org .apache .fluss .rpc .GatewayClientProxy ;
2627import org .apache .fluss .rpc .RpcClient ;
2728import org .apache .fluss .rpc .gateway .CoordinatorGateway ;
2829import org .apache .fluss .rpc .messages .CommitLakeTableSnapshotRequest ;
30+ import org .apache .fluss .rpc .messages .PbCommitLakeTableSnapshotRespForTable ;
2931import org .apache .fluss .rpc .messages .PbLakeTableOffsetForBucket ;
3032import org .apache .fluss .rpc .messages .PbLakeTableSnapshotInfo ;
33+ import org .apache .fluss .rpc .messages .PbLakeTableSnapshotMetadata ;
34+ import org .apache .fluss .rpc .messages .PbPrepareCommitLakeTableRespForTable ;
35+ import org .apache .fluss .rpc .messages .PrepareCommitLakeTableSnapshotRequest ;
36+ import org .apache .fluss .rpc .messages .PrepareCommitLakeTableSnapshotResponse ;
3137import org .apache .fluss .rpc .metrics .ClientMetricGroup ;
38+ import org .apache .fluss .rpc .protocol .ApiError ;
3239import org .apache .fluss .utils .ExceptionUtils ;
3340
3441import java .io .IOException ;
42+ import java .util .List ;
3543import java .util .Map ;
3644
37- /** Committer to commit {@link FlussTableLakeSnapshot} of lake to Fluss. */
45+ import static org .apache .fluss .utils .Preconditions .checkNotNull ;
46+ import static org .apache .fluss .utils .Preconditions .checkState ;
47+
48+ /** Committer to commit lake snapshot to Fluss to record the info of lake snapshot. */
3849public class FlussTableLakeSnapshotCommitter implements AutoCloseable {
3950
4051 private final Configuration flussConf ;
@@ -59,49 +70,149 @@ public void open() {
5970 metadataUpdater ::getCoordinatorServer , rpcClient , CoordinatorGateway .class );
6071 }
6172
62- void commit (FlussTableLakeSnapshot flussTableLakeSnapshot ) throws IOException {
73+ String prepareCommit (long tableId , TablePath tablePath , Map <TableBucket , Long > logEndOffsets )
74+ throws IOException {
75+ PbPrepareCommitLakeTableRespForTable prepareCommitResp = null ;
76+ Exception exception = null ;
6377 try {
64- CommitLakeTableSnapshotRequest request =
65- toCommitLakeTableSnapshotRequest (flussTableLakeSnapshot );
66- coordinatorGateway .commitLakeTableSnapshot (request ).get ();
78+ PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
79+ toPrepareCommitLakeTableSnapshotRequest (tableId , tablePath , logEndOffsets );
80+ PrepareCommitLakeTableSnapshotResponse prepareCommitLakeTableSnapshotResponse =
81+ coordinatorGateway
82+ .prepareCommitLakeTableSnapshot (prepareCommitLakeTableSnapshotRequest )
83+ .get ();
84+ List <PbPrepareCommitLakeTableRespForTable > pbPrepareCommitLakeTableRespForTables =
85+ prepareCommitLakeTableSnapshotResponse .getPrepareCommitLakeTableRespsList ();
86+ checkState (pbPrepareCommitLakeTableRespForTables .size () == 1 );
87+ prepareCommitResp = pbPrepareCommitLakeTableRespForTables .get (0 );
88+ if (prepareCommitResp .hasErrorCode ()) {
89+ exception = ApiError .fromErrorMessage (prepareCommitResp ).exception ();
90+ }
6791 } catch (Exception e ) {
92+ exception = e ;
93+ }
94+
95+ if (exception != null ) {
6896 throw new IOException (
6997 String .format (
70- "Fail to commit table lake snapshot %s to Fluss." ,
71- flussTableLakeSnapshot ),
72- ExceptionUtils .stripExecutionException (e ));
98+ "Fail to prepare commit table lake snapshot for %s to Fluss." ,
99+ tablePath ),
100+ ExceptionUtils .stripExecutionException (exception ));
73101 }
102+ return checkNotNull (prepareCommitResp ).getLakeTableSnapshotFilePath ();
74103 }
75104
76- public void commit (long tableId , long snapshotId , Map <TableBucket , Long > logEndOffsets )
105+ void commit (
106+ long tableId ,
107+ long lakeSnapshotId ,
108+ String lakeSnapshotPath ,
109+ Map <TableBucket , Long > logEndOffsets ,
110+ Map <TableBucket , Long > logMaxTieredTimestamps )
77111 throws IOException {
78- // construct lake snapshot to commit to Fluss
79- FlussTableLakeSnapshot flussTableLakeSnapshot =
80- new FlussTableLakeSnapshot (tableId , snapshotId );
81- for (Map .Entry <TableBucket , Long > entry : logEndOffsets .entrySet ()) {
82- flussTableLakeSnapshot .addBucketOffset (entry .getKey (), entry .getValue ());
112+ Exception exception = null ;
113+ try {
114+ CommitLakeTableSnapshotRequest request =
115+ toCommitLakeTableSnapshotRequest (
116+ tableId ,
117+ lakeSnapshotId ,
118+ lakeSnapshotPath ,
119+ logEndOffsets ,
120+ logMaxTieredTimestamps );
121+ List <PbCommitLakeTableSnapshotRespForTable > commitLakeTableSnapshotRespForTables =
122+ coordinatorGateway .commitLakeTableSnapshot (request ).get ().getTableRespsList ();
123+ checkState (commitLakeTableSnapshotRespForTables .size () == 1 );
124+ PbCommitLakeTableSnapshotRespForTable commitLakeTableSnapshotRes =
125+ commitLakeTableSnapshotRespForTables .get (0 );
126+ if (commitLakeTableSnapshotRes .hasErrorCode ()) {
127+ exception = ApiError .fromErrorMessage (commitLakeTableSnapshotRes ).exception ();
128+ }
129+ } catch (Exception e ) {
130+ exception = e ;
131+ }
132+
133+ if (exception != null ) {
134+ throw new IOException (
135+ String .format (
136+ "Fail to commit table lake snapshot id %d of table %d to Fluss." ,
137+ lakeSnapshotId , tableId ),
138+ ExceptionUtils .stripExecutionException (exception ));
83139 }
84- commit (flussTableLakeSnapshot );
85140 }
86141
87- private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest (
88- FlussTableLakeSnapshot flussTableLakeSnapshot ) {
89- CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
90- new CommitLakeTableSnapshotRequest ();
142+ private PrepareCommitLakeTableSnapshotRequest toPrepareCommitLakeTableSnapshotRequest (
143+ long tableId , TablePath tablePath , Map < TableBucket , Long > logEndOffsets ) {
144+ PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
145+ new PrepareCommitLakeTableSnapshotRequest ();
91146 PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
92- commitLakeTableSnapshotRequest .addTablesReq ();
147+ prepareCommitLakeTableSnapshotRequest .addTablesReq ();
148+ pbLakeTableSnapshotInfo .setTableId (tableId );
93149
94- pbLakeTableSnapshotInfo .setTableId (flussTableLakeSnapshot .tableId ());
95- pbLakeTableSnapshotInfo .setSnapshotId (flussTableLakeSnapshot .lakeSnapshotId ());
96- for (TableBucket tableBucket : flussTableLakeSnapshot .tableBuckets ()) {
150+ // in prepare phase, we don't know the snapshot id,
151+ // set -1 since the field is required
152+ pbLakeTableSnapshotInfo .setSnapshotId (-1L );
153+ for (Map .Entry <TableBucket , Long > logEndOffsetEntry : logEndOffsets .entrySet ()) {
97154 PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
98155 pbLakeTableSnapshotInfo .addBucketsReq ();
99- long endOffset = flussTableLakeSnapshot .getLogEndOffset (tableBucket );
156+ TableBucket tableBucket = logEndOffsetEntry .getKey ();
157+ pbLakeTableSnapshotInfo
158+ .setTablePath ()
159+ .setDatabaseName (tablePath .getDatabaseName ())
160+ .setTableName (tablePath .getTableName ());
100161 if (tableBucket .getPartitionId () != null ) {
101162 pbLakeTableOffsetForBucket .setPartitionId (tableBucket .getPartitionId ());
102163 }
103164 pbLakeTableOffsetForBucket .setBucketId (tableBucket .getBucket ());
104- pbLakeTableOffsetForBucket .setLogEndOffset (endOffset );
165+ pbLakeTableOffsetForBucket .setLogEndOffset (logEndOffsetEntry .getValue ());
166+ }
167+ return prepareCommitLakeTableSnapshotRequest ;
168+ }
169+
170+ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest (
171+ long tableId ,
172+ long snapshotId ,
173+ String lakeSnapshotPath ,
174+ Map <TableBucket , Long > logEndOffsets ,
175+ Map <TableBucket , Long > logMaxTieredTimestamps ) {
176+ CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
177+ new CommitLakeTableSnapshotRequest ();
178+
179+ // Add lake table snapshot metadata
180+ PbLakeTableSnapshotMetadata pbLakeTableSnapshotMetadata =
181+ commitLakeTableSnapshotRequest .addLakeTableSnapshotMetadata ();
182+ pbLakeTableSnapshotMetadata .setSnapshotId (snapshotId );
183+ pbLakeTableSnapshotMetadata .setTableId (tableId );
184+ // tiered snapshot file path is equal to readable snapshot currently
185+ pbLakeTableSnapshotMetadata .setTieredSnapshotFilePath (lakeSnapshotPath );
186+ pbLakeTableSnapshotMetadata .setReadableSnapshotFilePath (lakeSnapshotPath );
187+
188+ // Add PbLakeTableSnapshotInfo for metrics reporting (to notify tablet servers about
189+ // synchronized log end offsets and max timestamps)
190+ if (!logEndOffsets .isEmpty ()) {
191+ PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
192+ commitLakeTableSnapshotRequest .addTablesReq ();
193+ for (Map .Entry <TableBucket , Long > logEndOffsetEntry : logEndOffsets .entrySet ()) {
194+
195+ pbLakeTableSnapshotInfo .setTableId (tableId );
196+ pbLakeTableSnapshotInfo .setSnapshotId (snapshotId );
197+ TableBucket tableBucket = logEndOffsetEntry .getKey ();
198+ PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
199+ pbLakeTableSnapshotInfo .addBucketsReq ();
200+
201+ if (tableBucket .getPartitionId () != null ) {
202+ pbLakeTableOffsetForBucket .setPartitionId (tableBucket .getPartitionId ());
203+ }
204+ pbLakeTableOffsetForBucket .setBucketId (tableBucket .getBucket ());
205+
206+ Long logEndOffset = logEndOffsets .get (tableBucket );
207+ if (logEndOffset != null ) {
208+ pbLakeTableOffsetForBucket .setLogEndOffset (logEndOffset );
209+ }
210+
211+ Long maxTimestamp = logMaxTieredTimestamps .get (tableBucket );
212+ if (maxTimestamp != null ) {
213+ pbLakeTableOffsetForBucket .setMaxTimestamp (maxTimestamp );
214+ }
215+ }
105216 }
106217 return commitLakeTableSnapshotRequest ;
107218 }
0 commit comments