Skip to content

Commit 22a0e69

Browse files
committed
[lake] Support auto snapshot expiration for lake table
1 parent 03c8602 commit 22a0e69

File tree

8 files changed

+244
-93
lines changed

8 files changed

+244
-93
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,6 +1381,13 @@ public class ConfigOptions {
13811381
.withDescription(
13821382
"If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default.");
13831383

1384+
public static final ConfigOption<Boolean> TABLE_DATALAKE_AUTO_SNAPSHOT_EXPIRATION =
1385+
key("table.datalake.auto-snapshot-expiration")
1386+
.booleanType()
1387+
.defaultValue(false)
1388+
.withDescription(
1389+
"If true, snapshot expiration will be triggered automatically when tiering service writes to the datalake. It is disabled by default.");
1390+
13841391
public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
13851392
key("table.merge-engine")
13861393
.enumType(MergeEngineType.class)
@@ -1732,6 +1739,10 @@ public class ConfigOptions {
17321739
"The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. "
17331740
+ "In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi.");
17341741

1742+
// ------------------------------------------------------------------------
1743+
// ConfigOptions for tiering service
1744+
// ------------------------------------------------------------------------
1745+
17351746
// ------------------------------------------------------------------------
17361747
// ConfigOptions for fluss kafka
17371748
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ public boolean isDataLakeAutoCompaction() {
100100
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION);
101101
}
102102

