Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/** Utilities of Fluss {@link ConfigOptions}. */
@Internal
Expand All @@ -35,6 +37,7 @@ public class FlussConfigUtils {
public static final String TABLE_PREFIX = "table.";
public static final String CLIENT_PREFIX = "client.";
public static final String CLIENT_SECURITY_PREFIX = "client.security.";
public static final String CATALOG_PREFIX = "catalog.";

public static final List<String> ALTERABLE_TABLE_OPTIONS;

Expand All @@ -45,6 +48,14 @@ public class FlussConfigUtils {
Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
}

public static final Set<String> SENSITIVE_TABLE_OPTIONS = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move to here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems similar configs like ALTERABLE_TABLE_OPTIONS are all in this file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But currently, I don't want to expose them in FlussConfigUtils...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


static {
SENSITIVE_TABLE_OPTIONS.add("password");
SENSITIVE_TABLE_OPTIONS.add("secret");
SENSITIVE_TABLE_OPTIONS.add("key");
}

public static boolean isTableStorageConfig(String key) {
return key.startsWith(TABLE_PREFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,15 @@ public Flink21Catalog(
String defaultDatabase,
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs) {
super(name, defaultDatabase, bootstrapServers, classLoader, securityConfigs);
Map<String, String> securityConfigs,
Map<String, String> catalogSensitiveProperties) {
super(
name,
defaultDatabase,
bootstrapServers,
classLoader,
securityConfigs,
catalogSensitiveProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public FlinkCatalog createCatalog(Context context) {
catalog.defaultDatabase,
catalog.bootstrapServers,
catalog.classLoader,
catalog.securityConfigs);
catalog.securityConfigs,
catalog.catalogSensitiveProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ static void beforeAll() {
catalog.defaultDatabase,
catalog.bootstrapServers,
catalog.classLoader,
catalog.securityConfigs);
catalog.securityConfigs,
catalog.catalogSensitiveProperties);
catalog.open();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class FlinkCatalog extends AbstractCatalog {
protected final String bootstrapServers;
protected final Map<String, String> securityConfigs;
protected final LakeFlinkCatalog lakeFlinkCatalog;
protected final Map<String, String> catalogSensitiveProperties;
protected Connection connection;
protected Admin admin;

Expand All @@ -123,13 +124,15 @@ public FlinkCatalog(
String defaultDatabase,
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs) {
Map<String, String> securityConfigs,
Map<String, String> catalogSensitiveProperties) {
super(name, defaultDatabase);
this.catalogName = name;
this.defaultDatabase = defaultDatabase;
this.bootstrapServers = bootstrapServers;
this.classLoader = classLoader;
this.securityConfigs = securityConfigs;
this.catalogSensitiveProperties = catalogSensitiveProperties;
this.lakeFlinkCatalog = new LakeFlinkCatalog(catalogName, classLoader);
}

Expand Down Expand Up @@ -294,8 +297,9 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
objectPath.getDatabaseName(),
tableName.split("\\" + LAKE_TABLE_SPLITTER)[0])));
}
return getLakeTable(
objectPath.getDatabaseName(), tableName, tableInfo.getProperties());
Configuration tableProperties = tableInfo.getProperties();
tableProperties.addAll(Configuration.fromMap(catalogSensitiveProperties));
return getLakeTable(objectPath.getDatabaseName(), tableName, tableProperties);
} else {
tableInfo = admin.getTableInfo(tablePath).get();
}
Expand Down Expand Up @@ -754,4 +758,9 @@ public Procedure getProcedure(ObjectPath procedurePath)
public Map<String, String> getSecurityConfigs() {
return securityConfigs;
}

@VisibleForTesting
public Map<String, String> getCatalogSensitiveConfigs() {
return catalogSensitiveProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;

import static org.apache.fluss.config.FlussConfigUtils.CATALOG_PREFIX;
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_SECURITY_PREFIX;
import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;

Expand Down Expand Up @@ -54,15 +55,17 @@ public Set<ConfigOption<?>> optionalOptions() {
public FlinkCatalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validateExcept(CLIENT_SECURITY_PREFIX);
helper.validateExcept(CLIENT_SECURITY_PREFIX, CATALOG_PREFIX);
Map<String, String> options = context.getOptions();
Map<String, String> securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX);
Map<String, String> catalogSensitiveProperties = extractPrefix(options, CATALOG_PREFIX);

return new FlinkCatalog(
context.getName(),
helper.getOptions().get(FlinkCatalogOptions.DEFAULT_DATABASE),
helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS),
context.getClassLoader(),
securityConfigs);
securityConfigs,
catalogSensitiveProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public void testCreateCatalog() {
DB_NAME,
BOOTSTRAP_SERVERS_NAME,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap(),
Collections.emptyMap());

