Skip to content

Commit c29d0f6

Browse files
committed
minor fix
1 parent c625004 commit c29d0f6

File tree

21 files changed

+71
-90
lines changed

21 files changed

+71
-90
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
public class CommittedLakeSnapshot {
2828

2929
private final long lakeSnapshotId;
30+
3031
private final Map<String, String> snapshotProperties;
3132

3233
public CommittedLakeSnapshot(long lakeSnapshotId, Map<String, String> snapshotProperties) {

fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@
3636
@PublicEvolving
3737
public interface LakeCommitter<WriteResult, CommittableT> extends AutoCloseable {
3838

39+
/**
40+
* The property key used to store the file path of lake table bucket offsets in snapshot
41+
* properties.
42+
*/
43+
String FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY = "fluss-offsets";
44+
3945
/**
4046
* Converts a list of write results to a committable object.
4147
*

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
* information in Fluss:
5757
*
5858
* <ul>
59-
* <li><b>Prepare phase</b> ({@link #prepareCommit}): Sends log end offsets to the FLuss cluster,
59+
* <li><b>Prepare phase</b> ({@link #prepareCommit}): Sends log end offsets to the Fluss cluster,
6060
* which merges them with the previous log end offsets and stores the merged snapshot data in
6161
* a file. Returns the file path where the snapshot metadata is stored.
6262
* <li><b>Commit phase</b> ({@link #commit}): Sends the lake snapshot metadata (including snapshot
@@ -112,7 +112,7 @@ String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, Long> l
112112
if (prepareCommitResp.hasErrorCode()) {
113113
throw ApiError.fromErrorMessage(prepareCommitResp).exception();
114114
} else {
115-
return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();
115+
return checkNotNull(prepareCommitResp).getLakeTableBucketOffsetsPath();
116116
}
117117
} catch (Exception e) {
118118
throw new IOException(
@@ -126,7 +126,7 @@ String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, Long> l
126126
void commit(
127127
long tableId,
128128
long lakeSnapshotId,
129-
String lakeSnapshotPath,
129+
String lakeBucketOffsetsPath,
130130
Map<TableBucket, Long> logEndOffsets,
131131
Map<TableBucket, Long> logMaxTieredTimestamps)
132132
throws IOException {
@@ -135,7 +135,7 @@ void commit(
135135
toCommitLakeTableSnapshotRequest(
136136
tableId,
137137
lakeSnapshotId,
138-
lakeSnapshotPath,
138+
lakeBucketOffsetsPath,
139139
logEndOffsets,
140140
logMaxTieredTimestamps);
141141
List<PbCommitLakeTableSnapshotRespForTable> commitLakeTableSnapshotRespForTables =
@@ -200,15 +200,15 @@ private PrepareCommitLakeTableSnapshotRequest toPrepareCommitLakeTableSnapshotRe
200200
*
201201
* @param tableId the table ID
202202
* @param snapshotId the lake snapshot ID
203-
* @param lakeSnapshotPath the file path where the snapshot metadata is stored
203+
* @param bucketOffsetsPath the file path where the bucket offsets is stored
204204
* @param logEndOffsets the log end offsets for each bucket
205205
* @param logMaxTieredTimestamps the max tiered timestamps for each bucket
206206
* @return the commit request
207207
*/
208208
private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
209209
long tableId,
210210
long snapshotId,
211-
String lakeSnapshotPath,
211+
String bucketOffsetsPath,
212212
Map<TableBucket, Long> logEndOffsets,
213213
Map<TableBucket, Long> logMaxTieredTimestamps) {
214214
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
@@ -220,8 +220,8 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
220220
pbLakeTableSnapshotMetadata.setSnapshotId(snapshotId);
221221
pbLakeTableSnapshotMetadata.setTableId(tableId);
222222
// tiered snapshot file path is equal to readable snapshot currently
223-
pbLakeTableSnapshotMetadata.setTieredSnapshotFilePath(lakeSnapshotPath);
224-
pbLakeTableSnapshotMetadata.setReadableSnapshotFilePath(lakeSnapshotPath);
223+
pbLakeTableSnapshotMetadata.setTieredBucketOffsetsFilePath(bucketOffsetsPath);
224+
pbLakeTableSnapshotMetadata.setReadableBucketOffsetsFilePath(bucketOffsetsPath);
225225

226226
// Add PbLakeTableSnapshotInfo for metrics reporting (to notify tablet servers about
227227
// synchronized log end offsets and max timestamps)

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import java.util.Set;
5858
import java.util.stream.Collectors;
5959

60-
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
60+
import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
6161
import static org.apache.fluss.utils.Preconditions.checkState;
6262

6363
/**
@@ -230,22 +230,21 @@ private Committable commitWriteResults(
230230
? null
231231
: flussCurrentLakeSnapshot.getSnapshotId());
232232

233-
// get the lake snapshot file storing the log end offsets
234-
String lakeSnapshotMetadataFile =
233+
// get the lake bucket offsets file storing the log end offsets
234+
String lakeBucketOffsetsFile =
235235
flussTableLakeSnapshotCommitter.prepareCommit(
236236
tableId, tablePath, logEndOffsets);
237237

238-
// record the lake snapshot metadata file to snapshot property
238+
// record the lake snapshot bucket offsets file to snapshot property
239239
long committedSnapshotId =
240240
lakeCommitter.commit(
241241
committable,
242242
Collections.singletonMap(
243-
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
244-
lakeSnapshotMetadataFile));
243+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, lakeBucketOffsetsFile));
245244
flussTableLakeSnapshotCommitter.commit(
246245
tableId,
247246
committedSnapshotId,
248-
lakeSnapshotMetadataFile,
247+
lakeBucketOffsetsFile,
249248
logEndOffsets,
250249
logMaxTieredTimestamps);
251250
return committable;
@@ -303,7 +302,7 @@ private void checkFlussNotMissingLakeSnapshot(
303302
// since this code path should be rare, we do not consider backward compatibility
304303
// and throw IllegalStateException directly
305304
String trimmedPath = lakeSnapshotOffsetPath.trim();
306-
if (trimmedPath.startsWith("{")) {
305+
if (trimmedPath.contains("{")) {
307306
throw new IllegalStateException(
308307
String.format(
309308
"The %s field in snapshot property is a JSON string (tiered by v0.8), "

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ void testPartitionsExpiredInFlussButExistInLake(
617617
new TableBucket(tableId, hybridPartitionId, 1), lakeEndOffset,
618618
new TableBucket(tableId, hybridPartitionId, 2), lakeEndOffset));
619619
LakeTableHelper lakeTableHelper = new LakeTableHelper(zooKeeperClient, tempDir.toString());
620-
lakeTableHelper.upsertLakeTableV1(tableId, lakeTableSnapshot);
620+
lakeTableHelper.upsertLakeTable(tableId, lakeTableSnapshot);
621621

622622
// Create PartitionInfo for lake partitions
623623
List<PartitionInfo> lakePartitionInfos = new ArrayList<>();

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
import java.util.List;
5656
import java.util.Map;
5757

58-
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
58+
import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
5959
import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
6060
import static org.assertj.core.api.Assertions.assertThat;
6161
import static org.assertj.core.api.Assertions.assertThatThrownBy;

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
import java.util.TreeSet;
7979

8080
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
81-
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
81+
import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
8282
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
8383
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
8484
import static org.apache.fluss.testutils.DataTestUtils.row;

fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
import java.util.List;
4646
import java.util.Map;
4747

48-
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
48+
import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
4949
import static org.apache.fluss.testutils.DataTestUtils.row;
5050
import static org.assertj.core.api.Assertions.assertThat;
5151

fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
import java.util.Map;
6565
import java.util.stream.Stream;
6666

67-
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
67+
import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
6868
import static org.assertj.core.api.Assertions.assertThat;
6969

7070
/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
import java.util.Set;
7272

7373
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
74-
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
74+
import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
7575
import static org.apache.fluss.testutils.DataTestUtils.row;
7676
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
7777
import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;

0 commit comments

Comments
 (0)