Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS: Fix Catalog URI within VendedCredentialsProvider #12612

Merged
merged 19 commits into from
Apr 1, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@

import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.s3.VendedCredentialsProvider;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SerializableMap;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
Expand Down Expand Up @@ -198,12 +196,9 @@ public <T extends S3CrtAsyncClientBuilder> void applyClientCredentialConfigurati
public AwsCredentialsProvider credentialsProvider(
String accessKeyId, String secretAccessKey, String sessionToken) {
if (refreshCredentialsEnabled && !Strings.isNullOrEmpty(refreshCredentialsEndpoint)) {
clientCredentialsProviderProperties.putAll(allProperties);
clientCredentialsProviderProperties.put(
VendedCredentialsProvider.URI, refreshCredentialsEndpoint);
Optional.ofNullable(allProperties.get(OAuth2Properties.TOKEN))
.ifPresent(
token ->
clientCredentialsProviderProperties.putIfAbsent(OAuth2Properties.TOKEN, token));
return credentialsProvider(VendedCredentialsProvider.class.getName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
import org.apache.iceberg.rest.auth.AuthSession;
Expand All @@ -47,6 +49,8 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
private volatile HTTPClient client;
private final Map<String, String> properties;
private final CachedSupplier<AwsCredentials> credentialCache;
private final String catalogEndpoint;
private final String credentialsEndpoint;
private AuthManager authManager;
private AuthSession authSession;

Expand All @@ -58,6 +62,8 @@ private VendedCredentialsProvider(Map<String, String> properties) {
CachedSupplier.builder(() -> credentialFromProperties().orElseGet(this::refreshCredential))
.cachedValueName(VendedCredentialsProvider.class.getName())
.build();
this.catalogEndpoint = properties.get(CatalogProperties.URI);
Copy link
Contributor

@nastra nastra Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please update Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null"); to Preconditions.checkArgument(null != properties.get(URI), "Invalid credentials endpoint: null"); and also add Preconditions.checkArgument(null != properties.get(CatalogProperties.URI), "Invalid catalog endpoint: null");.

Please also update invalidOrMissingUri() and add a check where the catalog URI isn't provided

this.credentialsEndpoint = RESTUtil.resolveEndpoint(catalogEndpoint, properties.get(URI));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already being done in AwsClientProperties, so it should only do this.credentialsEndpoint = properties.get(URI)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is corrected.

}

@Override
Expand All @@ -82,7 +88,7 @@ private RESTClient httpClient() {
synchronized (this) {
if (null == client) {
authManager = AuthManagers.loadAuthManager("s3-credentials-refresh", properties);
HTTPClient httpClient = HTTPClient.builder(properties).uri(properties.get(URI)).build();
HTTPClient httpClient = HTTPClient.builder(properties).uri(catalogEndpoint).build();
authSession = authManager.catalogSession(httpClient, properties);
client = httpClient.withAuthSession(authSession);
}
Expand All @@ -95,7 +101,7 @@ private RESTClient httpClient() {
private LoadCredentialsResponse fetchCredentials() {
return httpClient()
.get(
properties.get(URI),
credentialsEndpoint,
null,
LoadCredentialsResponse.class,
Map.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.HttpMethod;
Expand All @@ -48,7 +49,12 @@
public class TestVendedCredentialsProvider {

private static final int PORT = 3232;
private static final String URI = String.format("http://127.0.0.1:%d/v1/credentials", PORT);
private static final String CREDENTIALS_URI =
String.format("http://127.0.0.1:%d/v1/credentials", PORT);
Copy link
Contributor

@danielcweeks danielcweeks Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically you can point to any URI, but I think we need to include a test pointing to a path consistent with the spec (e.g. /v1/{prefix}/namespaces/{namespace}/tables/{table}/credentials) and we should test both absolute and relative (which I believe this change will address as well.)

private static final String CATALOG_URI = String.format("http://127.0.0.1:%d/v1", PORT);
private static final ImmutableMap<String, String> PROPERTIES =
ImmutableMap.of(
VendedCredentialsProvider.URI, CREDENTIALS_URI, CatalogProperties.URI, CATALOG_URI);
private static ClientAndServer mockServer;

@BeforeAll
Expand All @@ -73,7 +79,7 @@ public void invalidOrMissingUri() {
.hasMessage("Invalid properties: null");
assertThatThrownBy(() -> VendedCredentialsProvider.create(ImmutableMap.of()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid URI: null");
.hasMessage("Invalid credentials URI: null");

try (VendedCredentialsProvider provider =
VendedCredentialsProvider.create(
Expand All @@ -95,8 +101,7 @@ public void noS3Credentials() {
.withStatusCode(200);
mockServer.when(mockRequest).respond(mockResponse);

try (VendedCredentialsProvider provider =
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
assertThatThrownBy(provider::resolveCredentials)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Invalid S3 Credentials: empty");
Expand Down Expand Up @@ -124,8 +129,7 @@ public void accessKeyIdAndSecretAccessKeyWithoutToken() {
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
mockServer.when(mockRequest).respond(mockResponse);

try (VendedCredentialsProvider provider =
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
assertThatThrownBy(provider::resolveCredentials)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Invalid S3 Credentials: s3.session-token not set");
Expand Down Expand Up @@ -155,8 +159,7 @@ public void expirationNotSet() {
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
mockServer.when(mockRequest).respond(mockResponse);

try (VendedCredentialsProvider provider =
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
assertThatThrownBy(provider::resolveCredentials)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Invalid S3 Credentials: s3.session-token-expires-at-ms not set");
Expand Down Expand Up @@ -187,8 +190,7 @@ public void nonExpiredToken() {
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
mockServer.when(mockRequest).respond(mockResponse);

try (VendedCredentialsProvider provider =
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
AwsCredentials awsCredentials = provider.resolveCredentials();

verifyCredentials(awsCredentials, credential);
Expand Down Expand Up @@ -226,8 +228,7 @@ public void expiredToken() {
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
mockServer.when(mockRequest).respond(mockResponse);

try (VendedCredentialsProvider provider =
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
AwsCredentials awsCredentials = provider.resolveCredentials();
verifyCredentials(awsCredentials, credential);

Expand Down Expand Up @@ -294,8 +295,7 @@ public void multipleS3Credentials() {
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
mockServer.when(mockRequest).respond(mockResponse);

try (VendedCredentialsProvider provider =
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) {
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) {
assertThatThrownBy(provider::resolveCredentials)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Invalid S3 Credentials: only one S3 credential should exist");
Expand Down Expand Up @@ -346,7 +346,7 @@ public void nonExpiredTokenInProperties() {
VendedCredentialsProvider.create(
ImmutableMap.of(
VendedCredentialsProvider.URI,
URI,
CREDENTIALS_URI,
S3FileIOProperties.ACCESS_KEY_ID,
"randomAccessKeyFromProperties",
S3FileIOProperties.SECRET_ACCESS_KEY,
Expand Down Expand Up @@ -398,7 +398,7 @@ public void expiredTokenInProperties() {
VendedCredentialsProvider.create(
ImmutableMap.of(
VendedCredentialsProvider.URI,
URI,
CREDENTIALS_URI,
S3FileIOProperties.ACCESS_KEY_ID,
"randomAccessKeyFromProperties",
S3FileIOProperties.SECRET_ACCESS_KEY,
Expand Down Expand Up @@ -451,7 +451,7 @@ public void invalidTokenInProperties() {
VendedCredentialsProvider.create(
ImmutableMap.of(
VendedCredentialsProvider.URI,
URI,
CREDENTIALS_URI,
S3FileIOProperties.ACCESS_KEY_ID,
"randomAccessKeyFromProperties",
S3FileIOProperties.SECRET_ACCESS_KEY,
Expand Down
Loading