Skip to content

Redact sensitive information in catalog queries #24563

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public class FeaturesConfig

private boolean faultTolerantExecutionExchangeEncryptionEnabled = true;

private boolean statementRedactingEnabled = true;

public enum DataIntegrityVerification
{
NONE,
Expand Down Expand Up @@ -514,6 +516,18 @@ public FeaturesConfig setFaultTolerantExecutionExchangeEncryptionEnabled(boolean
return this;
}

public boolean isStatementRedactingEnabled()
{
return statementRedactingEnabled;
}

@Config("deprecated.statement-redacting-enabled")
public FeaturesConfig setStatementRedactingEnabled(boolean statementRedactingEnabled)
{
this.statementRedactingEnabled = statementRedactingEnabled;
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
exchangeCompressionCodec = LZ4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.connector.ConnectorName;

import java.util.Set;

@ThreadSafe
public interface CatalogFactory
{
Expand All @@ -28,4 +30,6 @@ public interface CatalogFactory
CatalogConnector createCatalog(CatalogProperties catalogProperties);

CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName connectorName, Connector connector);

Set<String> getSecuritySensitivePropertyNames(CatalogProperties catalogProperties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ public Set<CatalogHandle> getActiveCatalogs()
.collect(toImmutableSet());
}

@Override
public CatalogProperties createCatalogProperties(CatalogName catalogName, ConnectorName connectorName, Map<String, String> properties)
{
return catalogStore.createCatalogProperties(catalogName, connectorName, properties);
}

@Override
public void ensureCatalogsLoaded(Session session, List<CatalogProperties> catalogs)
{
Expand Down Expand Up @@ -270,7 +276,7 @@ public void createCatalog(CatalogName catalogName, ConnectorName connectorName,
return;
}

CatalogProperties catalogProperties = catalogStore.createCatalogProperties(catalogName, connectorName, properties);
CatalogProperties catalogProperties = createCatalogProperties(catalogName, connectorName, properties);

// get or create catalog for the handle
CatalogConnector catalog = allCatalogs.computeIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.connector;

import com.google.common.collect.ImmutableSet;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.inject.Inject;
import io.airlift.configuration.secrets.SecretsResolver;
Expand Down Expand Up @@ -45,6 +46,7 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand Down Expand Up @@ -144,6 +146,25 @@ public CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName
return createCatalog(catalogHandle, connectorName, connector, Optional.empty());
}

@Override
public Set<String> getSecuritySensitivePropertyNames(CatalogProperties catalogProperties)
{
ConnectorFactory connectorFactory = connectorFactories.get(catalogProperties.connectorName());
if (connectorFactory == null) {
// If someone tries to use a non-existent connector, we assume they
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an error, actually. It should throw IllegalArgumentException.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then the query will fail during redaction. The idea is to avoid disrupting the natural flow and let it fail where it normally would if redaction didn't exist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would a query fail during redaction if it hasn’t first failed during analysis? I.e., it’s a condition that should never occur.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We perform redaction at a very early stage (before the query state machine is created) to modify query text exposed in query events and QueryInfo. I believe that verifying the existence of a given connector happens only during execution, for example, in CreateCatalogTask.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m very confused about the purpose of this change, then. It redaction happens before analysis, how is the analyzer and execution engine able to see the unredacted values so that it can to its job?

Can you describe the technical approach at a high level so that I don’t have to reverse engineer what the code is trying to achieve?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if the following helps:

Problem

Currently, when we execute a CREATE CATALOG statement containing plaintext secrets, unredacted query text is exposed via the REST API, the system.runtime.queries table, and query events.

Goal

Instead of displaying the query text in its raw form in the locations mentioned above, such as:

CREATE CATALOG test USING postgresql  
WITH (  
   "connection-user" = 'bob',  
   "connection-password" = '1234'  
)  

we aim to redact security-sensitive property values:

CREATE CATALOG test USING postgresql  
WITH (  
   "connection-user" = 'bob',  
   "connection-password" = '***'  
)  

Proposed Solution

The REST API, the system.runtime.queries table, and query events obtain query text from the QueryInfo object. Based on our research, the query text contained in QueryInfo is not interpreted anywhere in the engine.

The QueryInfo object is created by the QueryStateMachine. To redact the query text, we propose performing redaction after the query is parsed (to ensure we have the AST, available for traversal and redaction) but before the QueryStateMachine is created.

Since redaction occurs at an early stage of query processing, we need to duplicate some logic that is typically performed during analysis and execution. For example, this includes evaluating catalog properties. Additionally, we do not want to disrupt the normal query processing flow; therefore, we ensure the query never fails due to redaction. If, for any reason, redaction is not possible, we will resort to masking all properties.

To identify security-sensitive properties for a given connector, we propose introducing a new SPI to expose them: #24562

// misspelled the name and, for safety, we redact all the properties.
return ImmutableSet.copyOf(catalogProperties.properties().keySet());
}

ConnectorContext context = createConnectorContext(catalogProperties.catalogHandle());
String catalogName = catalogProperties.catalogHandle().getCatalogName().toString();
Map<String, String> config = secretsResolver.getResolvedConfiguration(catalogProperties.properties());

try (ThreadContextClassLoader _ = new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
return connectorFactory.getSecuritySensitivePropertyNames(catalogName, config, context);
}
}

private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName connectorName, Connector connector, Optional<CatalogProperties> catalogProperties)
{
Tracer tracer = createTracer(catalogHandle);
Expand Down Expand Up @@ -196,7 +217,16 @@ private Connector createConnector(
ConnectorFactory connectorFactory,
Map<String, String> properties)
{
ConnectorContext context = new ConnectorContextInstance(
ConnectorContext context = createConnectorContext(catalogHandle);

try (ThreadContextClassLoader _ = new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
return connectorFactory.create(catalogName, properties, context);
}
}

private ConnectorContext createConnectorContext(CatalogHandle catalogHandle)
{
return new ConnectorContextInstance(
catalogHandle,
openTelemetry,
createTracer(catalogHandle),
Expand All @@ -206,10 +236,6 @@ private Connector createConnector(
new InternalMetadataProvider(metadata, typeManager),
pageSorter,
pageIndexerFactory);

try (ThreadContextClassLoader _ = new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
return connectorFactory.create(catalogName, properties, context);
}
}

private Tracer createTracer(CatalogHandle catalogHandle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.connector.ConnectorName;

import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -51,6 +52,12 @@ public CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName
return getDelegate().createCatalog(catalogHandle, connectorName, connector);
}

@Override
public Set<String> getSecuritySensitivePropertyNames(CatalogProperties catalogProperties)
{
return getDelegate().getSecuritySensitivePropertyNames(catalogProperties);
}

private CatalogFactory getDelegate()
{
CatalogFactory catalogFactory = delegate.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ public Set<CatalogHandle> getActiveCatalogs()
return ImmutableSet.of();
}

@Override
public CatalogProperties createCatalogProperties(CatalogName catalogName, ConnectorName connectorName, Map<String, String> properties)
{
throw new TrinoException(NOT_SUPPORTED, "CREATE CATALOG is not supported by the static catalog store");
}

@Override
public ConnectorServices getConnectorServices(CatalogHandle catalogHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.dispatcher;

import com.google.common.base.Function;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -44,6 +45,8 @@
import io.trino.spi.TrinoException;
import io.trino.spi.resourcegroups.SelectionContext;
import io.trino.spi.resourcegroups.SelectionCriteria;
import io.trino.sql.RedactedQuery;
import io.trino.sql.SensitiveStatementRedactor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.weakref.jmx.Flatten;
Expand Down Expand Up @@ -84,6 +87,7 @@ public class DispatchManager
private final SessionPropertyDefaults sessionPropertyDefaults;
private final SessionPropertyManager sessionPropertyManager;
private final Tracer tracer;
private final SensitiveStatementRedactor sensitiveStatementRedactor;

private final int maxQueryLength;

Expand All @@ -107,6 +111,7 @@ public DispatchManager(
SessionPropertyDefaults sessionPropertyDefaults,
SessionPropertyManager sessionPropertyManager,
Tracer tracer,
SensitiveStatementRedactor sensitiveStatementRedactor,
QueryManagerConfig queryManagerConfig,
DispatchExecutor dispatchExecutor,
QueryMonitor queryMonitor)
Expand All @@ -121,6 +126,7 @@ public DispatchManager(
this.sessionPropertyDefaults = requireNonNull(sessionPropertyDefaults, "sessionPropertyDefaults is null");
this.sessionPropertyManager = sessionPropertyManager;
this.tracer = requireNonNull(tracer, "tracer is null");
this.sensitiveStatementRedactor = requireNonNull(sensitiveStatementRedactor, "sensitiveStatementRedactor is null");

this.maxQueryLength = queryManagerConfig.getMaxQueryLength();

Expand Down Expand Up @@ -240,7 +246,7 @@ private <C> void createQueryInternal(QueryId queryId, Span querySpan, Slug slug,
DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this automatically also handles things like event listener and QueryResource right?

Might be worth to explicitly call it out in the commit message (although you do imply that by mentioning anything using QueryInfo/BasicQueryInfo).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this automatically also handles things like event listener and QueryResource right?

Correct.

I extracted tests confirming that to separate commits into separate commits to avoid distracting from the core functionality of redacting.

I refined the commit message and included your suggestion.

session,
sessionContext.getTransactionId(),
query,
getRedactedQueryProvider(preparedQuery, query),
preparedQuery,
slug,
selectionContext.getResourceGroupId());
Expand Down Expand Up @@ -280,6 +286,11 @@ private <C> void createQueryInternal(QueryId queryId, Span querySpan, Slug slug,
}
}

private Function<Session, RedactedQuery> getRedactedQueryProvider(PreparedQuery preparedQuery, String query)
{
return session -> sensitiveStatementRedactor.redact(query, preparedQuery, session);
}

private boolean queryCreated(DispatchQuery dispatchQuery)
{
boolean queryAdded = queryTracker.addQuery(dispatchQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
import io.trino.execution.QueryPreparer.PreparedQuery;
import io.trino.server.protocol.Slug;
import io.trino.spi.resourcegroups.ResourceGroupId;
import io.trino.sql.RedactedQuery;
import io.trino.transaction.TransactionId;

import java.util.Optional;
import java.util.function.Function;

public interface DispatchQueryFactory
{
DispatchQuery createDispatchQuery(
Session session,
Optional<TransactionId> transactionId,
String query,
Function<Session, RedactedQuery> queryProvider,
PreparedQuery preparedQuery,
Slug slug,
ResourceGroupId resourceGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import io.trino.server.protocol.Slug;
import io.trino.spi.TrinoException;
import io.trino.spi.resourcegroups.ResourceGroupId;
import io.trino.sql.RedactedQuery;
import io.trino.sql.tree.Statement;
import io.trino.transaction.TransactionId;
import io.trino.transaction.TransactionManager;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.util.StatementUtils.getQueryType;
Expand Down Expand Up @@ -108,7 +110,7 @@ public LocalDispatchQueryFactory(
public DispatchQuery createDispatchQuery(
Session session,
Optional<TransactionId> existingTransactionId,
String query,
Function<Session, RedactedQuery> queryProvider,
PreparedQuery preparedQuery,
Slug slug,
ResourceGroupId resourceGroup)
Expand All @@ -117,8 +119,7 @@ public DispatchQuery createDispatchQuery(
PlanOptimizersStatsCollector planOptimizersStatsCollector = new PlanOptimizersStatsCollector(queryReportedRuleStatsLimit);
QueryStateMachine stateMachine = QueryStateMachine.begin(
existingTransactionId,
query,
preparedQuery.getPrepareSql(),
queryProvider,
session,
locationFactory.createQueryLocation(session.getQueryId()),
resourceGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.execution;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import io.trino.Session;
Expand Down Expand Up @@ -78,6 +79,20 @@ public ListenableFuture<Void> execute(
CatalogName catalog = new CatalogName(statement.getCatalogName().getValue().toLowerCase(ENGLISH));
accessControl.checkCanCreateCatalog(session.toSecurityContext(), catalog.toString());

Map<String, String> properties = evaluateProperties(statement, session, plannerContext, accessControl, parameters);

ConnectorName connectorName = new ConnectorName(statement.getConnectorName().toString());
catalogManager.createCatalog(catalog, connectorName, properties, statement.isNotExists());
return immediateVoidFuture();
}

public static Map<String, String> evaluateProperties(
CreateCatalog statement,
Session session,
PlannerContext plannerContext,
AccessControl accessControl,
List<Expression> parameters)
{
Map<String, String> properties = new HashMap<>();
for (Property property : statement.getProperties()) {
if (property.isSetToDefault()) {
Expand All @@ -97,9 +112,6 @@ public ListenableFuture<Void> execute(
INVALID_CATALOG_PROPERTY,
"catalog property"));
}

ConnectorName connectorName = new ConnectorName(statement.getConnectorName().toString());
catalogManager.createCatalog(catalog, connectorName, properties, statement.isNotExists());
return immediateVoidFuture();
return ImmutableMap.copyOf(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ else if (wrappedStatement instanceof ExecuteImmediate executeImmediateStatement)
}
validateParameters(statement, parameters);

return new PreparedQuery(statement, parameters, prepareSql);
return new PreparedQuery(wrappedStatement, statement, parameters, prepareSql);
}

private static void validateParameters(Statement node, List<Expression> parameterValues)
Expand All @@ -102,17 +102,24 @@ private static void validateParameters(Statement node, List<Expression> paramete

public static class PreparedQuery
{
private final Statement wrappedStatement;
private final Statement statement;
private final List<Expression> parameters;
private final Optional<String> prepareSql;

public PreparedQuery(Statement statement, List<Expression> parameters, Optional<String> prepareSql)
public PreparedQuery(Statement wrappedStatement, Statement statement, List<Expression> parameters, Optional<String> prepareSql)
{
this.wrappedStatement = requireNonNull(wrappedStatement, "wrappedStatement is null");
this.statement = requireNonNull(statement, "statement is null");
this.parameters = ImmutableList.copyOf(requireNonNull(parameters, "parameters is null"));
this.prepareSql = requireNonNull(prepareSql, "prepareSql is null");
}

public Statement getWrappedStatement()
{
return wrappedStatement;
}

public Statement getStatement()
{
return statement;
Expand Down
Loading
Loading