Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS: Fix Catalog URI within VendedCredentialsProvider #12612

Merged
merged 19 commits into from
Apr 1, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ public AwsCredentialsProvider credentialsProvider(
String accessKeyId, String secretAccessKey, String sessionToken) {
if (refreshCredentialsEnabled && !Strings.isNullOrEmpty(refreshCredentialsEndpoint)) {
clientCredentialsProviderProperties.put(
VendedCredentialsProvider.URI, refreshCredentialsEndpoint);
VendedCredentialsProvider.CREDENTIALS_ENDPOINT, refreshCredentialsEndpoint);
clientCredentialsProviderProperties.put(
VendedCredentialsProvider.URI, allProperties.get(CatalogProperties.URI));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than introducing/renaming the existing property, we could just do

      Optional.ofNullable(allProperties.get(allProperties.get(CatalogProperties.URI)))
          .ifPresent(
              catalogUri ->
                  clientCredentialsProviderProperties.put(CatalogProperties.URI, catalogUri));

Optional.ofNullable(allProperties.get(OAuth2Properties.TOKEN))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tangential question: here we are transferring the TOKEN property from allPropeties to clientCredentialsProviderProperties.

But what about the other auth properties? I'm especially concerned about OAuth2Properties.OAUTH2_SERVER_URI. If that property is not retained, and the token endpoint is not the default one, the credentials provider won't be able to fetch a token successfully.

The same could be said of properties like SCOPE, RESOURCE or AUDIENCE. Why aren't we transferring those as well?

The only way currently to pass an auth server URL to the provider would be to "wrap" it in a property like client.credentials-provider.oauth2-server-uri=http://auth-server.com/tokens. Is that the recommended approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, in hindsight it's probably better to just pass all properties to the credentialsprovider. Could you please update it to

    if (refreshCredentialsEnabled && !Strings.isNullOrEmpty(refreshCredentialsEndpoint)) {
      clientCredentialsProviderProperties.putAll(allProperties);
      clientCredentialsProviderProperties.put(
          VendedCredentialsProvider.URI, refreshCredentialsEndpoint);
      return credentialsProvider(VendedCredentialsProvider.class.getName());
    }

.ifPresent(
token ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.hadoop.util.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.HTTPClient;
Expand All @@ -43,7 +43,8 @@
import software.amazon.awssdk.utils.cache.RefreshResult;

public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAutoCloseable {
public static final String URI = "credentials.uri";
public static final String URI = "credentials.catalog.uri";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of renaming this and introducing another property, we can do it also like this:

--- a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.rest.ErrorHandlers;
@@ -47,12 +48,16 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
   private volatile HTTPClient client;
   private final Map<String, String> properties;
   private final CachedSupplier<AwsCredentials> credentialCache;
+  private final String catalogEndpoint;
+  private final String credentialsEndpoint;
   private AuthManager authManager;
   private AuthSession authSession;

   private VendedCredentialsProvider(Map<String, String> properties) {
     Preconditions.checkArgument(null != properties, "Invalid properties: null");
-    Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null");
+    Preconditions.checkArgument(null != properties.get(URI), "Invalid credentials URI: null");
+    this.credentialsEndpoint = properties.get(URI);
+    this.catalogEndpoint = properties.getOrDefault(CatalogProperties.URI, credentialsEndpoint);
     this.properties = properties;
     this.credentialCache =
         CachedSupplier.builder(() -> credentialFromProperties().orElseGet(this::refreshCredential))
@@ -82,7 +87,7 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
       synchronized (this) {
         if (null == client) {
           authManager = AuthManagers.loadAuthManager("s3-credentials-refresh", properties);
-          HTTPClient httpClient = HTTPClient.builder(properties).uri(properties.get(URI)).build();
+          HTTPClient httpClient = HTTPClient.builder(properties).uri(catalogEndpoint).build();
           authSession = authManager.catalogSession(httpClient, properties);
           client = httpClient.withAuthSession(authSession);
         }
@@ -95,7 +100,7 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
   private LoadCredentialsResponse fetchCredentials() {
     return httpClient()
         .get(
-            properties.get(URI),
+            credentialsEndpoint,
             null,
             LoadCredentialsResponse.class,
             Map.of(),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also please update the tests to

diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java
index 51aca88943..a48f6b4dff 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java
@@ -26,6 +26,7 @@ import static org.mockserver.model.HttpResponse.response;
 
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.exceptions.RESTException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.rest.HttpMethod;
@@ -48,7 +49,12 @@ import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 public class TestVendedCredentialsProvider {
 
   private static final int PORT = 3232;
-  private static final String URI = String.format("http://127.0.0.1:%d/v1/credentials", PORT);
+  private static final String CREDENTIALS_URI =
+      String.format("http://127.0.0.1:%d/v1/credentials", PORT);
+  private static final String CATALOG_URI = String.format("http://127.0.0.1:%d/v1", PORT);
+  private static final ImmutableMap<String, String> PROPERTIES =
+      ImmutableMap.of(
+          VendedCredentialsProvider.URI, CREDENTIALS_URI, CatalogProperties.URI, CATALOG_URI);
   private static ClientAndServer mockServer;
 
   @BeforeAll
@@ -73,7 +79,7 @@ public class TestVendedCredentialsProvider {
         .hasMessage("Invalid properties: null");
     assertThatThrownBy(() -> VendedCredentialsProvider.create(ImmutableMap.of()))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Invalid URI: null");
+        .hasMessage("Invalid credentials URI: null");
 
     try (VendedCredentialsProvider provider =
         VendedCredentialsProvider.create(
@@ -95,8 +101,7 @@ public class TestVendedCredentialsProvider {
             .withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       assertThatThrownBy(provider::resolveCredentials)
           .isInstanceOf(IllegalStateException.class)
           .hasMessage("Invalid S3 Credentials: empty");
@@ -124,8 +129,7 @@ public class TestVendedCredentialsProvider {
         response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       assertThatThrownBy(provider::resolveCredentials)
           .isInstanceOf(IllegalStateException.class)
           .hasMessage("Invalid S3 Credentials: s3.session-token not set");
@@ -155,8 +159,7 @@ public class TestVendedCredentialsProvider {
         response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       assertThatThrownBy(provider::resolveCredentials)
           .isInstanceOf(IllegalStateException.class)
           .hasMessage("Invalid S3 Credentials: s3.session-token-expires-at-ms not set");
@@ -187,8 +190,7 @@ public class TestVendedCredentialsProvider {
         response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       AwsCredentials awsCredentials = provider.resolveCredentials();
 
       verifyCredentials(awsCredentials, credential);
@@ -226,8 +228,7 @@ public class TestVendedCredentialsProvider {
         response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       AwsCredentials awsCredentials = provider.resolveCredentials();
       verifyCredentials(awsCredentials, credential);
 
@@ -294,8 +295,7 @@ public class TestVendedCredentialsProvider {
         response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
     mockServer.when(mockRequest).respond(mockResponse);
 
-    try (VendedCredentialsProvider provider =
-        VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
+    try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
       assertThatThrownBy(provider::resolveCredentials)
           .isInstanceOf(IllegalStateException.class)
           .hasMessage("Invalid S3 Credentials: only one S3 credential should exist");
@@ -346,7 +346,7 @@ public class TestVendedCredentialsProvider {
         VendedCredentialsProvider.create(
             ImmutableMap.of(
                 VendedCredentialsProvider.URI,
-                URI,
+                CREDENTIALS_URI,
                 S3FileIOProperties.ACCESS_KEY_ID,
                 "randomAccessKeyFromProperties",
                 S3FileIOProperties.SECRET_ACCESS_KEY,
@@ -398,7 +398,7 @@ public class TestVendedCredentialsProvider {
         VendedCredentialsProvider.create(
             ImmutableMap.of(
                 VendedCredentialsProvider.URI,
-                URI,
+                CREDENTIALS_URI,
                 S3FileIOProperties.ACCESS_KEY_ID,
                 "randomAccessKeyFromProperties",
                 S3FileIOProperties.SECRET_ACCESS_KEY,
@@ -451,7 +451,7 @@ public class TestVendedCredentialsProvider {
         VendedCredentialsProvider.create(
             ImmutableMap.of(
                 VendedCredentialsProvider.URI,
-                URI,
+                CREDENTIALS_URI,
                 S3FileIOProperties.ACCESS_KEY_ID,
                 "randomAccessKeyFromProperties",
                 S3FileIOProperties.SECRET_ACCESS_KEY,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolflex888 I don't see this being applied to the tests. Can you please update the tests so that we can get this merged?

public static final String CREDENTIALS_ENDPOINT = "credentials.endpoint";
private volatile HTTPClient client;
private final Map<String, String> properties;
private final CachedSupplier<AwsCredentials> credentialCache;
Expand All @@ -53,6 +54,8 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
private VendedCredentialsProvider(Map<String, String> properties) {
Preconditions.checkArgument(null != properties, "Invalid properties: null");
Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null");
Preconditions.checkArgument(
null != properties.get(CREDENTIALS_ENDPOINT), "Invalid endpoint: null");
this.properties = properties;
this.credentialCache =
CachedSupplier.builder(() -> credentialFromProperties().orElseGet(this::refreshCredential))
Expand Down Expand Up @@ -95,7 +98,7 @@ private RESTClient httpClient() {
private LoadCredentialsResponse fetchCredentials() {
return httpClient()
.get(
properties.get(URI),
properties.get(CREDENTIALS_ENDPOINT),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we need two properties.

But the problem, as I see it, is that this property, even with the changes in this PR, is currently an absolute URL that was computed in AwsClientProperties:

    this.refreshCredentialsEndpoint =
        RESTUtil.resolveEndpoint(
            properties.get(CatalogProperties.URI), properties.get(REFRESH_CREDENTIALS_ENDPOINT));

But I think this property must be a relative path instead.

    this.refreshCredentialsEndpoint = properties.get(REFRESH_CREDENTIALS_ENDPOINT);

In that case we would have the following properties:

Property Absolute? Example
credentials.catalog.uri Yes http://catalog.com/api/v1
credentials.endpoint No creds-refresh/whatever

The HTTP client then must be constructed with credentials.catalog.uri as the base URI for this to work with both internal and external IDPs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my other comment above. we don't need two properties as we can just pass the catalog URI via

      Optional.ofNullable(allProperties.get(allProperties.get(CatalogProperties.URI)))
          .ifPresent(
              catalogUri ->
                  clientCredentialsProviderProperties.put(CatalogProperties.URI, catalogUri));

The absolute/relative path handling is already done in

this.refreshCredentialsEndpoint =
RESTUtil.resolveEndpoint(
properties.get(CatalogProperties.URI), properties.get(REFRESH_CREDENTIALS_ENDPOINT));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but my point was: why bother absolutizing refreshCredentialsEndpoint in AwsClientProperties? The HTTP client could handle that internally.

null,
LoadCredentialsResponse.class,
Map.of(),
Expand Down
Loading