diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java index e350cc5af8f4..e46e254dd33e 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.gcp.GCPProperties; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -41,6 +42,9 @@ public class OAuth2RefreshCredentialsHandler implements OAuth2CredentialsWithRefresh.OAuth2RefreshHandler, AutoCloseable { private final Map properties; + private final String credentialsEndpoint; + // will be used to refresh the OAuth2 token + private final String catalogEndpoint; private volatile HTTPClient client; private AuthManager authManager; private AuthSession authSession; @@ -49,6 +53,11 @@ private OAuth2RefreshCredentialsHandler(Map properties) { Preconditions.checkArgument( null != properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT), "Invalid credentials endpoint: null"); + Preconditions.checkArgument( + null != properties.get(CatalogProperties.URI), "Invalid catalog endpoint: null"); + this.credentialsEndpoint = + properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT); + this.catalogEndpoint = properties.get(CatalogProperties.URI); this.properties = properties; } @@ -58,7 +67,7 @@ public AccessToken refreshAccessToken() { LoadCredentialsResponse response = httpClient() .get( - properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT), + credentialsEndpoint, null, LoadCredentialsResponse.class, Map.of(), @@ -99,10 +108,7 @@ private RESTClient httpClient() { synchronized (this) { if (null == client) { authManager = AuthManagers.loadAuthManager("gcs-credentials-refresh", properties); - HTTPClient httpClient = - HTTPClient.builder(properties) - .uri(properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT)) - .build(); + HTTPClient httpClient = HTTPClient.builder(properties).uri(catalogEndpoint).build(); authSession = authManager.catalogSession(httpClient, properties); client = httpClient.withAuthSession(authSession); } diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java index 37fd5e65dcd7..627123abfa3c 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java @@ -44,6 +44,7 @@ import java.util.Random; import java.util.stream.StreamSupport; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.gcp.GCPProperties; @@ -238,6 +239,8 @@ public void refreshCredentialsEndpointSet() { try (GCSFileIO fileIO = new GCSFileIO()) { fileIO.initialize( ImmutableMap.of( + CatalogProperties.URI, + "http://catalog-endpoint", GCS_OAUTH2_TOKEN, "gcsToken", GCS_OAUTH2_TOKEN_EXPIRES_AT, diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java index c538745f2767..b1c8e6838c0d 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java @@ -25,6 +25,8 @@ import com.google.auth.oauth2.AccessToken; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Map; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.exceptions.BadRequestException; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.gcp.GCPProperties; @@ -45,7 +47,15 @@ public class OAuth2RefreshCredentialsHandlerTest { private static final int PORT = 3333; - private static final String URI = String.format("http://127.0.0.1:%d/v1/credentials", PORT); + private static final String CREDENTIALS_URI = + String.format("http://127.0.0.1:%d/v1/credentials", PORT); + private static final String CATALOG_URI = String.format("http://127.0.0.1:%d/v1/", PORT); + private static final Map PROPERTIES = + ImmutableMap.of( + GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, + CREDENTIALS_URI, + CatalogProperties.URI, + CATALOG_URI); private static ClientAndServer mockServer; @BeforeAll @@ -65,18 +75,33 @@ public void before() { @Test public void invalidOrMissingUri() { - assertThatThrownBy(() -> OAuth2RefreshCredentialsHandler.create(ImmutableMap.of())) + assertThatThrownBy( + () -> + OAuth2RefreshCredentialsHandler.create( + ImmutableMap.of(CatalogProperties.URI, CATALOG_URI))) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid credentials endpoint: null"); + assertThatThrownBy( + () -> + OAuth2RefreshCredentialsHandler.create( + ImmutableMap.of( + GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, CREDENTIALS_URI))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid catalog endpoint: null"); + assertThatThrownBy( () -> OAuth2RefreshCredentialsHandler.create( ImmutableMap.of( - GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, "invalid uri")) + GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, + "invalid uri", + CatalogProperties.URI, + CATALOG_URI)) .refreshAccessToken()) .isInstanceOf(RESTException.class) - .hasMessageStartingWith("Failed to create request URI from base invalid uri"); + .hasMessageStartingWith( + "Failed to create request URI from base %sinvalid uri", CATALOG_URI); } @Test @@ -87,9 +112,7 @@ public void badRequest() { HttpResponse mockResponse = HttpResponse.response().withStatusCode(400); mockServer.when(mockRequest).respond(mockResponse); - OAuth2RefreshCredentialsHandler handler = - OAuth2RefreshCredentialsHandler.create( - ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES); assertThatThrownBy(handler::refreshAccessToken) .isInstanceOf(BadRequestException.class) @@ -108,9 +131,7 @@ public void noGcsCredentialInResponse() { .withStatusCode(200); mockServer.when(mockRequest).respond(mockResponse); - OAuth2RefreshCredentialsHandler handler = - OAuth2RefreshCredentialsHandler.create( - ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES); assertThatThrownBy(handler::refreshAccessToken) .isInstanceOf(IllegalStateException.class) @@ -134,9 +155,7 @@ public void noGcsToken() { .withStatusCode(200); mockServer.when(mockRequest).respond(mockResponse); - OAuth2RefreshCredentialsHandler handler = - OAuth2RefreshCredentialsHandler.create( - ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES); assertThatThrownBy(handler::refreshAccessToken) .isInstanceOf(IllegalStateException.class) @@ -160,9 +179,7 @@ public void tokenWithoutExpiration() { .withStatusCode(200); mockServer.when(mockRequest).respond(mockResponse); - OAuth2RefreshCredentialsHandler handler = - OAuth2RefreshCredentialsHandler.create( - ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES); assertThatThrownBy(handler::refreshAccessToken) .isInstanceOf(IllegalStateException.class) @@ -191,9 +208,7 @@ public void tokenWithExpiration() { .withStatusCode(200); mockServer.when(mockRequest).respond(mockResponse); - OAuth2RefreshCredentialsHandler handler = - OAuth2RefreshCredentialsHandler.create( - ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES); AccessToken accessToken = handler.refreshAccessToken(); assertThat(accessToken.getTokenValue()) @@ -253,9 +268,7 @@ public void multipleGcsCredentials() { .withStatusCode(200); mockServer.when(mockRequest).respond(mockResponse); - OAuth2RefreshCredentialsHandler handler = - OAuth2RefreshCredentialsHandler.create( - ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + OAuth2RefreshCredentialsHandler handler = OAuth2RefreshCredentialsHandler.create(PROPERTIES); assertThatThrownBy(handler::refreshAccessToken) .isInstanceOf(IllegalStateException.class)