Skip to content

Commit 58957fd

Browse files
authored
[lake] Support auto snapshot expiration for paimon lake table (#2184)
1 parent 459c763 commit 58957fd

File tree

20 files changed

+399
-126
lines changed

20 files changed

+399
-126
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,6 +1405,13 @@ public class ConfigOptions {
14051405
.withDescription(
14061406
"If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default.");
14071407

1408+
public static final ConfigOption<Boolean> TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT =
1409+
key("table.datalake.auto-expire-snapshot")
1410+
.booleanType()
1411+
.defaultValue(false)
1412+
.withDescription(
1413+
"If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default.");
1414+
14081415
public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
14091416
key("table.merge-engine")
14101417
.enumType(MergeEngineType.class)
@@ -1777,6 +1784,20 @@ public class ConfigOptions {
17771784
"The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. "
17781785
+ "In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi.");
17791786

1787+
// ------------------------------------------------------------------------
1788+
// ConfigOptions for tiering service
1789+
// ------------------------------------------------------------------------
1790+
1791+
public static final ConfigOption<Boolean> LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT =
1792+
key("lake.tiering.auto-expire-snapshot")
1793+
.booleanType()
1794+
.defaultValue(false)
1795+
.withDescription(
1796+
"If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake, "
1797+
+ "even if "
1798+
+ ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
1799+
+ " is false.");
1800+
17801801
// ------------------------------------------------------------------------
17811802
// ConfigOptions for fluss kafka
17821803
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public boolean isDataLakeAutoCompaction() {
101101
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION);
102102
}
103103

104+
/** Whether auto expire snapshot is enabled. */
105+
public boolean isDataLakeAutoExpireSnapshot() {
106+
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT);
107+
}
108+
104109
/** Gets the optional merge engine type of the table. */
105110
public Optional<MergeEngineType> getMergeEngineType() {
106111
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package org.apache.fluss.lake.committer;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.metadata.TableInfo;
2123
import org.apache.fluss.metadata.TablePath;
2224

2325
/**
2426
* The CommitterInitContext interface provides the context needed to create a LakeCommitter. It
25-
* includes methods to obtain the table path.
27+
* includes methods to obtain the table path, table info and lake tiering configs.
2628
*
2729
* @since 0.7
2830
*/
@@ -35,4 +37,18 @@ public interface CommitterInitContext {
3537
* @return the table path
3638
*/
3739
TablePath tablePath();
40+
41+
/**
42+
* Returns the table info.
43+
*
44+
* @return the table info
45+
*/
46+
TableInfo tableInfo();
47+
48+
/**
49+
* Returns the lake tiering config.
50+
*
51+
* @return the lake tiering config
52+
*/
53+
Configuration lakeTieringConfig();
3854
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,25 +46,30 @@ public class LakeTieringJobBuilder {
4646
private final StreamExecutionEnvironment env;
4747
private final Configuration flussConfig;
4848
private final Configuration dataLakeConfig;
49+
private final Configuration lakeTieringConfig;
4950
private final String dataLakeFormat;
5051

5152
private LakeTieringJobBuilder(
5253
StreamExecutionEnvironment env,
5354
Configuration flussConfig,
5455
Configuration dataLakeConfig,
56+
Configuration lakeTieringConfig,
5557
String dataLakeFormat) {
5658
this.env = checkNotNull(env);
5759
this.flussConfig = checkNotNull(flussConfig);
5860
this.dataLakeConfig = checkNotNull(dataLakeConfig);
61+
this.lakeTieringConfig = checkNotNull(lakeTieringConfig);
5962
this.dataLakeFormat = checkNotNull(dataLakeFormat);
6063
}
6164

6265
public static LakeTieringJobBuilder newBuilder(
6366
StreamExecutionEnvironment env,
6467
Configuration flussConfig,
6568
Configuration dataLakeConfig,
69+
Configuration lakeTieringConfig,
6670
String dataLakeFormat) {
67-
return new LakeTieringJobBuilder(env, flussConfig, dataLakeConfig, dataLakeFormat);
71+
return new LakeTieringJobBuilder(
72+
env, flussConfig, dataLakeConfig, lakeTieringConfig, dataLakeFormat);
6873
}
6974

7075
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -99,7 +104,8 @@ public JobClient build() throws Exception {
99104
"TieringCommitter",
100105
CommittableMessageTypeInfo.of(
101106
() -> lakeTieringFactory.getCommittableSerializer()),
102-
new TieringCommitOperatorFactory(flussConfig, lakeTieringFactory))
107+
new TieringCommitOperatorFactory(
108+
flussConfig, lakeTieringConfig, lakeTieringFactory))
103109
.setParallelism(1)
104110
.setMaxParallelism(1)
105111
.sinkTo(new DiscardingSink())

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public class TieringCommitOperator<WriteResult, Committable>
8989
private static final long serialVersionUID = 1L;
9090

9191
private final Configuration flussConfig;
92+
private final Configuration lakeTieringConfig;
9293
private final LakeTieringFactory<WriteResult, Committable> lakeTieringFactory;
9394
private final FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter;
9495
private Connection connection;
@@ -105,11 +106,13 @@ public class TieringCommitOperator<WriteResult, Committable>
105106
public TieringCommitOperator(
106107
StreamOperatorParameters<CommittableMessage<Committable>> parameters,
107108
Configuration flussConf,
109+
Configuration lakeTieringConfig,
108110
LakeTieringFactory<WriteResult, Committable> lakeTieringFactory) {
109111
this.lakeTieringFactory = lakeTieringFactory;
110112
this.flussTableLakeSnapshotCommitter = new FlussTableLakeSnapshotCommitter(flussConf);
111113
this.collectedTableBucketWriteResults = new HashMap<>();
112114
this.flussConfig = flussConf;
115+
this.lakeTieringConfig = lakeTieringConfig;
113116
this.operatorEventGateway =
114117
parameters
115118
.getOperatorEventDispatcher()
@@ -204,7 +207,10 @@ private Committable commitWriteResults(
204207
}
205208
try (LakeCommitter<WriteResult, Committable> lakeCommitter =
206209
lakeTieringFactory.createLakeCommitter(
207-
new TieringCommitterInitContext(tablePath))) {
210+
new TieringCommitterInitContext(
211+
tablePath,
212+
admin.getTableInfo(tablePath).get(),
213+
lakeTieringConfig))) {
208214
List<WriteResult> writeResults =
209215
committableWriteResults.stream()
210216
.map(TableBucketWriteResult::writeResult)

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@ public class TieringCommitOperatorFactory<WriteResult, Committable>
3333
TableBucketWriteResult<WriteResult>, CommittableMessage<Committable>> {
3434

3535
private final Configuration flussConfig;
36+
private final Configuration lakeTieringConfig;
3637
private final LakeTieringFactory<WriteResult, Committable> lakeTieringFactory;
3738

3839
public TieringCommitOperatorFactory(
3940
Configuration flussConfig,
41+
Configuration lakeTieringConfig,
4042
LakeTieringFactory<WriteResult, Committable> lakeTieringFactory) {
4143
this.flussConfig = flussConfig;
44+
this.lakeTieringConfig = lakeTieringConfig;
4245
this.lakeTieringFactory = lakeTieringFactory;
4346
}
4447

@@ -47,7 +50,8 @@ public <T extends StreamOperator<CommittableMessage<Committable>>> T createStrea
4750
StreamOperatorParameters<CommittableMessage<Committable>> parameters) {
4851

4952
TieringCommitOperator<WriteResult, Committable> commitOperator =
50-
new TieringCommitOperator<>(parameters, flussConfig, lakeTieringFactory);
53+
new TieringCommitOperator<>(
54+
parameters, flussConfig, lakeTieringConfig, lakeTieringFactory);
5155

5256
@SuppressWarnings("unchecked")
5357
final T castedOperator = (T) commitOperator;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,38 @@
1717

1818
package org.apache.fluss.flink.tiering.committer;
1919

20+
import org.apache.fluss.config.Configuration;
2021
import org.apache.fluss.lake.committer.CommitterInitContext;
2122
import org.apache.fluss.lake.committer.LakeCommitter;
23+
import org.apache.fluss.metadata.TableInfo;
2224
import org.apache.fluss.metadata.TablePath;
2325

2426
/** The {@link CommitterInitContext} implementation for {@link LakeCommitter}. */
2527
public class TieringCommitterInitContext implements CommitterInitContext {
2628

2729
private final TablePath tablePath;
30+
private final TableInfo tableInfo;
31+
private final Configuration lakeTieringConfig;
2832

29-
public TieringCommitterInitContext(TablePath tablePath) {
33+
public TieringCommitterInitContext(
34+
TablePath tablePath, TableInfo tableInfo, Configuration lakeTieringConfig) {
3035
this.tablePath = tablePath;
36+
this.tableInfo = tableInfo;
37+
this.lakeTieringConfig = lakeTieringConfig;
3138
}
3239

3340
@Override
3441
public TablePath tablePath() {
3542
return tablePath;
3643
}
44+
45+
@Override
46+
public TableInfo tableInfo() {
47+
return tableInfo;
48+
}
49+
50+
@Override
51+
public Configuration lakeTieringConfig() {
52+
return lakeTieringConfig;
53+
}
3754
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ void beforeEach() throws Exception {
8585
new TieringCommitOperator<>(
8686
parameters,
8787
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
88+
new org.apache.fluss.config.Configuration(),
8889
new TestingLakeTieringFactory());
8990
committerOperator.open();
9091
}
@@ -261,6 +262,7 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
261262
new TieringCommitOperator<>(
262263
parameters,
263264
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
265+
new org.apache.fluss.config.Configuration(),
264266
new TestingLakeTieringFactory(testingLakeCommitter));
265267
committerOperator.open();
266268

@@ -321,6 +323,7 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
321323
new TieringCommitOperator<>(
322324
parameters,
323325
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
326+
new org.apache.fluss.config.Configuration(),
324327
new TestingLakeTieringFactory(testingLakeCommitter));
325328
committerOperator.open();
326329

fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@
3030
import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
3131
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
3232
import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
33+
import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;
3334

3435
/** The entrypoint for Flink to tier fluss data to lake format like paimon. */
3536
public class FlussLakeTieringEntrypoint {
3637

3738
private static final String FLUSS_CONF_PREFIX = "fluss.";
39+
private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering.";
3840

3941
public static void main(String[] args) throws Exception {
4042

@@ -65,6 +67,10 @@ public static void main(String[] args) throws Exception {
6567
extractAndRemovePrefix(
6668
paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));
6769

70+
// extract tiering service config
71+
Map<String, String> lakeTieringConfigMap =
72+
extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);
73+
6874
// now, we must use full restart strategy if any task is failed,
6975
// since committer is stateless, if tiering committer is failover, committer
7076
// will lost the collected committable, and will never collect all committable to do commit
@@ -83,6 +89,7 @@ public static void main(String[] args) throws Exception {
8389
execEnv,
8490
Configuration.fromMap(flussConfigMap),
8591
Configuration.fromMap(lakeConfigMap),
92+
Configuration.fromMap(lakeTieringConfigMap),
8693
dataLake)
8794
.build();
8895

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws E
161161
execEnv,
162162
flussConfig,
163163
Configuration.fromMap(getIcebergCatalogConf()),
164+
new Configuration(),
164165
DataLakeFormat.ICEBERG.toString())
165166
.build();
166167
}

0 commit comments

Comments
 (0)