-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWS: Fix Catalog URI within VendedCredentialsProvider #12612
Changes from 8 commits
b240283
589d1a8
065c331
240e607
63e8c63
7e1fece
050fd91
6a86de7
5fa5ea9
2b72dd0
b9e2fb5
4deddbd
9b16f2d
e6be4c4
f0804eb
ee6929b
28f218b
1ddd6f2
ab59591
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,11 +24,13 @@ | |
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
import org.apache.iceberg.CatalogProperties; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.relocated.com.google.common.base.Strings; | ||
import org.apache.iceberg.rest.ErrorHandlers; | ||
import org.apache.iceberg.rest.HTTPClient; | ||
import org.apache.iceberg.rest.RESTClient; | ||
import org.apache.iceberg.rest.RESTUtil; | ||
import org.apache.iceberg.rest.auth.AuthManager; | ||
import org.apache.iceberg.rest.auth.AuthManagers; | ||
import org.apache.iceberg.rest.auth.AuthSession; | ||
|
@@ -47,17 +49,23 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut | |
private volatile HTTPClient client; | ||
private final Map<String, String> properties; | ||
private final CachedSupplier<AwsCredentials> credentialCache; | ||
private final String catalogEndpoint; | ||
private final String credentialsEndpoint; | ||
private AuthManager authManager; | ||
private AuthSession authSession; | ||
|
||
private VendedCredentialsProvider(Map<String, String> properties) { | ||
Preconditions.checkArgument(null != properties, "Invalid properties: null"); | ||
Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null"); | ||
Preconditions.checkArgument(null != properties.get(URI), "Invalid credentials endpoint: null"); | ||
Preconditions.checkArgument( | ||
null != properties.get(CatalogProperties.URI), "Invalid catalog endpoint: null"); | ||
this.properties = properties; | ||
this.credentialCache = | ||
CachedSupplier.builder(() -> credentialFromProperties().orElseGet(this::refreshCredential)) | ||
.cachedValueName(VendedCredentialsProvider.class.getName()) | ||
.build(); | ||
this.catalogEndpoint = properties.get(CatalogProperties.URI); | ||
this.credentialsEndpoint = RESTUtil.resolveEndpoint(catalogEndpoint, properties.get(URI)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is already being done in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is corrected. |
||
} | ||
|
||
@Override | ||
|
@@ -82,7 +90,7 @@ private RESTClient httpClient() { | |
synchronized (this) { | ||
if (null == client) { | ||
authManager = AuthManagers.loadAuthManager("s3-credentials-refresh", properties); | ||
HTTPClient httpClient = HTTPClient.builder(properties).uri(properties.get(URI)).build(); | ||
HTTPClient httpClient = HTTPClient.builder(properties).uri(catalogEndpoint).build(); | ||
authSession = authManager.catalogSession(httpClient, properties); | ||
client = httpClient.withAuthSession(authSession); | ||
} | ||
|
@@ -95,7 +103,7 @@ private RESTClient httpClient() { | |
private LoadCredentialsResponse fetchCredentials() { | ||
return httpClient() | ||
.get( | ||
properties.get(URI), | ||
credentialsEndpoint, | ||
null, | ||
LoadCredentialsResponse.class, | ||
Map.of(), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
|
||
import java.time.Instant; | ||
import java.time.temporal.ChronoUnit; | ||
import org.apache.iceberg.CatalogProperties; | ||
nastra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
import org.apache.iceberg.exceptions.RESTException; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
import org.apache.iceberg.rest.HttpMethod; | ||
|
@@ -48,7 +49,12 @@ | |
public class TestVendedCredentialsProvider { | ||
|
||
private static final int PORT = 3232; | ||
private static final String URI = String.format("http://127.0.0.1:%d/v1/credentials", PORT); | ||
private static final String CREDENTIALS_URI = | ||
String.format("http://127.0.0.1:%d/v1/credentials", PORT); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically you can point to any URI, but I think we need to include a test pointing to a path consistent with the spec (e.g. |
||
private static final String CATALOG_URI = String.format("http://127.0.0.1:%d/v1", PORT); | ||
private static final ImmutableMap<String, String> PROPERTIES = | ||
ImmutableMap.of( | ||
VendedCredentialsProvider.URI, CREDENTIALS_URI, CatalogProperties.URI, CATALOG_URI); | ||
private static ClientAndServer mockServer; | ||
|
||
@BeforeAll | ||
|
@@ -73,7 +79,13 @@ public void invalidOrMissingUri() { | |
.hasMessage("Invalid properties: null"); | ||
assertThatThrownBy(() -> VendedCredentialsProvider.create(ImmutableMap.of())) | ||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessage("Invalid URI: null"); | ||
.hasMessage("Invalid credentials URI: null"); | ||
assertThatThrownBy( | ||
() -> | ||
VendedCredentialsProvider.create( | ||
ImmutableMap.of("credentials.uri", "/credentials/uri"))) | ||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessage("Invalid catalog endpoint: null"); | ||
|
||
try (VendedCredentialsProvider provider = | ||
VendedCredentialsProvider.create( | ||
|
@@ -95,8 +107,7 @@ public void noS3Credentials() { | |
.withStatusCode(200); | ||
mockServer.when(mockRequest).respond(mockResponse); | ||
|
||
try (VendedCredentialsProvider provider = | ||
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { | ||
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) { | ||
assertThatThrownBy(provider::resolveCredentials) | ||
.isInstanceOf(IllegalStateException.class) | ||
.hasMessage("Invalid S3 Credentials: empty"); | ||
|
@@ -124,8 +135,7 @@ public void accessKeyIdAndSecretAccessKeyWithoutToken() { | |
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); | ||
mockServer.when(mockRequest).respond(mockResponse); | ||
|
||
try (VendedCredentialsProvider provider = | ||
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { | ||
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) { | ||
assertThatThrownBy(provider::resolveCredentials) | ||
.isInstanceOf(IllegalStateException.class) | ||
.hasMessage("Invalid S3 Credentials: s3.session-token not set"); | ||
|
@@ -155,8 +165,7 @@ public void expirationNotSet() { | |
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); | ||
mockServer.when(mockRequest).respond(mockResponse); | ||
|
||
try (VendedCredentialsProvider provider = | ||
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { | ||
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) { | ||
assertThatThrownBy(provider::resolveCredentials) | ||
.isInstanceOf(IllegalStateException.class) | ||
.hasMessage("Invalid S3 Credentials: s3.session-token-expires-at-ms not set"); | ||
|
@@ -187,8 +196,7 @@ public void nonExpiredToken() { | |
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); | ||
mockServer.when(mockRequest).respond(mockResponse); | ||
|
||
try (VendedCredentialsProvider provider = | ||
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { | ||
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) { | ||
AwsCredentials awsCredentials = provider.resolveCredentials(); | ||
|
||
verifyCredentials(awsCredentials, credential); | ||
|
@@ -226,8 +234,7 @@ public void expiredToken() { | |
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); | ||
mockServer.when(mockRequest).respond(mockResponse); | ||
|
||
try (VendedCredentialsProvider provider = | ||
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { | ||
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) { | ||
AwsCredentials awsCredentials = provider.resolveCredentials(); | ||
verifyCredentials(awsCredentials, credential); | ||
|
||
|
@@ -294,8 +301,7 @@ public void multipleS3Credentials() { | |
response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); | ||
mockServer.when(mockRequest).respond(mockResponse); | ||
|
||
try (VendedCredentialsProvider provider = | ||
VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { | ||
try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(PROPERTIES)) { | ||
assertThatThrownBy(provider::resolveCredentials) | ||
.isInstanceOf(IllegalStateException.class) | ||
.hasMessage("Invalid S3 Credentials: only one S3 credential should exist"); | ||
|
@@ -346,7 +352,7 @@ public void nonExpiredTokenInProperties() { | |
VendedCredentialsProvider.create( | ||
ImmutableMap.of( | ||
VendedCredentialsProvider.URI, | ||
URI, | ||
CREDENTIALS_URI, | ||
S3FileIOProperties.ACCESS_KEY_ID, | ||
"randomAccessKeyFromProperties", | ||
S3FileIOProperties.SECRET_ACCESS_KEY, | ||
|
@@ -398,7 +404,7 @@ public void expiredTokenInProperties() { | |
VendedCredentialsProvider.create( | ||
ImmutableMap.of( | ||
VendedCredentialsProvider.URI, | ||
URI, | ||
CREDENTIALS_URI, | ||
S3FileIOProperties.ACCESS_KEY_ID, | ||
"randomAccessKeyFromProperties", | ||
S3FileIOProperties.SECRET_ACCESS_KEY, | ||
|
@@ -451,7 +457,7 @@ public void invalidTokenInProperties() { | |
VendedCredentialsProvider.create( | ||
ImmutableMap.of( | ||
VendedCredentialsProvider.URI, | ||
URI, | ||
CREDENTIALS_URI, | ||
S3FileIOProperties.ACCESS_KEY_ID, | ||
"randomAccessKeyFromProperties", | ||
S3FileIOProperties.SECRET_ACCESS_KEY, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please update
Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null");
toPreconditions.checkArgument(null != properties.get(URI), "Invalid credentials endpoint: null");
and also addPreconditions.checkArgument(null != properties.get(CatalogProperties.URI), "Invalid catalog endpoint: null");
.Please also update
invalidOrMissingUri()
and add a check where the catalog URI isn't provided