Skip to content

Commit 64876d1

Browse files
committed
[lake] Support auto snapshot expiration for lake table
1 parent 03c8602 commit 64876d1

File tree

17 files changed

+373
-106
lines changed

17 files changed

+373
-106
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,6 +1381,13 @@ public class ConfigOptions {
13811381
.withDescription(
13821382
"If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default.");
13831383

1384+
public static final ConfigOption<Boolean> TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT =
1385+
key("table.datalake.auto-expire-snapshot")
1386+
.booleanType()
1387+
.defaultValue(false)
1388+
.withDescription(
1389+
"If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default.");
1390+
13841391
public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
13851392
key("table.merge-engine")
13861393
.enumType(MergeEngineType.class)
@@ -1732,6 +1739,20 @@ public class ConfigOptions {
17321739
"The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. "
17331740
+ "In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi.");
17341741

1742+
// ------------------------------------------------------------------------
1743+
// ConfigOptions for tiering service
1744+
// ------------------------------------------------------------------------
1745+
1746+
public static final ConfigOption<Boolean> LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT =
1747+
key("lake.tiering.auto-expire-snapshot")
1748+
.booleanType()
1749+
.defaultValue(false)
1750+
.withDescription(
1751+
"If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake, "
1752+
+ "even if "
1753+
+ ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
1754+
+ " is false.");
1755+
17351756
// ------------------------------------------------------------------------
17361757
// ConfigOptions for fluss kafka
17371758
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ public boolean isDataLakeAutoCompaction() {
100100
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION);
101101
}
102102

