Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,13 @@ public class ConfigOptions {
.withDescription(
"If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default.");

public static final ConfigOption<Boolean> TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT =
key("table.datalake.auto-expire-snapshot")
.booleanType()
.defaultValue(false)
.withDescription(
"If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default.");

public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
key("table.merge-engine")
.enumType(MergeEngineType.class)
Expand Down Expand Up @@ -1777,6 +1784,20 @@ public class ConfigOptions {
"The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. "
+ "In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi.");

// ------------------------------------------------------------------------
// ConfigOptions for tiering service
// ------------------------------------------------------------------------

public static final ConfigOption<Boolean> LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT =
key("lake.tiering.auto-expire-snapshot")
.booleanType()
.defaultValue(false)
.withDescription(
"If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake, "
+ "even if "
+ ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
+ " is false.");

// ------------------------------------------------------------------------
// ConfigOptions for fluss kafka
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public boolean isDataLakeAutoCompaction() {
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION);
}

/** Whether auto expire snapshot is enabled. */
public boolean isDataLakeAutoExpireSnapshot() {
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT);
}

/** Gets the optional merge engine type of the table. */
public Optional<MergeEngineType> getMergeEngineType() {
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.fluss.lake.committer;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;

/**
* The CommitterInitContext interface provides the context needed to create a LakeCommitter. It
* includes methods to obtain the table path.
* includes methods to obtain the table path, table info and lake tiering configs.
*
* @since 0.7
*/
Expand All @@ -35,4 +37,18 @@ public interface CommitterInitContext {
* @return the table path
*/
TablePath tablePath();

/**
* Returns the table info.
*
* @return the table info
*/
TableInfo tableInfo();

/**
* Returns the lake tiering config.
*
* @return the lake tiering config
*/
Configuration lakeTieringConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,30 @@ public class LakeTieringJobBuilder {
private final StreamExecutionEnvironment env;
private final Configuration flussConfig;
private final Configuration dataLakeConfig;
private final Configuration lakeTieringConfig;
private final String dataLakeFormat;

private LakeTieringJobBuilder(
StreamExecutionEnvironment env,
Configuration flussConfig,
Configuration dataLakeConfig,
Configuration lakeTieringConfig,
String dataLakeFormat) {
this.env = checkNotNull(env);
this.flussConfig = checkNotNull(flussConfig);
this.dataLakeConfig = checkNotNull(dataLakeConfig);
this.lakeTieringConfig = checkNotNull(lakeTieringConfig);
this.dataLakeFormat = checkNotNull(dataLakeFormat);
}

public static LakeTieringJobBuilder newBuilder(
StreamExecutionEnvironment env,
Configuration flussConfig,
Configuration dataLakeConfig,
Configuration lakeTieringConfig,
String dataLakeFormat) {
return new LakeTieringJobBuilder(env, flussConfig, dataLakeConfig, dataLakeFormat);
return new LakeTieringJobBuilder(
env, flussConfig, dataLakeConfig, lakeTieringConfig, dataLakeFormat);
}

@SuppressWarnings({"rawtypes", "unchecked"})
Expand Down Expand Up @@ -99,7 +104,8 @@ public JobClient build() throws Exception {
"TieringCommitter",
CommittableMessageTypeInfo.of(
() -> lakeTieringFactory.getCommittableSerializer()),
new TieringCommitOperatorFactory(flussConfig, lakeTieringFactory))
new TieringCommitOperatorFactory(
flussConfig, lakeTieringConfig, lakeTieringFactory))
.setParallelism(1)
.setMaxParallelism(1)
.sinkTo(new DiscardingSink())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class TieringCommitOperator<WriteResult, Committable>
private static final long serialVersionUID = 1L;

private final Configuration flussConfig;
private final Configuration lakeTieringConfig;
private final LakeTieringFactory<WriteResult, Committable> lakeTieringFactory;
private final FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter;
private Connection connection;
Expand All @@ -105,11 +106,13 @@ public class TieringCommitOperator<WriteResult, Committable>
public TieringCommitOperator(
StreamOperatorParameters<CommittableMessage<Committable>> parameters,
Configuration flussConf,
Configuration lakeTieringConfig,
LakeTieringFactory<WriteResult, Committable> lakeTieringFactory) {
this.lakeTieringFactory = lakeTieringFactory;
this.flussTableLakeSnapshotCommitter = new FlussTableLakeSnapshotCommitter(flussConf);
this.collectedTableBucketWriteResults = new HashMap<>();
this.flussConfig = flussConf;
this.lakeTieringConfig = lakeTieringConfig;
this.operatorEventGateway =
parameters
.getOperatorEventDispatcher()
Expand Down Expand Up @@ -204,7 +207,10 @@ private Committable commitWriteResults(
}
try (LakeCommitter<WriteResult, Committable> lakeCommitter =
lakeTieringFactory.createLakeCommitter(
new TieringCommitterInitContext(tablePath))) {
new TieringCommitterInitContext(
tablePath,
admin.getTableInfo(tablePath).get(),
lakeTieringConfig))) {
List<WriteResult> writeResults =
committableWriteResults.stream()
.map(TableBucketWriteResult::writeResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ public class TieringCommitOperatorFactory<WriteResult, Committable>
TableBucketWriteResult<WriteResult>, CommittableMessage<Committable>> {

private final Configuration flussConfig;
private final Configuration lakeTieringConfig;
private final LakeTieringFactory<WriteResult, Committable> lakeTieringFactory;

public TieringCommitOperatorFactory(
Configuration flussConfig,
Configuration lakeTieringConfig,
LakeTieringFactory<WriteResult, Committable> lakeTieringFactory) {
this.flussConfig = flussConfig;
this.lakeTieringConfig = lakeTieringConfig;
this.lakeTieringFactory = lakeTieringFactory;
}

Expand All @@ -47,7 +50,8 @@ public <T extends StreamOperator<CommittableMessage<Committable>>> T createStrea
StreamOperatorParameters<CommittableMessage<Committable>> parameters) {

TieringCommitOperator<WriteResult, Committable> commitOperator =
new TieringCommitOperator<>(parameters, flussConfig, lakeTieringFactory);
new TieringCommitOperator<>(
parameters, flussConfig, lakeTieringConfig, lakeTieringFactory);

@SuppressWarnings("unchecked")
final T castedOperator = (T) commitOperator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,38 @@

package org.apache.fluss.flink.tiering.committer;

import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;

/** The {@link CommitterInitContext} implementation for {@link LakeCommitter}. */
public class TieringCommitterInitContext implements CommitterInitContext {

private final TablePath tablePath;
private final TableInfo tableInfo;
private final Configuration lakeTieringConfig;

public TieringCommitterInitContext(TablePath tablePath) {
public TieringCommitterInitContext(
TablePath tablePath, TableInfo tableInfo, Configuration lakeTieringConfig) {
this.tablePath = tablePath;
this.tableInfo = tableInfo;
this.lakeTieringConfig = lakeTieringConfig;
}

@Override
public TablePath tablePath() {
return tablePath;
}

@Override
public TableInfo tableInfo() {
return tableInfo;
}

@Override
public Configuration lakeTieringConfig() {
return lakeTieringConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ void beforeEach() throws Exception {
new TieringCommitOperator<>(
parameters,
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
new org.apache.fluss.config.Configuration(),
new TestingLakeTieringFactory());
committerOperator.open();
}
Expand Down Expand Up @@ -261,6 +262,7 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
new TieringCommitOperator<>(
parameters,
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
new org.apache.fluss.config.Configuration(),
new TestingLakeTieringFactory(testingLakeCommitter));
committerOperator.open();

Expand Down Expand Up @@ -321,6 +323,7 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
new TieringCommitOperator<>(
parameters,
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
new org.apache.fluss.config.Configuration(),
new TestingLakeTieringFactory(testingLakeCommitter));
committerOperator.open();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;

/** The entrypoint for Flink to tier fluss data to lake format like paimon. */
public class FlussLakeTieringEntrypoint {

private static final String FLUSS_CONF_PREFIX = "fluss.";
private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering.";

public static void main(String[] args) throws Exception {

Expand Down Expand Up @@ -65,6 +67,10 @@ public static void main(String[] args) throws Exception {
extractAndRemovePrefix(
paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));

// extract tiering service config
Map<String, String> lakeTieringConfigMap =
extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);

// now, we must use full restart strategy if any task is failed,
// since committer is stateless, if tiering committer is failover, committer
// will lost the collected committable, and will never collect all committable to do commit
Expand All @@ -83,6 +89,7 @@ public static void main(String[] args) throws Exception {
execEnv,
Configuration.fromMap(flussConfigMap),
Configuration.fromMap(lakeConfigMap),
Configuration.fromMap(lakeTieringConfigMap),
dataLake)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws E
execEnv,
flussConfig,
Configuration.fromMap(getIcebergCatalogConf()),
new Configuration(),
DataLakeFormat.ICEBERG.toString())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
import org.apache.fluss.lake.writer.LakeWriter;
Expand Down Expand Up @@ -185,7 +186,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable

// second, commit data
try (LakeCommitter<IcebergWriteResult, IcebergCommittable> lakeCommitter =
createLakeCommitter(tablePath)) {
createLakeCommitter(tablePath, tableInfo)) {
// serialize/deserialize committable
IcebergCommittable icebergCommittable =
lakeCommitter.toCommittable(icebergWriteResults);
Expand Down Expand Up @@ -246,8 +247,24 @@ public TableInfo tableInfo() {
}

private LakeCommitter<IcebergWriteResult, IcebergCommittable> createLakeCommitter(
TablePath tablePath) throws IOException {
return icebergLakeTieringFactory.createLakeCommitter(() -> tablePath);
TablePath tablePath, TableInfo tableInfo) throws IOException {
return icebergLakeTieringFactory.createLakeCommitter(
new CommitterInitContext() {
@Override
public TablePath tablePath() {
return tablePath;
}

@Override
public TableInfo tableInfo() {
return tableInfo;
}

@Override
public Configuration lakeTieringConfig() {
return new Configuration();
}
});
}

private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws E
execEnv,
flussConfig,
Configuration.fromMap(getLanceCatalogConf()),
new Configuration(),
DataLakeFormat.LANCE.toString())
.build();
}
Expand Down
Loading