Skip to content

Commit ccec817

Browse files
committed
[lake] Only record lake snapshot file path in lake snapshots
1 parent 9ea798a commit ccec817

File tree

23 files changed

+442
-224
lines changed

23 files changed

+442
-224
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.lake.committer;
1919

20+
import java.util.HashMap;
2021
import java.util.Map;
2122
import java.util.Objects;
2223

@@ -27,20 +28,20 @@
2728
public class CommittedLakeSnapshot {
2829

2930
private final long lakeSnapshotId;
31+
// <partition_id, bucket> -> log offset, partition_id will be null if it's not a
32+
// partition bucket
33+
private final Map<String, String> lakeSnapshotProperties = new HashMap<>();
3034

31-
private final Map<String, String> snapshotProperties;
32-
33-
public CommittedLakeSnapshot(long lakeSnapshotId, Map<String, String> snapshotProperties) {
35+
public CommittedLakeSnapshot(long lakeSnapshotId, Map<String, String> lakeSnapshotProperties) {
3436
this.lakeSnapshotId = lakeSnapshotId;
35-
this.snapshotProperties = snapshotProperties;
3637
}
3738

3839
public long getLakeSnapshotId() {
3940
return lakeSnapshotId;
4041
}
4142

42-
public Map<String, String> getSnapshotProperties() {
43-
return snapshotProperties;
43+
public Map<String, String> getLakeSnapshotProperties() {
44+
return lakeSnapshotProperties;
4445
}
4546

4647
@Override
@@ -53,21 +54,21 @@ public boolean equals(Object o) {
5354
}
5455
CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
5556
return lakeSnapshotId == that.lakeSnapshotId
56-
&& Objects.equals(snapshotProperties, that.snapshotProperties);
57+
&& Objects.equals(lakeSnapshotProperties, that.lakeSnapshotProperties);
5758
}
5859

5960
@Override
6061
public int hashCode() {
61-
return Objects.hash(lakeSnapshotId, snapshotProperties);
62+
return Objects.hash(lakeSnapshotId, lakeSnapshotProperties);
6263
}
6364

6465
@Override
6566
public String toString() {
6667
return "CommittedLakeSnapshot{"
6768
+ "lakeSnapshotId="
6869
+ lakeSnapshotId
69-
+ ", snapshotProperties="
70-
+ snapshotProperties
70+
+ ", lakeSnapshotProperties="
71+
+ lakeSnapshotProperties
7172
+ '}';
7273
}
7374
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,14 @@ class FlussTableLakeSnapshot {
3434
private final Map<TableBucket, Long> logEndOffsets;
3535

3636
FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
37+
this(tableId, lakeSnapshotId, new HashMap<>());
38+
}
39+
40+
FlussTableLakeSnapshot(
41+
long tableId, long lakeSnapshotId, Map<TableBucket, Long> logEndOffsets) {
3742
this.tableId = tableId;
3843
this.lakeSnapshotId = lakeSnapshotId;
39-
this.logEndOffsets = new HashMap<>();
44+
this.logEndOffsets = logEndOffsets;
4045
}
4146

4247
public long tableId() {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 71 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,28 @@
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
2323
import org.apache.fluss.metadata.TableBucket;
24+
import org.apache.fluss.metadata.TablePath;
2425
import org.apache.fluss.metrics.registry.MetricRegistry;
2526
import org.apache.fluss.rpc.GatewayClientProxy;
2627
import org.apache.fluss.rpc.RpcClient;
2728
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
2829
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
2930
import org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket;
3031
import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo;
32+
import org.apache.fluss.rpc.messages.PbLakeTableSnapshotMetadata;
33+
import org.apache.fluss.rpc.messages.PbPrepareCommitLakeTableRespForTable;
34+
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotRequest;
35+
import org.apache.fluss.rpc.messages.PrepareCommitLakeTableSnapshotResponse;
3136
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
37+
import org.apache.fluss.rpc.protocol.ApiError;
3238
import org.apache.fluss.utils.ExceptionUtils;
3339

3440
import java.io.IOException;
41+
import java.util.List;
3542
import java.util.Map;
3643

44+
import static org.apache.fluss.utils.Preconditions.checkState;
45+
3746
/** Committer to commit {@link FlussTableLakeSnapshot} of lake to Fluss. */
3847
public class FlussTableLakeSnapshotCommitter implements AutoCloseable {
3948

@@ -59,50 +68,87 @@ public void open() {
5968
metadataUpdater::getCoordinatorServer, rpcClient, CoordinatorGateway.class);
6069
}
6170

62-
void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOException {
71+
String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, Long> logEndOffsets)
72+
throws IOException {
73+
PbPrepareCommitLakeTableRespForTable prepareCommitResp;
6374
try {
64-
CommitLakeTableSnapshotRequest request =
65-
toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
66-
coordinatorGateway.commitLakeTableSnapshot(request).get();
75+
PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
76+
toPrepareCommitLakeTableSnapshotRequest(tableId, tablePath, logEndOffsets);
77+
PrepareCommitLakeTableSnapshotResponse prepareCommitLakeTableSnapshotResponse =
78+
coordinatorGateway
79+
.prepareCommitLakeTableSnapshot(prepareCommitLakeTableSnapshotRequest)
80+
.get();
81+
List<PbPrepareCommitLakeTableRespForTable> pbPrepareCommitLakeTableRespForTables =
82+
prepareCommitLakeTableSnapshotResponse.getPrepareCommitLakeTableRespsList();
83+
checkState(pbPrepareCommitLakeTableRespForTables.size() == 1);
84+
prepareCommitResp = pbPrepareCommitLakeTableRespForTables.get(0);
6785
} catch (Exception e) {
6886
throw new IOException(
6987
String.format(
70-
"Fail to commit table lake snapshot %s to Fluss.",
71-
flussTableLakeSnapshot),
88+
"Fail to prepare commit table lake snapshot for %s to Fluss.",
89+
tablePath),
7290
ExceptionUtils.stripExecutionException(e));
7391
}
92+
93+
// get the prepare commit lake resp for table
94+
if (prepareCommitResp.hasError()) {
95+
throw new IOException(
96+
"Fail to prepare commit table lake snapshot.",
97+
ApiError.fromErrorMessage(prepareCommitResp.getError()).exception());
98+
}
99+
return prepareCommitResp.getLakeTableSnapshotFilePath();
74100
}
75101

76-
public void commit(long tableId, long snapshotId, Map<TableBucket, Long> logEndOffsets)
77-
throws IOException {
78-
// construct lake snapshot to commit to Fluss
79-
FlussTableLakeSnapshot flussTableLakeSnapshot =
80-
new FlussTableLakeSnapshot(tableId, snapshotId);
81-
for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
82-
flussTableLakeSnapshot.addBucketOffset(entry.getKey(), entry.getValue());
102+
void commit(long tableId, long lakeSnapshotId, String lakeSnapshotPath) throws IOException {
103+
try {
104+
CommitLakeTableSnapshotRequest request =
105+
toCommitLakeTableSnapshotRequest(tableId, lakeSnapshotId, lakeSnapshotPath);
106+
coordinatorGateway.commitLakeTableSnapshot(request).get();
107+
} catch (Exception e) {
108+
throw new IOException(
109+
String.format(
110+
"Fail to commit table lake snapshot id %d of table %d to Fluss.",
111+
lakeSnapshotId, tableId),
112+
ExceptionUtils.stripExecutionException(e));
83113
}
84-
commit(flussTableLakeSnapshot);
85114
}
86115

87-
private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
88-
FlussTableLakeSnapshot flussTableLakeSnapshot) {
89-
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
90-
new CommitLakeTableSnapshotRequest();
116+
private PrepareCommitLakeTableSnapshotRequest toPrepareCommitLakeTableSnapshotRequest(
117+
long tableId, TablePath tablePath, Map<TableBucket, Long> logEndOffsets) {
118+
PrepareCommitLakeTableSnapshotRequest prepareCommitLakeTableSnapshotRequest =
119+
new PrepareCommitLakeTableSnapshotRequest();
91120
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
92-
commitLakeTableSnapshotRequest.addTablesReq();
93-
94-
pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
95-
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
96-
for (TableBucket tableBucket : flussTableLakeSnapshot.tableBuckets()) {
121+
prepareCommitLakeTableSnapshotRequest.addTablesReq();
122+
pbLakeTableSnapshotInfo.setTableId(tableId);
123+
pbLakeTableSnapshotInfo.setSnapshotId(-1L);
124+
for (Map.Entry<TableBucket, Long> logEndOffsetEntry : logEndOffsets.entrySet()) {
97125
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
98126
pbLakeTableSnapshotInfo.addBucketsReq();
99-
long endOffset = flussTableLakeSnapshot.getLogEndOffset(tableBucket);
127+
TableBucket tableBucket = logEndOffsetEntry.getKey();
128+
pbLakeTableSnapshotInfo
129+
.setTablePath()
130+
.setDatabaseName(tablePath.getDatabaseName())
131+
.setTableName(tablePath.getTableName());
100132
if (tableBucket.getPartitionId() != null) {
101133
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
102134
}
103135
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
104-
pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
136+
pbLakeTableOffsetForBucket.setLogEndOffset(logEndOffsetEntry.getValue());
105137
}
138+
return prepareCommitLakeTableSnapshotRequest;
139+
}
140+
141+
private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
142+
long tableId, long snapshotId, String lakeSnapshotPath) {
143+
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
144+
new CommitLakeTableSnapshotRequest();
145+
PbLakeTableSnapshotMetadata pbLakeTableSnapshotMetadata =
146+
commitLakeTableSnapshotRequest.addLakeTableSnapshotMetadata();
147+
pbLakeTableSnapshotMetadata.setSnapshotId(snapshotId);
148+
pbLakeTableSnapshotMetadata.setTableId(tableId);
149+
// tiered snapshot file path is equal to readable snapshot currently
150+
pbLakeTableSnapshotMetadata.setTieredSnapshotFilePath(lakeSnapshotPath);
151+
pbLakeTableSnapshotMetadata.setReadableSnapshotFilePath(lakeSnapshotPath);
106152
return commitLakeTableSnapshotRequest;
107153
}
108154

0 commit comments

Comments
 (0)