103+
/** Whether auto expire snapshot is enabled. */
104+
public boolean isDataLakeAutoExpireSnapshot() {
105+
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT);
106+
}
107+
103108
/** Gets the optional merge engine type of the table. */
104109
public Optional<MergeEngineType> getMergeEngineType() {
105110
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package org.apache.fluss.lake.committer;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.metadata.TableInfo;
2123
import org.apache.fluss.metadata.TablePath;
2224

2325
/**
2426
* The CommitterInitContext interface provides the context needed to create a LakeCommitter. It
25-
* includes methods to obtain the table path.
27+
* includes methods to obtain the table path, table info and lake tiering configs.
2628
*
2729
* @since 0.7
2830
*/
@@ -35,4 +37,18 @@ public interface CommitterInitContext {
3537
* @return the table path
3638
*/
3739
TablePath tablePath();
40+
41+
/**
42+
* Returns the table info.
43+
*
44+
* @return the table info
45+
*/
46+
TableInfo tableInfo();
47+
48+
/**
49+
* Returns the lake tiering config.
50+
*
51+
* @return the lake tiering config
52+
*/
53+
Configuration lakeTieringConfig();
3854
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,25 +46,30 @@ public class LakeTieringJobBuilder {
4646
private final StreamExecutionEnvironment env;
4747
private final Configuration flussConfig;
4848
private final Configuration dataLakeConfig;
49+
private final Configuration lakeTieringConfig;
4950
private final String dataLakeFormat;
5051

5152
private LakeTieringJobBuilder(
5253
StreamExecutionEnvironment env,
5354
Configuration flussConfig,
5455
Configuration dataLakeConfig,
56+
Configuration lakeTieringConfig,
5557
String dataLakeFormat) {
5658
this.env = checkNotNull(env);
5759
this.flussConfig = checkNotNull(flussConfig);
5860
this.dataLakeConfig = checkNotNull(dataLakeConfig);
61+
this.lakeTieringConfig = checkNotNull(lakeTieringConfig);
5962
this.dataLakeFormat = checkNotNull(dataLakeFormat);
6063
}
6164

6265
public static LakeTieringJobBuilder newBuilder(
6366
StreamExecutionEnvironment env,
6467
Configuration flussConfig,
6568
Configuration dataLakeConfig,
69+
Configuration lakeTieringConfig,
6670
String dataLakeFormat) {
67-
return new LakeTieringJobBuilder(env, flussConfig, dataLakeConfig, dataLakeFormat);
71+
return new LakeTieringJobBuilder(
72+
env, flussConfig, dataLakeConfig, lakeTieringConfig, dataLakeFormat);
6873
}
6974

7075
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -99,7 +104,8 @@ public JobClient build() throws Exception {
99104
"TieringCommitter",
100105
CommittableMessageTypeInfo.of(
101106
() -> lakeTieringFactory.getCommittableSerializer()),
102-
new TieringCommitOperatorFactory(flussConfig, lakeTieringFactory))
107+
new TieringCommitOperatorFactory(
108+
flussConfig, lakeTieringConfig, lakeTieringFactory))
103109
.setParallelism(1)
104110
.setMaxParallelism(1)
105111
.sinkTo(new DiscardingSink())

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public class TieringCommitOperator<WriteResult, Committable>
9090
private static final long serialVersionUID = 1L;
9191

9292
private final Configuration flussConfig;
93+
private final Configuration lakeTieringConfig;
9394
private final LakeTieringFactory<WriteResult, Committable> lakeTieringFactory;
9495
private final FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter;
9596
private Connection connection;
@@ -106,11 +107,13 @@ public class TieringCommitOperator<WriteResult, Committable>
106107
public TieringCommitOperator(
107108
StreamOperatorParameters<CommittableMessage<Committable>> parameters,
108109
Configuration flussConf,
110+
Configuration lakeTieringConfig,
109111
LakeTieringFactory<WriteResult, Committable> lakeTieringFactory) {
110112
this.lakeTieringFactory = lakeTieringFactory;
111113
this.flussTableLakeSnapshotCommitter = new FlussTableLakeSnapshotCommitter(flussConf);
112114
this.collectedTableBucketWriteResults = new HashMap<>();
113115
this.flussConfig = flussConf;
116+
this.lakeTieringConfig = lakeTieringConfig;
114117
this.operatorEventGateway =
115118
parameters
116119
.getOperatorEventDispatcher()
@@ -205,7 +208,10 @@ private Committable commitWriteResults(
205208
}
206209
try (LakeCommitter<WriteResult, Committable> lakeCommitter =
207210
lakeTieringFactory.createLakeCommitter(
208-
new TieringCommitterInitContext(tablePath))) {
211+
new TieringCommitterInitContext(
212+
tablePath,
213+
admin.getTableInfo(tablePath).get(),
214+
lakeTieringConfig))) {
209215
List<WriteResult> writeResults =
210216
committableWriteResults.stream()
211217
.map(TableBucketWriteResult::writeResult)

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@ public class TieringCommitOperatorFactory<WriteResult, Committable>
3333
TableBucketWriteResult<WriteResult>, CommittableMessage<Committable>> {
3434

3535
private final Configuration flussConfig;
36+
private final Configuration lakeTieringConfig;
3637
private final LakeTieringFactory<WriteResult, Committable> lakeTieringFactory;
3738

3839
public TieringCommitOperatorFactory(
3940
Configuration flussConfig,
41+
Configuration lakeTieringConfig,
4042
LakeTieringFactory<WriteResult, Committable> lakeTieringFactory) {
4143
this.flussConfig = flussConfig;
44+
this.lakeTieringConfig = lakeTieringConfig;
4245
this.lakeTieringFactory = lakeTieringFactory;
4346
}
4447

@@ -47,7 +50,8 @@ public <T extends StreamOperator<CommittableMessage<Committable>>> T createStrea
4750
StreamOperatorParameters<CommittableMessage<Committable>> parameters) {
4851

4952
TieringCommitOperator<WriteResult, Committable> commitOperator =
50-
new TieringCommitOperator<>(parameters, flussConfig, lakeTieringFactory);
53+
new TieringCommitOperator<>(
54+
parameters, flussConfig, lakeTieringConfig, lakeTieringFactory);
5155

5256
@SuppressWarnings("unchecked")
5357
final T castedOperator = (T) commitOperator;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,38 @@
1717

1818
package org.apache.fluss.flink.tiering.committer;
1919

20+
import org.apache.fluss.config.Configuration;
2021
import org.apache.fluss.lake.committer.CommitterInitContext;
2122
import org.apache.fluss.lake.committer.LakeCommitter;
23+
import org.apache.fluss.metadata.TableInfo;
2224
import org.apache.fluss.metadata.TablePath;
2325

2426
/** The {@link CommitterInitContext} implementation for {@link LakeCommitter}. */
2527
public class TieringCommitterInitContext implements CommitterInitContext {
2628

2729
private final TablePath tablePath;
30+
private final TableInfo tableInfo;
31+
private final Configuration lakeTieringConfig;
2832

29-
public TieringCommitterInitContext(TablePath tablePath) {
33+
public TieringCommitterInitContext(
34+
TablePath tablePath, TableInfo tableInfo, Configuration lakeTieringConfig) {
3035
this.tablePath = tablePath;
36+
this.tableInfo = tableInfo;
37+
this.lakeTieringConfig = lakeTieringConfig;
3138
}
3239

3340
@Override
3441
public TablePath tablePath() {
3542
return tablePath;
3643
}
44+
45+
@Override
46+
public TableInfo tableInfo() {
47+
return tableInfo;
48+
}
49+
50+
@Override
51+
public Configuration lakeTieringConfig() {
52+
return lakeTieringConfig;
53+
}
3754
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ void beforeEach() throws Exception {
8989
new TieringCommitOperator<>(
9090
parameters,
9191
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
92+
new org.apache.fluss.config.Configuration(),
9293
new TestingLakeTieringFactory());
9394
committerOperator.open();
9495
}
@@ -279,6 +280,7 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
279280
new TieringCommitOperator<>(
280281
parameters,
281282
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
283+
new org.apache.fluss.config.Configuration(),
282284
new TestingLakeTieringFactory(testingLakeCommitter));
283285
committerOperator.open();
284286

@@ -343,6 +345,7 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
343345
new TieringCommitOperator<>(
344346
parameters,
345347
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
348+
new org.apache.fluss.config.Configuration(),
346349
new TestingLakeTieringFactory(testingLakeCommitter));
347350
committerOperator.open();
348351

fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@
3030
import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
3131
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
3232
import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
33+
import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;
3334

3435
/** The entrypoint for Flink to tier fluss data to lake format like paimon. */
3536
public class FlussLakeTieringEntrypoint {
3637

3738
private static final String FLUSS_CONF_PREFIX = "fluss.";
39+
private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.teiring.";
3840

3941
public static void main(String[] args) throws Exception {
4042

@@ -65,6 +67,10 @@ public static void main(String[] args) throws Exception {
6567
extractAndRemovePrefix(
6668
paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));
6769

70+
// extract tiering service config
71+
Map<String, String> lakeTieringConfigMap =
72+
extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);
73+
6874
// now, we must use full restart strategy if any task is failed,
6975
// since committer is stateless, if tiering committer is failover, committer
7076
// will lost the collected committable, and will never collect all committable to do commit
@@ -83,6 +89,7 @@ public static void main(String[] args) throws Exception {
8389
execEnv,
8490
Configuration.fromMap(flussConfigMap),
8591
Configuration.fromMap(lakeConfigMap),
92+
Configuration.fromMap(lakeTieringConfigMap),
8693
dataLake)
8794
.build();
8895

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.config.ConfigOptions;
2121
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.lake.committer.CommitterInitContext;
2223
import org.apache.fluss.lake.committer.LakeCommitter;
2324
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
2425
import org.apache.fluss.lake.writer.LakeWriter;
@@ -185,7 +186,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable
185186

186187
// second, commit data
187188
try (LakeCommitter<IcebergWriteResult, IcebergCommittable> lakeCommitter =
188-
createLakeCommitter(tablePath)) {
189+
createLakeCommitter(tablePath, tableInfo)) {
189190
// serialize/deserialize committable
190191
IcebergCommittable icebergCommittable =
191192
lakeCommitter.toCommittable(icebergWriteResults);
@@ -246,8 +247,24 @@ public TableInfo tableInfo() {
246247
}
247248

248249
private LakeCommitter<IcebergWriteResult, IcebergCommittable> createLakeCommitter(
249-
TablePath tablePath) throws IOException {
250-
return icebergLakeTieringFactory.createLakeCommitter(() -> tablePath);
250+
TablePath tablePath, TableInfo tableInfo) throws IOException {
251+
return icebergLakeTieringFactory.createLakeCommitter(
252+
new CommitterInitContext() {
253+
@Override
254+
public TablePath tablePath() {
255+
return tablePath;
256+
}
257+
258+
@Override
259+
public TableInfo tableInfo() {
260+
return tableInfo;
261+
}
262+
263+
@Override
264+
public Configuration lakeTieringConfig() {
265+
return new Configuration();
266+
}
267+
});
251268
}
252269

253270
private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(

0 commit comments

Comments
 (0)