diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java index 7a6f737e02..d14e3af6fb 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; /** A {@link FlinkCatalog} used for Flink 2.1. */ public class Flink21Catalog extends FlinkCatalog { @@ -42,8 +43,15 @@ public Flink21Catalog( String defaultDatabase, String bootstrapServers, ClassLoader classLoader, - Map securityConfigs) { - super(name, defaultDatabase, bootstrapServers, classLoader, securityConfigs); + Map securityConfigs, + Supplier> lakeCatalogPropertiesSupplier) { + super( + name, + defaultDatabase, + bootstrapServers, + classLoader, + securityConfigs, + lakeCatalogPropertiesSupplier); } @VisibleForTesting @@ -53,6 +61,7 @@ public Flink21Catalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, + Supplier> lakeCatalogPropertiesSupplier, LakeFlinkCatalog lakeFlinkCatalog) { super( name, @@ -60,6 +69,7 @@ public Flink21Catalog( bootstrapServers, classLoader, securityConfigs, + lakeCatalogPropertiesSupplier, lakeFlinkCatalog); } diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java index 8557a552f6..cff44ab866 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java +++ b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java @@ -29,6 +29,7 @@ public FlinkCatalog createCatalog(Context context) { catalog.defaultDatabase, catalog.bootstrapServers, catalog.classLoader, - catalog.securityConfigs); + catalog.securityConfigs, + catalog.lakeCatalogPropertiesSupplier); } } diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java index c0b9b91965..62bf5b9aa0 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java +++ b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java @@ -42,7 +42,8 @@ static void beforeAll() { catalog.defaultDatabase, catalog.bootstrapServers, catalog.classLoader, - catalog.securityConfigs); + catalog.securityConfigs, + catalog.lakeCatalogPropertiesSupplier); catalog.open(); } diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java index e66a625e59..d6aa6ef564 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java +++ b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java @@ -43,6 +43,7 @@ protected FlinkCatalog initCatalog( bootstrapServers, Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), + Collections::emptyMap, lakeFlinkCatalog); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 16b05089a4..724983f9af 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -80,6 +80,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; @@ -115,6 +116,8 @@ public class FlinkCatalog extends AbstractCatalog { protected final String bootstrapServers; protected final Map securityConfigs; protected final LakeFlinkCatalog lakeFlinkCatalog; + protected volatile Map lakeCatalogProperties; + protected final Supplier> lakeCatalogPropertiesSupplier; protected Connection connection; protected Admin admin; @@ -123,13 +126,15 @@ public FlinkCatalog( String defaultDatabase, String bootstrapServers, ClassLoader classLoader, - Map securityConfigs) { + Map securityConfigs, + Supplier> lakeCatalogPropertiesSupplier) { this( name, defaultDatabase, bootstrapServers, classLoader, securityConfigs, + lakeCatalogPropertiesSupplier, new LakeFlinkCatalog(name, classLoader)); } @@ -140,6 +145,7 @@ public FlinkCatalog( String bootstrapServers, ClassLoader classLoader, Map securityConfigs, + Supplier> lakeCatalogPropertiesSupplier, LakeFlinkCatalog lakeFlinkCatalog) { super(name, defaultDatabase); this.catalogName = name; @@ -147,6 +153,7 @@ public FlinkCatalog( this.bootstrapServers = bootstrapServers; this.classLoader = classLoader; this.securityConfigs = securityConfigs; + this.lakeCatalogPropertiesSupplier = lakeCatalogPropertiesSupplier; this.lakeFlinkCatalog = lakeFlinkCatalog; } @@ -312,8 +319,12 @@ public CatalogBaseTable getTable(ObjectPath objectPath) objectPath.getDatabaseName(), tableName.split("\\" + LAKE_TABLE_SPLITTER)[0]))); } + return getLakeTable( - objectPath.getDatabaseName(), tableName, tableInfo.getProperties()); + objectPath.getDatabaseName(), + tableName, + tableInfo.getProperties(), + getLakeCatalogProperties()); } else { tableInfo = admin.getTableInfo(tablePath).get(); } @@ -347,7 +358,10 @@ public CatalogBaseTable getTable(ObjectPath objectPath) } protected CatalogBaseTable getLakeTable( - String databaseName, String tableName, Configuration properties) + String databaseName, + String tableName, + Configuration properties, + Map lakeCatalogProperties) throws TableNotExistException, CatalogException { String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER); if (tableComponents.length == 1) { @@ -359,7 +373,7 @@ protected CatalogBaseTable getLakeTable( tableName = String.join("", tableComponents); } return lakeFlinkCatalog - .getLakeCatalog(properties) + .getLakeCatalog(properties, lakeCatalogProperties) .getTable(new ObjectPath(databaseName, tableName)); } @@ -772,4 +786,16 @@ public Procedure getProcedure(ObjectPath procedurePath) public Map getSecurityConfigs() { return securityConfigs; } + + @VisibleForTesting + public Map getLakeCatalogProperties() { + if (lakeCatalogProperties == null) { + synchronized (this) { + if (lakeCatalogProperties == null) { + lakeCatalogProperties = lakeCatalogPropertiesSupplier.get(); + } + } + } + return lakeCatalogProperties; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java index 83bd3cd21f..d1c9bfc488 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java @@ -18,12 +18,16 @@ package org.apache.fluss.flink.catalog; import org.apache.fluss.flink.FlinkConnectorOptions; +import org.apache.fluss.metadata.DataLakeFormat; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; @@ -35,6 +39,15 @@ public class FlinkCatalogFactory implements CatalogFactory { public static final String IDENTIFIER = "fluss"; + public static final List PREFIXES_TO_SKIP_VALIDATE = new ArrayList<>(); + + static { + PREFIXES_TO_SKIP_VALIDATE.add(CLIENT_SECURITY_PREFIX); + for (DataLakeFormat value : DataLakeFormat.values()) { + PREFIXES_TO_SKIP_VALIDATE.add(value.toString()); + } + } + @Override public String factoryIdentifier() { return IDENTIFIER; @@ -54,7 +67,7 @@ public Set> optionalOptions() { public FlinkCatalog createCatalog(Context context) { final FactoryUtil.CatalogFactoryHelper helper = FactoryUtil.createCatalogFactoryHelper(this, context); - helper.validateExcept(CLIENT_SECURITY_PREFIX); + helper.validateExcept(PREFIXES_TO_SKIP_VALIDATE.toArray(new String[0])); Map options = context.getOptions(); Map securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX); @@ -63,6 +76,13 @@ public FlinkCatalog createCatalog(Context context) { helper.getOptions().get(FlinkCatalogOptions.DEFAULT_DATABASE), helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS), context.getClassLoader(), - securityConfigs); + securityConfigs, + () -> { + Map lakeCatalogProperties = new HashMap<>(); + for (DataLakeFormat lakeFormat : DataLakeFormat.values()) { + lakeCatalogProperties.putAll(extractPrefix(options, lakeFormat.toString())); + } + return lakeCatalogProperties; + }); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java index b517764190..6e8b2a5e25 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java @@ -21,6 +21,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.utils.DataLakeUtils; import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.utils.PropertiesUtils; import org.apache.flink.table.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; @@ -49,7 +50,8 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) { this.classLoader = classLoader; } - public Catalog getLakeCatalog(Configuration tableOptions) { + public Catalog getLakeCatalog( + Configuration tableOptions, Map lakeCatalogProperties) { // TODO: Currently, a Fluss cluster only supports a single DataLake storage. // However, in the // future, it may support multiple DataLakes. The following code assumes @@ -69,12 +71,19 @@ public Catalog getLakeCatalog(Configuration tableOptions) { + ConfigOptions.TABLE_DATALAKE_FORMAT.key() + "' is set."); } + Map catalogProperties = + PropertiesUtils.extractAndRemovePrefix( + lakeCatalogProperties, lakeFormat + "."); + + catalogProperties.putAll( + DataLakeUtils.extractLakeCatalogProperties(tableOptions)); if (lakeFormat == PAIMON) { catalog = - PaimonCatalogFactory.create(catalogName, tableOptions, classLoader); + PaimonCatalogFactory.create( + catalogName, catalogProperties, classLoader); this.lakeFormat = PAIMON; } else if (lakeFormat == ICEBERG) { - catalog = IcebergCatalogFactory.create(catalogName, tableOptions); + catalog = IcebergCatalogFactory.create(catalogName, catalogProperties); this.lakeFormat = ICEBERG; } else { throw new UnsupportedOperationException( @@ -111,9 +120,9 @@ public static class PaimonCatalogFactory { private PaimonCatalogFactory() {} public static Catalog create( - String catalogName, Configuration tableOptions, ClassLoader classLoader) { - Map catalogProperties = - DataLakeUtils.extractLakeCatalogProperties(tableOptions); + String catalogName, + Map catalogProperties, + ClassLoader classLoader) { return FlinkCatalogFactory.createCatalog( catalogName, CatalogContext.create( @@ -131,9 +140,7 @@ private IcebergCatalogFactory() {} // requires Iceberg 1.5.0+. // Using reflection to maintain Java 8 compatibility. // Once Fluss drops Java 8, we can remove the reflection code - public static Catalog create(String catalogName, Configuration tableOptions) { - Map catalogProperties = - DataLakeUtils.extractLakeCatalogProperties(tableOptions); + public static Catalog create(String catalogName, Map catalogProperties) { // Map "type" to "catalog-type" (equivalent) // Required: either "catalog-type" (standard type) or "catalog-impl" // (fully-qualified custom class, mandatory if "catalog-type" is missing) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java index 3f0ff88c67..ad9918f389 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java @@ -25,6 +25,8 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; +import java.util.Collections; + /** A factory to create {@link DynamicTableSource} for lake table. */ public class LakeTableFactory { private final LakeFlinkCatalog lakeFlinkCatalog; @@ -83,7 +85,7 @@ private DynamicTableSourceFactory getIcebergFactory() { lakeFlinkCatalog.getLakeCatalog( // we can pass empty configuration to get catalog // since the catalog should already be initialized - new Configuration()); + new Configuration(), Collections.emptyMap()); // Create FlinkDynamicTableFactory with the catalog Class icebergFactoryClass = diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java index 4a17cc3c87..f47d9c56b8 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java @@ -64,7 +64,8 @@ public void testCreateCatalog() { DB_NAME, BOOTSTRAP_SERVERS_NAME, Thread.currentThread().getContextClassLoader(), - Collections.emptyMap()); + Collections.emptyMap(), + Collections::emptyMap); checkEquals(flinkCatalog, actualCatalog); @@ -75,7 +76,12 @@ public void testCreateCatalog() { securityMap.put("client.security.sasl.username", "root"); securityMap.put("client.security.sasl.password", "password"); + Map lakeCatalogMap = new HashMap<>(); + lakeCatalogMap.put("paimon.jdbc.user", "admin"); + lakeCatalogMap.put("paimon.jdbc.password", "pass"); + options.putAll(securityMap); + options.putAll(lakeCatalogMap); FlinkCatalog actualCatalog2 = (FlinkCatalog) FactoryUtil.createCatalog( @@ -85,6 +91,7 @@ public void testCreateCatalog() { Thread.currentThread().getContextClassLoader()); assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap); + assertThat(actualCatalog2.getLakeCatalogProperties()).isEqualTo(lakeCatalogMap); } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index 570d7121a8..6e640959f9 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -115,7 +115,8 @@ static void beforeAll() { DEFAULT_DB, bootstrapServers, Thread.currentThread().getContextClassLoader(), - Collections.emptyMap()); + Collections.emptyMap(), + Collections::emptyMap); catalog.open(); } @@ -769,7 +770,8 @@ void testAuthentication() throws Exception { DEFAULT_DB, bootstrapServers, Thread.currentThread().getContextClassLoader(), - Collections.emptyMap()); + Collections.emptyMap(), + Collections::emptyMap); Catalog finalAuthenticateCatalog = authenticateCatalog; assertThatThrownBy(finalAuthenticateCatalog::open) .cause() @@ -787,7 +789,8 @@ void testAuthentication() throws Exception { DEFAULT_DB, bootstrapServers, Thread.currentThread().getContextClassLoader(), - clientConfig); + clientConfig, + Collections::emptyMap); authenticateCatalog.open(); assertThat(authenticateCatalog.listDatabases()) .containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB)); @@ -815,6 +818,20 @@ void testCreateCatalogWithUnexistedDatabase() { "The configured default-database 'non-exist' does not exist in the Fluss cluster."); } + @Test + void testCreateCatalogWithLakeProperties() { + Map properties = new HashMap<>(); + properties.put("paimon.jdbc.password", "pass"); + tEnv.executeSql( + String.format( + "create catalog test_catalog_with_lake_properties with ('type' = 'fluss', '%s' = '%s', 'paimon.jdbc.password' = 'pass')", + BOOTSTRAP_SERVERS.key(), FLUSS_CLUSTER_EXTENSION.getBootstrapServers())); + FlinkCatalog catalog = + (FlinkCatalog) tEnv.getCatalog("test_catalog_with_lake_properties").get(); + + assertOptionsEqual(catalog.getLakeCatalogProperties(), properties); + } + /** * Before Flink 2.1, the {@link Schema} did not include an index field. Starting from Flink 2.1, * Flink introduced the concept of an index, and in Fluss, the primary key is considered as an diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index c32492c4b7..9b915a74d5 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -172,6 +172,7 @@ protected FlinkCatalog initCatalog( bootstrapServers, Thread.currentThread().getContextClassLoader(), Collections.emptyMap(), + Collections::emptyMap, lakeFlinkCatalog); } @@ -645,7 +646,8 @@ void testDatabase() throws Exception { ",", flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)), Thread.currentThread().getContextClassLoader(), - Collections.emptyMap())) + Collections.emptyMap(), + Collections::emptyMap)) .hasMessageContaining("defaultDatabase cannot be null or empty"); } @@ -833,7 +835,8 @@ void testConnectionFailureHandling() { "default", "invalid-bootstrap-server:9092", Thread.currentThread().getContextClassLoader(), - Collections.emptyMap()); + Collections.emptyMap(), + Collections::emptyMap); // Test open() throws proper exception assertThatThrownBy(() -> badCatalog.open()) @@ -963,7 +966,8 @@ void testSecurityConfigsIntegration() throws Exception { DEFAULT_DB, String.join(",", flussConf.get(BOOTSTRAP_SERVERS)), Thread.currentThread().getContextClassLoader(), - securityConfigs); + securityConfigs, + Collections::emptyMap); securedCatalog.open(); try { @@ -1006,7 +1010,8 @@ public MockLakeFlinkCatalog(String catalogName, ClassLoader classLoader) { } @Override - public Catalog getLakeCatalog(Configuration tableOptions) { + public Catalog getLakeCatalog( + Configuration tableOptions, Map lakeCatalogProperties) { return catalog; } diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java index f1305b99d2..a6380cc444 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java @@ -102,7 +102,8 @@ public void buildCatalog() { DEFAULT_DB, bootstrapServers, Thread.currentThread().getContextClassLoader(), - Collections.emptyMap()); + Collections.emptyMap(), + Collections::emptyMap); catalog.open(); } } diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md index a542db143c..cc5bfcd974 100644 --- a/website/docs/engine-flink/ddl.md +++ b/website/docs/engine-flink/ddl.md @@ -21,13 +21,14 @@ USE CATALOG fluss_catalog; The following properties can be set if using the Fluss catalog: -| Option | Required | Default | Description | -|--------------------------------|----------|-----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| type | required | (none) | Catalog type, must be 'fluss' here. | -| bootstrap.servers | required | (none) | Comma separated list of Fluss servers. | -| default-database | optional | fluss | The default database to use when switching to this catalog. | -| client.security.protocol | optional | PLAINTEXT | The security protocol used to communicate with brokers. Currently, only `PLAINTEXT` and `SASL` are supported, the configuration value is case insensitive. | -| `client.security.{protocol}.*` | optional | (none) | Client-side configuration properties for a specific authentication protocol. E.g., client.security.sasl.jaas.config. More Details in [authentication](../security/authentication.md) | +| Option | Required | Default | Description | +|--------------------------------|----------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| type | required | (none) | Catalog type, must be 'fluss' here. | +| bootstrap.servers | required | (none) | Comma separated list of Fluss servers. | +| default-database | optional | fluss | The default database to use when switching to this catalog. | +| client.security.protocol | optional | PLAINTEXT | The security protocol used to communicate with brokers. Currently, only `PLAINTEXT` and `SASL` are supported, the configuration value is case insensitive. | +| `client.security.{protocol}.*` | optional | (none) | Client-side configuration properties for a specific authentication protocol. E.g., client.security.sasl.jaas.config. More Details in [authentication](../security/authentication.md) | +| `{lake-format}.*` | optional | (none) | Extra properties to be passed to the lake catalog. This is useful for configuring sensitive settings, such as the username and password required for lake catalog authentication. E.g., `paimon.jdbc.password = pass`. | The following statements assume that the current catalog has been switched to the Fluss catalog using the `USE CATALOG ` statement.