From 34c93df18f52dc8bcac78b64b4b089659ea23dfe Mon Sep 17 00:00:00 2001 From: maxcwang Date: Wed, 5 Nov 2025 18:05:08 +0800 Subject: [PATCH 01/22] pass catalog sensitive info when create fluss catalog --- .../org/apache/fluss/config/FlussConfigUtils.java | 11 +++++++++++ .../fluss/flink/catalog/Flink21Catalog.java | 11 +++++++++-- .../flink/catalog/Flink21CatalogFactory.java | 3 ++- .../fluss/flink/catalog/Flink21CatalogITCase.java | 3 ++- .../apache/fluss/flink/catalog/FlinkCatalog.java | 15 ++++++++++++--- .../fluss/flink/catalog/FlinkCatalogFactory.java | 7 +++++-- .../flink/catalog/FlinkCatalogFactoryTest.java | 7 +++++++ .../fluss/flink/catalog/FlinkCatalogITCase.java | 5 ++++- .../fluss/flink/catalog/FlinkCatalogTest.java | 6 +++++- .../lake/iceberg/flink/FlinkCatalogLakeTest.java | 1 + .../fluss/server/coordinator/MetadataManager.java | 12 ++---------- 11 files changed, 60 insertions(+), 21 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index fa9c4274c9..8e04de8f60 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -23,8 +23,10 @@ import java.lang.reflect.Field; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** Utilities of Fluss {@link ConfigOptions}. */ @Internal @@ -35,6 +37,7 @@ public class FlussConfigUtils { public static final String TABLE_PREFIX = "table."; public static final String CLIENT_PREFIX = "client."; public static final String CLIENT_SECURITY_PREFIX = "client.security."; + public static final String CATALOG_PREFIX = "catalog."; public static final List ALTERABLE_TABLE_OPTIONS; @@ -45,6 +48,14 @@ public class FlussConfigUtils { Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); } + public static final Set SENSITIVE_TABLE_OPTIONS = new HashSet<>(); + + static { + SENSITIVE_TABLE_OPTIONS.add("password"); + SENSITIVE_TABLE_OPTIONS.add("secret"); + SENSITIVE_TABLE_OPTIONS.add("key"); + } + public static boolean isTableStorageConfig(String key) { return key.startsWith(TABLE_PREFIX); } diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java index 4f567aae0a..52ffd02379 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java @@ -40,8 +40,15 @@ public Flink21Catalog( String defaultDatabase, String bootstrapServers, ClassLoader classLoader, - Map securityConfigs) { - super(name, defaultDatabase, bootstrapServers, classLoader, securityConfigs); + Map securityConfigs, + Map catalogSensitiveProperties) { + super( + name, + defaultDatabase, + bootstrapServers, + classLoader, + securityConfigs, + catalogSensitiveProperties); } @Override diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java index 8557a552f6..90111d7dcd 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java @@ -29,6 +29,7 @@ public FlinkCatalog createCatalog(Context context) { catalog.defaultDatabase, catalog.bootstrapServers, catalog.classLoader, - catalog.securityConfigs); + catalog.securityConfigs, + catalog.catalogSensitiveProperties); } } diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java index c0b9b91965..7a72fdcc35 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java +++ b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java @@ -42,7 +42,8 @@ static void beforeAll() { catalog.defaultDatabase, catalog.bootstrapServers, catalog.classLoader, - catalog.securityConfigs); + catalog.securityConfigs, + catalog.catalogSensitiveProperties); catalog.open(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 0627e3f688..15b7bff16a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -115,6 +115,7 @@ public class FlinkCatalog extends AbstractCatalog { protected final String bootstrapServers; protected final Map securityConfigs; protected final LakeFlinkCatalog lakeFlinkCatalog; + protected final Map catalogSensitiveProperties; protected Connection connection; protected Admin admin; @@ -123,13 +124,15 @@ public FlinkCatalog( String defaultDatabase, String bootstrapServers, ClassLoader classLoader, - Map securityConfigs) { + Map securityConfigs, + Map catalogSensitiveProperties) { super(name, defaultDatabase); this.catalogName = name; this.defaultDatabase = defaultDatabase; this.bootstrapServers = bootstrapServers; this.classLoader = classLoader; this.securityConfigs = securityConfigs; + this.catalogSensitiveProperties = catalogSensitiveProperties; this.lakeFlinkCatalog = new LakeFlinkCatalog(catalogName, classLoader); } @@ -294,8 +297,9 @@ public CatalogBaseTable getTable(ObjectPath objectPath) objectPath.getDatabaseName(), tableName.split("\\" + LAKE_TABLE_SPLITTER)[0]))); } - return getLakeTable( - objectPath.getDatabaseName(), tableName, tableInfo.getProperties()); + Configuration tableProperties = tableInfo.getProperties(); + tableProperties.addAll(Configuration.fromMap(catalogSensitiveProperties)); + return getLakeTable(objectPath.getDatabaseName(), tableName, tableProperties); } else { tableInfo = admin.getTableInfo(tablePath).get(); } @@ -754,4 +758,9 @@ public Procedure getProcedure(ObjectPath procedurePath) public Map getSecurityConfigs() { return securityConfigs; } + + @VisibleForTesting + public Map getCatalogSensitiveConfigs() { + return catalogSensitiveProperties; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java index 83bd3cd21f..60f122e97e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; +import static org.apache.fluss.config.FlussConfigUtils.CATALOG_PREFIX; import static org.apache.fluss.config.FlussConfigUtils.CLIENT_SECURITY_PREFIX; import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; @@ -54,15 +55,17 @@ public Set> optionalOptions() { public FlinkCatalog createCatalog(Context context) { final FactoryUtil.CatalogFactoryHelper helper = FactoryUtil.createCatalogFactoryHelper(this, context); - helper.validateExcept(CLIENT_SECURITY_PREFIX); + helper.validateExcept(CLIENT_SECURITY_PREFIX, CATALOG_PREFIX); Map options = context.getOptions(); Map securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX); + Map catalogSensitiveProperties = extractPrefix(options, CATALOG_PREFIX); return new FlinkCatalog( context.getName(), helper.getOptions().get(FlinkCatalogOptions.DEFAULT_DATABASE), helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS), context.getClassLoader(), - securityConfigs); + securityConfigs, + catalogSensitiveProperties); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java index 4a17cc3c87..76fea65013 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java @@ -64,6 +64,7 @@ public void testCreateCatalog() { DB_NAME, BOOTSTRAP_SERVERS_NAME, Thread.currentThread().getContextClassLoader(), + Collections.emptyMap(), Collections.emptyMap()); checkEquals(flinkCatalog, actualCatalog); @@ -75,7 +76,12 @@ public void testCreateCatalog() { securityMap.put("client.security.sasl.username", "root"); securityMap.put("client.security.sasl.password", "password"); + Map catalogSensitiveMap = new HashMap<>(); + catalogSensitiveMap.put("catalog.paimon.jdbc.user", "admin"); + catalogSensitiveMap.put("catalog.paimon.jdbc.password", "pass"); + options.putAll(securityMap); + options.putAll(catalogSensitiveMap); FlinkCatalog actualCatalog2 = (FlinkCatalog) FactoryUtil.createCatalog( @@ -85,6 +91,7 @@ public void testCreateCatalog() { Thread.currentThread().getContextClassLoader()); assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap); + assertThat(actualCatalog2.getCatalogSensitiveConfigs()).isEqualTo(catalogSensitiveMap); } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index 88e16e9967..37e7a8c853 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -115,6 +115,7 @@ static void beforeAll() { DEFAULT_DB, bootstrapServers, Thread.currentThread().getContextClassLoader(), + Collections.emptyMap(), Collections.emptyMap()); catalog.open(); } @@ -750,6 +751,7 @@ void testAuthentication() throws Exception { DEFAULT_DB, bootstrapServers, Thread.currentThread().getContextClassLoader(), + Collections.emptyMap(), Collections.emptyMap()); Catalog finalAuthenticateCatalog = authenticateCatalog; assertThatThrownBy(finalAuthenticateCatalog::open) @@ -768,7 +770,8 @@ void testAuthentication() throws Exception { DEFAULT_DB, bootstrapServers, Thread.currentThread().getContextClassLoader(), - clientConfig); + clientConfig, + Collections.emptyMap()); authenticateCatalog.open(); assertThat(authenticateCatalog.listDatabases()) .containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB)); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 84a85deade..98870a7a9f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -168,6 +168,7 @@ static void beforeAll() { DEFAULT_DB, String.join(",", flussConf.get(BOOTSTRAP_SERVERS)), Thread.currentThread().getContextClassLoader(), + Collections.emptyMap(), Collections.emptyMap()); catalog.open(); } @@ -627,6 +628,7 @@ void testDatabase() throws Exception { ",", flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)), Thread.currentThread().getContextClassLoader(), + Collections.emptyMap(), Collections.emptyMap())) .hasMessageContaining("defaultDatabase cannot be null or empty"); } @@ -815,6 +817,7 @@ void testConnectionFailureHandling() { "default", "invalid-bootstrap-server:9092", Thread.currentThread().getContextClassLoader(), + Collections.emptyMap(), Collections.emptyMap()); // Test open() throws proper exception @@ -945,7 +948,8 @@ void testSecurityConfigsIntegration() throws Exception { DEFAULT_DB, String.join(",", flussConf.get(BOOTSTRAP_SERVERS)), Thread.currentThread().getContextClassLoader(), - securityConfigs); + securityConfigs, + Collections.emptyMap()); securedCatalog.open(); try { diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java index f1305b99d2..3e0cac400f 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java @@ -102,6 +102,7 @@ public void buildCatalog() { DEFAULT_DB, bootstrapServers, Thread.currentThread().getContextClassLoader(), + Collections.emptyMap(), Collections.emptyMap()); catalog.open(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 074cb04f2b..655b705602 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -62,7 +62,6 @@ import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -70,6 +69,7 @@ import java.util.Set; import java.util.concurrent.Callable; +import static org.apache.fluss.config.FlussConfigUtils.SENSITIVE_TABLE_OPTIONS; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor; @@ -83,14 +83,6 @@ public class MetadataManager { private final int maxBucketNum; private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader; - public static final Set SENSITIVE_TABLE_OPTIOINS = new HashSet<>(); - - static { - SENSITIVE_TABLE_OPTIOINS.add("password"); - SENSITIVE_TABLE_OPTIOINS.add("secret"); - SENSITIVE_TABLE_OPTIOINS.add("key"); - } - /** * Creates a new metadata manager. * @@ -525,7 +517,7 @@ public void removeSensitiveTableOptions(Map tableLakeOptions) { Iterator> iterator = tableLakeOptions.entrySet().iterator(); while (iterator.hasNext()) { String key = iterator.next().getKey().toLowerCase(); - if (SENSITIVE_TABLE_OPTIOINS.stream().anyMatch(key::contains)) { + if (SENSITIVE_TABLE_OPTIONS.stream().anyMatch(key::contains)) { iterator.remove(); } } From 75a034cd23e18bf3638839b17295086641e24450 Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Mon, 10 Nov 2025 23:26:37 +0800 Subject: [PATCH 02/22] nit --- .../apache/fluss/flink/catalog/Flink21Catalog.java | 4 ++-- .../fluss/flink/catalog/Flink21CatalogFactory.java | 2 +- .../fluss/flink/catalog/Flink21CatalogITCase.java | 2 +- .../org/apache/fluss/flink/catalog/FlinkCatalog.java | 12 ++++++------ .../fluss/flink/catalog/FlinkCatalogFactory.java | 4 ++-- .../fluss/flink/catalog/FlinkCatalogFactoryTest.java | 10 +++++----- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java index 52ffd02379..3dcc8e5ea9 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java @@ -41,14 +41,14 @@ public Flink21Catalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, - Map catalogSensitiveProperties) { + Map catalogProperties) { super( name, defaultDatabase, bootstrapServers, classLoader, securityConfigs, - catalogSensitiveProperties); + catalogProperties); } @Override diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java index 90111d7dcd..23adfa9a1a 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java @@ -30,6 +30,6 @@ public FlinkCatalog createCatalog(Context context) { catalog.bootstrapServers, catalog.classLoader, catalog.securityConfigs, - catalog.catalogSensitiveProperties); + catalog.catalogProperties); } } diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java index 7a72fdcc35..99530a4ce2 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java +++ b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java @@ -43,7 +43,7 @@ static void beforeAll() { catalog.bootstrapServers, catalog.classLoader, catalog.securityConfigs, - catalog.catalogSensitiveProperties); + catalog.catalogProperties); catalog.open(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 15b7bff16a..3ef6f0ca37 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -115,7 +115,7 @@ public class FlinkCatalog extends AbstractCatalog { protected final String bootstrapServers; protected final Map securityConfigs; protected final LakeFlinkCatalog lakeFlinkCatalog; - protected final Map catalogSensitiveProperties; + protected final Map catalogProperties; protected Connection connection; protected Admin admin; @@ -125,14 +125,14 @@ public FlinkCatalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, - Map catalogSensitiveProperties) { + Map catalogProperties) { super(name, defaultDatabase); this.catalogName = name; this.defaultDatabase = defaultDatabase; this.bootstrapServers = bootstrapServers; this.classLoader = classLoader; this.securityConfigs = securityConfigs; - this.catalogSensitiveProperties = catalogSensitiveProperties; + this.catalogProperties = catalogProperties; this.lakeFlinkCatalog = new LakeFlinkCatalog(catalogName, classLoader); } @@ -298,7 +298,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath) tableName.split("\\" + LAKE_TABLE_SPLITTER)[0]))); } Configuration tableProperties = tableInfo.getProperties(); - tableProperties.addAll(Configuration.fromMap(catalogSensitiveProperties)); + tableProperties.addAll(Configuration.fromMap(catalogProperties)); return getLakeTable(objectPath.getDatabaseName(), tableName, tableProperties); } else { tableInfo = admin.getTableInfo(tablePath).get(); @@ -760,7 +760,7 @@ public Map getSecurityConfigs() { } @VisibleForTesting - public Map getCatalogSensitiveConfigs() { - return catalogSensitiveProperties; + public Map getCatalogProperties() { + return catalogProperties; } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java index 60f122e97e..ed94d46d1e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java @@ -58,7 +58,7 @@ public FlinkCatalog createCatalog(Context context) { helper.validateExcept(CLIENT_SECURITY_PREFIX, CATALOG_PREFIX); Map options = context.getOptions(); Map securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX); - Map catalogSensitiveProperties = extractPrefix(options, CATALOG_PREFIX); + Map catalogProperties = extractPrefix(options, CATALOG_PREFIX); return new FlinkCatalog( context.getName(), @@ -66,6 +66,6 @@ public FlinkCatalog createCatalog(Context context) { helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS), context.getClassLoader(), securityConfigs, - catalogSensitiveProperties); + catalogProperties); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java index 76fea65013..cac5afb241 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java @@ -76,12 +76,12 @@ public void testCreateCatalog() { securityMap.put("client.security.sasl.username", "root"); securityMap.put("client.security.sasl.password", "password"); - Map catalogSensitiveMap = new HashMap<>(); - catalogSensitiveMap.put("catalog.paimon.jdbc.user", "admin"); - catalogSensitiveMap.put("catalog.paimon.jdbc.password", "pass"); + Map catalogMap = new HashMap<>(); + catalogMap.put("catalog.paimon.jdbc.user", "admin"); + catalogMap.put("catalog.paimon.jdbc.password", "pass"); options.putAll(securityMap); - options.putAll(catalogSensitiveMap); + options.putAll(catalogMap); FlinkCatalog actualCatalog2 = (FlinkCatalog) FactoryUtil.createCatalog( @@ -91,7 +91,7 @@ public void testCreateCatalog() { Thread.currentThread().getContextClassLoader()); assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap); - assertThat(actualCatalog2.getCatalogSensitiveConfigs()).isEqualTo(catalogSensitiveMap); + assertThat(actualCatalog2.getCatalogProperties()).isEqualTo(catalogMap); } @Test From 6e43bd839f281577711f644f6d963b3a6600126e Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Tue, 11 Nov 2025 15:09:35 +0800 Subject: [PATCH 03/22] rename --- .../org/apache/fluss/config/FlussConfigUtils.java | 1 - .../org/apache/fluss/flink/catalog/FlinkCatalog.java | 12 ++++++------ .../fluss/flink/catalog/FlinkCatalogFactory.java | 8 ++++---- .../fluss/flink/catalog/FlinkCatalogFactoryTest.java | 10 +++++----- 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index 8e04de8f60..5eb3164f4f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -37,7 +37,6 @@ public class FlussConfigUtils { public static final String TABLE_PREFIX = "table."; public static final String CLIENT_PREFIX = "client."; public static final String CLIENT_SECURITY_PREFIX = "client.security."; - public static final String CATALOG_PREFIX = "catalog."; public static final List ALTERABLE_TABLE_OPTIONS; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 3ef6f0ca37..dd7dfb3d5c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -115,7 +115,7 @@ public class FlinkCatalog extends AbstractCatalog { protected final String bootstrapServers; protected final Map securityConfigs; protected final LakeFlinkCatalog lakeFlinkCatalog; - protected final Map catalogProperties; + protected final Map lakeCatalogProperties; protected Connection connection; protected Admin admin; @@ -125,14 +125,14 @@ public FlinkCatalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, - Map catalogProperties) { + Map lakeCatalogProperties) { super(name, defaultDatabase); this.catalogName = name; this.defaultDatabase = defaultDatabase; this.bootstrapServers = bootstrapServers; this.classLoader = classLoader; this.securityConfigs = securityConfigs; - this.catalogProperties = catalogProperties; + this.lakeCatalogProperties = lakeCatalogProperties; this.lakeFlinkCatalog = new LakeFlinkCatalog(catalogName, classLoader); } @@ -298,7 +298,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath) tableName.split("\\" + LAKE_TABLE_SPLITTER)[0]))); } Configuration tableProperties = tableInfo.getProperties(); - tableProperties.addAll(Configuration.fromMap(catalogProperties)); + tableProperties.addAll(Configuration.fromMap(lakeCatalogProperties)); return getLakeTable(objectPath.getDatabaseName(), tableName, tableProperties); } else { tableInfo = admin.getTableInfo(tablePath).get(); @@ -760,7 +760,7 @@ public Map getSecurityConfigs() { } @VisibleForTesting - public Map getCatalogProperties() { - return catalogProperties; + public Map getLakeCatalogProperties() { + return lakeCatalogProperties; } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java index ed94d46d1e..66cdb43e9f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java @@ -27,7 +27,7 @@ import java.util.Map; import java.util.Set; -import static org.apache.fluss.config.FlussConfigUtils.CATALOG_PREFIX; +import static org.apache.fluss.config.FlussConfigUtils.TABLE_PREFIX; import static org.apache.fluss.config.FlussConfigUtils.CLIENT_SECURITY_PREFIX; import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; @@ -55,10 +55,10 @@ public Set> optionalOptions() { public FlinkCatalog createCatalog(Context context) { final FactoryUtil.CatalogFactoryHelper helper = FactoryUtil.createCatalogFactoryHelper(this, context); - helper.validateExcept(CLIENT_SECURITY_PREFIX, CATALOG_PREFIX); + helper.validateExcept(CLIENT_SECURITY_PREFIX); Map options = context.getOptions(); Map securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX); - Map catalogProperties = extractPrefix(options, CATALOG_PREFIX); + Map lakeCatalogProperties = extractPrefix(options, TABLE_PREFIX); return new FlinkCatalog( context.getName(), @@ -66,6 +66,6 @@ public FlinkCatalog createCatalog(Context context) { helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS), context.getClassLoader(), securityConfigs, - catalogProperties); + lakeCatalogProperties); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java index cac5afb241..0df5a520b1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java @@ -76,12 +76,12 @@ public void testCreateCatalog() { securityMap.put("client.security.sasl.username", "root"); securityMap.put("client.security.sasl.password", "password"); - Map catalogMap = new HashMap<>(); - catalogMap.put("catalog.paimon.jdbc.user", "admin"); - catalogMap.put("catalog.paimon.jdbc.password", "pass"); + Map lakeCatalogMap = new HashMap<>(); + lakeCatalogMap.put("catalog.paimon.jdbc.user", "admin"); + lakeCatalogMap.put("catalog.paimon.jdbc.password", "pass"); options.putAll(securityMap); - options.putAll(catalogMap); + options.putAll(lakeCatalogMap); FlinkCatalog actualCatalog2 = (FlinkCatalog) FactoryUtil.createCatalog( @@ -91,7 +91,7 @@ public void testCreateCatalog() { Thread.currentThread().getContextClassLoader()); assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap); - assertThat(actualCatalog2.getCatalogProperties()).isEqualTo(catalogMap); + assertThat(actualCatalog2.getLakeCatalogProperties()).isEqualTo(lakeCatalogMap); } @Test From db330cba4808d108cce7caf30c833fdf0e11c71d Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Tue, 11 Nov 2025 15:21:14 +0800 Subject: [PATCH 04/22] rename --- .../org/apache/fluss/flink/catalog/Flink21CatalogFactory.java | 2 +- .../org/apache/fluss/flink/catalog/Flink21CatalogITCase.java | 2 +- .../org/apache/fluss/flink/catalog/FlinkCatalogFactory.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java index 23adfa9a1a..3b28afed7e 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java @@ -30,6 +30,6 @@ public FlinkCatalog createCatalog(Context context) { catalog.bootstrapServers, catalog.classLoader, catalog.securityConfigs, - catalog.catalogProperties); + catalog.lakeCatalogProperties); } } diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java index 99530a4ce2..303fd1dce8 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java +++ b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java @@ -43,7 +43,7 @@ static void beforeAll() { catalog.bootstrapServers, catalog.classLoader, catalog.securityConfigs, - catalog.catalogProperties); + catalog.lakeCatalogProperties); catalog.open(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java index 66cdb43e9f..7d37b4c4f4 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java @@ -27,8 +27,8 @@ import java.util.Map; import java.util.Set; -import static org.apache.fluss.config.FlussConfigUtils.TABLE_PREFIX; import static org.apache.fluss.config.FlussConfigUtils.CLIENT_SECURITY_PREFIX; +import static org.apache.fluss.config.FlussConfigUtils.TABLE_PREFIX; import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; /** Factory for {@link FlinkCatalog}. */ From 038e69e72e219beeacd09160f556559113736780 Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Tue, 11 Nov 2025 21:44:11 +0800 Subject: [PATCH 05/22] wip: pass all types of lake formats --- .../apache/fluss/utils/PropertiesUtils.java | 18 ++++++++++++++++++ .../fluss/flink/catalog/Flink21Catalog.java | 4 ++-- .../flink/catalog/FlinkCatalogFactory.java | 13 ++++++++++--- .../flink/catalog/FlinkCatalogFactoryTest.java | 4 ++-- 4 files changed, 32 insertions(+), 7 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java index a05a5a841c..7df84829fd 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java @@ -17,7 +17,9 @@ package org.apache.fluss.utils; +import java.util.Arrays; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** Utility class for properties related helper functions. */ @@ -57,6 +59,22 @@ public static Map extractPrefix(Map originalMap, Strin .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } + public static Map extractPrefix( + Map originalMap, Class> enumClass) { + Set prefixes = + Arrays.stream(enumClass.getEnumConstants()) + .map(Enum::name) + .map(String::toLowerCase) + .collect(Collectors.toSet()); + + return originalMap.entrySet().stream() + .filter( + entry -> + prefixes.stream() + .anyMatch(prefix -> entry.getKey().startsWith(prefix))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + /** Filter out keys that start with the given prefix from the original map. */ public static Map excludeByPrefix(Map originalMap, String prefix) { return originalMap.entrySet().stream() diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java index 3dcc8e5ea9..5340de17d3 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java @@ -41,14 +41,14 @@ public Flink21Catalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, - Map catalogProperties) { + Map lakeCatalogProperties) { super( name, defaultDatabase, bootstrapServers, classLoader, securityConfigs, - catalogProperties); + lakeCatalogProperties); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java index 7d37b4c4f4..d76fd658f4 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java @@ -18,17 +18,19 @@ package org.apache.fluss.flink.catalog; import org.apache.fluss.flink.FlinkConnectorOptions; +import org.apache.fluss.metadata.DataLakeFormat; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.stream.Stream; import static org.apache.fluss.config.FlussConfigUtils.CLIENT_SECURITY_PREFIX; -import static org.apache.fluss.config.FlussConfigUtils.TABLE_PREFIX; import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; /** Factory for {@link FlinkCatalog}. */ @@ -55,10 +57,15 @@ public Set> optionalOptions() { public FlinkCatalog createCatalog(Context context) { final FactoryUtil.CatalogFactoryHelper helper = FactoryUtil.createCatalogFactoryHelper(this, context); - helper.validateExcept(CLIENT_SECURITY_PREFIX); + helper.validateExcept( + Stream.concat( + Stream.of(CLIENT_SECURITY_PREFIX), + Arrays.stream(DataLakeFormat.values()) + .map(DataLakeFormat::toString)) + .toArray(String[]::new)); Map options = context.getOptions(); Map securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX); - Map lakeCatalogProperties = extractPrefix(options, TABLE_PREFIX); + Map lakeCatalogProperties = extractPrefix(options, DataLakeFormat.class); return new FlinkCatalog( context.getName(), diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java index 0df5a520b1..3bc8079424 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java @@ -77,8 +77,8 @@ public void testCreateCatalog() { securityMap.put("client.security.sasl.password", "password"); Map lakeCatalogMap = new HashMap<>(); - lakeCatalogMap.put("catalog.paimon.jdbc.user", "admin"); - lakeCatalogMap.put("catalog.paimon.jdbc.password", "pass"); + lakeCatalogMap.put("paimon.jdbc.user", "admin"); + lakeCatalogMap.put("paimon.jdbc.password", "pass"); options.putAll(securityMap); options.putAll(lakeCatalogMap); From 2531fd85b90699593571d726b1aff279d7cfe50b Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Tue, 11 Nov 2025 22:30:32 +0800 Subject: [PATCH 06/22] refactor --- .../apache/fluss/utils/PropertiesUtils.java | 3 +- .../fluss/flink/catalog/FlinkCatalog.java | 16 +++++--- .../fluss/flink/lake/LakeFlinkCatalog.java | 39 ++++++++++++++++--- .../fluss/flink/lake/LakeTableFactory.java | 4 +- 4 files changed, 49 insertions(+), 13 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java index 7df84829fd..af8041deb9 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java @@ -63,8 +63,7 @@ public static Map extractPrefix( Map originalMap, Class> enumClass) { Set prefixes = Arrays.stream(enumClass.getEnumConstants()) - .map(Enum::name) - .map(String::toLowerCase) + .map(Enum::toString) .collect(Collectors.toSet()); return originalMap.entrySet().stream() diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index dd7dfb3d5c..bb2c0e691d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -297,9 +297,12 @@ public CatalogBaseTable getTable(ObjectPath objectPath) objectPath.getDatabaseName(), tableName.split("\\" + LAKE_TABLE_SPLITTER)[0]))); } - Configuration tableProperties = tableInfo.getProperties(); - tableProperties.addAll(Configuration.fromMap(lakeCatalogProperties)); - return getLakeTable(objectPath.getDatabaseName(), tableName, tableProperties); + + return getLakeTable( + objectPath.getDatabaseName(), + tableName, + tableInfo.getProperties(), + lakeCatalogProperties); } else { tableInfo = admin.getTableInfo(tablePath).get(); } @@ -333,7 +336,10 @@ public CatalogBaseTable getTable(ObjectPath objectPath) } protected CatalogBaseTable getLakeTable( - String databaseName, String tableName, Configuration properties) + String databaseName, + String tableName, + Configuration properties, + Map lakeCatalogProperties) throws TableNotExistException, CatalogException { String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER); if (tableComponents.length == 1) { @@ -345,7 +351,7 @@ protected CatalogBaseTable getLakeTable( tableName = String.join("", tableComponents); } return lakeFlinkCatalog - .getLakeCatalog(properties) + .getLakeCatalog(properties, lakeCatalogProperties) .getTable(new ObjectPath(databaseName, tableName)); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index 3eb0db5d9a..2f19ae138d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -30,6 +30,7 @@ import java.lang.reflect.Method; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG; import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; @@ -49,7 +50,8 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) { this.classLoader = classLoader; } - public Catalog getLakeCatalog(Configuration tableOptions) { + public Catalog getLakeCatalog( + Configuration tableOptions, Map lakeCatalogProperties) { // TODO: Currently, a Fluss cluster only supports a single DataLake storage. // However, in the // future, it may support multiple DataLakes. The following code assumes @@ -69,12 +71,30 @@ public Catalog getLakeCatalog(Configuration tableOptions) { + ConfigOptions.TABLE_DATALAKE_FORMAT.key() + "' is set."); } + String dataLakePrefix = lakeFormat.toString() + "."; + Map prefixRemovedLakeProperties = + lakeCatalogProperties.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(dataLakePrefix)) + .collect( + Collectors.toMap( + entry -> + entry.getKey() + .substring( + dataLakePrefix + .length()), + Map.Entry::getValue)); if (lakeFormat == PAIMON) { catalog = - PaimonCatalogFactory.create(catalogName, tableOptions, classLoader); + PaimonCatalogFactory.create( + catalogName, + tableOptions, + prefixRemovedLakeProperties, + classLoader); this.lakeFormat = PAIMON; } else if (lakeFormat == ICEBERG) { - catalog = IcebergCatalogFactory.create(catalogName, tableOptions); + catalog = + IcebergCatalogFactory.create( + catalogName, prefixRemovedLakeProperties, tableOptions); this.lakeFormat = ICEBERG; } else { throw new UnsupportedOperationException( @@ -104,9 +124,13 @@ public static class PaimonCatalogFactory { private PaimonCatalogFactory() {} public static Catalog create( - String catalogName, Configuration tableOptions, ClassLoader classLoader) { + String catalogName, + Configuration tableOptions, + Map lakeCatalogProperties, + ClassLoader classLoader) { Map catalogProperties = DataLakeUtils.extractLakeCatalogProperties(tableOptions); + catalogProperties.putAll(lakeCatalogProperties); return FlinkCatalogFactory.createCatalog( catalogName, CatalogContext.create( @@ -124,9 +148,14 @@ private IcebergCatalogFactory() {} // requires Iceberg 1.5.0+. // Using reflection to maintain Java 8 compatibility. // Once Fluss drops Java 8, we can remove the reflection code - public static Catalog create(String catalogName, Configuration tableOptions) { + public static Catalog create( + String catalogName, + Map lakeCatalogProperties, + Configuration tableOptions) { Map catalogProperties = DataLakeUtils.extractLakeCatalogProperties(tableOptions); + catalogProperties.putAll(lakeCatalogProperties); + // Map "type" to "catalog-type" (equivalent) // Required: either "catalog-type" (standard type) or "catalog-impl" // (fully-qualified custom class, mandatory if "catalog-type" is missing) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java index 3f0ff88c67..ad9918f389 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java @@ -25,6 +25,8 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; +import java.util.Collections; + /** A factory to create {@link DynamicTableSource} for lake table. */ public class LakeTableFactory { private final LakeFlinkCatalog lakeFlinkCatalog; @@ -83,7 +85,7 @@ private DynamicTableSourceFactory getIcebergFactory() { lakeFlinkCatalog.getLakeCatalog( // we can pass empty configuration to get catalog // since the catalog should already be initialized - new Configuration()); + new Configuration(), Collections.emptyMap()); // Create FlinkDynamicTableFactory with the catalog Class icebergFactoryClass = From e8f3cf6191646fb7ce85ca603b564283c07b03d6 Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Tue, 11 Nov 2025 22:40:48 +0800 Subject: [PATCH 07/22] rename --- .../fluss/flink/lake/LakeFlinkCatalog.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index 2f19ae138d..22fc371f37 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -72,7 +72,7 @@ public Catalog getLakeCatalog( + "' is set."); } String dataLakePrefix = lakeFormat.toString() + "."; - Map prefixRemovedLakeProperties = + Map catalogProperties = lakeCatalogProperties.entrySet().stream() .filter(entry -> entry.getKey().startsWith(dataLakePrefix)) .collect( @@ -86,15 +86,12 @@ public Catalog getLakeCatalog( if (lakeFormat == PAIMON) { catalog = PaimonCatalogFactory.create( - catalogName, - tableOptions, - prefixRemovedLakeProperties, - classLoader); + catalogName, tableOptions, catalogProperties, classLoader); this.lakeFormat = PAIMON; } else if (lakeFormat == ICEBERG) { catalog = IcebergCatalogFactory.create( - catalogName, prefixRemovedLakeProperties, tableOptions); + catalogName, catalogProperties, tableOptions); this.lakeFormat = ICEBERG; } else { throw new UnsupportedOperationException( @@ -126,11 +123,11 @@ private PaimonCatalogFactory() {} public static Catalog create( String catalogName, Configuration tableOptions, - Map lakeCatalogProperties, + Map additionalLakeCatalogProperties, ClassLoader classLoader) { Map catalogProperties = DataLakeUtils.extractLakeCatalogProperties(tableOptions); - catalogProperties.putAll(lakeCatalogProperties); + catalogProperties.putAll(additionalLakeCatalogProperties); return FlinkCatalogFactory.createCatalog( catalogName, CatalogContext.create( @@ -150,11 +147,11 @@ private IcebergCatalogFactory() {} // Once Fluss drops Java 8, we can remove the reflection code public static Catalog create( String catalogName, - Map lakeCatalogProperties, + Map additionalLakeCatalogProperties, Configuration tableOptions) { Map catalogProperties = DataLakeUtils.extractLakeCatalogProperties(tableOptions); - catalogProperties.putAll(lakeCatalogProperties); + catalogProperties.putAll(additionalLakeCatalogProperties); // Map "type" to "catalog-type" (equivalent) // Required: either "catalog-type" (standard type) or "catalog-impl" From 6dd060714d06648738787d6c3099ab7f7fbc3a9e Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Wed, 12 Nov 2025 12:01:01 +0800 Subject: [PATCH 08/22] fix quickstart sql --- website/docs/quickstart/lakehouse.md | 55 ++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 7 deletions(-) diff --git a/website/docs/quickstart/lakehouse.md b/website/docs/quickstart/lakehouse.md index 3dcd074bf2..f2b8a7fc12 100644 --- a/website/docs/quickstart/lakehouse.md +++ b/website/docs/quickstart/lakehouse.md @@ -332,6 +332,10 @@ For further information how to store catalog configurations, see [Flink's Catalo ::: ### Create Tables + + + + Running the following SQL to create Fluss tables to be used in this guide: ```sql title="Flink SQL" CREATE TABLE fluss_order ( @@ -366,6 +370,46 @@ CREATE TABLE fluss_nation ( ); ``` + + + + + +Running the following SQL to create Fluss tables to be used in this guide: +```sql title="Flink SQL" +CREATE TABLE fluss_order ( + `order_key` BIGINT, + `cust_key` INT NOT NULL, + `total_price` DECIMAL(15, 2), + `order_date` DATE, + `order_priority` STRING, + `clerk` STRING, + `ptime` AS PROCTIME() +); +``` + +```sql title="Flink SQL" +CREATE TABLE fluss_customer ( + `cust_key` INT NOT NULL, + `name` STRING, + `phone` STRING, + `nation_key` INT NOT NULL, + `acctbal` DECIMAL(15, 2), + `mktsegment` STRING, + PRIMARY KEY (`cust_key`) NOT ENFORCED +); +``` + +```sql title="Flink SQL" +CREATE TABLE fluss_nation ( + `nation_key` INT NOT NULL, + `name` STRING, + PRIMARY KEY (`nation_key`) NOT ENFORCED +); +``` + + + ## Streaming into Fluss First, run the following SQL to sync data from source tables to Fluss tables: @@ -520,13 +564,10 @@ SELECT o.order_key, c.acctbal, c.mktsegment, n.name -FROM ( - SELECT *, PROCTIME() as ptime - FROM `default_catalog`.`default_database`.source_order -) o -LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF o.ptime AS c +FROM fluss_order o +LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON o.cust_key = c.cust_key -LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF o.ptime AS n +LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n` ON c.nation_key = n.nation_key; ``` @@ -714,4 +755,4 @@ After finishing the tutorial, run `exit` to exit Flink SQL CLI Container and the ```shell docker compose down -v ``` -to stop all containers. +to stop all containers. \ No newline at end of file From 544344f098c4ae23d08ab7fb3853dea9f806fe45 Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Wed, 12 Nov 2025 12:11:20 +0800 Subject: [PATCH 09/22] refactor --- .../fluss/flink/lake/LakeFlinkCatalog.java | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index 22fc371f37..fe70681fd3 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -83,10 +83,13 @@ public Catalog getLakeCatalog( dataLakePrefix .length()), Map.Entry::getValue)); + + catalogProperties.putAll( + DataLakeUtils.extractLakeCatalogProperties(tableOptions)); if (lakeFormat == PAIMON) { catalog = PaimonCatalogFactory.create( - catalogName, tableOptions, catalogProperties, classLoader); + catalogName, catalogProperties, classLoader); this.lakeFormat = PAIMON; } else if (lakeFormat == ICEBERG) { catalog = @@ -122,12 +125,8 @@ private PaimonCatalogFactory() {} public static Catalog create( String catalogName, - Configuration tableOptions, - Map additionalLakeCatalogProperties, + Map catalogProperties, ClassLoader classLoader) { - Map catalogProperties = - DataLakeUtils.extractLakeCatalogProperties(tableOptions); - catalogProperties.putAll(additionalLakeCatalogProperties); return FlinkCatalogFactory.createCatalog( catalogName, CatalogContext.create( @@ -145,14 +144,7 @@ private IcebergCatalogFactory() {} // requires Iceberg 1.5.0+. // Using reflection to maintain Java 8 compatibility. // Once Fluss drops Java 8, we can remove the reflection code - public static Catalog create( - String catalogName, - Map additionalLakeCatalogProperties, - Configuration tableOptions) { - Map catalogProperties = - DataLakeUtils.extractLakeCatalogProperties(tableOptions); - catalogProperties.putAll(additionalLakeCatalogProperties); - + public static Catalog create(String catalogName, Map catalogProperties) { // Map "type" to "catalog-type" (equivalent) // Required: either "catalog-type" (standard type) or "catalog-impl" // (fully-qualified custom class, mandatory if "catalog-type" is missing) From 3f5f7d8cf5c04f903e123943b90af4210948b379 Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Wed, 12 Nov 2025 12:18:48 +0800 Subject: [PATCH 10/22] fix compile --- .../java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index fe70681fd3..ed2fd42a8f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -92,9 +92,7 @@ public Catalog getLakeCatalog( catalogName, catalogProperties, classLoader); this.lakeFormat = PAIMON; } else if (lakeFormat == ICEBERG) { - catalog = - IcebergCatalogFactory.create( - catalogName, catalogProperties, tableOptions); + catalog = IcebergCatalogFactory.create(catalogName, catalogProperties); this.lakeFormat = ICEBERG; } else { throw new UnsupportedOperationException( From 9291067e6c035923f7e2eda3d7a1ddedc6da90bf Mon Sep 17 00:00:00 2001 From: maxcwang Date: Fri, 14 Nov 2025 10:18:35 +0800 Subject: [PATCH 11/22] isolate PREFIXES_TO_SKIP_VALIDATE --- .../flink/catalog/FlinkCatalogFactory.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java index d76fd658f4..72bcd6a45f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java @@ -24,11 +24,11 @@ import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Stream; import static org.apache.fluss.config.FlussConfigUtils.CLIENT_SECURITY_PREFIX; import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; @@ -38,6 +38,15 @@ public class FlinkCatalogFactory implements CatalogFactory { public static final String IDENTIFIER = "fluss"; + public static final List PREFIXES_TO_SKIP_VALIDATE = new ArrayList<>(); + + static { + PREFIXES_TO_SKIP_VALIDATE.add(CLIENT_SECURITY_PREFIX); + for (DataLakeFormat value : DataLakeFormat.values()) { + PREFIXES_TO_SKIP_VALIDATE.add(value.toString()); + } + } + @Override public String factoryIdentifier() { return IDENTIFIER; @@ -57,12 +66,7 @@ public Set> optionalOptions() { public FlinkCatalog createCatalog(Context context) { final FactoryUtil.CatalogFactoryHelper helper = FactoryUtil.createCatalogFactoryHelper(this, context); - helper.validateExcept( - Stream.concat( - Stream.of(CLIENT_SECURITY_PREFIX), - Arrays.stream(DataLakeFormat.values()) - .map(DataLakeFormat::toString)) - .toArray(String[]::new)); + helper.validateExcept(PREFIXES_TO_SKIP_VALIDATE.toArray(new String[0])); Map options = context.getOptions(); Map securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX); Map lakeCatalogProperties = extractPrefix(options, DataLakeFormat.class); From af2762672a5bb36caf66f5e45d0c4cb077cfe3bb Mon Sep 17 00:00:00 2001 From: maxcwang Date: Wed, 26 Nov 2025 21:04:27 +0800 Subject: [PATCH 12/22] remove quick start --- website/docs/quickstart/lakehouse.md | 53 ++++------------------------ 1 file changed, 6 insertions(+), 47 deletions(-) diff --git a/website/docs/quickstart/lakehouse.md b/website/docs/quickstart/lakehouse.md index f2b8a7fc12..6c3f23f70e 100644 --- a/website/docs/quickstart/lakehouse.md +++ b/website/docs/quickstart/lakehouse.md @@ -332,10 +332,6 @@ For further information how to store catalog configurations, see [Flink's Catalo ::: ### Create Tables - - - - Running the following SQL to create Fluss tables to be used in this guide: ```sql title="Flink SQL" CREATE TABLE fluss_order ( @@ -370,46 +366,6 @@ CREATE TABLE fluss_nation ( ); ``` - - - - - -Running the following SQL to create Fluss tables to be used in this guide: -```sql title="Flink SQL" -CREATE TABLE fluss_order ( - `order_key` BIGINT, - `cust_key` INT NOT NULL, - `total_price` DECIMAL(15, 2), - `order_date` DATE, - `order_priority` STRING, - `clerk` STRING, - `ptime` AS PROCTIME() -); -``` - -```sql title="Flink SQL" -CREATE TABLE fluss_customer ( - `cust_key` INT NOT NULL, - `name` STRING, - `phone` STRING, - `nation_key` INT NOT NULL, - `acctbal` DECIMAL(15, 2), - `mktsegment` STRING, - PRIMARY KEY (`cust_key`) NOT ENFORCED -); -``` - -```sql title="Flink SQL" -CREATE TABLE fluss_nation ( - `nation_key` INT NOT NULL, - `name` STRING, - PRIMARY KEY (`nation_key`) NOT ENFORCED -); -``` - - - ## Streaming into Fluss First, run the following SQL to sync data from source tables to Fluss tables: @@ -564,10 +520,13 @@ SELECT o.order_key, c.acctbal, c.mktsegment, n.name -FROM fluss_order o -LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` +FROM ( + SELECT *, PROCTIME() as ptime + FROM `default_catalog`.`default_database`.source_order +) o +LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF o.ptime AS c ON o.cust_key = c.cust_key -LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n` +LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF o.ptime AS n ON c.nation_key = n.nation_key; ``` From bf03205bcfcfd3bb20ad2bd8bcfb099b2b7feb64 Mon Sep 17 00:00:00 2001 From: maxcwang Date: Wed, 26 Nov 2025 21:09:24 +0800 Subject: [PATCH 13/22] remove quick start --- website/docs/quickstart/lakehouse.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/docs/quickstart/lakehouse.md b/website/docs/quickstart/lakehouse.md index 6c3f23f70e..3dcd074bf2 100644 --- a/website/docs/quickstart/lakehouse.md +++ b/website/docs/quickstart/lakehouse.md @@ -714,4 +714,4 @@ After finishing the tutorial, run `exit` to exit Flink SQL CLI Container and the ```shell docker compose down -v ``` -to stop all containers. \ No newline at end of file +to stop all containers. From 51760d9ee488c540c2794c68d4bbc880f0abe5b7 Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Mon, 15 Dec 2025 21:44:59 +0800 Subject: [PATCH 14/22] remove reduant code --- .../org/apache/fluss/utils/PropertiesUtils.java | 17 ----------------- .../flink/catalog/FlinkCatalogFactory.java | 7 ++++++- .../fluss/flink/lake/LakeFlinkCatalog.java | 15 +++------------ 3 files changed, 9 insertions(+), 30 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java index af8041deb9..a05a5a841c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/PropertiesUtils.java @@ -17,9 +17,7 @@ package org.apache.fluss.utils; -import java.util.Arrays; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; /** Utility class for properties related helper functions. */ @@ -59,21 +57,6 @@ public static Map extractPrefix(Map originalMap, Strin .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - public static Map extractPrefix( - Map originalMap, Class> enumClass) { - Set prefixes = - Arrays.stream(enumClass.getEnumConstants()) - .map(Enum::toString) - .collect(Collectors.toSet()); - - return originalMap.entrySet().stream() - .filter( - entry -> - prefixes.stream() - .anyMatch(prefix -> entry.getKey().startsWith(prefix))) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - /** Filter out keys that start with the given prefix from the original map. */ public static Map excludeByPrefix(Map originalMap, String prefix) { return originalMap.entrySet().stream() diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java index 72bcd6a45f..b53f511ec1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -69,7 +70,11 @@ public FlinkCatalog createCatalog(Context context) { helper.validateExcept(PREFIXES_TO_SKIP_VALIDATE.toArray(new String[0])); Map options = context.getOptions(); Map securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX); - Map lakeCatalogProperties = extractPrefix(options, DataLakeFormat.class); + + Map lakeCatalogProperties = new HashMap<>(); + for (DataLakeFormat lakeFormat : DataLakeFormat.values()) { + lakeCatalogProperties.putAll(extractPrefix(options, lakeFormat.toString())); + } return new FlinkCatalog( context.getName(), diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index ed2fd42a8f..b2c1b3f467 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -21,6 +21,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.utils.DataLakeUtils; import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.utils.PropertiesUtils; import org.apache.flink.table.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; @@ -30,7 +31,6 @@ import java.lang.reflect.Method; import java.util.Map; -import java.util.stream.Collectors; import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG; import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; @@ -71,18 +71,9 @@ public Catalog getLakeCatalog( + ConfigOptions.TABLE_DATALAKE_FORMAT.key() + "' is set."); } - String dataLakePrefix = lakeFormat.toString() + "."; Map catalogProperties = - lakeCatalogProperties.entrySet().stream() - .filter(entry -> entry.getKey().startsWith(dataLakePrefix)) - .collect( - Collectors.toMap( - entry -> - entry.getKey() - .substring( - dataLakePrefix - .length()), - Map.Entry::getValue)); + PropertiesUtils.extractAndRemovePrefix( + lakeCatalogProperties, lakeFormat + "."); catalogProperties.putAll( DataLakeUtils.extractLakeCatalogProperties(tableOptions)); From 715997d0769a5c4ba1e9bfa4292a12036615a9d6 Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Mon, 15 Dec 2025 22:46:05 +0800 Subject: [PATCH 15/22] supplier --- .../apache/fluss/flink/catalog/Flink21Catalog.java | 3 ++- .../apache/fluss/flink/catalog/FlinkCatalog.java | 7 ++++--- .../fluss/flink/catalog/FlinkCatalogFactory.java | 13 +++++++------ .../flink/catalog/FlinkCatalogFactoryTest.java | 2 +- .../fluss/flink/catalog/FlinkCatalogITCase.java | 6 +++--- .../fluss/flink/catalog/FlinkCatalogTest.java | 8 ++++---- .../lake/iceberg/flink/FlinkCatalogLakeTest.java | 2 +- 7 files changed, 22 insertions(+), 19 deletions(-) diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java index 5340de17d3..c22c834903 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; /** A {@link FlinkCatalog} used for Flink 2.1. */ public class Flink21Catalog extends FlinkCatalog { @@ -41,7 +42,7 @@ public Flink21Catalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, - Map lakeCatalogProperties) { + Supplier> lakeCatalogProperties) { super( name, defaultDatabase, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index bb2c0e691d..bf06612a25 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -80,6 +80,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; @@ -115,7 +116,7 @@ public class FlinkCatalog extends AbstractCatalog { protected final String bootstrapServers; protected final Map securityConfigs; protected final LakeFlinkCatalog lakeFlinkCatalog; - protected final Map lakeCatalogProperties; + protected final Supplier> lakeCatalogProperties; protected Connection connection; protected Admin admin; @@ -125,7 +126,7 @@ public FlinkCatalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, - Map lakeCatalogProperties) { + Supplier> lakeCatalogProperties) { super(name, defaultDatabase); this.catalogName = name; this.defaultDatabase = defaultDatabase; @@ -302,7 +303,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath) objectPath.getDatabaseName(), tableName, tableInfo.getProperties(), - lakeCatalogProperties); + lakeCatalogProperties.get()); } else { tableInfo = admin.getTableInfo(tablePath).get(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java index b53f511ec1..d1c9bfc488 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java @@ -71,17 +71,18 @@ public FlinkCatalog createCatalog(Context context) { Map options = context.getOptions(); Map securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX); - Map lakeCatalogProperties = new HashMap<>(); - for (DataLakeFormat lakeFormat : DataLakeFormat.values()) { - lakeCatalogProperties.putAll(extractPrefix(options, lakeFormat.toString())); - } - return new FlinkCatalog( context.getName(), helper.getOptions().get(FlinkCatalogOptions.DEFAULT_DATABASE), helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS), context.getClassLoader(), securityConfigs, - lakeCatalogProperties); + () -> { + Map lakeCatalogProperties = new HashMap<>(); + for (DataLakeFormat lakeFormat : DataLakeFormat.values()) { + lakeCatalogProperties.putAll(extractPrefix(options, lakeFormat.toString())); + } + return lakeCatalogProperties; + }); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java index 3bc8079424..f47d9c56b8 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java @@ -65,7 +65,7 @@ public void testCreateCatalog() { BOOTSTRAP_SERVERS_NAME, Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), - Collections.emptyMap()); + Collections::emptyMap); checkEquals(flinkCatalog, actualCatalog); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index 1b794d6e52..65756a87da 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -116,7 +116,7 @@ static void beforeAll() { bootstrapServers, Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), - Collections.emptyMap()); + Collections::emptyMap); catalog.open(); } @@ -771,7 +771,7 @@ void testAuthentication() throws Exception { bootstrapServers, Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), - Collections.emptyMap()); + Collections::emptyMap); Catalog finalAuthenticateCatalog = authenticateCatalog; assertThatThrownBy(finalAuthenticateCatalog::open) .cause() @@ -790,7 +790,7 @@ void testAuthentication() throws Exception { bootstrapServers, Thread.currentThread().getContextClassLoader(), clientConfig, - Collections.emptyMap()); + Collections::emptyMap); authenticateCatalog.open(); assertThat(authenticateCatalog.listDatabases()) .containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB)); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 50490430a5..49ef4be816 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -169,7 +169,7 @@ static void beforeAll() { String.join(",", flussConf.get(BOOTSTRAP_SERVERS)), Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), - Collections.emptyMap()); + Collections::emptyMap); catalog.open(); } @@ -625,7 +625,7 @@ void testDatabase() throws Exception { flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)), Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), - Collections.emptyMap())) + Collections::emptyMap)) .hasMessageContaining("defaultDatabase cannot be null or empty"); } @@ -814,7 +814,7 @@ void testConnectionFailureHandling() { "invalid-bootstrap-server:9092", Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), - Collections.emptyMap()); + Collections::emptyMap); // Test open() throws proper exception assertThatThrownBy(() -> badCatalog.open()) @@ -945,7 +945,7 @@ void testSecurityConfigsIntegration() throws Exception { String.join(",", flussConf.get(BOOTSTRAP_SERVERS)), Thread.currentThread().getContextClassLoader(), securityConfigs, - Collections.emptyMap()); + Collections::emptyMap); securedCatalog.open(); try { diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java index 3e0cac400f..a6380cc444 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java @@ -103,7 +103,7 @@ public void buildCatalog() { bootstrapServers, Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), - Collections.emptyMap()); + Collections::emptyMap); catalog.open(); } } From 7de5bd61ed26509c4b0401b1d20ca6b7a6cac7be Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Mon, 15 Dec 2025 23:42:30 +0800 Subject: [PATCH 16/22] fix --- .../java/org/apache/fluss/flink/catalog/FlinkCatalog.java | 6 +++--- .../java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java | 5 +++-- .../java/org/apache/fluss/flink/lake/LakeTableFactory.java | 2 +- .../apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java | 2 +- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index bf06612a25..e71de5dfe0 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -303,7 +303,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath) objectPath.getDatabaseName(), tableName, tableInfo.getProperties(), - lakeCatalogProperties.get()); + lakeCatalogProperties); } else { tableInfo = admin.getTableInfo(tablePath).get(); } @@ -340,7 +340,7 @@ protected CatalogBaseTable getLakeTable( String databaseName, String tableName, Configuration properties, - Map lakeCatalogProperties) + Supplier> lakeCatalogProperties) throws TableNotExistException, CatalogException { String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER); if (tableComponents.length == 1) { @@ -767,7 +767,7 @@ public Map getSecurityConfigs() { } @VisibleForTesting - public Map getLakeCatalogProperties() { + public Supplier> getLakeCatalogProperties() { return lakeCatalogProperties; } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index b2c1b3f467..dfbf49b13b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -31,6 +31,7 @@ import java.lang.reflect.Method; import java.util.Map; +import java.util.function.Supplier; import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG; import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; @@ -51,7 +52,7 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) { } public Catalog getLakeCatalog( - Configuration tableOptions, Map lakeCatalogProperties) { + Configuration tableOptions, Supplier> lakeCatalogProperties) { // TODO: Currently, a Fluss cluster only supports a single DataLake storage. // However, in the // future, it may support multiple DataLakes. The following code assumes @@ -73,7 +74,7 @@ public Catalog getLakeCatalog( } Map catalogProperties = PropertiesUtils.extractAndRemovePrefix( - lakeCatalogProperties, lakeFormat + "."); + lakeCatalogProperties.get(), lakeFormat + "."); catalogProperties.putAll( DataLakeUtils.extractLakeCatalogProperties(tableOptions)); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java index ad9918f389..ba496b8452 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java @@ -85,7 +85,7 @@ private DynamicTableSourceFactory getIcebergFactory() { lakeFlinkCatalog.getLakeCatalog( // we can pass empty configuration to get catalog // since the catalog should already be initialized - new Configuration(), Collections.emptyMap()); + new Configuration(), Collections::emptyMap); // Create FlinkDynamicTableFactory with the catalog Class icebergFactoryClass = diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java index f47d9c56b8..a71f63e849 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java @@ -91,7 +91,7 @@ public void testCreateCatalog() { Thread.currentThread().getContextClassLoader()); assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap); - assertThat(actualCatalog2.getLakeCatalogProperties()).isEqualTo(lakeCatalogMap); + assertThat(actualCatalog2.getLakeCatalogProperties().get()).isEqualTo(lakeCatalogMap); } @Test From 3b415a2366f7f311e090a2f7d63d735a63400a1f Mon Sep 17 00:00:00 2001 From: maxcwang Date: Tue, 16 Dec 2025 16:47:38 +0800 Subject: [PATCH 17/22] add test --- .../fluss/flink/catalog/FlinkCatalogITCase.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index 65756a87da..b4075d23b0 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -818,6 +818,20 @@ void testCreateCatalogWithUnexistedDatabase() { "The configured default-database 'non-exist' does not exist in the Fluss cluster."); } + @Test + void testCreateCatalogWithLakeProperties() { + Map properties = new HashMap<>(); + properties.put("paimon.jdbc.password", "pass"); + tEnv.executeSql( + String.format( + "create catalog test_catalog_with_lake_properties with ('type' = 'fluss', '%s' = '%s', 'paimon.jdbc.password' = 'pass')", + BOOTSTRAP_SERVERS.key(), FLUSS_CLUSTER_EXTENSION.getBootstrapServers())); + FlinkCatalog catalog = + (FlinkCatalog) tEnv.getCatalog("test_catalog_with_lake_properties").get(); + + assertOptionsEqual(catalog.getLakeCatalogProperties().get(), properties); + } + /** * Before Flink 2.1, the {@link Schema} did not include an index field. Starting from Flink 2.1, * Flink introduced the concept of an index, and in Fluss, the primary key is considered as an From a6a1fb9c6c84d419e3015fc03af5f99b80423f12 Mon Sep 17 00:00:00 2001 From: maxcwang Date: Tue, 16 Dec 2025 16:59:15 +0800 Subject: [PATCH 18/22] move --- .../java/org/apache/fluss/config/FlussConfigUtils.java | 10 ---------- .../fluss/server/coordinator/MetadataManager.java | 10 +++++++++- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index 5eb3164f4f..fa9c4274c9 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -23,10 +23,8 @@ import java.lang.reflect.Field; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** Utilities of Fluss {@link ConfigOptions}. */ @Internal @@ -47,14 +45,6 @@ public class FlussConfigUtils { Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); } - public static final Set SENSITIVE_TABLE_OPTIONS = new HashSet<>(); - - static { - SENSITIVE_TABLE_OPTIONS.add("password"); - SENSITIVE_TABLE_OPTIONS.add("secret"); - SENSITIVE_TABLE_OPTIONS.add("key"); - } - public static boolean isTableStorageConfig(String key) { return key.startsWith(TABLE_PREFIX); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 3bea49361b..a2fc6cf3d6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -62,6 +62,7 @@ import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -69,7 +70,6 @@ import java.util.Set; import java.util.concurrent.Callable; -import static org.apache.fluss.config.FlussConfigUtils.SENSITIVE_TABLE_OPTIONS; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor; @@ -83,6 +83,14 @@ public class MetadataManager { private final int maxBucketNum; private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader; + public static final Set SENSITIVE_TABLE_OPTIONS = new HashSet<>(); + + static { + SENSITIVE_TABLE_OPTIONS.add("password"); + SENSITIVE_TABLE_OPTIONS.add("secret"); + SENSITIVE_TABLE_OPTIONS.add("key"); + } + /** * Creates a new metadata manager. * From c14537aa1a18a78f16ae1944a7fd4b5ff1c3f05c Mon Sep 17 00:00:00 2001 From: maxcwang Date: Tue, 16 Dec 2025 17:55:38 +0800 Subject: [PATCH 19/22] move --- website/docs/engine-flink/ddl.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md index a542db143c..cc5bfcd974 100644 --- a/website/docs/engine-flink/ddl.md +++ b/website/docs/engine-flink/ddl.md @@ -21,13 +21,14 @@ USE CATALOG fluss_catalog; The following properties can be set if using the Fluss catalog: -| Option | Required | Default | Description | -|--------------------------------|----------|-----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| type | required | (none) | Catalog type, must be 'fluss' here. | -| bootstrap.servers | required | (none) | Comma separated list of Fluss servers. | -| default-database | optional | fluss | The default database to use when switching to this catalog. | -| client.security.protocol | optional | PLAINTEXT | The security protocol used to communicate with brokers. Currently, only `PLAINTEXT` and `SASL` are supported, the configuration value is case insensitive. | -| `client.security.{protocol}.*` | optional | (none) | Client-side configuration properties for a specific authentication protocol. E.g., client.security.sasl.jaas.config. More Details in [authentication](../security/authentication.md) | +| Option | Required | Default | Description | +|--------------------------------|----------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| type | required | (none) | Catalog type, must be 'fluss' here. | +| bootstrap.servers | required | (none) | Comma separated list of Fluss servers. | +| default-database | optional | fluss | The default database to use when switching to this catalog. | +| client.security.protocol | optional | PLAINTEXT | The security protocol used to communicate with brokers. Currently, only `PLAINTEXT` and `SASL` are supported, the configuration value is case insensitive. | +| `client.security.{protocol}.*` | optional | (none) | Client-side configuration properties for a specific authentication protocol. E.g., client.security.sasl.jaas.config. More Details in [authentication](../security/authentication.md) | +| `{lake-format}.*` | optional | (none) | Extra properties to be passed to the lake catalog. This is useful for configuring sensitive settings, such as the username and password required for lake catalog authentication. E.g., `paimon.jdbc.password = pass`. | The following statements assume that the current catalog has been switched to the Fluss catalog using the `USE CATALOG ` statement. From 18b017f4152886684743ddd8bc28035e9b8c80de Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Tue, 16 Dec 2025 20:55:14 +0800 Subject: [PATCH 20/22] rebase --- .../java/org/apache/fluss/flink/catalog/Flink21Catalog.java | 2 ++ .../org/apache/fluss/flink/catalog/FlinkCatalog21Test.java | 1 + .../java/org/apache/fluss/flink/catalog/FlinkCatalog.java | 6 +++++- .../org/apache/fluss/flink/catalog/FlinkCatalogTest.java | 5 ++++- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java index bac7d60065..e2b77dfa51 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java @@ -61,6 +61,7 @@ public Flink21Catalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, + Supplier> lakeCatalogProperties, LakeFlinkCatalog lakeFlinkCatalog) { super( name, @@ -68,6 +69,7 @@ public Flink21Catalog( bootstrapServers, classLoader, securityConfigs, + lakeCatalogProperties, lakeFlinkCatalog); } diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java index e66a625e59..d6aa6ef564 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java +++ b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java @@ -43,6 +43,7 @@ protected FlinkCatalog initCatalog( bootstrapServers, Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), + Collections::emptyMap, lakeFlinkCatalog); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index a095995a8d..b8a5186590 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -125,13 +125,15 @@ public FlinkCatalog( String defaultDatabase, String bootstrapServers, ClassLoader classLoader, - Map securityConfigs) { + Map securityConfigs, + Supplier> lakeCatalogProperties) { this( name, defaultDatabase, bootstrapServers, classLoader, securityConfigs, + lakeCatalogProperties, new LakeFlinkCatalog(name, classLoader)); } @@ -142,6 +144,7 @@ public FlinkCatalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, + Supplier> lakeCatalogProperties, LakeFlinkCatalog lakeFlinkCatalog) { super(name, defaultDatabase); this.catalogName = name; @@ -149,6 +152,7 @@ public FlinkCatalog( this.bootstrapServers = bootstrapServers; this.classLoader = classLoader; this.securityConfigs = securityConfigs; + this.lakeCatalogProperties = lakeCatalogProperties; this.lakeFlinkCatalog = lakeFlinkCatalog; } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index c883ddc14b..f17ee59e1d 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -72,6 +72,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; @@ -172,6 +173,7 @@ protected FlinkCatalog initCatalog( bootstrapServers, Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), + Collections::emptyMap, lakeFlinkCatalog); } @@ -1009,7 +1011,8 @@ public MockLakeFlinkCatalog(String catalogName, ClassLoader classLoader) { } @Override - public Catalog getLakeCatalog(Configuration tableOptions) { + public Catalog getLakeCatalog( + Configuration tableOptions, Supplier> lakeCatalogProperties) { return catalog; } From 32826f5f8adfa576bb5f501aec89126a5877f38e Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Tue, 16 Dec 2025 23:01:46 +0800 Subject: [PATCH 21/22] rename --- .../fluss/flink/catalog/Flink21Catalog.java | 8 ++++---- .../flink/catalog/Flink21CatalogFactory.java | 2 +- .../flink/catalog/Flink21CatalogITCase.java | 2 +- .../fluss/flink/catalog/FlinkCatalog.java | 20 +++++++++---------- .../fluss/flink/lake/LakeFlinkCatalog.java | 5 +++-- .../catalog/FlinkCatalogFactoryTest.java | 2 +- .../flink/catalog/FlinkCatalogITCase.java | 2 +- .../fluss/flink/catalog/FlinkCatalogTest.java | 3 ++- 8 files changed, 23 insertions(+), 21 deletions(-) diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java index e2b77dfa51..d14e3af6fb 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java @@ -44,14 +44,14 @@ public Flink21Catalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, - Supplier> lakeCatalogProperties) { + Supplier> lakeCatalogPropertiesSupplier) { super( name, defaultDatabase, bootstrapServers, classLoader, securityConfigs, - lakeCatalogProperties); + lakeCatalogPropertiesSupplier); } @VisibleForTesting @@ -61,7 +61,7 @@ public Flink21Catalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, - Supplier> lakeCatalogProperties, + Supplier> lakeCatalogPropertiesSupplier, LakeFlinkCatalog lakeFlinkCatalog) { super( name, @@ -69,7 +69,7 @@ public Flink21Catalog( bootstrapServers, classLoader, securityConfigs, - lakeCatalogProperties, + lakeCatalogPropertiesSupplier, lakeFlinkCatalog); } diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java index 3b28afed7e..cff44ab866 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java @@ -30,6 +30,6 @@ public FlinkCatalog createCatalog(Context context) { catalog.bootstrapServers, catalog.classLoader, catalog.securityConfigs, - catalog.lakeCatalogProperties); + catalog.lakeCatalogPropertiesSupplier); } } diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java index 303fd1dce8..62bf5b9aa0 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java +++ b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java @@ -43,7 +43,7 @@ static void beforeAll() { catalog.bootstrapServers, catalog.classLoader, catalog.securityConfigs, - catalog.lakeCatalogProperties); + catalog.lakeCatalogPropertiesSupplier); catalog.open(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index b8a5186590..cd4c775b57 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -116,7 +116,7 @@ public class FlinkCatalog extends AbstractCatalog { protected final String bootstrapServers; protected final Map securityConfigs; protected final LakeFlinkCatalog lakeFlinkCatalog; - protected final Supplier> lakeCatalogProperties; + protected final Supplier> lakeCatalogPropertiesSupplier; protected Connection connection; protected Admin admin; @@ -126,14 +126,14 @@ public FlinkCatalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, - Supplier> lakeCatalogProperties) { + Supplier> lakeCatalogPropertiesSupplier) { this( name, defaultDatabase, bootstrapServers, classLoader, securityConfigs, - lakeCatalogProperties, + lakeCatalogPropertiesSupplier, new LakeFlinkCatalog(name, classLoader)); } @@ -144,7 +144,7 @@ public FlinkCatalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, - Supplier> lakeCatalogProperties, + Supplier> lakeCatalogPropertiesSupplier, LakeFlinkCatalog lakeFlinkCatalog) { super(name, defaultDatabase); this.catalogName = name; @@ -152,7 +152,7 @@ public FlinkCatalog( this.bootstrapServers = bootstrapServers; this.classLoader = classLoader; this.securityConfigs = securityConfigs; - this.lakeCatalogProperties = lakeCatalogProperties; + this.lakeCatalogPropertiesSupplier = lakeCatalogPropertiesSupplier; this.lakeFlinkCatalog = lakeFlinkCatalog; } @@ -323,7 +323,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath) objectPath.getDatabaseName(), tableName, tableInfo.getProperties(), - lakeCatalogProperties); + lakeCatalogPropertiesSupplier); } else { tableInfo = admin.getTableInfo(tablePath).get(); } @@ -360,7 +360,7 @@ protected CatalogBaseTable getLakeTable( String databaseName, String tableName, Configuration properties, - Supplier> lakeCatalogProperties) + Supplier> lakeCatalogPropertiesSupplier) throws TableNotExistException, CatalogException { String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER); if (tableComponents.length == 1) { @@ -372,7 +372,7 @@ protected CatalogBaseTable getLakeTable( tableName = String.join("", tableComponents); } return lakeFlinkCatalog - .getLakeCatalog(properties, lakeCatalogProperties) + .getLakeCatalog(properties, lakeCatalogPropertiesSupplier) .getTable(new ObjectPath(databaseName, tableName)); } @@ -787,7 +787,7 @@ public Map getSecurityConfigs() { } @VisibleForTesting - public Supplier> getLakeCatalogProperties() { - return lakeCatalogProperties; + public Map getLakeCatalogProperties() { + return lakeCatalogPropertiesSupplier.get(); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index 6631907def..a382825c9a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -52,7 +52,8 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) { } public Catalog getLakeCatalog( - Configuration tableOptions, Supplier> lakeCatalogProperties) { + Configuration tableOptions, + Supplier> lakeCatalogPropertiesSupplier) { // TODO: Currently, a Fluss cluster only supports a single DataLake storage. // However, in the // future, it may support multiple DataLakes. The following code assumes @@ -74,7 +75,7 @@ public Catalog getLakeCatalog( } Map catalogProperties = PropertiesUtils.extractAndRemovePrefix( - lakeCatalogProperties.get(), lakeFormat + "."); + lakeCatalogPropertiesSupplier.get(), lakeFormat + "."); catalogProperties.putAll( DataLakeUtils.extractLakeCatalogProperties(tableOptions)); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java index a71f63e849..f47d9c56b8 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java @@ -91,7 +91,7 @@ public void testCreateCatalog() { Thread.currentThread().getContextClassLoader()); assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap); - assertThat(actualCatalog2.getLakeCatalogProperties().get()).isEqualTo(lakeCatalogMap); + assertThat(actualCatalog2.getLakeCatalogProperties()).isEqualTo(lakeCatalogMap); } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index b4075d23b0..6e640959f9 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -829,7 +829,7 @@ void testCreateCatalogWithLakeProperties() { FlinkCatalog catalog = (FlinkCatalog) tEnv.getCatalog("test_catalog_with_lake_properties").get(); - assertOptionsEqual(catalog.getLakeCatalogProperties().get(), properties); + assertOptionsEqual(catalog.getLakeCatalogProperties(), properties); } /** diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index f17ee59e1d..93c630bb18 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -1012,7 +1012,8 @@ public MockLakeFlinkCatalog(String catalogName, ClassLoader classLoader) { @Override public Catalog getLakeCatalog( - Configuration tableOptions, Supplier> lakeCatalogProperties) { + Configuration tableOptions, + Supplier> lakeCatalogPropertiesSupplier) { return catalog; } From 15de5fd32728b345e661e184c1976029aa7d4d8e Mon Sep 17 00:00:00 2001 From: maxcwang Date: Wed, 17 Dec 2025 10:19:44 +0800 Subject: [PATCH 22/22] fix supplier --- .../apache/fluss/flink/catalog/FlinkCatalog.java | 16 ++++++++++++---- .../fluss/flink/lake/LakeFlinkCatalog.java | 6 ++---- .../fluss/flink/lake/LakeTableFactory.java | 2 +- .../fluss/flink/catalog/FlinkCatalogTest.java | 4 +--- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index cd4c775b57..724983f9af 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -116,6 +116,7 @@ public class FlinkCatalog extends AbstractCatalog { protected final String bootstrapServers; protected final Map securityConfigs; protected final LakeFlinkCatalog lakeFlinkCatalog; + protected volatile Map lakeCatalogProperties; protected final Supplier> lakeCatalogPropertiesSupplier; protected Connection connection; protected Admin admin; @@ -323,7 +324,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath) objectPath.getDatabaseName(), tableName, tableInfo.getProperties(), - lakeCatalogPropertiesSupplier); + getLakeCatalogProperties()); } else { tableInfo = admin.getTableInfo(tablePath).get(); } @@ -360,7 +361,7 @@ protected CatalogBaseTable getLakeTable( String databaseName, String tableName, Configuration properties, - Supplier> lakeCatalogPropertiesSupplier) + Map lakeCatalogProperties) throws TableNotExistException, CatalogException { String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER); if (tableComponents.length == 1) { @@ -372,7 +373,7 @@ protected CatalogBaseTable getLakeTable( tableName = String.join("", tableComponents); } return lakeFlinkCatalog - .getLakeCatalog(properties, lakeCatalogPropertiesSupplier) + .getLakeCatalog(properties, lakeCatalogProperties) .getTable(new ObjectPath(databaseName, tableName)); } @@ -788,6 +789,13 @@ public Map getSecurityConfigs() { @VisibleForTesting public Map getLakeCatalogProperties() { - return lakeCatalogPropertiesSupplier.get(); + if (lakeCatalogProperties == null) { + synchronized (this) { + if (lakeCatalogProperties == null) { + lakeCatalogProperties = lakeCatalogPropertiesSupplier.get(); + } + } + } + return lakeCatalogProperties; } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index a382825c9a..6e8b2a5e25 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -31,7 +31,6 @@ import java.lang.reflect.Method; import java.util.Map; -import java.util.function.Supplier; import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG; import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; @@ -52,8 +51,7 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) { } public Catalog getLakeCatalog( - Configuration tableOptions, - Supplier> lakeCatalogPropertiesSupplier) { + Configuration tableOptions, Map lakeCatalogProperties) { // TODO: Currently, a Fluss cluster only supports a single DataLake storage. // However, in the // future, it may support multiple DataLakes. The following code assumes @@ -75,7 +73,7 @@ public Catalog getLakeCatalog( } Map catalogProperties = PropertiesUtils.extractAndRemovePrefix( - lakeCatalogPropertiesSupplier.get(), lakeFormat + "."); + lakeCatalogProperties, lakeFormat + "."); catalogProperties.putAll( DataLakeUtils.extractLakeCatalogProperties(tableOptions)); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java index ba496b8452..ad9918f389 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java @@ -85,7 +85,7 @@ private DynamicTableSourceFactory getIcebergFactory() { lakeFlinkCatalog.getLakeCatalog( // we can pass empty configuration to get catalog // since the catalog should already be initialized - new Configuration(), Collections::emptyMap); + new Configuration(), Collections.emptyMap()); // Create FlinkDynamicTableFactory with the catalog Class icebergFactoryClass = diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 93c630bb18..9b915a74d5 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -72,7 +72,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Supplier; import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; @@ -1012,8 +1011,7 @@ public MockLakeFlinkCatalog(String catalogName, ClassLoader classLoader) { @Override public Catalog getLakeCatalog( - Configuration tableOptions, - Supplier> lakeCatalogPropertiesSupplier) { + Configuration tableOptions, Map lakeCatalogProperties) { return catalog; }