From 5432c5a0a58bccec23587efec665b3daf532e019 Mon Sep 17 00:00:00 2001 From: Louis Pieterse Date: Thu, 4 Jun 2026 18:49:22 +0100 Subject: [PATCH] Support OAuth2 user tokens for Iceberg REST catalogs Store the authenticated access token under a tamper-proof internal$authenticated: extra credential key. Iceberg REST catalogs with session=USER and security=OAUTH2 use the authenticated `token` or `credential` for per-user REST catalog requests. Cache a RESTSessionCatalog per user with JWT exp-based TTL. Strip internal credentials from worker task updates. Reject client-submitted internal$ credentials at the coordinator. --- .../HttpRequestSessionContextFactory.java | 9 +- .../server/remotetask/HttpRemoteTask.java | 11 +- .../security/oauth2/OAuth2Authenticator.java | 8 + .../server/security/oauth2/OAuth2Config.java | 34 +++ .../TestHttpRequestSessionContextFactory.java | 24 +- .../server/remotetask/TestHttpRemoteTask.java | 41 +++ .../server/security/TestResourceSecurity.java | 53 ++++ .../security/oauth2/TestOAuth2Config.java | 21 ++ .../trino/spi/security/ExtraCredentials.java | 34 +++ .../java/io/trino/spi/security/Identity.java | 2 +- .../io/trino/spi/security/TestIdentity.java | 16 + .../main/sphinx/object-storage/metastores.md | 121 +++++++- docs/src/main/sphinx/security/oauth2.md | 36 +++ plugin/trino-iceberg/pom.xml | 15 - .../rest/IcebergRestCatalogModule.java | 18 +- .../catalog/rest/OAuth2SecurityConfig.java | 10 +- .../rest/OAuth2SecurityProperties.java | 8 + .../rest/OAuth2SessionCredentials.java | 91 ++++++ .../rest/TrinoIcebergRestCatalogFactory.java | 274 ++++++++++++++++-- .../catalog/rest/TrinoRestCatalog.java | 48 ++- .../plugin/iceberg/TestIcebergPlugin.java | 12 + .../rest/TestOAuth2SecurityConfig.java | 18 +- .../TestTrinoIcebergRestCatalogFactory.java | 127 ++++++++ .../catalog/rest/TestTrinoRestCatalog.java | 101 ++++++- 24 files changed, 1036 insertions(+), 96 deletions(-) create mode 100644 core/trino-spi/src/main/java/io/trino/spi/security/ExtraCredentials.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SessionCredentials.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoIcebergRestCatalogFactory.java diff --git a/core/trino-main/src/main/java/io/trino/server/HttpRequestSessionContextFactory.java b/core/trino-main/src/main/java/io/trino/server/HttpRequestSessionContextFactory.java index 9d7751b80f73..bed7ba627a9c 100644 --- a/core/trino-main/src/main/java/io/trino/server/HttpRequestSessionContextFactory.java +++ b/core/trino-main/src/main/java/io/trino/server/HttpRequestSessionContextFactory.java @@ -60,6 +60,7 @@ import static io.trino.client.ProtocolHeaders.detectProtocol; import static io.trino.server.ServletSecurityUtils.authenticatedIdentity; import static io.trino.spi.security.AccessDeniedException.denySetRole; +import static io.trino.spi.security.ExtraCredentials.isInternalExtraCredential; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Locale.ENGLISH; @@ -259,11 +260,15 @@ private Identity buildSessionIdentity(Optional authenticatedIdentity, if (systemRole.getType() == Type.ROLE) { systemEnabledRoles.add(systemRole.getRole().orElseThrow()); } + // Authenticated credentials (placed by the server-side authenticator under internal$* + // keys) take precedence over client-supplied credentials with the same name. + Map extraCredentials = new HashMap<>(parseExtraCredentials(protocolHeaders, headers)); + authenticatedIdentity.map(Identity::getExtraCredentials).ifPresent(extraCredentials::putAll); Identity newIdentity = authenticatedIdentity .map(identity -> Identity.from(identity).withUser(user)) .orElseGet(() -> Identity.forUser(user)) .withAdditionalConnectorRoles(parseConnectorRoleHeaders(protocolHeaders, headers)) - .withAdditionalExtraCredentials(parseExtraCredentials(protocolHeaders, headers)) + .withExtraCredentials(extraCredentials) .withAdditionalGroups(groupProvider.getGroups(user)) .withEnabledRoles(systemEnabledRoles.build()) .build(); @@ -343,7 +348,7 @@ private static Map parseExtraCredentials(ProtocolHeaders protoco { Map credentials = parseProperty(headers, protocolHeaders.requestExtraCredential()); for (String name : credentials.keySet()) { - assertRequest(!name.startsWith("internal$"), "Invalid extra credential name: %s", name); + assertRequest(!isInternalExtraCredential(name), "Invalid extra credential name"); } return credentials; } diff --git a/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java b/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java index 280229d4c712..5da9961b77c8 100644 --- a/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java +++ b/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.SetMultimap; import com.google.common.net.MediaType; @@ -126,6 +127,7 @@ import static io.trino.spi.HostAddress.fromUri; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.REMOTE_TASK_ERROR; +import static io.trino.spi.security.ExtraCredentials.isInternalExtraCredential; import static io.trino.util.Failures.toFailure; import static java.lang.Math.addExact; import static java.lang.Math.clamp; @@ -142,6 +144,7 @@ public final class HttpRemoteTask private final TaskId taskId; private final Session session; + private final Map workerExtraCredentials; private final Span stageSpan; private final String nodeId; private final AtomicBoolean speculative; @@ -266,6 +269,7 @@ public HttpRemoteTask( try (SetThreadName _ = new SetThreadName("HttpRemoteTask-" + taskId)) { this.taskId = taskId; this.session = session; + this.workerExtraCredentials = extraCredentialsForWorker(session.getIdentity().getExtraCredentials()); this.stageSpan = stageSpan; this.nodeId = node.getNodeIdentifier(); this.speculative = new AtomicBoolean(speculative); @@ -777,7 +781,7 @@ private void sendUpdateInternal() Optional fragment = sendPlan.get() ? Optional.of(planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty(); TaskUpdateRequest updateRequest = new TaskUpdateRequest( session.toSessionRepresentation(), - session.getIdentity().getExtraCredentials(), + workerExtraCredentials, stageSpan, fragment, tableCredentials, @@ -820,6 +824,11 @@ private void sendUpdateInternal() executor); } + private static Map extraCredentialsForWorker(Map extraCredentials) + { + return ImmutableMap.copyOf(Maps.filterKeys(extraCredentials, key -> !isInternalExtraCredential(key))); + } + private synchronized List getSplitAssignments(int currentSplitBatchSize) { return Stream.concat(planFragment.getPartitionedSourceNodes().stream(), planFragment.getRemoteSourceNodes().stream()) diff --git a/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2Authenticator.java b/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2Authenticator.java index de095ca6e1df..3019d64594d9 100644 --- a/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2Authenticator.java +++ b/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2Authenticator.java @@ -13,6 +13,7 @@ */ package io.trino.server.security.oauth2; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import io.airlift.log.Logger; import io.trino.server.security.AbstractBearerAuthenticator; @@ -34,6 +35,7 @@ import static io.trino.server.security.UserMapping.createUserMapping; import static io.trino.server.security.oauth2.OAuth2TokenExchangeResource.getInitiateUri; import static io.trino.server.security.oauth2.OAuth2TokenExchangeResource.getTokenUri; +import static io.trino.spi.security.ExtraCredentials.authenticatedExtraCredentialName; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -46,14 +48,17 @@ public class OAuth2Authenticator private final UserMapping userMapping; private final TokenPairSerializer tokenPairSerializer; private final TokenRefresher tokenRefresher; + private final Optional accessTokenExtraCredentialName; @Inject public OAuth2Authenticator(OAuth2Client client, OAuth2Config config, TokenRefresher tokenRefresher, TokenPairSerializer tokenPairSerializer) { + requireNonNull(config, "config is null"); this.client = requireNonNull(client, "service is null"); this.principalField = config.getPrincipalField(); this.tokenRefresher = requireNonNull(tokenRefresher, "tokenRefresher is null"); this.tokenPairSerializer = requireNonNull(tokenPairSerializer, "tokenPairSerializer is null"); + this.accessTokenExtraCredentialName = config.getAccessTokenExtraCredentialName(); userMapping = createUserMapping(config.getUserMappingPattern(), config.getUserMappingFile()); } @@ -80,6 +85,9 @@ protected Optional createIdentity(String token) } Identity.Builder builder = Identity.forUser(userMapping.mapUser(principal.get())); builder.withPrincipal(new BasicPrincipal(principal.get())); + accessTokenExtraCredentialName.ifPresent(name -> builder.withAdditionalExtraCredentials(ImmutableMap.of( + name, tokenPair.accessToken(), + authenticatedExtraCredentialName(name), tokenPair.accessToken()))); return Optional.of(builder.build()); } diff --git a/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2Config.java b/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2Config.java index 1711bc5d4f4c..b5a554a88c5c 100644 --- a/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2Config.java +++ b/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2Config.java @@ -22,6 +22,7 @@ import io.airlift.configuration.validation.FileExists; import io.airlift.units.Duration; import io.airlift.units.MinDuration; +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotNull; import java.io.File; @@ -31,7 +32,9 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import static com.google.common.base.Strings.emptyToNull; import static io.trino.server.security.oauth2.OAuth2Service.OPENID_SCOPE; +import static io.trino.spi.security.ExtraCredentials.isInternalExtraCredential; public class OAuth2Config { @@ -47,6 +50,7 @@ public class OAuth2Config private Optional jwtType = Optional.empty(); private Optional userMappingPattern = Optional.empty(); private Optional userMappingFile = Optional.empty(); + private Optional accessTokenExtraCredentialName = Optional.empty(); private boolean enableRefreshTokens; private boolean enableDiscovery = true; @@ -218,6 +222,36 @@ public OAuth2Config setUserMappingFile(File userMappingFile) return this; } + public Optional getAccessTokenExtraCredentialName() + { + return accessTokenExtraCredentialName; + } + + @Config("http-server.authentication.oauth2.access-token-extra-credential-name") + @ConfigDescription("Extra credential name for storing the authenticated OAuth2 access token") + public OAuth2Config setAccessTokenExtraCredentialName(String accessTokenExtraCredentialName) + { + this.accessTokenExtraCredentialName = Optional.ofNullable(emptyToNull(accessTokenExtraCredentialName)); + return this; + } + + @AssertTrue(message = "OAuth2 access token extra credential name must not start with internal$") + public boolean isAccessTokenExtraCredentialNameNotInternal() + { + return accessTokenExtraCredentialName + .map(name -> !isInternalExtraCredential(name)) + .orElse(true); + } + + @AssertTrue(message = "OAuth2 access token extra credential name must not contain whitespace, comma, or equals") + public boolean isAccessTokenExtraCredentialNameValid() + { + return accessTokenExtraCredentialName + .map(name -> name.chars().noneMatch(character -> + Character.isWhitespace(character) || character == ',' || character == '=')) + .orElse(true); + } + public boolean isEnableRefreshTokens() { return enableRefreshTokens; diff --git a/core/trino-main/src/test/java/io/trino/server/TestHttpRequestSessionContextFactory.java b/core/trino-main/src/test/java/io/trino/server/TestHttpRequestSessionContextFactory.java index 5b189d8ad4cf..df324ed751b7 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestHttpRequestSessionContextFactory.java +++ b/core/trino-main/src/test/java/io/trino/server/TestHttpRequestSessionContextFactory.java @@ -106,6 +106,28 @@ private static void assertSessionContext(ProtocolHeaders protocolHeaders) assertThat(context.getIdentity().getExtraCredentials()).isEqualTo(ImmutableMap.of("test.token.foo", "bar", "test.token.abc", "xyz")); } + @Test + public void testAuthenticatedExtraCredentialsOverrideRequestExtraCredentials() + { + MultivaluedMap headers = new GuavaMultivaluedMap<>(ImmutableListMultimap.builder() + .put(TRINO_HEADERS.requestUser(), "testUser") + .put(TRINO_HEADERS.requestExtraCredential(), "token=request-token") + .put(TRINO_HEADERS.requestExtraCredential(), "request-only=request-value") + .build()); + + SessionContext context = sessionContextFactory(TRINO_HEADERS).createSessionContext( + headers, + Optional.of("testRemote"), + Optional.of(Identity.forUser("testUser") + .withExtraCredentials(ImmutableMap.of("token", "authenticated-token", "authenticated-only", "authenticated-value")) + .build())); + + assertThat(context.getIdentity().getExtraCredentials()).isEqualTo(ImmutableMap.of( + "token", "authenticated-token", + "request-only", "request-value", + "authenticated-only", "authenticated-value")); + } + @Test public void testMappedUser() { @@ -186,7 +208,7 @@ public void testInternalExtraCredentialName() .build()); assertInvalidSession(TRINO_HEADERS, headers) - .hasMessage("Invalid extra credential name: internal$abc"); + .hasMessage("Invalid extra credential name"); } private static AbstractThrowableAssert assertInvalidSession(ProtocolHeaders protocolHeaders, MultivaluedMap headers) diff --git a/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java b/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java index 80e1a34a55d2..96cdf2e865ac 100644 --- a/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java +++ b/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java @@ -71,6 +71,7 @@ import io.trino.spi.connector.DynamicFilter; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.security.Identity; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeOperators; @@ -139,6 +140,7 @@ import static io.trino.server.InternalHeaders.TRINO_MAX_WAIT; import static io.trino.spi.StandardErrorCode.REMOTE_TASK_ERROR; import static io.trino.spi.StandardErrorCode.REMOTE_TASK_MISMATCH; +import static io.trino.spi.security.ExtraCredentials.authenticatedExtraCredentialName; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.sql.planner.TestingPlannerContext.PLANNER_CONTEXT; import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; @@ -220,6 +222,38 @@ public void testRegular() httpRemoteTaskFactory.stop(); } + @Test + @Timeout(30) + public void testInternalExtraCredentialsAreNotSentToWorkers() + throws Exception + { + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, FailureScenario.NO_FAILURE); + HttpRemoteTaskFactory httpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource); + Session session = Session.builder(TEST_SESSION) + .setIdentity(Identity.forUser(TEST_SESSION.getUser()) + .withExtraCredentials(ImmutableMap.of( + "token", "access-token", + authenticatedExtraCredentialName("token"), "access-token")) + .build()) + .build(); + RemoteTask remoteTask = createRemoteTask(httpRemoteTaskFactory, ImmutableSet.of(), session); + + testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo()); + remoteTask.start(); + remoteTask.addSplits(ImmutableMultimap.of(TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit()))); + + poll(() -> testingTaskResource.getLatestExtraCredentials().containsKey("token")); + assertThat(testingTaskResource.getLatestExtraCredentials()) + .containsEntry("token", "access-token") + .doesNotContainKey(authenticatedExtraCredentialName("token")); + + remoteTask.cancel(); + poll(() -> remoteTask.getTaskStatus().state().isDone()); + + httpRemoteTaskFactory.stop(); + } + @Test @Timeout(30) public void testDynamicFilterFetcherFailure() @@ -781,6 +815,7 @@ public static class TestingTaskResource private TaskState taskState; private long taskInstanceId = INITIAL_TASK_INSTANCE_ID; private Map latestDynamicFilterFromCoordinator = ImmutableMap.of(); + private Map latestExtraCredentials = ImmutableMap.of(); private long statusFetchCounter; private long createOrUpdateCounter; @@ -830,6 +865,7 @@ public synchronized TaskInfo createOrUpdateTask( dynamicFiltersSentCounter++; latestDynamicFilterFromCoordinator = taskUpdateRequest.dynamicFilterDomains(); } + latestExtraCredentials = taskUpdateRequest.extraCredentials(); createOrUpdateCounter++; lastActivityNanos.set(System.nanoTime()); return buildTaskInfo(); @@ -944,6 +980,11 @@ public synchronized long getCreateOrUpdateCounter() return createOrUpdateCounter; } + public synchronized Map getLatestExtraCredentials() + { + return latestExtraCredentials; + } + public synchronized long getDynamicFiltersFetchCounter() { return dynamicFiltersFetchCounter; diff --git a/core/trino-main/src/test/java/io/trino/server/security/TestResourceSecurity.java b/core/trino-main/src/test/java/io/trino/server/security/TestResourceSecurity.java index 612a1a521166..5c4bbdba003d 100644 --- a/core/trino-main/src/test/java/io/trino/server/security/TestResourceSecurity.java +++ b/core/trino-main/src/test/java/io/trino/server/security/TestResourceSecurity.java @@ -36,6 +36,7 @@ import io.trino.server.protocol.PreparedStatementEncoder; import io.trino.server.protocol.spooling.QueryDataEncoder; import io.trino.server.security.oauth2.ChallengeFailedException; +import io.trino.server.security.oauth2.OAuth2Authenticator; import io.trino.server.security.oauth2.OAuth2Client; import io.trino.server.security.oauth2.TokenPairSerializer; import io.trino.server.security.oauth2.TokenPairSerializer.TokenPair; @@ -106,6 +107,7 @@ import static io.trino.server.ui.OAuthWebUiCookie.OAUTH2_COOKIE; import static io.trino.spi.security.AccessDeniedException.denyImpersonateUser; import static io.trino.spi.security.AccessDeniedException.denyReadSystemInformationAccess; +import static io.trino.spi.security.ExtraCredentials.authenticatedExtraCredentialName; import static jakarta.servlet.http.HttpServletResponse.SC_FORBIDDEN; import static jakarta.servlet.http.HttpServletResponse.SC_OK; import static jakarta.servlet.http.HttpServletResponse.SC_SEE_OTHER; @@ -632,6 +634,57 @@ public void testOAuth2Authenticator() verifyOAuth2Authenticator(false, true, Optional.empty()); } + @Test + public void testOAuth2AccessTokenExtraCredential() + throws Exception + { + assertOAuth2AccessTokenExtraCredential("token"); + assertOAuth2AccessTokenExtraCredential("credential"); + } + + private void assertOAuth2AccessTokenExtraCredential(String credentialName) + throws Exception + { + CookieManager cookieManager = new CookieManager(); + OkHttpClient client = this.client.newBuilder() + .cookieJar(new JavaNetCookieJar(cookieManager)) + .build(); + + try (TokenServer tokenServer = new TokenServer(Optional.empty()); + TestingTrinoServer server = TestingTrinoServer.builder() + .setProperties(ImmutableMap.builder() + .putAll(SECURE_PROPERTIES) + .put("web-ui.enabled", "false") + .put("http-server.authentication.type", "oauth2") + .putAll(getOAuth2Properties(tokenServer)) + .put("http-server.authentication.oauth2.access-token-extra-credential-name", credentialName) + .buildOrThrow()) + .setAdditionalModule(oauth2Module(tokenServer)) + .setSystemAccessControl(TestSystemAccessControl.NO_IMPERSONATION) + .build()) { + HttpServerInfo httpServerInfo = server.getInstance(Key.get(HttpServerInfo.class)); + URI baseUri = httpServerInfo.getHttpsUri(); + + OAuthBearer bearer = assertAuthenticateOAuth2Bearer(client, getAuthorizedUserLocation(baseUri), "http://example.com/authorize"); + assertOk( + client, + uriBuilderFrom(baseUri) + .replacePath("/oauth2/callback/") + .addParameter("code", "TEST_CODE") + .addParameter("state", bearer.state()) + .toString()); + + String oauthToken = getOauthToken(client, bearer.tokenServer()); + List authenticators = server.getInstance(new Key<>() {}); + assertThat(authenticators).hasSize(1); + assertThat(authenticators.get(0)).isInstanceOf(OAuth2Authenticator.class); + Identity identity = ((OAuth2Authenticator) authenticators.get(0)).authenticate(null, oauthToken); + assertThat(identity.getExtraCredentials()) + .containsEntry(credentialName, tokenServer.getAccessToken()) + .containsEntry(authenticatedExtraCredentialName(credentialName), tokenServer.getAccessToken()); + } + } + private void verifyOAuth2Authenticator(boolean webUiEnabled, boolean refreshTokensEnabled, Optional principalField) throws Exception { diff --git a/core/trino-main/src/test/java/io/trino/server/security/oauth2/TestOAuth2Config.java b/core/trino-main/src/test/java/io/trino/server/security/oauth2/TestOAuth2Config.java index 830d03f39495..df20ab98c10c 100644 --- a/core/trino-main/src/test/java/io/trino/server/security/oauth2/TestOAuth2Config.java +++ b/core/trino-main/src/test/java/io/trino/server/security/oauth2/TestOAuth2Config.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.units.Duration; +import jakarta.validation.constraints.AssertTrue; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -28,6 +29,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.airlift.testing.ValidationAssertions.assertFailsValidation; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -49,6 +51,7 @@ public void testDefaults() .setJwtType(null) .setUserMappingPattern(null) .setUserMappingFile(null) + .setAccessTokenExtraCredentialName(null) .setEnableRefreshTokens(false) .setEnableDiscovery(true)); } @@ -71,6 +74,7 @@ public void testExplicitPropertyMappings() .put("http-server.authentication.oauth2.jwt-type", "at+jwt") .put("http-server.authentication.oauth2.user-mapping.pattern", "(.*)@something") .put("http-server.authentication.oauth2.user-mapping.file", userMappingFile.toString()) + .put("http-server.authentication.oauth2.access-token-extra-credential-name", "token") .put("http-server.authentication.oauth2.refresh-tokens", "true") .put("http-server.authentication.oauth2.oidc.discovery", "false") .buildOrThrow(); @@ -88,9 +92,26 @@ public void testExplicitPropertyMappings() .setJwtType("at+jwt") .setUserMappingPattern("(.*)@something") .setUserMappingFile(userMappingFile.toFile()) + .setAccessTokenExtraCredentialName("token") .setEnableRefreshTokens(true) .setEnableDiscovery(false); assertFullMapping(properties, expected); } + + @Test + public void testInvalidAccessTokenExtraCredentialName() + { + assertFailsValidation( + new OAuth2Config().setAccessTokenExtraCredentialName("internal$token"), + "accessTokenExtraCredentialNameNotInternal", + "OAuth2 access token extra credential name must not start with internal$", + AssertTrue.class); + + assertFailsValidation( + new OAuth2Config().setAccessTokenExtraCredentialName("bad token"), + "accessTokenExtraCredentialNameValid", + "OAuth2 access token extra credential name must not contain whitespace, comma, or equals", + AssertTrue.class); + } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/security/ExtraCredentials.java b/core/trino-spi/src/main/java/io/trino/spi/security/ExtraCredentials.java new file mode 100644 index 000000000000..b6a49fad18d2 --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/security/ExtraCredentials.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.security; + +import static java.util.Objects.requireNonNull; + +public final class ExtraCredentials +{ + private static final String INTERNAL_EXTRA_CREDENTIAL_PREFIX = "internal$"; + private static final String AUTHENTICATED_EXTRA_CREDENTIAL_PREFIX = INTERNAL_EXTRA_CREDENTIAL_PREFIX + "authenticated:"; + + private ExtraCredentials() {} + + public static boolean isInternalExtraCredential(String name) + { + return requireNonNull(name, "name is null").startsWith(INTERNAL_EXTRA_CREDENTIAL_PREFIX); + } + + public static String authenticatedExtraCredentialName(String name) + { + return AUTHENTICATED_EXTRA_CREDENTIAL_PREFIX + requireNonNull(name, "name is null"); + } +} diff --git a/core/trino-spi/src/main/java/io/trino/spi/security/Identity.java b/core/trino-spi/src/main/java/io/trino/spi/security/Identity.java index f5644620d82f..dc16ceaa14a1 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/security/Identity.java +++ b/core/trino-spi/src/main/java/io/trino/spi/security/Identity.java @@ -160,7 +160,7 @@ public String toString() } // Do not print any internal credential keys List filteredCredentials = extraCredentials.keySet().stream() - .filter(key -> !key.contains("$internal")) + .filter(key -> !ExtraCredentials.isInternalExtraCredential(key)) .collect(toCollection(ArrayList::new)); if (filteredCredentials.size() != extraCredentials.size()) { filteredCredentials.add("..."); diff --git a/core/trino-spi/src/test/java/io/trino/spi/security/TestIdentity.java b/core/trino-spi/src/test/java/io/trino/spi/security/TestIdentity.java index d8aead9bd226..8577a5d57d21 100644 --- a/core/trino-spi/src/test/java/io/trino/spi/security/TestIdentity.java +++ b/core/trino-spi/src/test/java/io/trino/spi/security/TestIdentity.java @@ -80,4 +80,20 @@ public void testHashCode() assertThat(otherIdentity.hashCode()) .isEqualTo(TEST_IDENTITY.hashCode()); } + + @Test + public void testToStringHidesInternalExtraCredentials() + { + Identity identity = Identity.forUser("user") + .withExtraCredentials(ImmutableMap.of( + "token", "credential", + ExtraCredentials.authenticatedExtraCredentialName("token"), "credential")) + .build(); + + assertThat(identity.toString()) + .contains("token") + .contains("...") + .doesNotContain(ExtraCredentials.authenticatedExtraCredentialName("token")) + .doesNotContain("credential"); + } } diff --git a/docs/src/main/sphinx/object-storage/metastores.md b/docs/src/main/sphinx/object-storage/metastores.md index 37e143929bcb..1488bfcdd282 100644 --- a/docs/src/main/sphinx/object-storage/metastores.md +++ b/docs/src/main/sphinx/object-storage/metastores.md @@ -60,7 +60,7 @@ are also available. They are discussed later in this topic. metastore calls for that partition. - `false` * - `hive.metastore-cache.cache-missing-stats` - - Enable caching the fact that table statistics for a specific table are + - Enable caching the fact that table statistics for a specific table are missing to prevent future metastore calls. - `false` * - `hive.metastore-cache-ttl` @@ -212,7 +212,7 @@ properties: ### Iceberg-specific Hive catalog configuration properties When using the Hive catalog, the Iceberg connector supports the same -{ref}`general Thrift metastore configuration properties ` +{ref}`general Thrift metastore configuration properties ` as previously described with the following additional property: :::{list-table} Iceberg Hive catalog configuration property @@ -487,11 +487,15 @@ following properties: - Warehouse identifier/location for the catalog (optional). Example: `s3://my_bucket/warehouse_location` * - `iceberg.rest-catalog.security` - - The type of security to use (default: `NONE`). Possible values are `NONE`, - `SIGV4`, `GOOGLE` or `OAUTH2`. `OAUTH2` requires either a `token` or a `credential`. + - The type of security to use (default: `NONE`). Possible values are `NONE`, + `SIGV4`, `GOOGLE` or `OAUTH2`. `OAUTH2` requires OAuth2 credentials + configured as catalog properties or supplied as OAuth2-authenticated user + session extra credentials. * - `iceberg.rest-catalog.session` - Session information included when communicating with the REST Catalog. - Options are `NONE` or `USER` (default: `NONE`). + Options are `NONE` or `USER` (default: `NONE`). When set to `USER`, the + REST catalog can use the OAuth2-authenticated `token` or `credential` extra + credential from the user session for `OAUTH2` security. * - `iceberg.rest-catalog.connection-timeout` - Maximum time [Duration](prop-type-duration) allowed for socket connection requests to complete before timing out. @@ -502,22 +506,29 @@ following properties: - [Duration](prop-type-duration) to keep authentication session in cache. Defaults to `1h`. * - `iceberg.rest-catalog.oauth2.token` - The bearer token used for interactions with the server. A `token` or - `credential` is required for `OAUTH2` security. Example: `AbCdEf123456` + `credential` is required for `OAUTH2` security unless + `iceberg.rest-catalog.session=USER` supplies the credential from the user + session. Use this for direct token forwarding. * - `iceberg.rest-catalog.oauth2.credential` - The credential to exchange for a token in the OAuth2 client credentials flow - with the server. A `token` or `credential` is required for `OAUTH2` - security. Example: `AbCdEf123456` + with the server. A `token` or `credential` is required for `OAUTH2` security + unless `iceberg.rest-catalog.session=USER` supplies the credential from the + user session. Use this when the catalog OAuth2 client must acquire a token. * - `iceberg.rest-catalog.oauth2.scope` - Scope to be used when communicating with the REST Catalog. Applicable only when using `credential`. * - `iceberg.rest-catalog.oauth2.server-uri` - - The endpoint to retrieve access token from OAuth2 Server. + - The endpoint to retrieve an access token from an OAuth2 server. This is not + needed when forwarding a user token directly. * - `iceberg.rest-catalog.oauth2.token-refresh-enabled` - Controls whether a token should be refreshed if information about its expiration time is available. Defaults to `true` * - `iceberg.rest-catalog.oauth2.token-exchange-enabled` - - Controls whether to use the token exchange flow to acquire new tokens. - Defaults to `true` + - Controls whether the Iceberg REST OAuth2 client uses the + [OAuth 2.0 Token Exchange](https://datatracker.ietf.org/doc/html/rfc8693) + flow to acquire new tokens. The exchange is performed by the Iceberg REST + OAuth2 client and the configured OAuth2 server, not by Trino. + Defaults to `true`. Set to `false` when forwarding a user token directly. * - `iceberg.rest-catalog.vended-credentials-enabled` - Use credentials provided by the REST backend for file system access. Defaults to `false`. @@ -529,12 +540,12 @@ following properties: * - `iceberg.rest-catalog.signing-name` - AWS SigV4 signing service name. Defaults to `execute-api`. * - `iceberg.rest-catalog.google-project-id` - - Google Cloud project name. This property must be set when `iceberg.rest-catalog.security` + - Google Cloud project name. This property must be set when `iceberg.rest-catalog.security` config property is set to `GOOGLE`. Example: `development-123456`. * - `iceberg.rest-catalog.case-insensitive-name-matching` - Match namespace, table, and view names case insensitively. Defaults to `false`. * - `iceberg.rest-catalog.case-insensitive-name-matching.cache-ttl` - - [Duration](prop-type-duration) for which case-insensitive namespace, table, + - [Duration](prop-type-duration) for which case-insensitive namespace, table, and view names are cached. Defaults to `1m`. * - `iceberg.rest-catalog.http-headers` - Additional *non-sensitive* HTTP headers to include with requests to the REST catalog. @@ -550,6 +561,88 @@ iceberg.catalog.type=rest iceberg.rest-catalog.uri=http://iceberg-with-rest:8181 ``` +When Trino authenticates users with OAuth2, the Iceberg REST catalog can use the +authenticated user's access token for REST catalog authorization. There are two +common patterns: forwarding the access token directly to the REST catalog, or +using the access token as input to a downstream +[OAuth 2.0 Token Exchange](https://datatracker.ietf.org/doc/html/rfc8693) flow. + +#### OAuth2 user token forwarding + +Use direct token forwarding when the REST catalog can validate and authorize the +same OAuth2 access token that Trino receives during user authentication. The +token issuer, audience, signature, and principal claims must be accepted by the +REST catalog. + +Configure the coordinator to store the authenticated OAuth2 access token as the +`token` extra credential: + +```properties +http-server.authentication.oauth2.access-token-extra-credential-name=token +``` + +Then configure the Iceberg catalog to use `OAUTH2` security with the `USER` +session. Do not configure `iceberg.rest-catalog.oauth2.token`, +`iceberg.rest-catalog.oauth2.credential`, or +`iceberg.rest-catalog.oauth2.server-uri` when the value must come from the user +session and be forwarded directly. Static catalog properties take precedence +over session extra credentials: + +```properties +connector.name=iceberg +iceberg.catalog.type=rest +iceberg.rest-catalog.uri=https://catalog.example.com/api/catalog +iceberg.rest-catalog.warehouse=warehouse +iceberg.rest-catalog.security=OAUTH2 +iceberg.rest-catalog.session=USER +iceberg.rest-catalog.oauth2.token-exchange-enabled=false +iceberg.rest-catalog.oauth2.token-refresh-enabled=false +iceberg.rest-catalog.vended-credentials-enabled=true +``` + +In this configuration, the REST catalog receives the OAuth2 token associated +with the authenticated Trino user. Client-supplied extra credentials named +`token` are not used. If the REST catalog provides storage credentials, +`iceberg.rest-catalog.vended-credentials-enabled=true` allows Trino to use +those credentials for file system access. + +#### OAuth2 token exchange + +Use token exchange when the REST catalog or its authorization server requires a +catalog-scoped token instead of the original Trino user token. Trino stores the +authenticated user's OAuth2 access token in the session as the `credential` +extra credential. The Iceberg REST OAuth2 client uses it as the credential for +its OAuth2 flow with the configured server. Trino does not perform the token +exchange itself. + +Configure the coordinator to store the authenticated OAuth2 access token as the +`credential` extra credential: + +```properties +http-server.authentication.oauth2.access-token-extra-credential-name=credential +``` + +Then configure the Iceberg catalog with the OAuth2 token endpoint used for the +exchange: + +```properties +connector.name=iceberg +iceberg.catalog.type=rest +iceberg.rest-catalog.uri=https://catalog.example.com/api/catalog +iceberg.rest-catalog.warehouse=warehouse +iceberg.rest-catalog.security=OAUTH2 +iceberg.rest-catalog.session=USER +iceberg.rest-catalog.oauth2.server-uri=https://auth.example.com/oauth2/token +iceberg.rest-catalog.oauth2.token-exchange-enabled=true +iceberg.rest-catalog.oauth2.token-refresh-enabled=true +iceberg.rest-catalog.vended-credentials-enabled=true +``` + +In this configuration, Trino does not exchange the token. The authenticated +access token is supplied to the Iceberg REST OAuth2 client as the credential, +and the configured OAuth2 server returns the token used for REST catalog +requests. + `iceberg.security` must be `read_only` when connecting to Databricks Unity catalog using an Iceberg REST catalog: @@ -578,7 +671,7 @@ fs.gcs.enabled=true gcs.json-key-file-path=/path/to/gcs_keyfile.json ``` -The REST catalog supports [view management](sql-view-management) +The REST catalog supports [view management](sql-view-management) using the [Iceberg View specification](https://iceberg.apache.org/view-spec/). The REST catalog does not support [materialized view management](sql-materialized-view-management). diff --git a/docs/src/main/sphinx/security/oauth2.md b/docs/src/main/sphinx/security/oauth2.md index 58fdd328448b..3277e186bc53 100644 --- a/docs/src/main/sphinx/security/oauth2.md +++ b/docs/src/main/sphinx/security/oauth2.md @@ -151,6 +151,11 @@ The following configuration properties are available: - The field of the access token used for the Trino user principal. Defaults to `sub`. Other commonly used fields include `sAMAccountName`, `name`, `upn`, and `email`. +* - `http-server.authentication.oauth2.access-token-extra-credential-name` + - Extra credential name used to store the authenticated OAuth2 access token + in the session `Identity`. The value must not use the reserved `internal$` + prefix, and must not contain whitespace, commas, or equals signs. Disabled + by default. * - `http-server.authentication.oauth2.oidc.discovery` - Enable reading the [OIDC provider metadata](https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata). Default is `true`. @@ -169,6 +174,37 @@ The following configuration properties are available: authentication server when logging out from Trino. ::: +### Access token extra credential + +Some connectors can use an end-user OAuth2 access token when they communicate +with downstream services. This supports deployments where the downstream service +authorizes the user token directly, or uses it as input to a token exchange +flow. Trino stores and forwards the authenticated access token; it does not +perform token exchange itself. Set +`http-server.authentication.oauth2.access-token-extra-credential-name` to add +the authenticated OAuth2 access token to the session `Identity` extra +credentials using the configured name. Use `token` when a connector forwards +the access token directly. Use `credential` when a connector uses the token as +input to a token exchange flow: + +```properties +http-server.authentication.oauth2.access-token-extra-credential-name=token +``` + +```properties +http-server.authentication.oauth2.access-token-extra-credential-name=credential +``` + +Only enable this for deployments where the configured connectors are trusted to +receive the user's OAuth2 access token. If a client also supplies an extra +credential with the same name, the authenticated OAuth2 token takes precedence. + +The {ref}`Iceberg REST catalog ` can use this with +`iceberg.rest-catalog.session=USER` and +`iceberg.rest-catalog.security=OAUTH2`. The Iceberg REST catalog +documentation includes examples for forwarding the user token directly and for +using the token as input to a downstream token exchange. + (trino-oauth2-refresh-tokens)= ### Refresh tokens diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index e13b0a5cf1b0..8d49d52dabd8 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -93,21 +93,6 @@ units - - io.jsonwebtoken - jjwt-api - - - - io.jsonwebtoken - jjwt-impl - - - - io.jsonwebtoken - jjwt-jackson - - io.trino trino-cache diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java index 077d938ea98a..e2633fa27f5d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java @@ -22,7 +22,11 @@ import io.trino.spi.TrinoException; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; +import static io.airlift.bootstrap.ClosingBinder.closingBinder; import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.Security.OAUTH2; +import static io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType.USER; +import static io.trino.spi.StandardErrorCode.CONFIGURATION_INVALID; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; public class IcebergRestCatalogModule @@ -40,11 +44,23 @@ protected void setup(Binder binder) }); binder.bind(IcebergRestCatalogPropertiesProvider.class).in(Scopes.SINGLETON); - binder.bind(TrinoCatalogFactory.class).to(TrinoIcebergRestCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(TrinoIcebergRestCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(TrinoCatalogFactory.class).to(TrinoIcebergRestCatalogFactory.class); + closingBinder(binder).registerCloseable(TrinoIcebergRestCatalogFactory.class); newOptionalBinder(binder, IcebergFileSystemFactory.class).setBinding().to(IcebergRestCatalogFileSystemFactory.class).in(Scopes.SINGLETON); IcebergConfig icebergConfig = buildConfigObject(IcebergConfig.class); IcebergRestCatalogConfig restCatalogConfig = buildConfigObject(IcebergRestCatalogConfig.class); + if (restCatalogConfig.getSecurity() == OAUTH2 && + restCatalogConfig.getSessionType() != USER) { + OAuth2SecurityConfig oauth2SecurityConfig = buildConfigObject(OAuth2SecurityConfig.class); + if (oauth2SecurityConfig.getCredential().isEmpty() && oauth2SecurityConfig.getToken().isEmpty()) { + throw new TrinoException(CONFIGURATION_INVALID, "OAuth2 REST catalog requires iceberg.rest-catalog.oauth2.credential or iceberg.rest-catalog.oauth2.token when iceberg.rest-catalog.session is not USER"); + } + } + if (restCatalogConfig.getSessionType() == USER && restCatalogConfig.getSecurity() != OAUTH2) { + throw new TrinoException(CONFIGURATION_INVALID, "iceberg.rest-catalog.session=USER requires iceberg.rest-catalog.security=OAUTH2"); + } if (restCatalogConfig.isVendedCredentialsEnabled() && icebergConfig.isRegisterTableProcedureEnabled()) { throw new TrinoException(NOT_SUPPORTED, "Using the `register_table` procedure with vended credentials is currently not supported"); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityConfig.java index 66da457f7904..58ad13d5b9fa 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityConfig.java @@ -111,15 +111,9 @@ public OAuth2SecurityConfig setTokenExchangeEnabled(boolean tokenExchangeEnabled return this; } - @AssertTrue(message = "OAuth2 requires a credential or token") - public boolean credentialOrTokenPresent() - { - return credential != null || token != null; - } - - @AssertTrue(message = "Scope is applicable only when using credential") + @AssertTrue(message = "iceberg.rest-catalog.oauth2.scope must not be set with token") public boolean scopePresentOnlyWithCredential() { - return !(token != null && scope != null); + return scope == null || token == null; } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityProperties.java index 5e2c9de49267..3b9653082bbb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityProperties.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import io.airlift.log.Logger; import org.apache.iceberg.rest.auth.AuthProperties; import org.apache.iceberg.rest.auth.OAuth2Properties; @@ -25,6 +26,8 @@ public class OAuth2SecurityProperties implements SecurityProperties { + private static final Logger log = Logger.get(OAuth2SecurityProperties.class); + private final Map securityProperties; @Inject @@ -32,6 +35,11 @@ public OAuth2SecurityProperties(OAuth2SecurityConfig securityConfig) { requireNonNull(securityConfig, "securityConfig is null"); + if (securityConfig.getCredential().isPresent() && securityConfig.getToken().isPresent()) { + log.warn("Both iceberg.rest-catalog.oauth2.credential and iceberg.rest-catalog.oauth2.token are configured; " + + "token takes precedence and credential is ignored"); + } + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); propertiesBuilder.put(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_OAUTH2); securityConfig.getCredential().ifPresent( diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SessionCredentials.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SessionCredentials.java new file mode 100644 index 000000000000..7be6f09483d8 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SessionCredentials.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import io.trino.spi.security.ConnectorIdentity; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static io.trino.spi.security.ExtraCredentials.authenticatedExtraCredentialName; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; +import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN; + +final class OAuth2SessionCredentials +{ + private static final List OAUTH2_SESSION_CREDENTIALS = List.of(TOKEN, CREDENTIAL); + + private OAuth2SessionCredentials() {} + + static Map fromIdentity(ConnectorIdentity identity) + { + requireNonNull(identity, "identity is null"); + return fromExtraCredentials(identity.getExtraCredentials()); + } + + static Map fromExtraCredentials(Map extraCredentials) + { + requireNonNull(extraCredentials, "extraCredentials is null"); + + ImmutableMap.Builder credentials = ImmutableMap.builder(); + for (String credentialName : OAUTH2_SESSION_CREDENTIALS) { + String authenticatedCredentialName = authenticatedExtraCredentialName(credentialName); + if (extraCredentials.containsKey(authenticatedCredentialName)) { + credentials.put(credentialName, extraCredentials.get(authenticatedCredentialName)); + } + } + return credentials.buildOrThrow(); + } + + static Optional cacheKey(ConnectorIdentity identity) + { + requireNonNull(identity, "identity is null"); + + Hasher hasher = Hashing.murmur3_128().newHasher(); + boolean hasAny = false; + for (String credentialName : OAUTH2_SESSION_CREDENTIALS) { + String value = identity.getExtraCredentials().get(authenticatedExtraCredentialName(credentialName)); + if (value != null) { + hasher.putUnencodedChars(credentialName); + hasher.putByte((byte) 0); + hasher.putUnencodedChars(value); + hasher.putByte((byte) 0); + hasAny = true; + } + } + return hasAny ? Optional.of(hasher.hash().toString()) : Optional.empty(); + } + + static Map catalogPropertiesWithSessionCredentials( + Map catalogProperties, + Map sessionCredentials) + { + requireNonNull(catalogProperties, "catalogProperties is null"); + requireNonNull(sessionCredentials, "sessionCredentials is null"); + + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.putAll(catalogProperties); + sessionCredentials.forEach((key, value) -> { + if (!catalogProperties.containsKey(key)) { + properties.put(key, value); + } + }); + return properties.buildOrThrow(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java index 30caca4b05c5..71975de7bdf0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java @@ -17,6 +17,7 @@ import com.google.common.collect.Maps; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; +import io.airlift.log.Logger; import io.trino.cache.EvictableCacheBuilder; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergFileSystemFactory; @@ -26,6 +27,7 @@ import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType; import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory; import io.trino.spi.NodeVersion; +import io.trino.spi.TrinoException; import io.trino.spi.catalog.CatalogName; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.TypeManager; @@ -36,17 +38,34 @@ import org.apache.iceberg.rest.RESTSessionCatalog; import org.apache.iceberg.rest.RESTUtil; +import java.io.IOException; +import java.util.Base64; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED; +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; +import static org.apache.iceberg.rest.auth.OAuth2Properties.OAUTH2_SERVER_URI; import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN; +import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN_EXCHANGE_ENABLED; +import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN_REFRESH_ENABLED; public class TrinoIcebergRestCatalogFactory - implements TrinoCatalogFactory + implements TrinoCatalogFactory, AutoCloseable { + private static final Logger log = Logger.get(TrinoIcebergRestCatalogFactory.class); + private static final int USER_SESSION_CATALOG_CACHE_SIZE = 1000; + private static final Set CREDENTIAL_KEYS = Set.of(TOKEN, CREDENTIAL); + private static final Set SESSION_PROPERTY_KEYS = Set.of(OAUTH2_SERVER_URI, TOKEN_REFRESH_ENABLED, TOKEN_EXCHANGE_ENABLED); + private final IcebergFileSystemFactory fileSystemFactory; private final ForwardingFileIoFactory fileIoFactory; private final CatalogName catalogName; @@ -62,8 +81,13 @@ public class TrinoIcebergRestCatalogFactory private final boolean caseInsensitiveNameMatching; private final Cache remoteNamespaceMappingCache; private final Cache remoteTableMappingCache; + private final ConcurrentMap userSessionCatalogCache = new ConcurrentHashMap<>(); + private final AtomicBoolean missingCredentialsWarned = new AtomicBoolean(false); + private final long sessionTimeoutMillis; + + private final Object catalogLock = new Object(); - @GuardedBy("this") + @GuardedBy("catalogLock") private RESTSessionCatalog icebergCatalog; @Inject @@ -101,41 +125,28 @@ public TrinoIcebergRestCatalogFactory( .expireAfterWrite(restConfig.getCaseInsensitiveNameMatchingCacheTtl().toMillis(), MILLISECONDS) .shareNothingWhenDisabled() .build(); + this.sessionTimeoutMillis = restConfig.getSessionTimeout().toMillis(); } @Override - public synchronized TrinoCatalog create(ConnectorIdentity identity) + public TrinoCatalog create(ConnectorIdentity identity) { - // Creation of the RESTSessionCatalog is lazy due to required network calls - // for authorization and config route - if (icebergCatalog == null) { - RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog( - config -> HTTPClient.builder(config) - .uri(config.get(CatalogProperties.URI)) - .withHeaders(RESTUtil.configHeaders(config)) - .build(), - (context, config) -> { - ConnectorIdentity currentIdentity = (context.wrappedIdentity() != null) - ? ((ConnectorIdentity) context.wrappedIdentity()) - : ConnectorIdentity.ofUser("fake"); - return fileIoFactory.create(fileSystemFactory.create(currentIdentity, config), true, config); - }); - icebergCatalogInstance.initialize(catalogName.toString(), catalogPropertiesProvider.catalogProperties()); - - icebergCatalog = icebergCatalogInstance; - } + requireNonNull(identity, "identity is null"); + RESTSessionCatalog restSessionCatalog = restSessionCatalog(identity); // `OAuth2Properties.SCOPE` is not set as scope passed through credentials is unused in // https://github.com/apache/iceberg/blob/229d8f6fcd109e6c8943ea7cbb41dab746c6d0ed/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java#L714-L721 - Map credentials = Maps.filterKeys(securityProperties.get(), key -> Set.of(TOKEN, CREDENTIAL).contains(key)); + Map credentials = Maps.filterKeys(securityProperties.get(), CREDENTIAL_KEYS::contains); + Map sessionProperties = Maps.filterKeys(securityProperties.get(), SESSION_PROPERTY_KEYS::contains); return new TrinoRestCatalog( fileSystemFactory, - icebergCatalog, + restSessionCatalog, catalogName, security, sessionType, credentials, + sessionProperties, nestedNamespaceEnabled, trinoVersion, typeManager, @@ -143,6 +154,221 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity) caseInsensitiveNameMatching, remoteNamespaceMappingCache, remoteTableMappingCache, - viewEndpointsEnabled); + viewEndpointsEnabled, + missingCredentialsWarned); + } + + private RESTSessionCatalog restSessionCatalog(ConnectorIdentity identity) + { + if (security == Security.OAUTH2 && sessionType == SessionType.USER) { + Map sessionCredentials = OAuth2SessionCredentials.fromIdentity(identity); + Map catalogProperties = OAuth2SessionCredentials.catalogPropertiesWithSessionCredentials(catalogPropertiesProvider.catalogProperties(), sessionCredentials); + + return OAuth2SessionCredentials.cacheKey(identity) + .map(cacheKey -> cachedRestSessionCatalog(cacheKey, sessionCredentials, catalogProperties, identity)) + .orElseGet(() -> sharedRestSessionCatalog(catalogProperties)); + } + + // Creation of the RESTSessionCatalog is lazy due to required network calls + // for authorization and config route. + return sharedRestSessionCatalog(catalogPropertiesProvider.catalogProperties()); + } + + private RESTSessionCatalog sharedRestSessionCatalog(Map catalogProperties) + { + synchronized (catalogLock) { + if (icebergCatalog == null) { + icebergCatalog = newRestSessionCatalog(catalogProperties); + } + return icebergCatalog; + } + } + + private RESTSessionCatalog cachedRestSessionCatalog(String cacheKey, Map sessionCredentials, Map catalogProperties, ConnectorIdentity identity) + { + long now = System.currentTimeMillis(); + long expiresAtMillis = sessionCredentialsExpirationMillis(sessionCredentials, now); + if (expiresAtMillis <= now) { + throw new TrinoException(PERMISSION_DENIED, format("OAuth2 credentials for user '%s' have already expired", identity.getUser())); + } + + while (true) { + CachedRestSessionCatalog cached = userSessionCatalogCache.get(cacheKey); + if (cached != null && !cached.expired(now)) { + return cached.catalog(); + } + + if (cached == null) { + CachedRestSessionCatalog newCached = new CachedRestSessionCatalog(newRestSessionCatalog(catalogProperties), expiresAtMillis); + CachedRestSessionCatalog previous = userSessionCatalogCache.putIfAbsent(cacheKey, newCached); + if (previous == null) { + cleanUpUserSessionCatalogCache(now); + return newCached.catalog(); + } + + closeCatalog(newCached); + if (!previous.expired(now)) { + return previous.catalog(); + } + userSessionCatalogCache.remove(cacheKey, previous); + continue; + } + + // cached != null and expired: re-read before the expensive network call in case + // another thread already replaced it. + CachedRestSessionCatalog current = userSessionCatalogCache.get(cacheKey); + if (current != null && !current.expired(now)) { + return current.catalog(); + } + + CachedRestSessionCatalog newCached = new CachedRestSessionCatalog(newRestSessionCatalog(catalogProperties), expiresAtMillis); + if (userSessionCatalogCache.replace(cacheKey, cached, newCached)) { + cleanUpUserSessionCatalogCache(now); + return newCached.catalog(); + } + closeCatalog(newCached); + } + } + + private RESTSessionCatalog newRestSessionCatalog(Map catalogProperties) + { + RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog( + config -> HTTPClient.builder(config) + .uri(config.get(CatalogProperties.URI)) + .withHeaders(RESTUtil.configHeaders(config)) + .build(), + (context, config) -> { + ConnectorIdentity currentIdentity = (context.wrappedIdentity() != null) + ? ((ConnectorIdentity) context.wrappedIdentity()) + : ConnectorIdentity.ofUser("fake"); + return fileIoFactory.create(fileSystemFactory.create(currentIdentity, config), true, config); + }); + icebergCatalogInstance.initialize(catalogName.toString(), catalogProperties); + return icebergCatalogInstance; + } + + private long sessionCredentialsExpirationMillis(Map sessionCredentials, long now) + { + String token = sessionCredentials.get(TOKEN); + if (token == null) { + return now + sessionTimeoutMillis; + } + return tokenExpirationMillis(token).orElse(now + sessionTimeoutMillis); + } + + // Parses the "exp" claim from the JWT payload using simple string matching. + // Returns empty on any parse failure, causing the caller to use sessionTimeoutMillis as TTL. + private static OptionalLong tokenExpirationMillis(String token) + { + String[] segments = token.split("\\."); + if (segments.length < 2) { + return OptionalLong.empty(); + } + + try { + String payload = new String(Base64.getUrlDecoder().decode(segments[1]), UTF_8); + int expirationKeyIndex = payload.indexOf("\"exp\""); + if (expirationKeyIndex < 0) { + return OptionalLong.empty(); + } + + int separatorIndex = payload.indexOf(':', expirationKeyIndex); + if (separatorIndex < 0) { + return OptionalLong.empty(); + } + + int start = separatorIndex + 1; + while (start < payload.length() && Character.isWhitespace(payload.charAt(start))) { + start++; + } + + int end = start; + while (end < payload.length() && Character.isDigit(payload.charAt(end))) { + end++; + } + if (start == end) { + return OptionalLong.empty(); + } + + return OptionalLong.of(Long.parseLong(payload.substring(start, end)) * 1000); + } + catch (IllegalArgumentException e) { + return OptionalLong.empty(); + } + } + + private void cleanUpUserSessionCatalogCache(long now) + { + // TODO: RESTSessionCatalog instances evicted from this cache are not closed because + // TrinoRestCatalog instances holding references to them have no lifecycle callback + // when a Trino transaction completes. Closing them prematurely causes active requests + // to fail. Evicted catalogs are only closed when the factory itself shuts down. + // A reference-counting scheme or Closeable wrapper would fix this properly. + userSessionCatalogCache.forEach((cacheKey, cached) -> { + if (cached.expired(now)) { + userSessionCatalogCache.remove(cacheKey, cached); + } + }); + + int entriesToEvict = userSessionCatalogCache.size() - USER_SESSION_CATALOG_CACHE_SIZE; + if (entriesToEvict <= 0) { + return; + } + + for (Map.Entry entry : userSessionCatalogCache.entrySet()) { + if (entriesToEvict <= 0) { + break; + } + if (userSessionCatalogCache.remove(entry.getKey(), entry.getValue())) { + entriesToEvict--; + } + } + } + + private static void closeCatalog(CachedRestSessionCatalog catalog) + { + if (catalog == null) { + return; + } + + try { + catalog.catalog().close(); + } + catch (IOException | RuntimeException e) { + log.warn(e, "Failed to close Iceberg REST session catalog"); + } + } + + @Override + public void close() + { + RESTSessionCatalog catalogToClose; + synchronized (catalogLock) { + catalogToClose = icebergCatalog; + } + if (catalogToClose != null) { + try { + catalogToClose.close(); + } + catch (IOException | RuntimeException e) { + log.warn(e, "Failed to close Iceberg REST catalog"); + } + } + + userSessionCatalogCache.values().forEach(TrinoIcebergRestCatalogFactory::closeCatalog); + userSessionCatalogCache.clear(); + } + + private record CachedRestSessionCatalog(RESTSessionCatalog catalog, long expiresAtMillis) + { + private CachedRestSessionCatalog + { + requireNonNull(catalog, "catalog is null"); + } + + private boolean expired(long now) + { + return expiresAtMillis <= now; + } } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 8451ad550d6e..70e47ec57c8d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -22,8 +22,6 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.log.Logger; -import io.jsonwebtoken.impl.DefaultJwtBuilder; -import io.jsonwebtoken.jackson.io.JacksonSerializer; import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; @@ -68,7 +66,6 @@ import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.rest.RESTSessionCatalog; -import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.iceberg.view.ReplaceViewVersion; import org.apache.iceberg.view.SQLViewRepresentation; import org.apache.iceberg.view.UpdateViewProperties; @@ -79,12 +76,12 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.function.UnaryOperator; @@ -117,7 +114,6 @@ public class TrinoRestCatalog private static final int PER_QUERY_CACHE_SIZE = 1000; private static final String NAMESPACE_SEPARATOR = "."; - private final IcebergFileSystemFactory fileSystemFactory; private final RESTSessionCatalog restSessionCatalog; private final CatalogName catalogName; @@ -125,6 +121,7 @@ public class TrinoRestCatalog private final Security security; private final SessionType sessionType; private final Map credentials; + private final Map sessionProperties; private final boolean nestedNamespaceEnabled; private final String trinoVersion; private final boolean useUniqueTableLocation; @@ -132,6 +129,7 @@ public class TrinoRestCatalog private final Cache remoteNamespaceMappingCache; private final Cache remoteTableMappingCache; private final boolean viewEndpointsEnabled; + private final AtomicBoolean missingCredentialsWarned; private final Cache tableCache = EvictableCacheBuilder.newBuilder() .maximumSize(PER_QUERY_CACHE_SIZE) @@ -144,6 +142,7 @@ public TrinoRestCatalog( Security security, SessionType sessionType, Map credentials, + Map sessionProperties, boolean nestedNamespaceEnabled, String trinoVersion, TypeManager typeManager, @@ -151,7 +150,8 @@ public TrinoRestCatalog( boolean caseInsensitiveNameMatching, Cache remoteNamespaceMappingCache, Cache remoteTableMappingCache, - boolean viewEndpointsEnabled) + boolean viewEndpointsEnabled, + AtomicBoolean missingCredentialsWarned) { this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null"); @@ -159,6 +159,7 @@ public TrinoRestCatalog( this.security = requireNonNull(security, "security is null"); this.sessionType = requireNonNull(sessionType, "sessionType is null"); this.credentials = ImmutableMap.copyOf(requireNonNull(credentials, "credentials is null")); + this.sessionProperties = ImmutableMap.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); this.nestedNamespaceEnabled = nestedNamespaceEnabled; this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); @@ -167,6 +168,7 @@ public TrinoRestCatalog( this.remoteNamespaceMappingCache = requireNonNull(remoteNamespaceMappingCache, "remoteNamespaceMappingCache is null"); this.remoteTableMappingCache = requireNonNull(remoteTableMappingCache, "remoteTableMappingCache is null"); this.viewEndpointsEnabled = viewEndpointsEnabled; + this.missingCredentialsWarned = requireNonNull(missingCredentialsWarned, "missingCredentialsWarned is null"); } @Override @@ -605,10 +607,7 @@ private TableIdentifier toRemoteObject(ConnectorSession session, SchemaTableName if (!remoteView.name().equals(schemaTableName.getTableName())) { return remoteView; } - if (remoteView.name().equals(schemaTableName.getTableName()) && remoteTable.name().equals(schemaTableName.getTableName())) { - return remoteTable; - } - throw new RuntimeException("Unable to find remote object"); + return remoteTable; } @Override @@ -910,24 +909,19 @@ private SessionCatalog.SessionContext convert(ConnectorSession session) "trinoCatalog", catalogName.toString(), "trinoVersion", trinoVersion); - Map claims = ImmutableMap.builder() + // Per-request properties take precedence over static OAuth2 session properties. + Map sessionProperties = ImmutableMap.builder() + .putAll(this.sessionProperties) .putAll(properties) - .buildOrThrow(); - - String subjectJwt = new DefaultJwtBuilder() - .subject(session.getUser()) - .issuer(trinoVersion) - .issuedAt(new Date()) - .claims(claims) - .json(new JacksonSerializer<>()) - .compact(); - - Map credentials = ImmutableMap.builder() - .putAll(session.getIdentity().getExtraCredentials()) - .put(OAuth2Properties.JWT_TOKEN_TYPE, subjectJwt) - .buildOrThrow(); - - yield new SessionCatalog.SessionContext(sessionId, session.getUser(), credentials, properties, session.getIdentity()); + .buildKeepingLast(); + + Map sessionCredentials = OAuth2SessionCredentials.fromExtraCredentials(session.getIdentity().getExtraCredentials()); + if (sessionCredentials.isEmpty() && missingCredentialsWarned.compareAndSet(false, true)) { + log.warn("No OAuth2 credentials found for user session; REST catalog requests may be rejected. " + + "Ensure http-server.authentication.oauth2.access-token-extra-credential-name is configured."); + } + + yield new SessionCatalog.SessionContext(sessionId, session.getUser(), sessionCredentials, sessionProperties, session.getIdentity()); } }; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java index c0b428e268c7..e8f553365984 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java @@ -275,6 +275,18 @@ public void testRestCatalogValidations() .shutdown()) .isInstanceOf(ApplicationConfigurationException.class) .hasMessageContaining("Using the `register_table` procedure with vended credentials is currently not supported"); + + assertThatThrownBy(() -> factory.create( + "test", + Map.of( + "iceberg.catalog.type", "rest", + "iceberg.rest-catalog.uri", "https://foo:1234", + "iceberg.rest-catalog.security", "OAUTH2", + "bootstrap.quiet", "true"), + new TestingConnectorContext()) + .shutdown()) + .isInstanceOf(ApplicationConfigurationException.class) + .hasMessageContaining("OAuth2 REST catalog requires iceberg.rest-catalog.oauth2.credential or iceberg.rest-catalog.oauth2.token when iceberg.rest-catalog.session is not USER"); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestOAuth2SecurityConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestOAuth2SecurityConfig.java index 5ad16dc3999d..d27596e330f3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestOAuth2SecurityConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestOAuth2SecurityConfig.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.catalog.rest; import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.auth.AuthProperties; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.junit.jupiter.api.Test; @@ -58,8 +59,23 @@ public void testExplicitPropertyMappings() .setServerUri(URI.create("http://localhost:8080/realms/iceberg/protocol/openid-connect/token")) .setTokenRefreshEnabled(false) .setTokenExchangeEnabled(false); - assertThat(expected.credentialOrTokenPresent()).isTrue(); assertThat(expected.scopePresentOnlyWithCredential()).isFalse(); assertFullMapping(properties, expected); } + + @Test + public void testUserSessionAllowsSessionOAuth2Credentials() + { + OAuth2SecurityProperties securityProperties = new OAuth2SecurityProperties( + new OAuth2SecurityConfig() + .setServerUri(URI.create("https://auth.example.com/oauth2/token")) + .setTokenExchangeEnabled(true)); + + assertThat(securityProperties.get()) + .containsEntry(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_OAUTH2) + .containsEntry(OAuth2Properties.OAUTH2_SERVER_URI, "https://auth.example.com/oauth2/token") + .containsEntry(OAuth2Properties.TOKEN_EXCHANGE_ENABLED, "true") + .doesNotContainKey(OAuth2Properties.TOKEN) + .doesNotContainKey(OAuth2Properties.CREDENTIAL); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoIcebergRestCatalogFactory.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoIcebergRestCatalogFactory.java new file mode 100644 index 000000000000..823f6a9ceaaf --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoIcebergRestCatalogFactory.java @@ -0,0 +1,127 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.trino.spi.security.ConnectorIdentity; +import org.apache.iceberg.CatalogProperties; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static io.trino.spi.security.ExtraCredentials.authenticatedExtraCredentialName; +import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; +import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestTrinoIcebergRestCatalogFactory +{ + @Test + public void testUserSessionOAuth2AddsSessionCredentialsToInitialCatalogProperties() + { + Map sessionCredentials = OAuth2SessionCredentials.fromIdentity( + ConnectorIdentity.forUser("alice") + .withExtraCredentials(ImmutableMap.of( + TOKEN, "client-supplied-token", + authenticatedExtraCredentialName(TOKEN), "alice-token", + CREDENTIAL, "client-supplied-credential", + authenticatedExtraCredentialName(CREDENTIAL), "alice-credential", + "unrelated", "secret")) + .build()); + + Map properties = OAuth2SessionCredentials.catalogPropertiesWithSessionCredentials( + ImmutableMap.of(CatalogProperties.URI, "https://catalog.example.com"), + sessionCredentials); + + assertThat(properties) + .containsEntry(CatalogProperties.URI, "https://catalog.example.com") + .containsEntry(TOKEN, "alice-token") + .containsEntry(CREDENTIAL, "alice-credential") + .doesNotContainKey("unrelated"); + } + + @Test + public void testStaticCatalogCredentialsTakePrecedenceForInitialCatalogProperties() + { + Map sessionCredentials = OAuth2SessionCredentials.fromIdentity( + ConnectorIdentity.forUser("alice") + .withExtraCredentials(ImmutableMap.of( + authenticatedExtraCredentialName(TOKEN), "alice-token", + authenticatedExtraCredentialName(CREDENTIAL), "alice-credential")) + .build()); + + Map properties = OAuth2SessionCredentials.catalogPropertiesWithSessionCredentials( + ImmutableMap.of( + CatalogProperties.URI, "https://catalog.example.com", + TOKEN, "catalog-token", + CREDENTIAL, "catalog-credential"), + sessionCredentials); + + assertThat(properties) + .containsEntry(TOKEN, "catalog-token") + .containsEntry(CREDENTIAL, "catalog-credential"); + } + + @Test + public void testClientSuppliedOAuth2SessionCredentialsAreIgnored() + { + Map sessionCredentials = OAuth2SessionCredentials.fromIdentity( + ConnectorIdentity.forUser("alice") + .withExtraCredentials(ImmutableMap.of( + TOKEN, "client-supplied-token", + CREDENTIAL, "client-supplied-credential")) + .build()); + + Map properties = OAuth2SessionCredentials.catalogPropertiesWithSessionCredentials( + ImmutableMap.of(CatalogProperties.URI, "https://catalog.example.com"), + sessionCredentials); + + assertThat(properties) + .doesNotContainKey(TOKEN) + .doesNotContainKey(CREDENTIAL); + } + + @Test + public void testUserSessionOAuth2CacheKeyUsesOnlyAuthenticatedCredentials() + { + String cacheKey = OAuth2SessionCredentials.cacheKey( + ConnectorIdentity.forUser("alice") + .withExtraCredentials(ImmutableMap.of( + TOKEN, "client-supplied-token", + authenticatedExtraCredentialName(TOKEN), "alice-token", + "unrelated", "secret")) + .build()) + .orElseThrow(); + + assertThat(cacheKey) + .isNotBlank() + .matches("[0-9a-f]+"); + + assertThat(OAuth2SessionCredentials.cacheKey( + ConnectorIdentity.forUser("alice") + .withExtraCredentials(ImmutableMap.of( + TOKEN, "different-client-supplied-token", + authenticatedExtraCredentialName(TOKEN), "alice-token", + "unrelated", "different-secret")) + .build())) + .contains(cacheKey); + + assertThat(OAuth2SessionCredentials.cacheKey( + ConnectorIdentity.forUser("alice") + .withExtraCredentials(ImmutableMap.of(authenticatedExtraCredentialName(TOKEN), "bob-token")) + .build())) + .isPresent() + .hasValueSatisfying(differentCacheKey -> assertThat(differentCacheKey).isNotEqualTo(cacheKey)); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java index f8566ff46442..733779fdb5b8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java @@ -28,11 +28,14 @@ import io.trino.spi.TrinoException; import io.trino.spi.catalog.CatalogName; import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.ConnectorViewDefinition.ViewColumn; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.security.PrincipalType; import io.trino.spi.security.TrinoPrincipal; +import io.trino.testing.TestingConnectorSession; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.RESTException; @@ -48,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; @@ -58,8 +62,10 @@ import static io.trino.metastore.TableInfo.ExtendedRelationType.OTHER_VIEW; import static io.trino.plugin.iceberg.IcebergTestUtils.TABLE_STATISTICS_READER; import static io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType.NONE; +import static io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType.USER; import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog; import static io.trino.plugin.iceberg.delete.DeletionVectorWriter.UNSUPPORTED_DELETION_VECTOR_WRITER; +import static io.trino.spi.security.ExtraCredentials.authenticatedExtraCredentialName; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.sql.planner.TestingPlannerContext.PLANNER_CONTEXT; @@ -67,6 +73,10 @@ import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; import static java.util.Locale.ENGLISH; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; +import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN; +import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN_EXCHANGE_ENABLED; +import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN_REFRESH_ENABLED; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -121,6 +131,7 @@ private static TrinoRestCatalog createTrinoRestCatalog( Security.NONE, NONE, ImmutableMap.of(), + ImmutableMap.of(), nestedNamespaceEnabled, "test", TESTING_TYPE_MANAGER, @@ -128,7 +139,35 @@ private static TrinoRestCatalog createTrinoRestCatalog( caseInsensitiveNameMatching, EvictableCacheBuilder.newBuilder().expireAfterWrite(1000, MILLISECONDS).shareNothingWhenDisabled().build(), EvictableCacheBuilder.newBuilder().expireAfterWrite(1000, MILLISECONDS).shareNothingWhenDisabled().build(), - true); + true, + new AtomicBoolean(false)); + } + + private static TrinoRestCatalog createUserSessionTrinoRestCatalog(RESTSessionCatalog restSessionCatalog) + { + return createUserSessionTrinoRestCatalog(restSessionCatalog, ImmutableMap.of()); + } + + private static TrinoRestCatalog createUserSessionTrinoRestCatalog(RESTSessionCatalog restSessionCatalog, Map sessionProperties) + { + String catalogName = "iceberg_rest"; + return new TrinoRestCatalog( + new DefaultIcebergFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY), + restSessionCatalog, + new CatalogName(catalogName), + Security.OAUTH2, + USER, + ImmutableMap.of(), + sessionProperties, + false, + "test", + TESTING_TYPE_MANAGER, + true, + false, + EvictableCacheBuilder.newBuilder().expireAfterWrite(1000, MILLISECONDS).shareNothingWhenDisabled().build(), + EvictableCacheBuilder.newBuilder().expireAfterWrite(1000, MILLISECONDS).shareNothingWhenDisabled().build(), + true, + new AtomicBoolean(false)); } @Test @@ -224,6 +263,41 @@ public void testCaseInsensitiveNamespaceLookupIgnoresNamespaceDeletedDuringRecur assertThat(catalog.namespaceExists(SESSION, "existing")).isTrue(); } + @Test + public void testUserSessionPassesOAuth2ExtraCredentialsToRestCatalogRequests() + { + CapturingListNamespacesCatalog restSessionCatalog = new CapturingListNamespacesCatalog(); + TrinoCatalog catalog = createUserSessionTrinoRestCatalog( + restSessionCatalog, + ImmutableMap.of( + TOKEN_REFRESH_ENABLED, "false", + TOKEN_EXCHANGE_ENABLED, "false")); + + ConnectorSession session = TestingConnectorSession.builder() + .setIdentity(ConnectorIdentity.forUser("alice") + .withExtraCredentials(ImmutableMap.of( + TOKEN, "client-supplied-token", + authenticatedExtraCredentialName(TOKEN), "alice-token", + CREDENTIAL, "client-supplied-credential", + authenticatedExtraCredentialName(CREDENTIAL), "alice-credential", + "unrelated", "secret")) + .build()) + .build(); + + assertThat(catalog.listNamespaces(session)).isEmpty(); + + assertThat(restSessionCatalog.credentials()) + .containsEntry(TOKEN, "alice-token") + .containsEntry(CREDENTIAL, "alice-credential") + .doesNotContainKey("unrelated"); + assertThat(restSessionCatalog.properties()) + .containsEntry(TOKEN_REFRESH_ENABLED, "false") + .containsEntry(TOKEN_EXCHANGE_ENABLED, "false") + .containsEntry("user", "alice") + .containsEntry("trinoCatalog", "iceberg_rest") + .containsEntry("trinoVersion", "test"); + } + @Test public void testNestedListNamespacesPropagatesRecursiveRestFailures() { @@ -248,6 +322,31 @@ public List listNamespaces(SessionContext context, Namespace namespac .hasMessage("catalog failure"); } + private static class CapturingListNamespacesCatalog + extends RESTSessionCatalog + { + private Map credentials = ImmutableMap.of(); + private Map properties = ImmutableMap.of(); + + @Override + public List listNamespaces(SessionContext context, Namespace namespace) + { + credentials = context.credentials(); + properties = context.properties(); + return ImmutableList.of(); + } + + public Map credentials() + { + return credentials; + } + + public Map properties() + { + return properties; + } + } + private static class NamespaceDeletedDuringRecursiveListingCatalog extends RESTSessionCatalog {