Skip to content

Commit 3889069

Browse files
committed
[lake] Only record lake snapshot file path in lake snapshots
1 parent de2d9a0 commit 3889069

File tree

30 files changed

+732
-480
lines changed

30 files changed

+732
-480
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
public class CommittedLakeSnapshot {
2828

2929
private final long lakeSnapshotId;
30-
3130
private final Map<String, String> snapshotProperties;
3231

3332
public CommittedLakeSnapshot(long lakeSnapshotId, Map<String, String> snapshotProperties) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java

Lines changed: 0 additions & 73 deletions
This file was deleted.

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 171 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,45 @@
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
2323
import org.apache.fluss.metadata.TableBucket;
24+
import org.apache.fluss.metadata.TablePath;
2425
import org.apache.fluss.metrics.registry.MetricRegistry;
2526
import org.apache.fluss.rpc.GatewayClientProxy;
2627
import org.apache.fluss.rpc.RpcClient;
2728
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
2829
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
30+
import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
2931
import org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket;
3032
import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo;
33+
import org.apache.fluss.rpc.messages.PbLakeTableSnapshotMetadata;
34+
import org.apache.fluss.rpc.messages.PbPrepareCommitLakeTableRespForTable;
35+
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotRequest;
36+
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotResponse;
3137
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
38+
import org.apache.fluss.rpc.protocol.ApiError;
3239
import org.apache.fluss.utils.ExceptionUtils;
3340

3441
import java.io.IOException;
42+
import java.util.List;
3543
import java.util.Map;
3644

37-
/** Committer to commit {@link FlussTableLakeSnapshot} of lake to Fluss. */
45+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
46+
import static org.apache.fluss.utils.Preconditions.checkState;
47+
48+
/**
49+
* Committer to commit lake table snapshots to Fluss cluster.
50+
*
51+
* <p>This committer implements a two-phase commit protocol to record lake table snapshot
52+
* information in Fluss:
53+
*
54+
* <ul>
55+
* <li><b>Prepare phase</b> ({@link #prepareCommit}): Sends log end offsets to the FLuss cluster,
56+
* which merges them with the previous log end offsets and stores the merged snapshot data in
57+
* a file. Returns the file path where the snapshot metadata is stored.
58+
* <li><b>Commit phase</b> ({@link #commit}): Sends the lake snapshot metadata (including snapshot
59+
* ID and file paths) to the coordinator to finalize the commit. Also includes log end offsets
60+
* and max tiered timestamps for metrics reporting to tablet servers.
61+
* </ul>
62+
*/
3863
public class FlussTableLakeSnapshotCommitter implements AutoCloseable {
3964

4065
private final Configuration flussConf;
@@ -59,49 +84,170 @@ public void open() {
5984
metadataUpdater::getCoordinatorServer, rpcClient, CoordinatorGateway.class);
6085
}
6186

62-
void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOException {
87+
String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, Long> logEndOffsets)
88+
throws IOException {
89+
PbPrepareCommitLakeTableRespForTable prepareCommitResp = null;
90+
Exception exception = null;
6391
try {
64-
CommitLakeTableSnapshotRequest request =
65-
toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
66-
coordinatorGateway.commitLakeTableSnapshot(request).get();
92+
PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
93+
toPrepareCommitLakeTableSnapshotRequest(tableId, tablePath, logEndOffsets);
94+
PrepareCommitLakeTableSnapshotResponse prepareCommitLakeTableSnapshotResponse =
95+
coordinatorGateway
96+
.prepareCommitLakeTableSnapshot(prepareCommitLakeTableSnapshotRequest)
97+
.get();
98+
List<PbPrepareCommitLakeTableRespForTable> pbPrepareCommitLakeTableRespForTables =
99+
prepareCommitLakeTableSnapshotResponse.getPrepareCommitLakeTableRespsList();
100+
checkState(pbPrepareCommitLakeTableRespForTables.size() == 1);
101+
prepareCommitResp = pbPrepareCommitLakeTableRespForTables.get(0);
102+
if (prepareCommitResp.hasErrorCode()) {
103+
exception = ApiError.fromErrorMessage(prepareCommitResp).exception();
104+
}
67105
} catch (Exception e) {
106+
exception = e;
107+
}
108+
109+
if (exception != null) {
68110
throw new IOException(
69111
String.format(
70-
"Fail to commit table lake snapshot %s to Fluss.",
71-
flussTableLakeSnapshot),
72-
ExceptionUtils.stripExecutionException(e));
112+
"Fail to prepare commit table lake snapshot for %s to Fluss.",
113+
tablePath),
114+
ExceptionUtils.stripExecutionException(exception));
73115
}
116+
return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();
74117
}
75118

76-
public void commit(long tableId, long snapshotId, Map<TableBucket, Long> logEndOffsets)
119+
void commit(
120+
long tableId,
121+
long lakeSnapshotId,
122+
String lakeSnapshotPath,
123+
Map<TableBucket, Long> logEndOffsets,
124+
Map<TableBucket, Long> logMaxTieredTimestamps)
77125
throws IOException {
78-
// construct lake snapshot to commit to Fluss
79-
FlussTableLakeSnapshot flussTableLakeSnapshot =
80-
new FlussTableLakeSnapshot(tableId, snapshotId);
81-
for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
82-
flussTableLakeSnapshot.addBucketOffset(entry.getKey(), entry.getValue());
126+
Exception exception = null;
127+
try {
128+
CommitLakeTableSnapshotRequest request =
129+
toCommitLakeTableSnapshotRequest(
130+
tableId,
131+
lakeSnapshotId,
132+
lakeSnapshotPath,
133+
logEndOffsets,
134+
logMaxTieredTimestamps);
135+
List<PbCommitLakeTableSnapshotRespForTable> commitLakeTableSnapshotRespForTables =
136+
coordinatorGateway.commitLakeTableSnapshot(request).get().getTableRespsList();
137+
checkState(commitLakeTableSnapshotRespForTables.size() == 1);
138+
PbCommitLakeTableSnapshotRespForTable commitLakeTableSnapshotRes =
139+
commitLakeTableSnapshotRespForTables.get(0);
140+
if (commitLakeTableSnapshotRes.hasErrorCode()) {
141+
exception = ApiError.fromErrorMessage(commitLakeTableSnapshotRes).exception();
142+
}
143+
} catch (Exception e) {
144+
exception = e;
145+
}
146+
147+
if (exception != null) {
148+
throw new IOException(
149+
String.format(
150+
"Fail to commit table lake snapshot id %d of table %d to Fluss.",
151+
lakeSnapshotId, tableId),
152+
ExceptionUtils.stripExecutionException(exception));
83153
}
84-
commit(flussTableLakeSnapshot);
85154
}
86155

87-
private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
88-
FlussTableLakeSnapshot flussTableLakeSnapshot) {
89-
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
90-
new CommitLakeTableSnapshotRequest();
156+
/**
157+
* Converts the prepare commit parameters to a {@link PrepareCommitLakeTableSnapshotRequest}.
158+
*
159+
* @param tableId the table ID
160+
* @param tablePath the table path
161+
* @param logEndOffsets the log end offsets for each bucket
162+
* @return the prepared commit request
163+
*/
164+
private PrepareCommitLakeTableSnapshotRequest toPrepareCommitLakeTableSnapshotRequest(
165+
long tableId, TablePath tablePath, Map<TableBucket, Long> logEndOffsets) {
166+
PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
167+
new PrepareCommitLakeTableSnapshotRequest();
91168
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
92-
commitLakeTableSnapshotRequest.addTablesReq();
169+
prepareCommitLakeTableSnapshotRequest.addTablesReq();
170+
pbLakeTableSnapshotInfo.setTableId(tableId);
93171

94-
pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
95-
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
96-
for (TableBucket tableBucket : flussTableLakeSnapshot.tableBuckets()) {
172+
// in prepare phase, we don't know the snapshot id,
173+
// set -1 since the field is required
174+
pbLakeTableSnapshotInfo.setSnapshotId(-1L);
175+
for (Map.Entry<TableBucket, Long> logEndOffsetEntry : logEndOffsets.entrySet()) {
97176
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
98177
pbLakeTableSnapshotInfo.addBucketsReq();
99-
long endOffset = flussTableLakeSnapshot.getLogEndOffset(tableBucket);
178+
TableBucket tableBucket = logEndOffsetEntry.getKey();
179+
pbLakeTableSnapshotInfo
180+
.setTablePath()
181+
.setDatabaseName(tablePath.getDatabaseName())
182+
.setTableName(tablePath.getTableName());
100183
if (tableBucket.getPartitionId() != null) {
101184
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
102185
}
103186
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
104-
pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
187+
pbLakeTableOffsetForBucket.setLogEndOffset(logEndOffsetEntry.getValue());
188+
}
189+
return prepareCommitLakeTableSnapshotRequest;
190+
}
191+
192+
/**
193+
* Converts the commit parameters to a {@link CommitLakeTableSnapshotRequest}.
194+
*
195+
* <p>This method creates a request that includes:
196+
*
197+
* <ul>
198+
* <li>Lake table snapshot metadata (snapshot ID, table ID, file paths)
199+
* <li>PbLakeTableSnapshotInfo for metrics reporting (log end offsets and max tiered
200+
* timestamps)
201+
* </ul>
202+
*
203+
* @param tableId the table ID
204+
* @param snapshotId the lake snapshot ID
205+
* @param lakeSnapshotPath the file path where the snapshot metadata is stored
206+
* @param logEndOffsets the log end offsets for each bucket
207+
* @param logMaxTieredTimestamps the max tiered timestamps for each bucket
208+
* @return the commit request
209+
*/
210+
private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
211+
long tableId,
212+
long snapshotId,
213+
String lakeSnapshotPath,
214+
Map<TableBucket, Long> logEndOffsets,
215+
Map<TableBucket, Long> logMaxTieredTimestamps) {
216+
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
217+
new CommitLakeTableSnapshotRequest();
218+
219+
// Add lake table snapshot metadata
220+
PbLakeTableSnapshotMetadata pbLakeTableSnapshotMetadata =
221+
commitLakeTableSnapshotRequest.addLakeTableSnapshotMetadata();
222+
pbLakeTableSnapshotMetadata.setSnapshotId(snapshotId);
223+
pbLakeTableSnapshotMetadata.setTableId(tableId);
224+
// tiered snapshot file path is equal to readable snapshot currently
225+
pbLakeTableSnapshotMetadata.setTieredSnapshotFilePath(lakeSnapshotPath);
226+
pbLakeTableSnapshotMetadata.setReadableSnapshotFilePath(lakeSnapshotPath);
227+
228+
// Add PbLakeTableSnapshotInfo for metrics reporting (to notify tablet servers about
229+
// synchronized log end offsets and max timestamps)
230+
if (!logEndOffsets.isEmpty()) {
231+
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
232+
commitLakeTableSnapshotRequest.addTablesReq();
233+
pbLakeTableSnapshotInfo.setTableId(tableId);
234+
pbLakeTableSnapshotInfo.setSnapshotId(snapshotId);
235+
for (Map.Entry<TableBucket, Long> logEndOffsetEntry : logEndOffsets.entrySet()) {
236+
TableBucket tableBucket = logEndOffsetEntry.getKey();
237+
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
238+
pbLakeTableSnapshotInfo.addBucketsReq();
239+
240+
if (tableBucket.getPartitionId() != null) {
241+
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
242+
}
243+
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
244+
pbLakeTableOffsetForBucket.setLogEndOffset(logEndOffsetEntry.getValue());
245+
246+
Long maxTimestamp = logMaxTieredTimestamps.get(tableBucket);
247+
if (maxTimestamp != null) {
248+
pbLakeTableOffsetForBucket.setMaxTimestamp(maxTimestamp);
249+
}
250+
}
105251
}
106252
return commitLakeTableSnapshotRequest;
107253
}

0 commit comments

Comments
 (0)