103+
/** Whether auto snapshot expiration is enabled. */
104+
public boolean isDataLakeAutoSnapshotExpiration() {
105+
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_SNAPSHOT_EXPIRATION);
106+
}
107+
103108
/** Gets the optional merge engine type of the table. */
104109
public Optional<MergeEngineType> getMergeEngineType() {
105110
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
package org.apache.fluss.lake.committer;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableInfo;
2122
import org.apache.fluss.metadata.TablePath;
2223

2324
/**
2425
* The CommitterInitContext interface provides the context needed to create a LakeCommitter. It
25-
* includes methods to obtain the table path.
26+
* includes methods to obtain the table path and table info.
2627
*
2728
* @since 0.7
2829
*/
@@ -35,4 +36,11 @@ public interface CommitterInitContext {
3536
* @return the table path
3637
*/
3738
TablePath tablePath();
39+
40+
/**
41+
* Returns the Fluss table info.
42+
*
43+
* @return the Fluss table info
44+
*/
45+
TableInfo tableInfo();
3846
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ private Committable commitWriteResults(
205205
}
206206
try (LakeCommitter<WriteResult, Committable> lakeCommitter =
207207
lakeTieringFactory.createLakeCommitter(
208-
new TieringCommitterInitContext(tablePath))) {
208+
new TieringCommitterInitContext(
209+
tablePath, admin.getTableInfo(tablePath).get()))) {
209210
List<WriteResult> writeResults =
210211
committableWriteResults.stream()
211212
.map(TableBucketWriteResult::writeResult)

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,27 @@
1919

2020
import org.apache.fluss.lake.committer.CommitterInitContext;
2121
import org.apache.fluss.lake.committer.LakeCommitter;
22+
import org.apache.fluss.metadata.TableInfo;
2223
import org.apache.fluss.metadata.TablePath;
2324

2425
/** The {@link CommitterInitContext} implementation for {@link LakeCommitter}. */
2526
public class TieringCommitterInitContext implements CommitterInitContext {
2627

2728
private final TablePath tablePath;
29+
private final TableInfo tableInfo;
2830

29-
public TieringCommitterInitContext(TablePath tablePath) {
31+
public TieringCommitterInitContext(TablePath tablePath, TableInfo tableInfo) {
3032
this.tablePath = tablePath;
33+
this.tableInfo = tableInfo;
3134
}
3235

3336
@Override
3437
public TablePath tablePath() {
3538
return tablePath;
3639
}
40+
41+
@Override
42+
public TableInfo tableInfo() {
43+
return tableInfo;
44+
}
3745
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.lake.committer.BucketOffset;
2121
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
22+
import org.apache.fluss.lake.committer.CommitterInitContext;
2223
import org.apache.fluss.lake.committer.LakeCommitter;
2324
import org.apache.fluss.metadata.TablePath;
2425
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -32,15 +33,15 @@
3233
import org.apache.paimon.manifest.ManifestCommittable;
3334
import org.apache.paimon.manifest.ManifestEntry;
3435
import org.apache.paimon.manifest.SimpleFileEntry;
35-
import org.apache.paimon.operation.FileStoreCommit;
3636
import org.apache.paimon.table.FileStoreTable;
3737
import org.apache.paimon.table.sink.CommitCallback;
38+
import org.apache.paimon.table.sink.TableCommitImpl;
3839
import org.apache.paimon.utils.SnapshotManager;
3940

4041
import javax.annotation.Nullable;
4142

4243
import java.io.IOException;
43-
import java.util.Collections;
44+
import java.util.HashMap;
4445
import java.util.List;
4546
import java.util.Map;
4647

@@ -55,14 +56,21 @@ public class PaimonLakeCommitter implements LakeCommitter<PaimonWriteResult, Pai
5556

5657
private final Catalog paimonCatalog;
5758
private final FileStoreTable fileStoreTable;
58-
private FileStoreCommit fileStoreCommit;
59+
private TableCommitImpl tableCommit;
5960
private static final ThreadLocal<Long> currentCommitSnapshotId = new ThreadLocal<>();
6061
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
6162

62-
public PaimonLakeCommitter(PaimonCatalogProvider paimonCatalogProvider, TablePath tablePath)
63+
public PaimonLakeCommitter(
64+
PaimonCatalogProvider paimonCatalogProvider, CommitterInitContext committerInitContext)
6365
throws IOException {
6466
this.paimonCatalog = paimonCatalogProvider.get();
65-
this.fileStoreTable = getTable(tablePath);
67+
this.fileStoreTable =
68+
getTable(
69+
committerInitContext.tablePath(),
70+
committerInitContext
71+
.tableInfo()
72+
.getTableConfig()
73+
.isDataLakeAutoSnapshotExpiration());
6674
}
6775

6876
@Override
@@ -82,29 +90,26 @@ public long commit(PaimonCommittable committable, Map<String, String> snapshotPr
8290
snapshotProperties.forEach(manifestCommittable::addProperty);
8391

8492
try {
85-
fileStoreCommit =
86-
fileStoreTable
87-
.store()
88-
.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, fileStoreTable);
89-
fileStoreCommit.commit(manifestCommittable, false);
93+
tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
94+
tableCommit.commit(manifestCommittable);
95+
9096
Long commitSnapshotId = currentCommitSnapshotId.get();
9197
currentCommitSnapshotId.remove();
9298

9399
return checkNotNull(commitSnapshotId, "Paimon committed snapshot id must be non-null.");
94100
} catch (Throwable t) {
95-
if (fileStoreCommit != null) {
101+
if (tableCommit != null) {
96102
// if any error happen while commit, abort the commit to clean committable
97-
fileStoreCommit.abort(manifestCommittable.fileCommittables());
103+
tableCommit.abort(manifestCommittable.fileCommittables());
98104
}
99105
throw new IOException(t);
100106
}
101107
}
102108

103109
@Override
104110
public void abort(PaimonCommittable committable) throws IOException {
105-
fileStoreCommit =
106-
fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, fileStoreTable);
107-
fileStoreCommit.abort(committable.manifestCommittable().fileCommittables());
111+
tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
112+
tableCommit.abort(committable.manifestCommittable().fileCommittables());
108113
}
109114

110115
@Nullable
@@ -191,8 +196,8 @@ private Snapshot getCommittedLatestSnapshotOfLake(String commitUser) throws IOEx
191196
@Override
192197
public void close() throws Exception {
193198
try {
194-
if (fileStoreCommit != null) {
195-
fileStoreCommit.close();
199+
if (tableCommit != null) {
200+
tableCommit.close();
196201
}
197202
if (paimonCatalog != null) {
198203
paimonCatalog.close();
@@ -202,19 +207,20 @@ public void close() throws Exception {
202207
}
203208
}
204209

205-
private FileStoreTable getTable(TablePath tablePath) throws IOException {
210+
private FileStoreTable getTable(TablePath tablePath, boolean isAutoSnapshotExpiration)
211+
throws IOException {
206212
try {
207-
FileStoreTable table =
208-
(FileStoreTable)
209-
paimonCatalog
210-
.getTable(toPaimon(tablePath))
211-
.copy(
212-
Collections.singletonMap(
213-
CoreOptions.COMMIT_CALLBACKS.key(),
214-
PaimonLakeCommitter.PaimonCommitCallback.class
215-
.getName()));
216-
217-
return table;
213+
FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath));
214+
215+
Map<String, String> dynamicOptions = new HashMap<>();
216+
dynamicOptions.put(
217+
CoreOptions.COMMIT_CALLBACKS.key(),
218+
PaimonLakeCommitter.PaimonCommitCallback.class.getName());
219+
dynamicOptions.put(
220+
CoreOptions.WRITE_ONLY.key(),
221+
isAutoSnapshotExpiration ? Boolean.FALSE.toString() : Boolean.TRUE.toString());
222+
223+
return table.copy(dynamicOptions);
218224
} catch (Exception e) {
219225
throw new IOException("Failed to get table " + tablePath + " in Paimon.", e);
220226
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public SimpleVersionedSerializer<PaimonWriteResult> getWriteResultSerializer() {
5353
@Override
5454
public LakeCommitter<PaimonWriteResult, PaimonCommittable> createLakeCommitter(
5555
CommitterInitContext committerInitContext) throws IOException {
56-
return new PaimonLakeCommitter(paimonCatalogProvider, committerInitContext.tablePath());
56+
return new PaimonLakeCommitter(paimonCatalogProvider, committerInitContext);
5757
}
5858

5959
@Override

0 commit comments

Comments
 (0)