checkEquals(flinkCatalog, actualCatalog);
Expand All @@ -75,7 +76,12 @@ public void testCreateCatalog() {
securityMap.put("client.security.sasl.username", "root");
securityMap.put("client.security.sasl.password", "password");

Map<String, String> catalogSensitiveMap = new HashMap<>();
catalogSensitiveMap.put("catalog.paimon.jdbc.user", "admin");
catalogSensitiveMap.put("catalog.paimon.jdbc.password", "pass");

options.putAll(securityMap);
options.putAll(catalogSensitiveMap);
FlinkCatalog actualCatalog2 =
(FlinkCatalog)
FactoryUtil.createCatalog(
Expand All @@ -85,6 +91,7 @@ public void testCreateCatalog() {
Thread.currentThread().getContextClassLoader());

assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap);
assertThat(actualCatalog2.getCatalogSensitiveConfigs()).isEqualTo(catalogSensitiveMap);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ static void beforeAll() {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap(),
Collections.emptyMap());
catalog.open();
}
Expand Down Expand Up @@ -750,6 +751,7 @@ void testAuthentication() throws Exception {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap(),
Collections.emptyMap());
Catalog finalAuthenticateCatalog = authenticateCatalog;
assertThatThrownBy(finalAuthenticateCatalog::open)
Expand All @@ -768,7 +770,8 @@ void testAuthentication() throws Exception {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
clientConfig);
clientConfig,
Collections.emptyMap());
authenticateCatalog.open();
assertThat(authenticateCatalog.listDatabases())
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ static void beforeAll() {
DEFAULT_DB,
String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap(),
Collections.emptyMap());
catalog.open();
}
Expand Down Expand Up @@ -627,6 +628,7 @@ void testDatabase() throws Exception {
",",
flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)),
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap(),
Collections.emptyMap()))
.hasMessageContaining("defaultDatabase cannot be null or empty");
}
Expand Down Expand Up @@ -815,6 +817,7 @@ void testConnectionFailureHandling() {
"default",
"invalid-bootstrap-server:9092",
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap(),
Collections.emptyMap());

// Test open() throws proper exception
Expand Down Expand Up @@ -945,7 +948,8 @@ void testSecurityConfigsIntegration() throws Exception {
DEFAULT_DB,
String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
Thread.currentThread().getContextClassLoader(),
securityConfigs);
securityConfigs,
Collections.emptyMap());
securedCatalog.open();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void buildCatalog() {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap(),
Collections.emptyMap());
catalog.open();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;

import static org.apache.fluss.config.FlussConfigUtils.SENSITIVE_TABLE_OPTIONS;
import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties;
import static org.apache.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor;

Expand All @@ -83,14 +83,6 @@ public class MetadataManager {
private final int maxBucketNum;
private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader;

public static final Set<String> SENSITIVE_TABLE_OPTIOINS = new HashSet<>();

static {
SENSITIVE_TABLE_OPTIOINS.add("password");
SENSITIVE_TABLE_OPTIOINS.add("secret");
SENSITIVE_TABLE_OPTIOINS.add("key");
}

/**
* Creates a new metadata manager.
*
Expand Down Expand Up @@ -525,7 +517,7 @@ public void removeSensitiveTableOptions(Map<String, String> tableLakeOptions) {
Iterator<Map.Entry<String, String>> iterator = tableLakeOptions.entrySet().iterator();
while (iterator.hasNext()) {
String key = iterator.next().getKey().toLowerCase();
if (SENSITIVE_TABLE_OPTIOINS.stream().anyMatch(key::contains)) {
if (SENSITIVE_TABLE_OPTIONS.stream().anyMatch(key::contains)) {
iterator.remove();
}
}
Expand Down