Skip to content

Commit 50fca1d

Browse files
committed
enable lake table stores multiple lake snapshot && lake snapshot should contain readable offset and tiered offset
1 parent 5db8f4b commit 50fca1d

File tree

21 files changed

+422
-251
lines changed

21 files changed

+422
-251
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/committer/BucketOffset.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public Long getPartitionId() {
5353

5454
@Override
5555
public boolean equals(Object o) {
56+
if (this == o) {
57+
return true;
58+
}
5659
if (o == null || getClass() != o.getClass()) {
5760
return false;
5861
}

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
5656

5757
@Override
5858
public boolean equals(Object o) {
59+
if (this == o) {
60+
return true;
61+
}
5962
if (o == null || getClass() != o.getClass()) {
6063
return false;
6164
}

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -689,20 +689,37 @@ public static FsPath remoteKvSnapshotDir(FsPath remoteKvTabletDir, long snapshot
689689
* <p>The path contract:
690690
*
691691
* <pre>
692-
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/snapshot/{snapshotId}.snapshot
692+
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}
693693
* </pre>
694694
*/
695-
public static FsPath remoteLakeTableSnapshotPath(
696-
String remoteDataDir, TablePath tablePath, long tableId, long snapshotId) {
695+
public static FsPath remoteLakeTableSnapshotMetadataDir(
696+
String remoteDataDir, TablePath tablePath, long tableId) {
697697
return new FsPath(
698698
String.format(
699-
"%s/%s/%s/%s-%d/snapshot/%d.snapshot",
699+
"%s/%s/%s/%s-%d",
700700
remoteDataDir,
701701
REMOTE_LAKE_DIR_NAME,
702702
tablePath.getDatabaseName(),
703703
tablePath.getTableName(),
704-
tableId,
705-
snapshotId));
704+
tableId));
705+
}
706+
707+
/**
708+
* Returns a remote path for storing lake snapshot metadata required by Fluss for a table.
709+
*
710+
* <p>The path contract:
711+
*
712+
* <pre>
713+
* {$remoteLakeTableSnapshotMetadataDir}/manifest/{uuid}.manifest
714+
* </pre>
715+
*/
716+
public static FsPath remoteLakeTableSnapshotManifestPath(
717+
String remoteDataDir, TablePath tablePath, long tableId) {
718+
return new FsPath(
719+
String.format(
720+
"%s/manifest/%s.manifest",
721+
remoteLakeTableSnapshotMetadataDir(remoteDataDir, tablePath, tableId),
722+
UUID.randomUUID()));
706723
}
707724

708725
/**

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -102,21 +102,15 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable {
102102

103103
// mock finished tiered this round, check second round
104104
context.getSplitsAssignmentSequence().clear();
105-
final Map<Integer, Long> bucketOffsetOfEarliest = new HashMap<>();
106105
final Map<Integer, Long> bucketOffsetOfInitialWrite = new HashMap<>();
107106
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
108-
bucketOffsetOfEarliest.put(tableBucket, EARLIEST_OFFSET);
109107
bucketOffsetOfInitialWrite.put(tableBucket, 0L);
110108
}
111109
// commit and notify this table tiering task finished
112110
coordinatorGateway
113111
.commitLakeTableSnapshot(
114112
genCommitLakeTableSnapshotRequest(
115-
tableId,
116-
null,
117-
0,
118-
bucketOffsetOfEarliest,
119-
bucketOffsetOfInitialWrite))
113+
tableId, null, 0, bucketOffsetOfInitialWrite))
120114
.get();
121115

122116
enumerator.handleSourceEvent(1, new FinishedTieringEvent(tableId));
@@ -211,11 +205,7 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable {
211205
coordinatorGateway
212206
.commitLakeTableSnapshot(
213207
genCommitLakeTableSnapshotRequest(
214-
tableId,
215-
null,
216-
1,
217-
initialBucketOffsets,
218-
bucketOffsetOfInitialWrite))
208+
tableId, null, 1, bucketOffsetOfInitialWrite))
219209
.get();
220210

221211
enumerator.handleSourceEvent(1, new FinishedTieringEvent(tableId));
@@ -312,11 +302,7 @@ void testLogTableSplits() throws Throwable {
312302
coordinatorGateway
313303
.commitLakeTableSnapshot(
314304
genCommitLakeTableSnapshotRequest(
315-
tableId,
316-
null,
317-
0,
318-
bucketOffsetOfEarliest,
319-
bucketOffsetOfInitialWrite))
305+
tableId, null, 0, bucketOffsetOfInitialWrite))
320306
.get();
321307
enumerator.handleSourceEvent(1, new FinishedTieringEvent(tableId));
322308

@@ -411,7 +397,6 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
411397
tableId,
412398
partitionNameById.getValue(),
413399
snapshotId++,
414-
partitionBucketOffsetOfEarliest,
415400
bucketOffsetOfInitialWrite.get(
416401
partitionNameById.getValue())))
417402
.get();
@@ -539,7 +524,6 @@ void testPartitionedLogTableSplits() throws Throwable {
539524
tableId,
540525
partitionId,
541526
snapshot++,
542-
partitionInitialBucketOffsets,
543527
bucketOffsetOfInitialWrite.get(partitionId)))
544528
.get();
545529
}
@@ -693,22 +677,20 @@ private static CommitLakeTableSnapshotRequest genCommitLakeTableSnapshotRequest(
693677
long tableId,
694678
@Nullable Long partitionId,
695679
long snapshotId,
696-
Map<Integer, Long> bucketLogStartOffsets,
697680
Map<Integer, Long> bucketLogEndOffsets) {
698681
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
699682
new CommitLakeTableSnapshotRequest();
700683
PbLakeTableSnapshotInfo reqForTable = commitLakeTableSnapshotRequest.addTablesReq();
701684
reqForTable.setTableId(tableId);
702685
reqForTable.setSnapshotId(snapshotId);
703-
for (Map.Entry<Integer, Long> bucketLogStartOffset : bucketLogStartOffsets.entrySet()) {
704-
int bucketId = bucketLogStartOffset.getKey();
686+
for (Map.Entry<Integer, Long> bucketLogEndOffset : bucketLogEndOffsets.entrySet()) {
687+
int bucketId = bucketLogEndOffset.getKey();
705688
TableBucket tb = new TableBucket(tableId, partitionId, bucketId);
706689
PbLakeTableOffsetForBucket lakeTableOffsetForBucket = reqForTable.addBucketsReq();
707690
if (tb.getPartitionId() != null) {
708691
lakeTableOffsetForBucket.setPartitionId(tb.getPartitionId());
709692
}
710693
lakeTableOffsetForBucket.setBucketId(tb.getBucket());
711-
lakeTableOffsetForBucket.setLogStartOffset(bucketLogStartOffset.getValue());
712694
lakeTableOffsetForBucket.setLogEndOffset(bucketLogEndOffsets.get(bucketId));
713695
}
714696
return commitLakeTableSnapshotRequest;

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,8 @@ private void testPartitionedTableTiering() throws Exception {
402402
put(
403403
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
404404
"["
405-
+ "{\"partition_id\":0,\"bucket\":0,\"partition_name\":\"date=2025\",\"offset\":3},"
406-
+ "{\"partition_id\":1,\"bucket\":0,\"partition_name\":\"date=2026\",\"offset\":3}"
405+
+ "{\"partition_id\":0,\"bucket\":0,\"offset\":3},"
406+
+ "{\"partition_id\":1,\"bucket\":0,\"offset\":3}"
407407
+ "]");
408408
}
409409
};

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ void testTieringForAllTypes(boolean isPrimaryKeyTable) throws Exception {
335335
put(
336336
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
337337
String.format(
338-
"[{\"partition_id\":%d,\"bucket\":0,\"partition_name\":\"c1=true/c2=1/c3=2/c4=3/c5=4/c6=5_0/c7=6_0/c9=v1/c10=v2/c11=7633/c12=0102030405/c13=2025-10-16/c14=10-10-10_123/c15=2025-10-16-10-10-10_123/c16=2025-10-16-10-10-10_123\",\"offset\":1}]",
338+
"[{\"partition_id\":%d,\"bucket\":0,\"offset\":1}]",
339339
partitionId));
340340
}
341341
};
@@ -475,8 +475,7 @@ void testTieringForAlterTable() throws Exception {
475475
}
476476

477477
private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds) {
478-
String raw =
479-
"{\"partition_id\":%s,\"bucket\":0,\"partition_name\":\"date=%s\",\"offset\":3}";
478+
String raw = "{\"partition_id\":%s,\"bucket\":0,\"offset\":3}";
480479
List<Long> partitionIds = new ArrayList<>(partitionNameByIds.keySet());
481480
Collections.sort(partitionIds);
482481
List<String> partitionOffsetStrs = new ArrayList<>();

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,9 +462,7 @@ message PbLakeTableSnapshotInfo {
462462
message PbLakeTableOffsetForBucket {
463463
optional int64 partition_id = 1;
464464
required int32 bucket_id = 2;
465-
optional int64 log_start_offset = 3;
466465
optional int64 log_end_offset = 4;
467-
optional string partition_name = 5;
468466
optional int64 max_timestamp = 6;
469467
}
470468

fluss-server/src/main/java/org/apache/fluss/server/coordinator/RemoteStorageCleaner.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.server.coordinator;
1919

20+
import org.apache.fluss.config.ConfigOptions;
2021
import org.apache.fluss.config.Configuration;
2122
import org.apache.fluss.exception.FlussRuntimeException;
2223
import org.apache.fluss.fs.FileSystem;
@@ -41,13 +42,16 @@ public class RemoteStorageCleaner {
4142

4243
private final FsPath remoteLogDir;
4344

45+
private final String remoteDataDir;
46+
4447
private final FileSystem remoteFileSystem;
4548

4649
private final ExecutorService ioExecutor;
4750

4851
public RemoteStorageCleaner(Configuration configuration, ExecutorService ioExecutor) {
4952
this.remoteKvDir = FlussPaths.remoteKvDir(configuration);
5053
this.remoteLogDir = FlussPaths.remoteLogDir(configuration);
54+
this.remoteDataDir = configuration.getString(ConfigOptions.REMOTE_DATA_DIR);
5155
this.ioExecutor = ioExecutor;
5256
try {
5357
this.remoteFileSystem = remoteKvDir.getFileSystem();
@@ -57,10 +61,16 @@ public RemoteStorageCleaner(Configuration configuration, ExecutorService ioExecu
5761
}
5862
}
5963

60-
public void asyncDeleteTableRemoteDir(TablePath tablePath, boolean isKvTable, long tableId) {
64+
public void asyncDeleteTableRemoteDir(
65+
TablePath tablePath, boolean isKvTable, boolean isLakeEnabled, long tableId) {
6166
if (isKvTable) {
6267
asyncDeleteDir(FlussPaths.remoteTableDir(remoteKvDir, tablePath, tableId));
6368
}
69+
if (isLakeEnabled) {
70+
asyncDeleteDir(
71+
FlussPaths.remoteLakeTableSnapshotMetadataDir(
72+
remoteDataDir, tablePath, tableId));
73+
}
6474
asyncDeleteDir(FlussPaths.remoteTableDir(remoteLogDir, tablePath, tableId));
6575
}
6676

fluss-server/src/main/java/org/apache/fluss/server/coordinator/TableManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,10 @@ private void asyncDeleteRemoteDirectory(long tableId) {
275275
TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId);
276276
if (tableInfo != null) {
277277
remoteStorageCleaner.asyncDeleteTableRemoteDir(
278-
tableInfo.getTablePath(), tableInfo.hasPrimaryKey(), tableId);
278+
tableInfo.getTablePath(),
279+
tableInfo.hasPrimaryKey(),
280+
tableInfo.getTableConfig().isDataLakeEnabled(),
281+
tableId);
279282
}
280283
}
281284

fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public Map<TableBucket, Long> getTableBucketsMaxTieredTimestamp() {
4747

4848
@Override
4949
public boolean equals(Object o) {
50+
if (this == o) {
51+
return true;
52+
}
5053
if (o == null || getClass() != o.getClass()) {
5154
return false;
5255
}

0 commit comments

Comments
 (0)