diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 7eb2039999..43c2445544 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1405,6 +1405,13 @@ public class ConfigOptions { .withDescription( "If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default."); + public static final ConfigOption TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT = + key("table.datalake.auto-expire-snapshot") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default."); + public static final ConfigOption TABLE_MERGE_ENGINE = key("table.merge-engine") .enumType(MergeEngineType.class) @@ -1777,6 +1784,20 @@ public class ConfigOptions { "The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. " + "In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi."); + // ------------------------------------------------------------------------ + // ConfigOptions for tiering service + // ------------------------------------------------------------------------ + + public static final ConfigOption LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT = + key("lake.tiering.auto-expire-snapshot") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake, " + + "even if " + + ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT + + " is false."); + // ------------------------------------------------------------------------ // ConfigOptions for fluss kafka // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index fc7966ab0d..80d2ee8f78 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -101,6 +101,11 @@ public boolean isDataLakeAutoCompaction() { return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION); } + /** Whether auto expire snapshot is enabled. */ + public boolean isDataLakeAutoExpireSnapshot() { + return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT); + } + /** Gets the optional merge engine type of the table. */ public Optional getMergeEngineType() { return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE); diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java b/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java index 263d3a2fd2..dab6b14852 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java @@ -18,11 +18,13 @@ package org.apache.fluss.lake.committer; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; /** * The CommitterInitContext interface provides the context needed to create a LakeCommitter. It - * includes methods to obtain the table path. + * includes methods to obtain the table path, table info and lake tiering configs. * * @since 0.7 */ @@ -35,4 +37,18 @@ public interface CommitterInitContext { * @return the table path */ TablePath tablePath(); + + /** + * Returns the table info. + * + * @return the table info + */ + TableInfo tableInfo(); + + /** + * Returns the lake tiering config. + * + * @return the lake tiering config + */ + Configuration lakeTieringConfig(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java index 85eed128c7..74cf91e0e0 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java @@ -46,16 +46,19 @@ public class LakeTieringJobBuilder { private final StreamExecutionEnvironment env; private final Configuration flussConfig; private final Configuration dataLakeConfig; + private final Configuration lakeTieringConfig; private final String dataLakeFormat; private LakeTieringJobBuilder( StreamExecutionEnvironment env, Configuration flussConfig, Configuration dataLakeConfig, + Configuration lakeTieringConfig, String dataLakeFormat) { this.env = checkNotNull(env); this.flussConfig = checkNotNull(flussConfig); this.dataLakeConfig = checkNotNull(dataLakeConfig); + this.lakeTieringConfig = checkNotNull(lakeTieringConfig); this.dataLakeFormat = checkNotNull(dataLakeFormat); } @@ -63,8 +66,10 @@ public static LakeTieringJobBuilder newBuilder( StreamExecutionEnvironment env, Configuration flussConfig, Configuration dataLakeConfig, + Configuration lakeTieringConfig, String dataLakeFormat) { - return new LakeTieringJobBuilder(env, flussConfig, dataLakeConfig, dataLakeFormat); + return new LakeTieringJobBuilder( + env, flussConfig, dataLakeConfig, lakeTieringConfig, dataLakeFormat); } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -99,7 +104,8 @@ public JobClient build() throws Exception { "TieringCommitter", CommittableMessageTypeInfo.of( () -> lakeTieringFactory.getCommittableSerializer()), - new TieringCommitOperatorFactory(flussConfig, lakeTieringFactory)) + new TieringCommitOperatorFactory( + flussConfig, lakeTieringConfig, lakeTieringFactory)) .setParallelism(1) .setMaxParallelism(1) .sinkTo(new DiscardingSink()) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java index ebe4a1ae68..a678a4b237 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java @@ -89,6 +89,7 @@ public class TieringCommitOperator private static final long serialVersionUID = 1L; private final Configuration flussConfig; + private final Configuration lakeTieringConfig; private final LakeTieringFactory lakeTieringFactory; private final FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter; private Connection connection; @@ -105,11 +106,13 @@ public class TieringCommitOperator public TieringCommitOperator( StreamOperatorParameters> parameters, Configuration flussConf, + Configuration lakeTieringConfig, LakeTieringFactory lakeTieringFactory) { this.lakeTieringFactory = lakeTieringFactory; this.flussTableLakeSnapshotCommitter = new FlussTableLakeSnapshotCommitter(flussConf); this.collectedTableBucketWriteResults = new HashMap<>(); this.flussConfig = flussConf; + this.lakeTieringConfig = lakeTieringConfig; this.operatorEventGateway = parameters .getOperatorEventDispatcher() @@ -204,7 +207,10 @@ private Committable commitWriteResults( } try (LakeCommitter lakeCommitter = lakeTieringFactory.createLakeCommitter( - new TieringCommitterInitContext(tablePath))) { + new TieringCommitterInitContext( + tablePath, + admin.getTableInfo(tablePath).get(), + lakeTieringConfig))) { List writeResults = committableWriteResults.stream() .map(TableBucketWriteResult::writeResult) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java index 65d6f377e8..efced7aeab 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java @@ -33,12 +33,15 @@ public class TieringCommitOperatorFactory TableBucketWriteResult, CommittableMessage> { private final Configuration flussConfig; + private final Configuration lakeTieringConfig; private final LakeTieringFactory lakeTieringFactory; public TieringCommitOperatorFactory( Configuration flussConfig, + Configuration lakeTieringConfig, LakeTieringFactory lakeTieringFactory) { this.flussConfig = flussConfig; + this.lakeTieringConfig = lakeTieringConfig; this.lakeTieringFactory = lakeTieringFactory; } @@ -47,7 +50,8 @@ public >> T createStrea StreamOperatorParameters> parameters) { TieringCommitOperator commitOperator = - new TieringCommitOperator<>(parameters, flussConfig, lakeTieringFactory); + new TieringCommitOperator<>( + parameters, flussConfig, lakeTieringConfig, lakeTieringFactory); @SuppressWarnings("unchecked") final T castedOperator = (T) commitOperator; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java index 69c5aff619..3cae145f06 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java @@ -17,21 +17,38 @@ package org.apache.fluss.flink.tiering.committer; +import org.apache.fluss.config.Configuration; import org.apache.fluss.lake.committer.CommitterInitContext; import org.apache.fluss.lake.committer.LakeCommitter; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; /** The {@link CommitterInitContext} implementation for {@link LakeCommitter}. */ public class TieringCommitterInitContext implements CommitterInitContext { private final TablePath tablePath; + private final TableInfo tableInfo; + private final Configuration lakeTieringConfig; - public TieringCommitterInitContext(TablePath tablePath) { + public TieringCommitterInitContext( + TablePath tablePath, TableInfo tableInfo, Configuration lakeTieringConfig) { this.tablePath = tablePath; + this.tableInfo = tableInfo; + this.lakeTieringConfig = lakeTieringConfig; } @Override public TablePath tablePath() { return tablePath; } + + @Override + public TableInfo tableInfo() { + return tableInfo; + } + + @Override + public Configuration lakeTieringConfig() { + return lakeTieringConfig; + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java index 593dcadd18..230460395a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java @@ -85,6 +85,7 @@ void beforeEach() throws Exception { new TieringCommitOperator<>( parameters, FLUSS_CLUSTER_EXTENSION.getClientConfig(), + new org.apache.fluss.config.Configuration(), new TestingLakeTieringFactory()); committerOperator.open(); } @@ -261,6 +262,7 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception { new TieringCommitOperator<>( parameters, FLUSS_CLUSTER_EXTENSION.getClientConfig(), + new org.apache.fluss.config.Configuration(), new TestingLakeTieringFactory(testingLakeCommitter)); committerOperator.open(); @@ -321,6 +323,7 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { new TieringCommitOperator<>( parameters, FLUSS_CLUSTER_EXTENSION.getClientConfig(), + new org.apache.fluss.config.Configuration(), new TestingLakeTieringFactory(testingLakeCommitter)); committerOperator.open(); diff --git a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java index 74236c91a7..a2ce6ce1af 100644 --- a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java +++ b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java @@ -30,11 +30,13 @@ import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME; import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX; import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix; +import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; /** The entrypoint for Flink to tier fluss data to lake format like paimon. */ public class FlussLakeTieringEntrypoint { private static final String FLUSS_CONF_PREFIX = "fluss."; + private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering."; public static void main(String[] args) throws Exception { @@ -65,6 +67,10 @@ public static void main(String[] args) throws Exception { extractAndRemovePrefix( paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake)); + // extract tiering service config + Map lakeTieringConfigMap = + extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX); + // now, we must use full restart strategy if any task is failed, // since committer is stateless, if tiering committer is failover, committer // will lost the collected committable, and will never collect all committable to do commit @@ -83,6 +89,7 @@ public static void main(String[] args) throws Exception { execEnv, Configuration.fromMap(flussConfigMap), Configuration.fromMap(lakeConfigMap), + Configuration.fromMap(lakeTieringConfigMap), dataLake) .build(); diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java index a2997f98c0..a7c922ba5a 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java @@ -161,6 +161,7 @@ protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws E execEnv, flussConfig, Configuration.fromMap(getIcebergCatalogConf()), + new Configuration(), DataLakeFormat.ICEBERG.toString()) .build(); } diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index c81c331f19..ba59b1b742 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -19,6 +19,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.committer.CommitterInitContext; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; import org.apache.fluss.lake.writer.LakeWriter; @@ -185,7 +186,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable // second, commit data try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath)) { + createLakeCommitter(tablePath, tableInfo)) { // serialize/deserialize committable IcebergCommittable icebergCommittable = lakeCommitter.toCommittable(icebergWriteResults); @@ -246,8 +247,24 @@ public TableInfo tableInfo() { } private LakeCommitter createLakeCommitter( - TablePath tablePath) throws IOException { - return icebergLakeTieringFactory.createLakeCommitter(() -> tablePath); + TablePath tablePath, TableInfo tableInfo) throws IOException { + return icebergLakeTieringFactory.createLakeCommitter( + new CommitterInitContext() { + @Override + public TablePath tablePath() { + return tablePath; + } + + @Override + public TableInfo tableInfo() { + return tableInfo; + } + + @Override + public Configuration lakeTieringConfig() { + return new Configuration(); + } + }); } private Tuple2, List> genLogTableRecords( diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java index a548e3558e..0b94cedb5a 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java @@ -206,6 +206,7 @@ protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws E execEnv, flussConfig, Configuration.fromMap(getLanceCatalogConf()), + new Configuration(), DataLakeFormat.LANCE.toString()) .build(); } diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java index 642bd7afd1..1157daf767 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java @@ -20,6 +20,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.lake.committer.CommittedLakeSnapshot; +import org.apache.fluss.lake.committer.CommitterInitContext; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.lance.LanceConfig; import org.apache.fluss.lake.lance.utils.LanceArrowUtils; @@ -114,7 +115,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception { lanceLakeTieringFactory.getCommittableSerializer(); try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath)) { + createLakeCommitter(tablePath, tableInfo)) { // should no any missing snapshot assertThat(lakeCommitter.getMissingLakeSnapshot(2L)).isNull(); } @@ -159,7 +160,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception { // second, commit data try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath)) { + createLakeCommitter(tablePath, tableInfo)) { // serialize/deserialize committable LanceCommittable lanceCommittable = lakeCommitter.toCommittable(lanceWriteResults); byte[] serialized = committableSerializer.serialize(lanceCommittable); @@ -196,7 +197,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception { // then, let's verify getMissingLakeSnapshot works try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath)) { + createLakeCommitter(tablePath, tableInfo)) { // use snapshot id 1 as the known snapshot id CommittedLakeSnapshot committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(1L); assertThat(committedLakeSnapshot).isNotNull(); @@ -242,8 +243,24 @@ private void verifyLogTableRecords( } private LakeCommitter createLakeCommitter( - TablePath tablePath) throws IOException { - return lanceLakeTieringFactory.createLakeCommitter(() -> tablePath); + TablePath tablePath, TableInfo tableInfo) throws IOException { + return lanceLakeTieringFactory.createLakeCommitter( + new CommitterInitContext() { + @Override + public TablePath tablePath() { + return tablePath; + } + + @Override + public TableInfo tableInfo() { + return tableInfo; + } + + @Override + public Configuration lakeTieringConfig() { + return new Configuration(); + } + }); } private LakeWriter createLakeWriter( diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java index 8351a377fe..ee04193125 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java @@ -17,8 +17,10 @@ package org.apache.fluss.lake.paimon.tiering; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.lake.committer.BucketOffset; import org.apache.fluss.lake.committer.CommittedLakeSnapshot; +import org.apache.fluss.lake.committer.CommitterInitContext; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -32,15 +34,15 @@ import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.SimpleFileEntry; -import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitCallback; +import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.utils.SnapshotManager; import javax.annotation.Nullable; import java.io.IOException; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -55,14 +57,25 @@ public class PaimonLakeCommitter implements LakeCommitter currentCommitSnapshotId = new ThreadLocal<>(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public PaimonLakeCommitter(PaimonCatalogProvider paimonCatalogProvider, TablePath tablePath) + public PaimonLakeCommitter( + PaimonCatalogProvider paimonCatalogProvider, CommitterInitContext committerInitContext) throws IOException { this.paimonCatalog = paimonCatalogProvider.get(); - this.fileStoreTable = getTable(tablePath); + this.fileStoreTable = + getTable( + committerInitContext.tablePath(), + committerInitContext + .tableInfo() + .getTableConfig() + .isDataLakeAutoExpireSnapshot() + || committerInitContext + .lakeTieringConfig() + .get(ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT)); } @Override @@ -82,19 +95,17 @@ public long commit(PaimonCommittable committable, Map snapshotPr snapshotProperties.forEach(manifestCommittable::addProperty); try { - fileStoreCommit = - fileStoreTable - .store() - .newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, fileStoreTable); - fileStoreCommit.commit(manifestCommittable, false); + tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER); + tableCommit.commit(manifestCommittable); + Long commitSnapshotId = currentCommitSnapshotId.get(); currentCommitSnapshotId.remove(); return checkNotNull(commitSnapshotId, "Paimon committed snapshot id must be non-null."); } catch (Throwable t) { - if (fileStoreCommit != null) { + if (tableCommit != null) { // if any error happen while commit, abort the commit to clean committable - fileStoreCommit.abort(manifestCommittable.fileCommittables()); + tableCommit.abort(manifestCommittable.fileCommittables()); } throw new IOException(t); } @@ -102,9 +113,8 @@ public long commit(PaimonCommittable committable, Map snapshotPr @Override public void abort(PaimonCommittable committable) throws IOException { - fileStoreCommit = - fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, fileStoreTable); - fileStoreCommit.abort(committable.manifestCommittable().fileCommittables()); + tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER); + tableCommit.abort(committable.manifestCommittable().fileCommittables()); } @Nullable @@ -190,8 +200,8 @@ private Snapshot getCommittedLatestSnapshotOfLake(String commitUser) throws IOEx @Override public void close() throws Exception { try { - if (fileStoreCommit != null) { - fileStoreCommit.close(); + if (tableCommit != null) { + tableCommit.close(); } if (paimonCatalog != null) { paimonCatalog.close(); @@ -201,19 +211,20 @@ public void close() throws Exception { } } - private FileStoreTable getTable(TablePath tablePath) throws IOException { + private FileStoreTable getTable(TablePath tablePath, boolean isAutoSnapshotExpiration) + throws IOException { try { - FileStoreTable table = - (FileStoreTable) - paimonCatalog - .getTable(toPaimon(tablePath)) - .copy( - Collections.singletonMap( - CoreOptions.COMMIT_CALLBACKS.key(), - PaimonLakeCommitter.PaimonCommitCallback.class - .getName())); - - return table; + FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath)); + + Map dynamicOptions = new HashMap<>(); + dynamicOptions.put( + CoreOptions.COMMIT_CALLBACKS.key(), + PaimonLakeCommitter.PaimonCommitCallback.class.getName()); + dynamicOptions.put( + CoreOptions.WRITE_ONLY.key(), + isAutoSnapshotExpiration ? Boolean.FALSE.toString() : Boolean.TRUE.toString()); + + return table.copy(dynamicOptions); } catch (Exception e) { throw new IOException("Failed to get table " + tablePath + " in Paimon.", e); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java index c69a997900..432620efb3 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java @@ -53,7 +53,7 @@ public SimpleVersionedSerializer getWriteResultSerializer() { @Override public LakeCommitter createLakeCommitter( CommitterInitContext committerInitContext) throws IOException { - return new PaimonLakeCommitter(paimonCatalogProvider, committerInitContext.tablePath()); + return new PaimonLakeCommitter(paimonCatalogProvider, committerInitContext); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java index 5b86e6a8f3..74cd75b7d1 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java @@ -128,6 +128,7 @@ protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws E execEnv, flussConfig, Configuration.fromMap(getPaimonCatalogConf()), + new Configuration(), DataLakeFormat.PAIMON.toString()) .build(); } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java index afc43b86c1..49f80d9991 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java @@ -20,6 +20,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.lake.committer.CommittedLakeSnapshot; +import org.apache.fluss.lake.committer.CommitterInitContext; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; import org.apache.fluss.lake.writer.LakeWriter; @@ -49,6 +50,7 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -106,6 +108,18 @@ private static Stream tieringWriteArgs() { Arguments.of(false, false)); } + private static Stream snapshotExpireArgs() { + return Stream.of( + Arguments.of(true, true, true), + Arguments.of(true, true, false), + Arguments.of(true, false, true), + Arguments.of(true, false, false), + Arguments.of(false, true, true), + Arguments.of(false, true, false), + Arguments.of(false, false, true), + Arguments.of(false, false, false)); + } + @ParameterizedTest @MethodSource("tieringWriteArgs") void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) throws Exception { @@ -118,7 +132,11 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr isPrimaryKeyTable ? "primary_key" : "log", isPartitioned ? "partitioned" : "non_partitioned")); createTable( - tablePath, isPrimaryKeyTable, isPartitioned, isPrimaryKeyTable ? bucketNum : null); + tablePath, + isPrimaryKeyTable, + isPartitioned, + isPrimaryKeyTable ? bucketNum : null, + Collections.emptyMap()); TableDescriptor descriptor = TableDescriptor.builder() .schema( @@ -132,14 +150,8 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr .build(); TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); - List paimonWriteResults = new ArrayList<>(); - SimpleVersionedSerializer writeResultSerializer = - paimonLakeTieringFactory.getWriteResultSerializer(); - SimpleVersionedSerializer committableSerializer = - paimonLakeTieringFactory.getCommittableSerializer(); - try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath)) { + createLakeCommitter(tablePath, tableInfo, new Configuration())) { // should no any missing snapshot assertThat(lakeCommitter.getMissingLakeSnapshot(1L)).isNull(); } @@ -155,49 +167,9 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr } } : Collections.singletonMap(null, null); - Map tableBucketOffsets = new HashMap<>(); - // first, write data - for (int bucket = 0; bucket < bucketNum; bucket++) { - for (Map.Entry entry : partitionIdAndName.entrySet()) { - String partition = entry.getValue(); - try (LakeWriter lakeWriter = - createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) { - Tuple2 partitionBucket = Tuple2.of(partition, bucket); - Tuple2, List> writeAndExpectRecords = - isPrimaryKeyTable - ? genPrimaryKeyTableRecords(partition, bucket) - : genLogTableRecords(partition, bucket, 10); - List writtenRecords = writeAndExpectRecords.f0; - List expectRecords = writeAndExpectRecords.f1; - recordsByBucket.put(partitionBucket, expectRecords); - tableBucketOffsets.put(new TableBucket(0, entry.getKey(), bucket), 10L); - for (LogRecord logRecord : writtenRecords) { - lakeWriter.write(logRecord); - } - // serialize/deserialize writeResult - PaimonWriteResult paimonWriteResult = lakeWriter.complete(); - byte[] serialized = writeResultSerializer.serialize(paimonWriteResult); - paimonWriteResults.add( - writeResultSerializer.deserialize( - writeResultSerializer.getVersion(), serialized)); - } - } - } - // second, commit data - try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath)) { - // serialize/deserialize committable - PaimonCommittable paimonCommittable = lakeCommitter.toCommittable(paimonWriteResults); - byte[] serialized = committableSerializer.serialize(paimonCommittable); - paimonCommittable = - committableSerializer.deserialize( - committableSerializer.getVersion(), serialized); - long snapshot = - lakeCommitter.commit( - paimonCommittable, toBucketOffsetsProperty(tableBucketOffsets)); - assertThat(snapshot).isEqualTo(1); - } + // firstly, write some data + writeData(tableInfo, recordsByBucket, partitionIdAndName); // then, check data for (int bucket = 0; bucket < 3; bucket++) { @@ -212,7 +184,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr // then, let's verify getMissingLakeSnapshot works try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath)) { + createLakeCommitter(tablePath, tableInfo, new Configuration())) { // use snapshot id 0 as the known snapshot id CommittedLakeSnapshot committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(0L); assertThat(committedLakeSnapshot).isNotNull(); @@ -292,7 +264,7 @@ void testMultiPartitionTiering() throws Exception { // Commit all data try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath)) { + createLakeCommitter(tablePath, tableInfo, new Configuration())) { PaimonCommittable committable = lakeCommitter.toCommittable(paimonWriteResults); long snapshot = lakeCommitter.commit(committable, toBucketOffsetsProperty(tableBucketOffsets)); @@ -364,7 +336,7 @@ void testThreePartitionTiering() throws Exception { // Commit all data long snapshot; try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath)) { + createLakeCommitter(tablePath, tableInfo, new Configuration())) { PaimonCommittable committable = lakeCommitter.toCommittable(paimonWriteResults); snapshot = lakeCommitter.commit(committable, toBucketOffsetsProperty(tableBucketOffsets)); @@ -387,6 +359,77 @@ void testThreePartitionTiering() throws Exception { } } + @ParameterizedTest + @MethodSource("snapshotExpireArgs") + void testSnapshotExpiration( + boolean isPartitioned, + boolean isTableAutoExpireSnapshot, + boolean isLakeTieringExpireSnapshot) + throws Exception { + int bucketNum = 3; + TablePath tablePath = + TablePath.of( + "paimon", + String.format( + "test_tiering_snapshot_expiration_table_%s", + isPartitioned ? "partitioned" : "non_partitioned")); + Map options = new HashMap<>(); + options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1"); + options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "2"); + createTable(tablePath, false, isPartitioned, null, options); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column("c1", org.apache.fluss.types.DataTypes.INT()) + .column("c2", org.apache.fluss.types.DataTypes.STRING()) + .column("c3", org.apache.fluss.types.DataTypes.STRING()) + .build()) + .distributedBy(bucketNum) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .property( + ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT, + isTableAutoExpireSnapshot) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + // Get the FileStoreTable to verify snapshots + FileStoreTable fileStoreTable = + (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath)); + SnapshotManager snapshotManager = fileStoreTable.snapshotManager(); + + Map partitionIdAndName = + isPartitioned + ? new HashMap() { + { + put(1L, "p1"); + put(2L, "p2"); + put(3L, "p3"); + } + } + : Collections.singletonMap(null, null); + + Configuration lakeTieringConfig = new Configuration(); + lakeTieringConfig.set( + ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT, isLakeTieringExpireSnapshot); + + // Write some data to generate 2 snapshots + writeData(tableInfo, lakeTieringConfig, new HashMap<>(), partitionIdAndName); + writeData(tableInfo, lakeTieringConfig, new HashMap<>(), partitionIdAndName); + assertThat(snapshotManager.snapshotCount()).isEqualTo(2); + + // write more data + for (int i = 0; i < 5; i++) { + writeData(tableInfo, lakeTieringConfig, new HashMap<>(), partitionIdAndName); + if (isTableAutoExpireSnapshot || isLakeTieringExpireSnapshot) { + // if auto snapshot expiration is enabled, snapshot should be expired + assertThat(snapshotManager.snapshotCount()).isEqualTo(2); + } else { + // if auto snapshot expiration is disabled, snapshot should never be expired + assertThat(snapshotManager.snapshotCount()).isGreaterThan(2); + } + } + } + private void verifyLogTableRecordsMultiPartition( CloseableIterator actualRecords, List expectRecords, @@ -704,15 +747,33 @@ public TableInfo tableInfo() { } private LakeCommitter createLakeCommitter( - TablePath tablePath) throws IOException { - return paimonLakeTieringFactory.createLakeCommitter(() -> tablePath); + TablePath tablePath, TableInfo tableInfo, Configuration lakeTieringConfig) + throws IOException { + return paimonLakeTieringFactory.createLakeCommitter( + new CommitterInitContext() { + @Override + public TablePath tablePath() { + return tablePath; + } + + @Override + public TableInfo tableInfo() { + return tableInfo; + } + + @Override + public Configuration lakeTieringConfig() { + return lakeTieringConfig; + } + }); } private void createTable( TablePath tablePath, boolean isPrimaryTable, boolean isPartitioned, - @Nullable Integer numBuckets) + @Nullable Integer numBuckets, + Map options) throws Exception { Schema.Builder builder = Schema.newBuilder() @@ -735,6 +796,7 @@ private void createTable( if (numBuckets != null) { builder.option(CoreOptions.BUCKET.key(), String.valueOf(numBuckets)); } + builder.options(options); doCreatePaimonTable(tablePath, builder); } @@ -784,4 +846,70 @@ private String getSnapshotLogOffsetProperty(TablePath tablePath, long snapshotId .properties() .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY); } + + private void writeData( + TableInfo tableInfo, + Map, List> recordsByBucket, + Map partitionIdAndName) + throws Exception { + writeData(tableInfo, new Configuration(), recordsByBucket, partitionIdAndName); + } + + private void writeData( + TableInfo tableInfo, + Configuration lakeTieringConfig, + Map, List> recordsByBucket, + Map partitionIdAndName) + throws Exception { + TablePath tablePath = tableInfo.getTablePath(); + int bucketNum = tableInfo.getNumBuckets(); + boolean isPrimaryKeyTable = tableInfo.hasPrimaryKey(); + + List paimonWriteResults = new ArrayList<>(); + SimpleVersionedSerializer writeResultSerializer = + paimonLakeTieringFactory.getWriteResultSerializer(); + SimpleVersionedSerializer committableSerializer = + paimonLakeTieringFactory.getCommittableSerializer(); + + Map tableBucketOffsets = new HashMap<>(); + // first, write data + for (int bucket = 0; bucket < bucketNum; bucket++) { + for (Map.Entry entry : partitionIdAndName.entrySet()) { + String partition = entry.getValue(); + try (LakeWriter lakeWriter = + createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) { + Tuple2 partitionBucket = Tuple2.of(partition, bucket); + Tuple2, List> writeAndExpectRecords = + isPrimaryKeyTable + ? genPrimaryKeyTableRecords(partition, bucket) + : genLogTableRecords(partition, bucket, 10); + List writtenRecords = writeAndExpectRecords.f0; + List expectRecords = writeAndExpectRecords.f1; + recordsByBucket.put(partitionBucket, expectRecords); + tableBucketOffsets.put(new TableBucket(0, entry.getKey(), bucket), 10L); + for (LogRecord logRecord : writtenRecords) { + lakeWriter.write(logRecord); + } + // serialize/deserialize writeResult + PaimonWriteResult paimonWriteResult = lakeWriter.complete(); + byte[] serialized = writeResultSerializer.serialize(paimonWriteResult); + paimonWriteResults.add( + writeResultSerializer.deserialize( + writeResultSerializer.getVersion(), serialized)); + } + } + } + + // second, commit data + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo, lakeTieringConfig)) { + // serialize/deserialize committable + PaimonCommittable paimonCommittable = lakeCommitter.toCommittable(paimonWriteResults); + byte[] serialized = committableSerializer.serialize(paimonCommittable); + paimonCommittable = + committableSerializer.deserialize( + committableSerializer.getVersion(), serialized); + lakeCommitter.commit(paimonCommittable, toBucketOffsetsProperty(tableBucketOffsets)); + } + } } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 0c6f54cd5c..46c34da57a 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -414,6 +414,9 @@ org.apache.fluss.flink.tiering.committer.CommittableMessageTypeInfo* + + org.apache.fluss.flink.tiering.committer.TieringCommitterInitContext + org.apache.fluss.flink.tiering.LakeTieringJobBuilder diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index 6c637240e1..d6462fbfa0 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -61,31 +61,32 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) ## Storage Options -| Option | Type | Default | Description | -|-----------------------------------------|----------|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| bucket.num | int | The bucket number of Fluss cluster. | The number of buckets of a Fluss table. | -| bucket.key | String | (None) | Specific the distribution policy of the Fluss table. Data will be distributed to each bucket according to the hash value of bucket-key (It must be a subset of the primary keys excluding partition keys of the primary key table). If you specify multiple fields, delimiter is `,`. If the table has a primary key and a bucket key is not specified, the bucket key will be used as primary key(excluding the partition key). If the table has no primary key and the bucket key is not specified, the data will be distributed to each bucket randomly. | -| table.log.ttl | Duration | 7 days | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. | -| table.auto-partition.enabled | Boolean | false | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. | -| table.auto-partition.key | String | (None) | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple partition keys (such as a composite partitioning strategy), this feature determines which key should serve as the primary time dimension for making auto-partitioning decisions. And If the table has only one partition key, this config is not necessary. Otherwise, it must be specified. | -| table.auto-partition.time-unit | ENUM | DAY | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the partition format for auto created is yyyyQ. If the value is `YEAR`, the partition format for auto created is yyyy. | -| table.auto-partition.num-precreate | Integer | 2 | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `table.auto-partition.time-unit` is `DAY`(default), one precreated partition is for today and another one is for tomorrow. For a partition table with multiple partition keys, pre-create is unsupported and will be set to 0 automatically when creating table if it is not explicitly specified. | -| table.auto-partition.num-retention | Integer | 7 | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7, which means that 7 partitions will be retained. | -| table.auto-partition.time-zone | String | the system time zone | The time zone for auto partitions, which is by default the same as the system time zone. | -| table.replication.factor | Integer | (None) | The replication factor for the log of the new table. When it's not set, Fluss will use the cluster's default replication factor configured by default.replication.factor. It should be a positive number and not larger than the number of tablet servers in the Fluss cluster. A value larger than the number of tablet servers in Fluss cluster will result in an error when the new table is created. | -| table.log.format | Enum | ARROW | The format of the log records in log store. The default value is `ARROW`. The supported formats are `ARROW` and `INDEXED`. | -| table.log.arrow.compression.type | Enum | ZSTD | The compression type of the log records if the log format is set to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The default value is `ZSTD`. | -| table.log.arrow.compression.zstd.level | Integer | 3 | The compression level of the log records if the log format is set to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to 22. The default value is 3. | -| table.kv.format | Enum | COMPACTED | The format of the kv records in kv store. The default value is `COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`. | -| table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. | -| table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. | -| table.datalake.format | Enum | (None) | The data lake format of the table specifies the tiered Lakehouse storage format. Currently, supported formats are `paimon`, `iceberg`, and `lance`. In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi. Once the `table.datalake.format` property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This ensures consistency in key encoding and bucketing, enabling seamless **Union Read** functionality across Fluss and Lakehouse. The `table.datalake.format` can be pre-defined before enabling `table.datalake.enabled`. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. If `table.datalake.format` is not explicitly set during table creation, the table will default to the format specified by the `datalake.format` configuration in the Fluss cluster. | -| table.datalake.freshness | Duration | 3min | It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. If the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs. | -| table.datalake.auto-compaction | Boolean | false | If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default. | -| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table uses the [default merge engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md). It also supports two merge engines are `first_row` and `versioned`. The [first_row merge engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](table-design/table-types/pk-table/merge-engines/versioned.md) will keep the row with the largest version of the same primary key. | -| table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the `versioned` merge engine. If the merge engine is set to `versioned`, the version column must be set. | -| table.delete.behavior | Enum | ALLOW | Controls the behavior of delete operations on primary key tables. Three modes are supported: `ALLOW` (default) - allows normal delete operations; `IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects delete requests and throws explicit errors. This configuration provides system-level guarantees for some downstream pipelines (e.g., Flink Delta Join) that must not receive any delete events in the changelog of the table. For tables with `first_row` or `versioned` merge engines, this option is automatically set to `IGNORE` and cannot be overridden. Only applicable to primary key tables. | -| table.changelog.image | Enum | FULL | Defines the changelog image mode for primary key tables. This configuration is inspired by similar settings in database systems like MySQL's `binlog_row_image` and PostgreSQL's `replica identity`. Two modes are supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values; `WAL` - does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. When WAL mode is enabled with default merge engine (no merge engine configured) and full row updates (not partial update), an optimization is applied to skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER events. This mode reduces storage and transmission costs but loses the ability to track previous values. Only applicable to primary key tables. | +| Option | Type | Default | Description | +|-----------------------------------------|----------|-------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| bucket.num | int | The bucket number of Fluss cluster. | The number of buckets of a Fluss table. | +| bucket.key | String | (None) | Specific the distribution policy of the Fluss table. Data will be distributed to each bucket according to the hash value of bucket-key (It must be a subset of the primary keys excluding partition keys of the primary key table). If you specify multiple fields, delimiter is `,`. If the table has a primary key and a bucket key is not specified, the bucket key will be used as primary key(excluding the partition key). If the table has no primary key and the bucket key is not specified, the data will be distributed to each bucket randomly. | +| table.log.ttl | Duration | 7 days | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. | +| table.auto-partition.enabled | Boolean | false | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. | +| table.auto-partition.key | String | (None) | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple partition keys (such as a composite partitioning strategy), this feature determines which key should serve as the primary time dimension for making auto-partitioning decisions. And If the table has only one partition key, this config is not necessary. Otherwise, it must be specified. | +| table.auto-partition.time-unit | ENUM | DAY | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the partition format for auto created is yyyyQ. If the value is `YEAR`, the partition format for auto created is yyyy. | +| table.auto-partition.num-precreate | Integer | 2 | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `table.auto-partition.time-unit` is `DAY`(default), one precreated partition is for today and another one is for tomorrow. For a partition table with multiple partition keys, pre-create is unsupported and will be set to 0 automatically when creating table if it is not explicitly specified. | +| table.auto-partition.num-retention | Integer | 7 | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7, which means that 7 partitions will be retained. | +| table.auto-partition.time-zone | String | the system time zone | The time zone for auto partitions, which is by default the same as the system time zone. | +| table.replication.factor | Integer | (None) | The replication factor for the log of the new table. When it's not set, Fluss will use the cluster's default replication factor configured by default.replication.factor. It should be a positive number and not larger than the number of tablet servers in the Fluss cluster. A value larger than the number of tablet servers in Fluss cluster will result in an error when the new table is created. | +| table.log.format | Enum | ARROW | The format of the log records in log store. The default value is `ARROW`. The supported formats are `ARROW` and `INDEXED`. | +| table.log.arrow.compression.type | Enum | ZSTD | The compression type of the log records if the log format is set to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The default value is `ZSTD`. | +| table.log.arrow.compression.zstd.level | Integer | 3 | The compression level of the log records if the log format is set to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to 22. The default value is 3. | +| table.kv.format | Enum | COMPACTED | The format of the kv records in kv store. The default value is `COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`. | +| table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. | +| table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. | +| table.datalake.format | Enum | (None) | The data lake format of the table specifies the tiered Lakehouse storage format. Currently, supported formats are `paimon`, `iceberg`, and `lance`. In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi. Once the `table.datalake.format` property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This ensures consistency in key encoding and bucketing, enabling seamless **Union Read** functionality across Fluss and Lakehouse. The `table.datalake.format` can be pre-defined before enabling `table.datalake.enabled`. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. If `table.datalake.format` is not explicitly set during table creation, the table will default to the format specified by the `datalake.format` configuration in the Fluss cluster. | +| table.datalake.freshness | Duration | 3min | It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. If the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs. | +| table.datalake.auto-compaction | Boolean | false | If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default. | +| table.datalake.auto-expire-snapshot | Boolean | false | If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default. | +| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table uses the [default merge engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md). It also supports two merge engines are `first_row` and `versioned`. The [first_row merge engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](table-design/table-types/pk-table/merge-engines/versioned.md) will keep the row with the largest version of the same primary key. | +| table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the `versioned` merge engine. If the merge engine is set to `versioned`, the version column must be set. | +| table.delete.behavior | Enum | ALLOW | Controls the behavior of delete operations on primary key tables. Three modes are supported: `ALLOW` (default) - allows normal delete operations; `IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects delete requests and throws explicit errors. This configuration provides system-level guarantees for some downstream pipelines (e.g., Flink Delta Join) that must not receive any delete events in the changelog of the table. For tables with `first_row` or `versioned` merge engines, this option is automatically set to `IGNORE` and cannot be overridden. Only applicable to primary key tables. | +| table.changelog.image | Enum | FULL | Defines the changelog image mode for primary key tables. This configuration is inspired by similar settings in database systems like MySQL's `binlog_row_image` and PostgreSQL's `replica identity`. Two modes are supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values; `WAL` - does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. When WAL mode is enabled with default merge engine (no merge engine configured) and full row updates (not partial update), an optimization is applied to skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER events. This mode reduces storage and transmission costs but loses the ability to track previous values. Only applicable to primary key tables. | ## Read Options diff --git a/website/docs/maintenance/tiered-storage/lakehouse-storage.md b/website/docs/maintenance/tiered-storage/lakehouse-storage.md index bd7b646d02..35b2394d74 100644 --- a/website/docs/maintenance/tiered-storage/lakehouse-storage.md +++ b/website/docs/maintenance/tiered-storage/lakehouse-storage.md @@ -102,4 +102,12 @@ To enable lakehouse storage for a table, the table must be created with the opti Another option `table.datalake.freshness`, allows per-table configuration of data freshness in the datalake. It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. Based on this target freshness, the Fluss tiering service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. -The default is `3min`, if the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs. \ No newline at end of file +The default is `3min`, if the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs. + +# Datalake Tiering Service Options + +The following table lists the options that can be used to configure the datalake tiering service. + +| Option | Type | Default | Description | +|-----------------------------------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| lake.tiering.auto-expire-snapshot | Boolean | false | If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake, even if `table.datalake.auto-expire-snapshot` is false. | \ No newline at end of file