Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS: Fix Catalog URI within VendedCredentialsProvider #12612

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

wolflex888
Copy link

Currently, when httpclient being initiated here, It's using refresh endpoint as the base URI. this will leads to error when the http client is constructing OAuth2 URI to refresh the tokens. This PR separates out two URIs so HTTPClient can be initiated with catalog base URI.

@github-actions github-actions bot added the AWS label Mar 22, 2025
@nastra nastra self-requested a review March 24, 2025 16:02
VendedCredentialsProvider.URI, refreshCredentialsEndpoint);
VendedCredentialsProvider.CREDENTIALS_ENDPOINT, refreshCredentialsEndpoint);
clientCredentialsProviderProperties.put(
VendedCredentialsProvider.URI, allProperties.get(CatalogProperties.URI));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than introducing/renaming the existing property, we could just do

      Optional.ofNullable(allProperties.get(allProperties.get(CatalogProperties.URI)))
          .ifPresent(
              catalogUri ->
                  clientCredentialsProviderProperties.put(CatalogProperties.URI, catalogUri));

@@ -43,7 +43,8 @@
import software.amazon.awssdk.utils.cache.RefreshResult;

public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAutoCloseable {
public static final String URI = "credentials.uri";
public static final String URI = "credentials.catalog.uri";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of renaming this and introducing another property, we can do it also like this:

--- a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.rest.ErrorHandlers;
@@ -47,12 +48,16 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
   private volatile HTTPClient client;
   private final Map<String, String> properties;
   private final CachedSupplier<AwsCredentials> credentialCache;
+  private final String catalogEndpoint;
+  private final String credentialsEndpoint;
   private AuthManager authManager;
   private AuthSession authSession;

   private VendedCredentialsProvider(Map<String, String> properties) {
     Preconditions.checkArgument(null != properties, "Invalid properties: null");
-    Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null");
+    Preconditions.checkArgument(null != properties.get(URI), "Invalid credentials URI: null");
+    this.credentialsEndpoint = properties.get(URI);
+    this.catalogEndpoint = properties.getOrDefault(CatalogProperties.URI, credentialsEndpoint);
     this.properties = properties;
     this.credentialCache =
         CachedSupplier.builder(() -> credentialFromProperties().orElseGet(this::refreshCredential))
@@ -82,7 +87,7 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
       synchronized (this) {
         if (null == client) {
           authManager = AuthManagers.loadAuthManager("s3-credentials-refresh", properties);
-          HTTPClient httpClient = HTTPClient.builder(properties).uri(properties.get(URI)).build();
+          HTTPClient httpClient = HTTPClient.builder(properties).uri(catalogEndpoint).build();
           authSession = authManager.catalogSession(httpClient, properties);
           client = httpClient.withAuthSession(authSession);
         }
@@ -95,7 +100,7 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
   private LoadCredentialsResponse fetchCredentials() {
     return httpClient()
         .get(
-            properties.get(URI),
+            credentialsEndpoint,
             null,
             LoadCredentialsResponse.class,
             Map.of(),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also please update the tests to

diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java
index 51aca88943..a48f6b4dff 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java
@@ -26,6 +26,7 @@ import static org.mockserver.model.HttpResponse.response;
 
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.exceptions.RESTException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.rest.HttpMethod;
@@ -48,7 +49,12 @@ import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 public class TestVendedCredentialsProvider {
 
   private static final int PORT = 3232;
-  private static final String URI = String.format("http://127.0.0.1:%d/v1/credentials", PORT);
+  private static final String CREDENTIALS_URI =
+      String.format("http://127.0.0.1:%d/v1/credentials", PORT);
+  private static final String CATALOG_URI = String.format("http://127.0.0.1:%d/v1", PORT);
+  private static final ImmutableMap<String, String> PROPERTIES =
+      ImmutableMap.of(
+          VendedCredentialsProvider.URI, CREDENTIALS_URI, CatalogProperties.URI, CATALOG_URI);
   private static ClientAndServer mockServer;
 
   @BeforeAll
@@ -73,7 +79,7 @@ public class TestVendedCredentialsProvider {
         .hasMessage("Invalid properties: null");
     assertThatThrownBy(() -> VendedCredentialsProvider.create(ImmutableMap.of()))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Invalid URI: null");
+        .hasMessage("Invalid credentials URI: null");
 
     try (VendedCredentialsProvider provider =
         VendedCredentialsProvider.create(
@@ -95,8 +101,7 @@ public class TestVendedCredentialsProvider {
             .withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       assertThatThrownBy(provider::resolveCredentials)
           .isInstanceOf(IllegalStateException.class)
           .hasMessage("Invalid S3 Credentials: empty");
@@ -124,8 +129,7 @@ public class TestVendedCredentialsProvider {
         response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       assertThatThrownBy(provider::resolveCredentials)
           .isInstanceOf(IllegalStateException.class)
           .hasMessage("Invalid S3 Credentials: s3.session-token not set");
@@ -155,8 +159,7 @@ public class TestVendedCredentialsProvider {
         response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       assertThatThrownBy(provider::resolveCredentials)
           .isInstanceOf(IllegalStateException.class)
           .hasMessage("Invalid S3 Credentials: s3.session-token-expires-at-ms not set");
@@ -187,8 +190,7 @@ public class TestVendedCredentialsProvider {
         response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       AwsCredentials awsCredentials = provider.resolveCredentials();
 
       verifyCredentials(awsCredentials, credential);
@@ -226,8 +228,7 @@ public class TestVendedCredentialsProvider {
         response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       AwsCredentials awsCredentials = provider.resolveCredentials();
       verifyCredentials(awsCredentials, credential);
 
@@ -294,8 +295,7 @@ public class TestVendedCredentialsProvider {
         response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       assertThatThrownBy(provider::resolveCredentials)
           .isInstanceOf(IllegalStateException.class)
           .hasMessage("Invalid S3 Credentials: only one S3 credential should exist");
@@ -346,7 +346,7 @@ public class TestVendedCredentialsProvider {
         VendedCredentialsProvider.create(
             ImmutableMap.of(
                 VendedCredentialsProvider.URI,
-                URI,
+                CREDENTIALS_URI,
                 S3FileIOProperties.ACCESS_KEY_ID,
                 "randomAccessKeyFromProperties",
                 S3FileIOProperties.SECRET_ACCESS_KEY,
@@ -398,7 +398,7 @@ public class TestVendedCredentialsProvider {
         VendedCredentialsProvider.create(
             ImmutableMap.of(
                 VendedCredentialsProvider.URI,
-                URI,
+                CREDENTIALS_URI,
                 S3FileIOProperties.ACCESS_KEY_ID,
                 "randomAccessKeyFromProperties",
                 S3FileIOProperties.SECRET_ACCESS_KEY,
@@ -451,7 +451,7 @@ public class TestVendedCredentialsProvider {
         VendedCredentialsProvider.create(
             ImmutableMap.of(
                 VendedCredentialsProvider.URI,
-                URI,
+                CREDENTIALS_URI,
                 S3FileIOProperties.ACCESS_KEY_ID,
                 "randomAccessKeyFromProperties",
                 S3FileIOProperties.SECRET_ACCESS_KEY,

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolflex888 I don't see this being applied to the tests. Can you please update the tests so that we can get this merged?

@nastra nastra added this to the Iceberg 1.9.0 milestone Mar 25, 2025
@ajantha-bhat
Copy link
Member

@adutra: Can you please also take a look at it?

@@ -95,7 +98,7 @@ private RESTClient httpClient() {
private LoadCredentialsResponse fetchCredentials() {
return httpClient()
.get(
properties.get(URI),
properties.get(CREDENTIALS_ENDPOINT),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we need two properties.

But the problem, as I see it, is that this property, even with the changes in this PR, is currently an absolute URL that was computed in AwsClientProperties:

    this.refreshCredentialsEndpoint =
        RESTUtil.resolveEndpoint(
            properties.get(CatalogProperties.URI), properties.get(REFRESH_CREDENTIALS_ENDPOINT));

But I think this property must be a relative path instead.

    this.refreshCredentialsEndpoint = properties.get(REFRESH_CREDENTIALS_ENDPOINT);

In that case we would have the following properties:

Property Absolute? Example
credentials.catalog.uri Yes http://catalog.com/api/v1
credentials.endpoint No creds-refresh/whatever

The HTTP client then must be constructed with credentials.catalog.uri as the base URI for this to work with both internal and external IDPs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my other comment above. we don't need two properties as we can just pass the catalog URI via

      Optional.ofNullable(allProperties.get(allProperties.get(CatalogProperties.URI)))
          .ifPresent(
              catalogUri ->
                  clientCredentialsProviderProperties.put(CatalogProperties.URI, catalogUri));

The absolute/relative path handling is already done in

this.refreshCredentialsEndpoint =
RESTUtil.resolveEndpoint(
properties.get(CatalogProperties.URI), properties.get(REFRESH_CREDENTIALS_ENDPOINT));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but my point was: why bother absolutizing refreshCredentialsEndpoint in AwsClientProperties? The HTTP client could handle that internally.

VendedCredentialsProvider.URI, refreshCredentialsEndpoint);
VendedCredentialsProvider.CREDENTIALS_ENDPOINT, refreshCredentialsEndpoint);
clientCredentialsProviderProperties.put(
VendedCredentialsProvider.URI, allProperties.get(CatalogProperties.URI));
Optional.ofNullable(allProperties.get(OAuth2Properties.TOKEN))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tangential question: here we are transferring the TOKEN property from allPropeties to clientCredentialsProviderProperties.

But what about the other auth properties? I'm especially concerned about OAuth2Properties.OAUTH2_SERVER_URI. If that property is not retained, and the token endpoint is not the default one, the credentials provider won't be able to fetch a token successfully.

The same could be said of properties like SCOPE, RESOURCE or AUDIENCE. Why aren't we transferring those as well?

The only way currently to pass an auth server URL to the provider would be to "wrap" it in a property like client.credentials-provider.oauth2-server-uri=http://auth-server.com/tokens. Is that the recommended approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, in hindsight it's probably better to just pass all properties to the credentialsprovider. Could you please update it to

    if (refreshCredentialsEnabled && !Strings.isNullOrEmpty(refreshCredentialsEndpoint)) {
      clientCredentialsProviderProperties.putAll(allProperties);
      clientCredentialsProviderProperties.put(
          VendedCredentialsProvider.URI, refreshCredentialsEndpoint);
      return credentialsProvider(VendedCredentialsProvider.class.getName());
    }

@ajantha-bhat
Copy link
Member

@wolflex888: Will you be updating the PR? I am planning 1.9.0 tomorrow evening and this issue is marked for 1.9.0 milestone.

@wolflex888 wolflex888 requested review from nastra and adutra March 26, 2025 22:41
@@ -58,6 +62,8 @@ private VendedCredentialsProvider(Map<String, String> properties) {
CachedSupplier.builder(() -> credentialFromProperties().orElseGet(this::refreshCredential))
.cachedValueName(VendedCredentialsProvider.class.getName())
.build();
this.catalogEndpoint = properties.get(CatalogProperties.URI);
Copy link
Contributor

@nastra nastra Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please update Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null"); to Preconditions.checkArgument(null != properties.get(URI), "Invalid credentials endpoint: null"); and also add Preconditions.checkArgument(null != properties.get(CatalogProperties.URI), "Invalid catalog endpoint: null");.

Please also update invalidOrMissingUri() and add a check where the catalog URI isn't provided

@@ -48,7 +49,12 @@
public class TestVendedCredentialsProvider {

private static final int PORT = 3232;
private static final String URI = String.format("http://127.0.0.1:%d/v1/credentials", PORT);
private static final String CREDENTIALS_URI =
String.format("http://127.0.0.1:%d/v1/credentials", PORT);
Copy link
Contributor

@danielcweeks danielcweeks Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically you can point to any URI, but I think we need to include a test pointing to a path consistent with the spec (e.g. /v1/{prefix}/namespaces/{namespace}/tables/{table}/credentials) and we should test both absolute and relative (which I believe this change will address as well.)

@@ -45,6 +40,12 @@
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;

import static org.assertj.core.api.Assertions.assertThat;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imports have been re-organized and so the build will fail

this.properties = properties;
this.credentialCache =
CachedSupplier.builder(() -> credentialFromProperties().orElseGet(this::refreshCredential))
.cachedValueName(VendedCredentialsProvider.class.getName())
.build();
this.catalogEndpoint = properties.get(CatalogProperties.URI);
this.credentialsEndpoint = RESTUtil.resolveEndpoint(catalogEndpoint, properties.get(URI));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already being done in AwsClientProperties, so it should only do this.credentialsEndpoint = properties.get(URI)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is corrected.

@@ -26,6 +26,7 @@

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import org.apache.iceberg.CatalogProperties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests still need to be updated. See also my comment in #12612 (comment)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests are updated.

@nastra
Copy link
Contributor

nastra commented Mar 28, 2025

thanks @wolflex888 for fixing this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants