Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,49 @@ public void validate() {}
}
}

/**
* Vend credentials for a table if eligible. Unlike {@link #getTableCredentials}, this method
* respects the same vending gate ({@link #shouldGenerateCredential}) used by
* createTable/loadTable, so local/HDFS tables correctly return empty.
*
* <p>Returns credentials separately (not embedded in a response config map) because {@link
* PlanTableScanResponse} in Iceberg 1.10.1 has no config/credentials field.
*
* @param identifier the table identifier
* @param requestCredential whether the client requested credential vending
* @param privilege the credential privilege level (READ for scans)
* @return credentials response, empty if the table is not eligible for vending
*/
public LoadCredentialsResponse getCredentialsIfEligible(
TableIdentifier identifier, boolean requestCredential, CredentialPrivilege privilege) {
try {
LoadTableResponse loadTableResponse = super.loadTable(identifier);
if (!shouldGenerateCredential(loadTableResponse, requestCredential)) {
return ImmutableLoadCredentialsResponse.builder().build();
}
Credential credential = getCredential(loadTableResponse, privilege);
org.apache.iceberg.rest.credentials.Credential icebergCredential =
new org.apache.iceberg.rest.credentials.Credential() {
@Override
public String prefix() {
return "";
}

@Override
public Map<String, String> config() {
return CredentialPropertyUtils.toIcebergProperties(credential);
}

@Override
public void validate() {}
};
return ImmutableLoadCredentialsResponse.builder().addCredentials(icebergCredential).build();
} catch (ServiceUnavailableException e) {
LOG.warn("Failed to generate scan credentials for table: {}", identifier, e);
return ImmutableLoadCredentialsResponse.builder().build();
}
}

@Override
public void close() throws Exception {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -52,6 +54,8 @@
import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.credential.CredentialPrivilege;
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapper;
import org.apache.gravitino.iceberg.service.IcebergRESTUtils;
Expand All @@ -72,6 +76,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
Expand Down Expand Up @@ -99,15 +104,18 @@ public class IcebergTableOperations {

private ObjectMapper icebergObjectMapper;
private IcebergTableOperationDispatcher tableOperationDispatcher;
private IcebergCatalogWrapperManager icebergCatalogWrapperManager;

@Context private HttpServletRequest httpRequest;

@Inject
public IcebergTableOperations(
IcebergMetricsManager icebergMetricsManager,
IcebergTableOperationDispatcher tableOperationDispatcher) {
IcebergTableOperationDispatcher tableOperationDispatcher,
IcebergCatalogWrapperManager icebergCatalogWrapperManager) {
this.icebergMetricsManager = icebergMetricsManager;
this.tableOperationDispatcher = tableOperationDispatcher;
this.icebergCatalogWrapperManager = icebergCatalogWrapperManager;
this.icebergObjectMapper = IcebergObjectMapper.getInstance();
}

Expand Down Expand Up @@ -509,27 +517,50 @@ public Response planTableScan(
@Encoded() @PathParam("namespace") @AuthorizationMetadata(type = EntityType.SCHEMA)
String namespace,
@Encoded() @PathParam("table") @AuthorizationMetadata(type = EntityType.TABLE) String table,
PlanTableScanRequest scanRequest) {
PlanTableScanRequest scanRequest,
@HeaderParam(X_ICEBERG_ACCESS_DELEGATION) String accessDelegation) {
boolean isCredentialVending = isCredentialVending(accessDelegation);
String catalogName = IcebergRESTUtils.getCatalogName(prefix);
Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
String tableName = RESTUtil.decodeString(table);
LOG.info(
"Plan table scan, catalog: {}, namespace: {}, table: {}",
"Plan table scan, catalog: {}, namespace: {}, table: {}, "
+ "accessDelegation: {}, isCredentialVending: {}",
catalogName,
icebergNS,
tableName);
tableName,
accessDelegation,
isCredentialVending);

try {
return Utils.doAs(
httpRequest,
() -> {
TableIdentifier tableIdentifier = TableIdentifier.of(icebergNS, tableName);
IcebergRequestContext context =
new IcebergRequestContext(httpServletRequest(), catalogName);
new IcebergRequestContext(httpServletRequest(), catalogName, isCredentialVending);

PlanTableScanResponse scanResponse =
tableOperationDispatcher.planTableScan(context, tableIdentifier, scanRequest);

if (isCredentialVending) {
try {
LoadCredentialsResponse credentialsResponse =
icebergCatalogWrapperManager
.getCatalogWrapper(catalogName)
.getCredentialsIfEligible(tableIdentifier, true, CredentialPrivilege.READ);
if (!credentialsResponse.credentials().isEmpty()) {
return buildScanResponseWithCredentials(scanResponse, credentialsResponse);
}
} catch (Exception e) {
LOG.warn(
"Failed to vend credentials for scan on table {}, "
+ "returning scan response without credentials",
tableIdentifier,
e);
}
}

return IcebergRESTUtils.ok(scanResponse);
});
} catch (Exception e) {
Expand Down Expand Up @@ -623,6 +654,19 @@ private boolean isCredentialVending(String accessDelegation) {
}
}

private Response buildScanResponseWithCredentials(
PlanTableScanResponse scanResponse, LoadCredentialsResponse credentialsResponse) {
ObjectNode responseNode = icebergObjectMapper.valueToTree(scanResponse);
ArrayNode credArray = responseNode.putArray("storage-credentials");
for (Credential cred : credentialsResponse.credentials()) {
ObjectNode credNode = credArray.addObject();
credNode.put("prefix", cred.prefix());
ObjectNode configNode = credNode.putObject("config");
cred.config().forEach(configNode::put);
}
return Response.ok(responseNode, MediaType.APPLICATION_JSON).build();
}

private NameIdentifier[] toNameIdentifiers(
ListTablesResponse listTablesResponse, String metalake, String catalogName) {
List<TableIdentifier> identifiers = listTablesResponse.identifiers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;

Expand All @@ -29,8 +30,9 @@ public class MockIcebergTableOperations extends IcebergTableOperations {
@Inject
public MockIcebergTableOperations(
IcebergMetricsManager icebergMetricsManager,
IcebergTableOperationDispatcher tableOperationDispatcher) {
super(icebergMetricsManager, tableOperationDispatcher);
IcebergTableOperationDispatcher tableOperationDispatcher,
IcebergCatalogWrapperManager icebergCatalogWrapperManager) {
super(icebergMetricsManager, tableOperationDispatcher, icebergCatalogWrapperManager);
}

// HTTP request is null in Jersey test, create a mock request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1046,4 +1046,109 @@ void testLoadTableSnapshotsAllReturnsAllSnapshots(Namespace namespace) {
allTableResponse.tableMetadata().snapshots().size(),
"Default load and snapshots=all should return the same number of snapshots");
}

@ParameterizedTest
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
void testPlanTableScanWithCredentialVending(Namespace namespace) {
verifyCreateNamespaceSucc(namespace);
PlanTableScanRequest emptyRequest = PlanTableScanRequest.builder().build();

// scan without credential vending -- no storage-credentials in response
String tableName = "scan_cred_no_header";
verifyCreateTableSucc(namespace, tableName);
Response noCredResponse = doPlanTableScan(namespace, tableName, emptyRequest);
Assertions.assertEquals(Status.OK.getStatusCode(), noCredResponse.getStatus());
try {
JsonNode noCredJson = JsonUtil.mapper().readTree(noCredResponse.readEntity(String.class));
Assertions.assertFalse(
noCredJson.has("storage-credentials"),
"Response should not have storage-credentials without header");
} catch (Exception e) {
throw new RuntimeException(e);
}

// scan with credential vending on local table -- no storage-credentials
String localTableName = "scan_cred_local";
Response localCreateResponse =
doCreateTableWithCredentialVending(
namespace, localTableName, "file:///tmp/" + localTableName);
Assertions.assertEquals(Status.OK.getStatusCode(), localCreateResponse.getStatus());
Response localScanResponse =
doPlanTableScanWithCredentialVending(namespace, localTableName, emptyRequest);
Assertions.assertEquals(Status.OK.getStatusCode(), localScanResponse.getStatus());
try {
JsonNode localScanJson =
JsonUtil.mapper().readTree(localScanResponse.readEntity(String.class));
Assertions.assertFalse(
localScanJson.has("storage-credentials"),
"Local table should not have storage-credentials");
} catch (Exception e) {
throw new RuntimeException(e);
}

// scan with credential vending on S3 table -- should have storage-credentials
String s3TableName = "scan_cred_s3";
String s3Location = "s3://dummy-bucket/" + s3TableName;
Response s3CreateResponse =
doCreateTableWithCredentialVending(namespace, s3TableName, s3Location);
Assertions.assertEquals(Status.OK.getStatusCode(), s3CreateResponse.getStatus());
Response s3ScanResponse =
doPlanTableScanWithCredentialVending(namespace, s3TableName, emptyRequest);
Assertions.assertEquals(Status.OK.getStatusCode(), s3ScanResponse.getStatus());
try {
JsonNode s3ScanJson = JsonUtil.mapper().readTree(s3ScanResponse.readEntity(String.class));
Assertions.assertTrue(
s3ScanJson.has("storage-credentials"), "S3 table should have storage-credentials");
JsonNode credentials = s3ScanJson.get("storage-credentials");
Assertions.assertTrue(credentials.isArray() && credentials.size() > 0);
JsonNode firstCred = credentials.get(0);
Assertions.assertTrue(firstCred.has("config"));
Assertions.assertEquals(
DummyCredentialProvider.DUMMY_CREDENTIAL_TYPE,
firstCred.get("config").get(Credential.CREDENTIAL_TYPE).asText());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@ParameterizedTest
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
void testPlanTableScanRemoteSigningNotSupported(Namespace namespace) {
verifyCreateNamespaceSucc(namespace);
verifyCreateTableSucc(namespace, "scan_remote_signing");
PlanTableScanRequest request = PlanTableScanRequest.builder().build();
Response response =
getTableClientBuilder(namespace, Optional.of("scan_remote_signing/scan"))
.header(IcebergTableOperations.X_ICEBERG_ACCESS_DELEGATION, "remote-signing")
.post(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE));
Assertions.assertEquals(406, response.getStatus());
String errorBody = response.readEntity(String.class);
Assertions.assertTrue(
errorBody.contains("remote signing") || errorBody.contains("remote-signing"),
"Error message should mention remote signing: " + errorBody);
}

@ParameterizedTest
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
void testPlanTableScanInvalidAccessDelegation(Namespace namespace) {
verifyCreateNamespaceSucc(namespace);
verifyCreateTableSucc(namespace, "scan_invalid_delegation");
PlanTableScanRequest request = PlanTableScanRequest.builder().build();
Response response =
getTableClientBuilder(namespace, Optional.of("scan_invalid_delegation/scan"))
.header(IcebergTableOperations.X_ICEBERG_ACCESS_DELEGATION, "invalid-value")
.post(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE));
Assertions.assertEquals(400, response.getStatus());
String errorBody = response.readEntity(String.class);
Assertions.assertTrue(
errorBody.contains("vended-credentials") && errorBody.contains("illegal"),
"Error message should mention valid values: " + errorBody);
}

private Response doPlanTableScanWithCredentialVending(
Namespace ns, String tableName, PlanTableScanRequest request) {
return getTableClientBuilder(ns, Optional.of(tableName + "/scan"))
.header(IcebergTableOperations.X_ICEBERG_ACCESS_DELEGATION, "vended-credentials")
.post(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE));
}
}