Skip to content

Add connector SPI for returning redactable properties #24562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,31 @@
package io.trino.spi.connector;

import com.google.errorprone.annotations.CheckReturnValue;
import io.trino.spi.Experimental;

import java.util.Map;
import java.util.Set;

public interface ConnectorFactory
{
String getName();

@CheckReturnValue
Connector create(String catalogName, Map<String, String> config, ConnectorContext context);

/**
* Returns property names that may contain security-sensitive information.
* In addition to properties that are explicitly known to the connector as
* security-sensitive, it may also return properties that are unknown or unused.
* In other words, if the connector cannot classify a property, it should treat it as
* security-sensitive by default for safety.
* <p>
* The engine uses the properties returned by this method to mask the corresponding
* values, preventing the leakage of security-sensitive information.
*/
@Experimental(eta = "2025-12-31")
default Set<String> getSecuritySensitivePropertyNames(String catalogName, Map<String, String> config, ConnectorContext context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the method take the full config map and not just the names?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this PR should go in until we have more clarity on exactly how it will be used by the engine. An SPI in a vacuum and without usages doesn't make sense to include.

The PR linked in the description is still work in progress, so let's narrow down on that first.

Copy link
Member Author

@piotrrzysko piotrrzysko Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the method take the full config map and not just the names?

We use the values to get security-sensitive properties from Airlift. Here’s an example:

@Override
public Set<String> getSecuritySensitivePropertyNames(String catalogName, Map<String, String> config, ConnectorContext context)
{
Bootstrap app = createBootstrap(catalogName, config, context);
Set<ConfigPropertyMetadata> usedProperties = app
.quiet()
.skipErrorReporting()
.configure();
return ConfigUtils.getSecuritySensitivePropertyNames(config, usedProperties);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint that's because config properties are dependant on each other and without actual values you don't know which ones will be bound during bootstrap

{
return Set.copyOf(config.keySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ public class FileSystemModule
private final NodeManager nodeManager;
private final OpenTelemetry openTelemetry;
private final boolean coordinatorFileCaching;
private final boolean quietBootstrap;

public FileSystemModule(String catalogName, NodeManager nodeManager, OpenTelemetry openTelemetry, boolean coordinatorFileCaching)
public FileSystemModule(String catalogName, NodeManager nodeManager, OpenTelemetry openTelemetry, boolean coordinatorFileCaching, boolean quietBootstrap)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.coordinatorFileCaching = coordinatorFileCaching;
this.quietBootstrap = quietBootstrap;
}

@Override
Expand All @@ -84,7 +86,8 @@ protected void setup(Binder binder)
!config.isNativeS3Enabled(),
catalogName,
nodeManager,
openTelemetry);
openTelemetry,
quietBootstrap);

loader.configure().forEach((name, securitySensitive) ->
consumeProperty(new ConfigPropertyMetadata(name, securitySensitive)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public HdfsFileSystemLoader(
boolean s3Enabled,
String catalogName,
NodeManager nodeManager,
OpenTelemetry openTelemetry)
OpenTelemetry openTelemetry,
boolean quietBootstrap)
{
Class<?> clazz = tryLoadExistingHdfsManager();

Expand Down Expand Up @@ -75,8 +76,8 @@ public HdfsFileSystemLoader(
}

try (var _ = new ThreadContextClassLoader(classLoader)) {
manager = clazz.getConstructor(Map.class, boolean.class, boolean.class, boolean.class, String.class, NodeManager.class, OpenTelemetry.class)
.newInstance(config, azureEnabled, gcsEnabled, s3Enabled, catalogName, nodeManager, openTelemetry);
manager = clazz.getConstructor(Map.class, boolean.class, boolean.class, boolean.class, String.class, NodeManager.class, OpenTelemetry.class, boolean.class)
.newInstance(config, azureEnabled, gcsEnabled, s3Enabled, catalogName, nodeManager, openTelemetry, quietBootstrap);
}
catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public HdfsFileSystemManager(
boolean s3Enabled,
String catalogName,
NodeManager nodeManager,
OpenTelemetry openTelemetry)
OpenTelemetry openTelemetry,
boolean quietBootstrap)
{
List<Module> modules = new ArrayList<>();

Expand Down Expand Up @@ -82,7 +83,9 @@ public HdfsFileSystemManager(
bootstrap = new Bootstrap(modules)
.doNotInitializeLogging()
.setRequiredConfigurationProperties(Map.of())
.setOptionalConfigurationProperties(config);
.setOptionalConfigurationProperties(config)
.withQuiet(quietBootstrap)
.withSkipErrorReporting(quietBootstrap);
}

public Map<String, Boolean> configure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ void testManager()
true,
"test",
new TestingNodeManager(),
OpenTelemetry.noop());
OpenTelemetry.noop(),
false);

assertThat(manager.configure().keySet()).containsExactly("hive.dfs.verify-checksum", "hive.s3.region");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.plugin.base.config;

import com.google.common.collect.ImmutableSet;
import io.airlift.configuration.ConfigPropertyMetadata;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public final class ConfigUtils
{
private ConfigUtils() {}

public static Set<String> getSecuritySensitivePropertyNames(Map<String, String> config, Set<ConfigPropertyMetadata> usedProperties)
{
Set<String> sensitivePropertyNames = new HashSet<>(config.keySet());

for (ConfigPropertyMetadata propertyMetadata : usedProperties) {
if (!propertyMetadata.securitySensitive()) {
sensitivePropertyNames.remove(propertyMetadata.name());
}
}

return ImmutableSet.copyOf(sensitivePropertyNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import com.google.inject.Injector;
import com.google.inject.Module;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.configuration.ConfigPropertyMetadata;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.base.config.ConfigUtils;
import io.trino.spi.NodeManager;
import io.trino.spi.VersionEmbedder;
import io.trino.spi.catalog.CatalogName;
Expand All @@ -26,6 +28,7 @@
import io.trino.spi.type.TypeManager;

import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -58,6 +61,28 @@ public Connector create(String catalogName, Map<String, String> requiredConfig,
requireNonNull(requiredConfig, "requiredConfig is null");
checkStrictSpiVersionMatch(context, this);

Bootstrap app = createBootstrap(catalogName, requiredConfig, context);

Injector injector = app.initialize();

return injector.getInstance(JdbcConnector.class);
}

@Override
public Set<String> getSecuritySensitivePropertyNames(String catalogName, Map<String, String> config, ConnectorContext context)
{
Bootstrap app = createBootstrap(catalogName, config, context);

Set<ConfigPropertyMetadata> usedProperties = app
.quiet()
.skipErrorReporting()
.configure();

return ConfigUtils.getSecuritySensitivePropertyNames(config, usedProperties);
}

private Bootstrap createBootstrap(String catalogName, Map<String, String> config, ConnectorContext context)
{
Bootstrap app = new Bootstrap(
binder -> binder.bind(TypeManager.class).toInstance(context.getTypeManager()),
binder -> binder.bind(NodeManager.class).toInstance(context.getNodeManager()),
Expand All @@ -67,11 +92,8 @@ public Connector create(String catalogName, Map<String, String> requiredConfig,
new JdbcModule(),
module.get());

Injector injector = app
return app
.doNotInitializeLogging()
.setRequiredConfigurationProperties(requiredConfig)
.initialize();

return injector.getInstance(JdbcConnector.class);
.setRequiredConfigurationProperties(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
*/
package io.trino.plugin.deltalake;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.configuration.ConfigPropertyMetadata;
import io.airlift.json.JsonModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
Expand All @@ -29,6 +31,7 @@
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
import io.trino.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.trino.plugin.base.config.ConfigUtils;
import io.trino.plugin.base.jmx.ConnectorObjectNameGeneratorModule;
import io.trino.plugin.base.jmx.MBeanServerModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
Expand Down Expand Up @@ -67,6 +70,9 @@ public class DeltaLakeConnectorFactory
{
public static final String CONNECTOR_NAME = "delta_lake";

private static final Module DEFAULT_ADDITIONAL_MODULE = EMPTY_MODULE;
private static final Optional<Module> DEFAULT_METASTORE_MODULE = Optional.empty();

@Override
public String getName()
{
Expand All @@ -77,9 +83,22 @@ public String getName()
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
checkStrictSpiVersionMatch(context, this);
return createConnector(catalogName, config, context, Optional.empty(), EMPTY_MODULE);
return createConnector(catalogName, config, context, DEFAULT_METASTORE_MODULE, DEFAULT_ADDITIONAL_MODULE);
}

@Override
public Set<String> getSecuritySensitivePropertyNames(String catalogName, Map<String, String> config, ConnectorContext context)
{
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(DeltaLakeConnectorFactory.class.getClassLoader())) {
Bootstrap app = createBootstrap(catalogName, config, context, DEFAULT_METASTORE_MODULE, DEFAULT_ADDITIONAL_MODULE, true);

Set<ConfigPropertyMetadata> usedProperties = app.configure();

return ConfigUtils.getSecuritySensitivePropertyNames(config, usedProperties);
}
}

@VisibleForTesting
public static Connector createConnector(
String catalogName,
Map<String, String> config,
Expand All @@ -89,32 +108,9 @@ public static Connector createConnector(
{
ClassLoader classLoader = DeltaLakeConnectorFactory.class.getClassLoader();
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(
new MBeanModule(),
new ConnectorObjectNameGeneratorModule("io.trino.plugin.deltalake", "trino.plugin.deltalake"),
new JsonModule(),
new MBeanServerModule(),
new CatalogNameModule(catalogName),
metastoreModule.orElse(new DeltaLakeMetastoreModule()),
new DeltaLakeModule(),
new DeltaLakeSecurityModule(),
new DeltaLakeSynchronizerModule(),
new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry(), false),
binder -> {
binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry());
binder.bind(Tracer.class).toInstance(context.getTracer());
binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName));
},
module);

Injector injector = app
.doNotInitializeLogging()
.setRequiredConfigurationProperties(config)
.initialize();
Bootstrap app = createBootstrap(catalogName, config, context, metastoreModule, module, false);

Injector injector = app.initialize();

LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
Expand Down Expand Up @@ -159,4 +155,41 @@ public static Connector createConnector(
functionProvider);
}
}

private static Bootstrap createBootstrap(
String catalogName,
Map<String, String> config,
ConnectorContext context,
Optional<Module> metastoreModule,
Module module,
boolean quietBootstrap)
{
Bootstrap app = new Bootstrap(
new MBeanModule(),
new ConnectorObjectNameGeneratorModule("io.trino.plugin.deltalake", "trino.plugin.deltalake"),
new JsonModule(),
new MBeanServerModule(),
new CatalogNameModule(catalogName),
metastoreModule.orElse(new DeltaLakeMetastoreModule()),
new DeltaLakeModule(),
new DeltaLakeSecurityModule(),
new DeltaLakeSynchronizerModule(),
new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry(), false, quietBootstrap),
binder -> {
binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry());
binder.bind(Tracer.class).toInstance(context.getTracer());
binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName));
},
module);

return app
.withQuiet(quietBootstrap)
.withSkipErrorReporting(quietBootstrap)
.doNotInitializeLogging()
.setRequiredConfigurationProperties(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

import java.io.File;
import java.nio.file.Files;
import java.util.Map;
import java.util.Set;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Iterables.getOnlyElement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestDeltaLakePlugin
Expand Down Expand Up @@ -223,6 +226,26 @@ public void testFileBasedAccessControl()
verify(tempFile.delete());
}

@Test
void testGetSecuritySensitivePropertyNames()
{
ConnectorFactory factory = getConnectorFactory();
Map<String, String> config = ImmutableMap.of(
"non-existent-property", "value",
"fs.hadoop.enabled", "true",
"hive.azure.abfs.oauth.client-id", "test-client-id", // security-sensitive property from trino-hdfs
"hive.azure.adl-proxy-host", "proxy-host:9800", // non-sensitive property from trino-hdfs
"hive.dfs-timeout", "invalidValue", // property from trino-hdfs with invalid value
"hive.metastore.uri", "thrift://foo:1234",
"hive.metastore.thrift.client.ssl.key-password", "password",
"delta.checkpoint-row-statistics-writing.enabled", "shouldBeBoolean");

Set<String> sensitiveProperties = factory.getSecuritySensitivePropertyNames("catalog", config, new TestingConnectorContext());

assertThat(sensitiveProperties)
.containsOnly("non-existent-property", "hive.azure.abfs.oauth.client-id", "hive.metastore.thrift.client.ssl.key-password");
}

private static ConnectorFactory getConnectorFactory()
{
return getOnlyElement(new DeltaLakePlugin().getConnectorFactories());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void setUp()
new DeltaLakeMetastoreModule(),
new DeltaLakeModule(),
// test setup
new FileSystemModule("test", context.getNodeManager(), context.getOpenTelemetry(), false));
new FileSystemModule("test", context.getNodeManager(), context.getOpenTelemetry(), false, false));

Injector injector = app
.doNotInitializeLogging()
Expand Down
Loading
Loading