Skip to content

Conversation

@xx789633
Copy link
Contributor

@xx789633 xx789633 commented Nov 5, 2025

Purpose

Linked issue: close #1903

Brief change log

Tests

API and Format

Documentation

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xx789633 Thanks for the pr. But it should allow pass all options instead of sensitive table options only. I image some thing like:
create catalog t1 with (
'paimon.k1' = 'v1',
'paimon.k2 = 'v2'
)
to enable pass 'k1' = 'v1' as well as 'k2 = 'v2

@xx789633
Copy link
Contributor Author

You are correct @luoyuxia . Let me rename it.

Comment on lines 60 to 65
helper.validateExcept(
Stream.concat(
Stream.of(CLIENT_SECURITY_PREFIX),
Arrays.stream(DataLakeFormat.values())
.map(DataLakeFormat::toString))
.toArray(String[]::new));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to make it extensiable. Thus if the subclass want to support new configs, they don't need to overwrite createCatalog.

    protected static final List<String> ALLOW_CATALOG_CONFIG_PREFIX = new ArrayList<>();

    static {
        ALLOW_CATALOG_CONFIG_PREFIX.add(CLIENT_SECURITY_PREFIX);
        // support lake catalog options
        for (DataLakeFormat value : DataLakeFormat.values()) {
            ALLOW_CATALOG_CONFIG_PREFIX.add(value.toString());
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Done.

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xx789633 Thanks for the pr. Left minor comments. PTAL
Also, is it possible to document this behavior that users can pass lake related properties to Flink Catalog via SQL DDL?

Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
}

public static final Set<String> SENSITIVE_TABLE_OPTIONS = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move to here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems similar configs like ALTERABLE_TABLE_OPTIONS are all in this file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But currently, I don't want to expose them in FlussConfigUtils...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static <V> Map<String, V> extractPrefix(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why introducr a new method? Can't we reuse extractPrefix(Map<String, V> originalMap, String prefix)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to handle in FlinkCatalogFactory. The method is hard to reuse...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. just reuse extractPrefix(Map<String, V> originalMap, String prefix).

+ "' is set.");
}
String dataLakePrefix = lakeFormat.toString() + ".";
Map<String, String> catalogProperties =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be

Map<String, String> catalogProperties =
                            PropertiesUtils.extractAndRemovePrefix(
                                    lakeCatalogProperties, lakeFormat + ".");

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

securityMap.put("client.security.sasl.username", "root");
securityMap.put("client.security.sasl.password", "password");

Map<String, String> lakeCatalogMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test in FlinkCatalogITCase to verify SQL can pass the properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new test testCreateCatalogWithLakeProperties is added.

protected final String bootstrapServers;
protected final Map<String, String> securityConfigs;
protected final LakeFlinkCatalog lakeFlinkCatalog;
protected final Map<String, String> lakeCatalogProperties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please make lakeCatalogProperties to be Supplier<Map<String, String>> lakeCatalogPropertiesProvider. The reason it that it make it more plugable. In our inner use, we will retrieve the lake catalog related from flink conf, but we only want to do the retrieve if it's required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. done.

@xx789633
Copy link
Contributor Author

doc is updated. please take a look @luoyuxia

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xx789633 Left some comments again

Map<String, String> securityConfigs) {
super(name, defaultDatabase, bootstrapServers, classLoader, securityConfigs);
Map<String, String> securityConfigs,
Supplier<Map<String, String>> lakeCatalogProperties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Supplier<Map<String, String>> lakeCatalogProperties) {
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {

String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs,
Supplier<Map<String, String>> lakeCatalogProperties,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito

protected final String bootstrapServers;
protected final Map<String, String> securityConfigs;
protected final LakeFlinkCatalog lakeFlinkCatalog;
protected final Supplier<Map<String, String>> lakeCatalogProperties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito

objectPath.getDatabaseName(),
tableName,
tableInfo.getProperties(),
lakeCatalogProperties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can get in here directly
lakeCatalogProperties.get()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acquiring the properties here will frequently invoke the supplier function and bring some overhead. Actually, the properties are only needed when creating the lake catalog.

}

@VisibleForTesting
public Supplier<Map<String, String>> getLakeCatalogProperties() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public Supplier<Map<String, String>> getLakeCatalogProperties() {
public Map<String, String> getLakeCatalogProperties() {

@luoyuxia luoyuxia merged commit 35d92c5 into apache:main Dec 17, 2025
6 checks passed
LiebingYu added a commit to LiebingYu/fluss that referenced this pull request Dec 17, 2025
LiebingYu added a commit to LiebingYu/fluss that referenced this pull request Dec 18, 2025
vamossagar12 pushed a commit to vamossagar12/fluss that referenced this pull request Dec 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow Flink users provides lake catalog information (e.g., passwords) directly at the time of creating the Fluss catalog

3 participants