diff --git a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java index 22a2175f7bf..289b843530c 100644 --- a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java +++ b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueCatalogOperations.java @@ -18,32 +18,123 @@ */ package org.apache.gravitino.catalog.glue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.SchemaChange; import org.apache.gravitino.connector.CatalogInfo; import org.apache.gravitino.connector.CatalogOperations; import org.apache.gravitino.connector.HasPropertyMetadata; +import org.apache.gravitino.connector.SupportsSchemas; +import org.apache.gravitino.exceptions.ConnectionFailedException; +import org.apache.gravitino.exceptions.NoSuchCatalogException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.exceptions.NonEmptySchemaException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.sorts.SortDirection; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.types.Type; +import org.apache.gravitino.utils.PrincipalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.DatabaseInput; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetDatabasesRequest; +import software.amazon.awssdk.services.glue.model.GetDatabasesResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.Order; +import software.amazon.awssdk.services.glue.model.SerDeInfo; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; /** * Operations implementation for the AWS Glue Data Catalog connector. * - *

This is a scaffold stub. Full implementation will be added in PR-02 through PR-06. - * - *

TODO PR-04: implement SupportsSchemas (Schema CRUD + exception mapping) - * - *

TODO PR-05: implement TableCatalog (Table CRUD + type detection) + *

Implements schema CRUD (via {@link SupportsSchemas}) and table CRUD (via {@link TableCatalog}) + * backed by the AWS Glue API. */ -public class GlueCatalogOperations implements CatalogOperations { +public class GlueCatalogOperations implements CatalogOperations, SupportsSchemas, TableCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalogOperations.class); + + /** Table property keys that map to StorageDescriptor fields, not to Table.parameters(). */ + private static final Set SD_TABLE_PROPERTY_KEYS = + ImmutableSet.of( + GlueConstants.LOCATION, + GlueConstants.INPUT_FORMAT, + GlueConstants.OUTPUT_FORMAT, + GlueConstants.SERDE_LIB, + GlueConstants.SERDE_NAME); + + /** Property keys that map to top-level TableInput fields, not to Table.parameters(). */ + private static final Set TABLE_LEVEL_KEYS = ImmutableSet.of(GlueConstants.TABLE_TYPE); + + @VisibleForTesting GlueClient glueClient; + + /** Nullable — when null, Glue uses the caller's AWS account ID. */ + @VisibleForTesting String catalogId; - // TODO PR-04: add GlueClient field and catalogId + /** Nullable — when null all table formats are exposed. */ + @VisibleForTesting Set tableFormatFilter; + + private final GlueTypeConverter typeConverter = new GlueTypeConverter(); @Override public void initialize( Map config, CatalogInfo info, HasPropertyMetadata propertiesMetadata) throws RuntimeException { - // TODO PR-04: build GlueClient via GlueClientProvider and store catalogId + this.glueClient = GlueClientProvider.buildClient(config); + this.catalogId = config.get(GlueConstants.AWS_GLUE_CATALOG_ID); + String filterProp = + config.getOrDefault( + GlueConstants.TABLE_FORMAT_FILTER, GlueConstants.DEFAULT_TABLE_FORMAT_FILTER); + if (!GlueConstants.DEFAULT_TABLE_FORMAT_FILTER.equalsIgnoreCase(filterProp)) { + tableFormatFilter = + Arrays.stream(filterProp.split(",")) + .map(String::trim) + .map(s -> s.toLowerCase(Locale.ROOT)) + .collect(Collectors.toSet()); + } } @Override @@ -54,11 +145,560 @@ public void testConnection( String comment, Map properties) throws Exception { - // TODO PR-04: call GlueClient.getDatabases() to verify connectivity + try { + GetDatabasesRequest.Builder req = GetDatabasesRequest.builder().maxResults(1); + applyCatalogId(req::catalogId); + glueClient.getDatabases(req.build()); + } catch (GlueException e) { + throw new ConnectionFailedException(e, "Failed to connect to AWS Glue: %s", e.getMessage()); + } } @Override public void close() throws IOException { - // TODO PR-04: close GlueClient + if (glueClient != null) { + glueClient.close(); + glueClient = null; + } + } + + @Override + public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { + List result = new ArrayList<>(); + String nextToken = null; + try { + do { + GetDatabasesRequest.Builder req = GetDatabasesRequest.builder(); + applyCatalogId(req::catalogId); + if (nextToken != null) req.nextToken(nextToken); + GetDatabasesResponse resp = glueClient.getDatabases(req.build()); + resp.databaseList().stream() + .map(db -> NameIdentifier.of(namespace, db.name())) + .forEach(result::add); + nextToken = resp.nextToken(); + } while (nextToken != null); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "listing schemas under " + namespace); + } + return result.toArray(new NameIdentifier[0]); + } + + @Override + public GlueSchema createSchema( + NameIdentifier ident, String comment, Map properties) + throws NoSuchCatalogException, SchemaAlreadyExistsException { + + Map params = properties != null ? properties : Collections.emptyMap(); + + DatabaseInput input = + DatabaseInput.builder().name(ident.name()).description(comment).parameters(params).build(); + + CreateDatabaseRequest.Builder req = CreateDatabaseRequest.builder().databaseInput(input); + applyCatalogId(req::catalogId); + + try { + glueClient.createDatabase(req.build()); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + + LOG.info("Created Glue schema (database) {}", ident.name()); + + return GlueSchema.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(params) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentUserName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + } + + @Override + public GlueSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { + GetDatabaseRequest.Builder req = GetDatabaseRequest.builder().name(ident.name()); + applyCatalogId(req::catalogId); + try { + GlueSchema schema = + GlueSchema.fromGlueDatabase(glueClient.getDatabase(req.build()).database()); + LOG.info("Loaded Glue schema (database) {}", ident.name()); + return schema; + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + } + + @Override + public GlueSchema alterSchema(NameIdentifier ident, SchemaChange... changes) + throws NoSuchSchemaException { + + GlueSchema current = loadSchema(ident); + + Map newProps = new HashMap<>(current.properties()); + + for (SchemaChange change : changes) { + if (change instanceof SchemaChange.SetProperty) { + SchemaChange.SetProperty sp = (SchemaChange.SetProperty) change; + newProps.put(sp.getProperty(), sp.getValue()); + } else if (change instanceof SchemaChange.RemoveProperty) { + newProps.remove(((SchemaChange.RemoveProperty) change).getProperty()); + } else { + throw new IllegalArgumentException( + "Unsupported schema change: " + change.getClass().getSimpleName()); + } + } + + DatabaseInput input = + DatabaseInput.builder() + .name(ident.name()) + .description(current.comment()) + .parameters(newProps) + .build(); + + UpdateDatabaseRequest.Builder req = + UpdateDatabaseRequest.builder().name(ident.name()).databaseInput(input); + applyCatalogId(req::catalogId); + + try { + glueClient.updateDatabase(req.build()); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + + LOG.info("Altered Glue schema (database) {}", ident.name()); + + return GlueSchema.builder() + .withName(ident.name()) + .withComment(current.comment()) + .withProperties(newProps) + .withAuditInfo(current.auditInfo()) + .build(); + } + + @Override + public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { + if (!cascade) { + GetTablesRequest.Builder tabReq = + GetTablesRequest.builder().databaseName(ident.name()).maxResults(1); + applyCatalogId(tabReq::catalogId); + try { + if (!glueClient.getTables(tabReq.build()).tableList().isEmpty()) { + throw new NonEmptySchemaException( + "Schema %s is not empty. Use cascade=true to drop it with its tables.", ident.name()); + } + } catch (NonEmptySchemaException e) { + throw e; + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException( + e, "checking tables in schema " + ident.name()); + } + } + + DeleteDatabaseRequest.Builder req = DeleteDatabaseRequest.builder().name(ident.name()); + applyCatalogId(req::catalogId); + try { + glueClient.deleteDatabase(req.build()); + LOG.info("Dropped Glue schema (database) {}", ident.name()); + return true; + } catch (EntityNotFoundException e) { + return false; + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "schema " + ident.name()); + } + } + + @Override + public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { + String dbName = schemaName(namespace); + List result = new ArrayList<>(); + String nextToken = null; + try { + do { + GetTablesRequest.Builder req = GetTablesRequest.builder().databaseName(dbName); + applyCatalogId(req::catalogId); + if (nextToken != null) req.nextToken(nextToken); + GetTablesResponse resp = glueClient.getTables(req.build()); + resp.tableList().stream() + .filter(this::matchesFormatFilter) + .map(t -> NameIdentifier.of(namespace, t.name())) + .forEach(result::add); + nextToken = resp.nextToken(); + } while (nextToken != null); + } catch (EntityNotFoundException e) { + throw new NoSuchSchemaException(e, "Schema %s does not exist", dbName); + } catch (GlueException e) { + throw GlueExceptionConverter.toSchemaException(e, "listing tables in schema " + dbName); + } + return result.toArray(new NameIdentifier[0]); + } + + @Override + public GlueTable loadTable(NameIdentifier ident) throws NoSuchTableException { + String dbName = schemaName(ident.namespace()); + GetTableRequest.Builder req = GetTableRequest.builder().databaseName(dbName).name(ident.name()); + applyCatalogId(req::catalogId); + try { + GlueTable table = + GlueTable.fromGlueTable(glueClient.getTable(req.build()).table(), typeConverter); + table.initOpsContext(glueClient, catalogId, dbName); + LOG.info("Loaded Glue table {}.{}", dbName, ident.name()); + return table; + } catch (GlueException e) { + throw GlueExceptionConverter.toTableException(e, "table " + ident.name()); + } + } + + @Override + public GlueTable createTable( + NameIdentifier ident, + Column[] columns, + String comment, + Map properties, + Transform[] partitions, + Distribution distribution, + SortOrder[] sortOrders, + Index[] indexes) + throws NoSuchSchemaException, TableAlreadyExistsException { + + Preconditions.checkArgument(indexes.length == 0, "Glue catalog does not support indexes"); + + for (Transform t : partitions) { + Preconditions.checkArgument( + t instanceof Transforms.IdentityTransform, + "Glue catalog only supports identity partitioning, got: %s", + t.name()); + Preconditions.checkArgument( + ((Transforms.IdentityTransform) t).fieldName().length == 1, + "Glue catalog does not support nested field partitioning"); + } + + String dbName = schemaName(ident.namespace()); + Map props = properties != null ? properties : Collections.emptyMap(); + + TableInput input = + buildTableInput( + ident.name(), comment, columns, props, partitions, distribution, sortOrders); + + CreateTableRequest.Builder req = + CreateTableRequest.builder().databaseName(dbName).tableInput(input); + applyCatalogId(req::catalogId); + + try { + glueClient.createTable(req.build()); + } catch (GlueException e) { + throw GlueExceptionConverter.toTableException(e, "table " + ident.name()); + } + + LOG.info("Created Glue table {}.{}", dbName, ident.name()); + + GlueTable created = + GlueTable.builder() + .withName(ident.name()) + .withComment(comment) + .withColumns(columns) + .withProperties(props) + .withPartitioning(partitions) + .withDistribution(distribution != null ? distribution : Distributions.NONE) + .withSortOrders(sortOrders != null ? sortOrders : new SortOrder[0]) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentUserName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + created.initOpsContext(glueClient, catalogId, dbName); + return created; + } + + @Override + public GlueTable alterTable(NameIdentifier ident, TableChange... changes) + throws NoSuchTableException, IllegalArgumentException { + + GlueTable current = loadTable(ident); + String dbName = schemaName(ident.namespace()); + + String newName = current.name(); + String newComment = current.comment(); + Map newProps = new HashMap<>(current.properties()); + + // Separate data columns from partition columns + int partCount = current.partitioning().length; + List allCols = new ArrayList<>(Arrays.asList(current.columns())); + List dataCols = new ArrayList<>(allCols.subList(0, allCols.size() - partCount)); + List partCols = + new ArrayList<>(allCols.subList(allCols.size() - partCount, allCols.size())); + + for (TableChange change : changes) { + if (change instanceof TableChange.RenameTable) { + newName = ((TableChange.RenameTable) change).getNewName(); + } else if (change instanceof TableChange.UpdateComment) { + newComment = ((TableChange.UpdateComment) change).getNewComment(); + } else if (change instanceof TableChange.SetProperty) { + TableChange.SetProperty sp = (TableChange.SetProperty) change; + newProps.put(sp.getProperty(), sp.getValue()); + } else if (change instanceof TableChange.RemoveProperty) { + newProps.remove(((TableChange.RemoveProperty) change).getProperty()); + } else if (change instanceof TableChange.ColumnChange) { + applyColumnChange(dataCols, partCols, (TableChange.ColumnChange) change); + } else { + throw new IllegalArgumentException( + "Unsupported table change: " + change.getClass().getSimpleName()); + } + } + + List newAllCols = new ArrayList<>(dataCols); + newAllCols.addAll(partCols); + Column[] newColumns = newAllCols.toArray(new Column[0]); + + TableInput input = + buildTableInput( + newName, + newComment, + newColumns, + newProps, + current.partitioning(), + current.distribution(), + current.sortOrder()); + + UpdateTableRequest.Builder req = + UpdateTableRequest.builder().databaseName(dbName).tableInput(input); + applyCatalogId(req::catalogId); + + try { + glueClient.updateTable(req.build()); + } catch (GlueException e) { + throw GlueExceptionConverter.toTableException(e, "table " + ident.name()); + } + + LOG.info("Altered Glue table {}.{}", dbName, ident.name()); + + GlueTable altered = + GlueTable.builder() + .withName(newName) + .withComment(newComment) + .withColumns(newColumns) + .withProperties(newProps) + .withPartitioning(current.partitioning()) + .withDistribution(current.distribution()) + .withSortOrders(current.sortOrder()) + .withAuditInfo(current.auditInfo()) + .build(); + altered.initOpsContext(glueClient, catalogId, dbName); + return altered; + } + + @Override + public boolean dropTable(NameIdentifier ident) { + String dbName = schemaName(ident.namespace()); + DeleteTableRequest.Builder req = + DeleteTableRequest.builder().databaseName(dbName).name(ident.name()); + applyCatalogId(req::catalogId); + try { + glueClient.deleteTable(req.build()); + LOG.info("Dropped Glue table {}.{}", dbName, ident.name()); + return true; + } catch (EntityNotFoundException e) { + return false; + } catch (GlueException e) { + throw GlueExceptionConverter.toTableException(e, "table " + ident.name()); + } + } + + private static String schemaName(Namespace namespace) { + String[] levels = namespace.levels(); + Preconditions.checkArgument( + levels.length >= 2, "Namespace must have at least 2 levels, got: %s", levels.length); + return levels[levels.length - 1]; + } + + // NOTE: parameter type is the Glue SDK Table, not GlueTable (our domain class). + // The Glue SDK's Column model is also referenced by FQN throughout this class because its + // simple name conflicts with the imported org.apache.gravitino.rel.Column. + private boolean matchesFormatFilter(Table table) { + if (tableFormatFilter == null) return true; + String fmt = table.hasParameters() ? table.parameters().get(GlueConstants.TABLE_FORMAT) : null; + String normalized = + fmt != null ? fmt.toLowerCase(Locale.ROOT) : GlueConstants.DEFAULT_TABLE_FORMAT_VALUE; + return tableFormatFilter.contains(normalized); + } + + private TableInput buildTableInput( + String name, + String comment, + Column[] columns, + Map properties, + Transform[] partitions, + Distribution distribution, + SortOrder[] sortOrders) { + + int partCount = partitions.length; + Preconditions.checkArgument( + columns.length >= partCount, + "columns.length (%s) must be >= number of partition columns (%s)", + columns.length, + partCount); + int dataCount = columns.length - partCount; + + List glueDataCols = new ArrayList<>(); + for (int i = 0; i < dataCount; i++) { + glueDataCols.add(toGlueColumn(columns[i])); + } + + List gluePartCols = new ArrayList<>(); + for (int i = dataCount; i < columns.length; i++) { + gluePartCols.add(toGlueColumn(columns[i])); + } + + // Separate properties into: SD fields, table-level fields, and Table.parameters() + Map serdeParams = new HashMap<>(); + Map tableParams = new HashMap<>(); + for (Map.Entry entry : properties.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(GlueConstants.SERDE_PARAMETER_PREFIX)) { + serdeParams.put( + key.substring(GlueConstants.SERDE_PARAMETER_PREFIX.length()), entry.getValue()); + } else if (!SD_TABLE_PROPERTY_KEYS.contains(key) && !TABLE_LEVEL_KEYS.contains(key)) { + tableParams.put(key, entry.getValue()); + } + } + + SerDeInfo serDe = + SerDeInfo.builder() + .serializationLibrary(properties.get(GlueConstants.SERDE_LIB)) + .name(properties.get(GlueConstants.SERDE_NAME)) + .parameters(serdeParams) + .build(); + + List bucketCols = Collections.emptyList(); + int numBuckets = 0; + if (distribution != null && distribution != Distributions.NONE && distribution.number() > 0) { + numBuckets = distribution.number(); + bucketCols = + Arrays.stream(distribution.expressions()) + .filter(e -> e instanceof NamedReference.FieldReference) + .map(e -> String.join(".", ((NamedReference.FieldReference) e).fieldName())) + .collect(Collectors.toList()); + } + + List glueSortCols = new ArrayList<>(); + if (sortOrders != null) { + for (SortOrder so : sortOrders) { + if (so.expression() instanceof NamedReference.FieldReference) { + String colName = + String.join(".", ((NamedReference.FieldReference) so.expression()).fieldName()); + int order = so.direction() == SortDirection.ASCENDING ? 1 : 0; + glueSortCols.add(Order.builder().column(colName).sortOrder(order).build()); + } + } + } + + StorageDescriptor sd = + StorageDescriptor.builder() + .columns(glueDataCols) + .location(properties.get(GlueConstants.LOCATION)) + .inputFormat(properties.get(GlueConstants.INPUT_FORMAT)) + .outputFormat(properties.get(GlueConstants.OUTPUT_FORMAT)) + .serdeInfo(serDe) + .bucketColumns(bucketCols) + .numberOfBuckets(numBuckets) + .sortColumns(glueSortCols) + .build(); + + return TableInput.builder() + .name(name) + .description(comment) + .tableType(properties.get(GlueConstants.TABLE_TYPE)) + .parameters(tableParams) + .storageDescriptor(sd) + .partitionKeys(gluePartCols) + .build(); + } + + private software.amazon.awssdk.services.glue.model.Column toGlueColumn(Column col) { + return software.amazon.awssdk.services.glue.model.Column.builder() + .name(col.name()) + .type(typeConverter.fromGravitino(col.dataType())) + .comment(col.comment()) + .build(); + } + + private static void applyColumnChange( + List dataCols, List partCols, TableChange.ColumnChange change) { + + String fieldName = change.fieldName()[0]; + + if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn add = (TableChange.AddColumn) change; + dataCols.add( + GlueColumn.builder() + .withName(add.fieldName()[0]) + .withType(add.getDataType()) + .withComment(add.getComment()) + .withNullable(add.isNullable()) + .build()); + + } else if (change instanceof TableChange.DeleteColumn) { + boolean isPartition = partCols.stream().anyMatch(c -> c.name().equals(fieldName)); + Preconditions.checkArgument(!isPartition, "Cannot delete partition column: %s", fieldName); + dataCols.removeIf(c -> c.name().equals(fieldName)); + + } else if (change instanceof TableChange.RenameColumn) { + boolean isPartition = partCols.stream().anyMatch(c -> c.name().equals(fieldName)); + Preconditions.checkArgument(!isPartition, "Cannot rename partition column: %s", fieldName); + String newColName = ((TableChange.RenameColumn) change).getNewName(); + replaceColumn( + dataCols, fieldName, old -> copyColumn(old, newColName, old.dataType(), old.comment())); + + } else if (change instanceof TableChange.UpdateColumnType) { + boolean isPartition = partCols.stream().anyMatch(c -> c.name().equals(fieldName)); + Preconditions.checkArgument( + !isPartition, "Cannot update type of partition column: %s", fieldName); + Type newType = ((TableChange.UpdateColumnType) change).getNewDataType(); + replaceColumn( + dataCols, fieldName, old -> copyColumn(old, old.name(), newType, old.comment())); + + } else if (change instanceof TableChange.UpdateColumnComment) { + String newCmt = ((TableChange.UpdateColumnComment) change).getNewComment(); + if (!replaceColumn( + dataCols, fieldName, old -> copyColumn(old, old.name(), old.dataType(), newCmt))) { + replaceColumn( + partCols, fieldName, old -> copyColumn(old, old.name(), old.dataType(), newCmt)); + } + + } else { + throw new IllegalArgumentException( + "Unsupported column change: " + change.getClass().getSimpleName()); + } + } + + /** Passes {@link #catalogId} to {@code setter} when it is non-null. */ + private void applyCatalogId(Consumer setter) { + if (catalogId != null) setter.accept(catalogId); + } + + /** + * Finds the first column matching {@code name} in {@code cols}, replaces it with the result of + * {@code updater}, and returns {@code true} if a replacement was made. + */ + private static boolean replaceColumn( + List cols, String name, UnaryOperator updater) { + for (int i = 0; i < cols.size(); i++) { + if (cols.get(i).name().equals(name)) { + cols.set(i, updater.apply(cols.get(i))); + return true; + } + } + return false; + } + + private static Column copyColumn(Column src, String name, Type type, String comment) { + return GlueColumn.builder() + .withName(name) + .withType(type) + .withComment(comment) + .withNullable(src.nullable()) + .build(); } } diff --git a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueExceptionConverter.java b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueExceptionConverter.java new file mode 100644 index 00000000000..7857875e0cb --- /dev/null +++ b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueExceptionConverter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.glue; + +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.InvalidInputException; + +/** Converts AWS Glue SDK exceptions to Gravitino exceptions. */ +final class GlueExceptionConverter { + + private GlueExceptionConverter() {} + + /** + * Converts a {@link GlueException} to the appropriate Gravitino schema exception. + * + * @param e the Glue exception to convert + * @param context description of the operation context for error messages + * @return a Gravitino or standard Java runtime exception + */ + static RuntimeException toSchemaException(GlueException e, String context) { + if (e instanceof EntityNotFoundException) { + return new NoSuchSchemaException(e, "%s does not exist", context); + } + if (e instanceof AlreadyExistsException) { + return new SchemaAlreadyExistsException(e, "%s already exists", context); + } + if (e instanceof InvalidInputException) { + return new IllegalArgumentException(context + ": " + e.getMessage(), e); + } + return new RuntimeException("Glue error: " + context, e); + } + + /** + * Converts a {@link GlueException} to the appropriate Gravitino table exception. + * + * @param e the Glue exception to convert + * @param context description of the operation context for error messages + * @return a Gravitino or standard Java runtime exception + */ + static RuntimeException toTableException(GlueException e, String context) { + if (e instanceof EntityNotFoundException) { + return new NoSuchTableException(e, "%s does not exist", context); + } + if (e instanceof AlreadyExistsException) { + return new TableAlreadyExistsException(e, "%s already exists", context); + } + if (e instanceof InvalidInputException) { + return new IllegalArgumentException(context + ": " + e.getMessage(), e); + } + return new RuntimeException("Glue error: " + context, e); + } +} diff --git a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java index d395caf23c2..1134a5ad4f2 100644 --- a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java +++ b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTable.java @@ -26,7 +26,9 @@ import static org.apache.gravitino.catalog.glue.GlueConstants.SERDE_PARAMETER_PREFIX; import static org.apache.gravitino.catalog.glue.GlueConstants.TABLE_TYPE; +import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,6 +39,7 @@ import org.apache.gravitino.connector.TableOperations; import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.SupportsPartitions; import org.apache.gravitino.rel.expressions.NamedReference; import org.apache.gravitino.rel.expressions.distributions.Distribution; import org.apache.gravitino.rel.expressions.distributions.Distributions; @@ -45,6 +48,7 @@ import org.apache.gravitino.rel.expressions.sorts.SortOrders; import org.apache.gravitino.rel.expressions.transforms.Transform; import org.apache.gravitino.rel.expressions.transforms.Transforms; +import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.model.StorageDescriptor; import software.amazon.awssdk.services.glue.model.Table; @@ -58,12 +62,44 @@ @ToString public class GlueTable extends BaseTable { + /** Glue client injected after construction to support partition operations. */ + private GlueClient glueClient; + + /** Nullable — when null, Glue uses the caller's AWS account ID. */ + private String catalogId; + + /** Database (schema) this table belongs to. */ + private String dbName; + private GlueTable() {} + /** + * Injects the Glue connection context required for partition operations. + * + *

Called by {@link GlueCatalogOperations} after constructing a table instance. + */ + void initOpsContext(GlueClient glueClient, String catalogId, String dbName) { + this.glueClient = glueClient; + this.catalogId = catalogId; + this.dbName = dbName; + } + @Override protected TableOperations newOps() { - throw new UnsupportedOperationException( - "Partition operations are not yet supported for GlueTable"); + Preconditions.checkState( + glueClient != null, + "Partition operations require Glue context; call initOpsContext() first"); + String[] partColNames = + Arrays.stream(partitioning) + .filter(t -> t instanceof Transforms.IdentityTransform) + .map(t -> ((Transforms.IdentityTransform) t).fieldName()[0]) + .toArray(String[]::new); + return new GlueTableOperations(glueClient, catalogId, dbName, name, partColNames); + } + + @Override + public SupportsPartitions supportPartitions() throws UnsupportedOperationException { + return (SupportsPartitions) ops(); } /** diff --git a/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java new file mode 100644 index 00000000000..56bc7fe6ca2 --- /dev/null +++ b/catalogs/catalog-glue/src/main/java/org/apache/gravitino/catalog/glue/GlueTableOperations.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.glue; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.gravitino.connector.TableOperations; +import org.apache.gravitino.exceptions.NoSuchPartitionException; +import org.apache.gravitino.exceptions.PartitionAlreadyExistsException; +import org.apache.gravitino.rel.SupportsPartitions; +import org.apache.gravitino.rel.expressions.literals.Literal; +import org.apache.gravitino.rel.expressions.literals.Literals; +import org.apache.gravitino.rel.partitions.IdentityPartition; +import org.apache.gravitino.rel.partitions.Partition; +import org.apache.gravitino.rel.partitions.Partitions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.CreatePartitionRequest; +import software.amazon.awssdk.services.glue.model.DeletePartitionRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetPartitionRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.PartitionInput; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; + +/** + * Table-level partition operations for the AWS Glue Data Catalog. + * + *

Implements {@link SupportsPartitions} for Hive-format identity-partitioned tables. Partition + * names follow the Hive convention: {@code col=val/col2=val2}. + * + *

Only {@link IdentityPartition} is supported; other partition types throw {@link + * IllegalArgumentException}. + */ +class GlueTableOperations implements TableOperations, SupportsPartitions { + + private static final Logger LOG = LoggerFactory.getLogger(GlueTableOperations.class); + + private final GlueClient glueClient; + /** Nullable — when null, Glue uses the caller's AWS account ID. */ + private final String catalogId; + + private final String dbName; + private final String tableName; + /** Ordered partition column names, matching the table's {@code partitionKeys()} order. */ + private final String[] partitionColNames; + + GlueTableOperations( + GlueClient glueClient, + String catalogId, + String dbName, + String tableName, + String[] partitionColNames) { + this.glueClient = glueClient; + this.catalogId = catalogId; + this.dbName = dbName; + this.tableName = tableName; + this.partitionColNames = partitionColNames; + } + + @Override + public String[] listPartitionNames() { + List names = new ArrayList<>(); + String nextToken = null; + try { + do { + GetPartitionsRequest.Builder req = + GetPartitionsRequest.builder().databaseName(dbName).tableName(tableName); + if (catalogId != null) req.catalogId(catalogId); + if (nextToken != null) req.nextToken(nextToken); + GetPartitionsResponse resp = glueClient.getPartitions(req.build()); + for (software.amazon.awssdk.services.glue.model.Partition p : resp.partitions()) { + names.add(buildPartitionName(p.values())); + } + nextToken = resp.nextToken(); + } while (nextToken != null); + } catch (GlueException e) { + throw new RuntimeException("Failed to list partitions for table " + tableName, e); + } + return names.toArray(new String[0]); + } + + @Override + public Partition[] listPartitions() { + List partitions = new ArrayList<>(); + String nextToken = null; + try { + do { + GetPartitionsRequest.Builder req = + GetPartitionsRequest.builder().databaseName(dbName).tableName(tableName); + if (catalogId != null) req.catalogId(catalogId); + if (nextToken != null) req.nextToken(nextToken); + GetPartitionsResponse resp = glueClient.getPartitions(req.build()); + for (software.amazon.awssdk.services.glue.model.Partition p : resp.partitions()) { + partitions.add(toGravitinoPartition(p)); + } + nextToken = resp.nextToken(); + } while (nextToken != null); + } catch (GlueException e) { + throw new RuntimeException("Failed to list partitions for table " + tableName, e); + } + return partitions.toArray(new Partition[0]); + } + + @Override + public Partition getPartition(String partitionName) throws NoSuchPartitionException { + List values = parsePartitionName(partitionName); + GetPartitionRequest.Builder req = + GetPartitionRequest.builder() + .databaseName(dbName) + .tableName(tableName) + .partitionValues(values); + if (catalogId != null) req.catalogId(catalogId); + try { + return toGravitinoPartition(glueClient.getPartition(req.build()).partition()); + } catch (EntityNotFoundException e) { + throw new NoSuchPartitionException( + e, "Partition %s does not exist in table %s", partitionName, tableName); + } catch (GlueException e) { + throw new RuntimeException("Failed to get partition " + partitionName, e); + } + } + + @Override + public Partition addPartition(Partition partition) throws PartitionAlreadyExistsException { + Preconditions.checkArgument( + partition instanceof IdentityPartition, "Glue only supports identity partitions"); + IdentityPartition ip = (IdentityPartition) partition; + Preconditions.checkArgument( + ip.values().length == partitionColNames.length, + "Partition values count (%s) must match partition columns count (%s)", + ip.values().length, + partitionColNames.length); + + List values = new ArrayList<>(ip.values().length); + for (Literal v : ip.values()) { + values.add(v.value() != null ? v.value().toString() : null); + } + + PartitionInput input = + PartitionInput.builder() + .values(values) + .storageDescriptor(StorageDescriptor.builder().build()) + .build(); + + CreatePartitionRequest.Builder req = + CreatePartitionRequest.builder() + .databaseName(dbName) + .tableName(tableName) + .partitionInput(input); + if (catalogId != null) req.catalogId(catalogId); + + try { + glueClient.createPartition(req.build()); + } catch (AlreadyExistsException e) { + throw new PartitionAlreadyExistsException( + e, "Partition %s already exists in table %s", partition.name(), tableName); + } catch (GlueException e) { + throw new RuntimeException("Failed to add partition " + partition.name(), e); + } + + LOG.info("Added partition {} to {}.{}", partition.name(), dbName, tableName); + return Partitions.identity( + partition.name(), ip.fieldNames(), ip.values(), partition.properties()); + } + + @Override + public void close() { + // GlueClient lifecycle is managed by GlueCatalogOperations; nothing to close here. + } + + @Override + public boolean dropPartition(String partitionName) { + List values = parsePartitionName(partitionName); + DeletePartitionRequest.Builder req = + DeletePartitionRequest.builder() + .databaseName(dbName) + .tableName(tableName) + .partitionValues(values); + if (catalogId != null) req.catalogId(catalogId); + try { + glueClient.deletePartition(req.build()); + LOG.info("Dropped partition {} from {}.{}", partitionName, dbName, tableName); + return true; + } catch (EntityNotFoundException e) { + return false; + } catch (GlueException e) { + throw new RuntimeException("Failed to drop partition " + partitionName, e); + } + } + + /** + * Builds a Hive-style partition name (e.g. {@code dt=2024-01-01/country=us}) from an ordered list + * of Glue partition values. + */ + private String buildPartitionName(List values) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partitionColNames.length && i < values.size(); i++) { + if (i > 0) sb.append('/'); + sb.append(partitionColNames[i]).append('=').append(values.get(i)); + } + return sb.toString(); + } + + /** + * Parses a Hive-style partition name (e.g. {@code dt=2024-01-01/country=us}) into an ordered list + * of values, validating that the keys match the table's partition columns in order. + */ + private List parsePartitionName(String partitionName) { + String[] parts = partitionName.split("/"); + Preconditions.checkArgument( + parts.length == partitionColNames.length, + "Partition name '%s' has %s segment(s) but table has %s partition column(s)", + partitionName, + parts.length, + partitionColNames.length); + List values = new ArrayList<>(parts.length); + for (int i = 0; i < parts.length; i++) { + int eq = parts[i].indexOf('='); + Preconditions.checkArgument( + eq >= 0 && parts[i].substring(0, eq).equals(partitionColNames[i]), + "Partition segment '%s' does not match expected column '%s'", + parts[i], + partitionColNames[i]); + values.add(parts[i].substring(eq + 1)); + } + return values; + } + + private IdentityPartition toGravitinoPartition( + software.amazon.awssdk.services.glue.model.Partition gluePartition) { + List values = gluePartition.values(); + String name = buildPartitionName(values); + + String[][] fieldNames = new String[partitionColNames.length][]; + Literal[] literals = new Literal[partitionColNames.length]; + for (int i = 0; i < partitionColNames.length; i++) { + fieldNames[i] = new String[] {partitionColNames[i]}; + literals[i] = Literals.stringLiteral(i < values.size() ? values.get(i) : null); + } + return Partitions.identity(name, fieldNames, literals, Collections.emptyMap()); + } +} diff --git a/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogOperations.java b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogOperations.java new file mode 100644 index 00000000000..1e804a08d41 --- /dev/null +++ b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueCatalogOperations.java @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.glue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.SchemaChange; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.exceptions.NonEmptySchemaException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.sorts.SortOrders; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.indexes.Indexes; +import org.apache.gravitino.rel.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetDatabaseResponse; +import software.amazon.awssdk.services.glue.model.GetDatabasesRequest; +import software.amazon.awssdk.services.glue.model.GetDatabasesResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; +import software.amazon.awssdk.services.glue.model.UpdateTableResponse; + +class TestGlueCatalogOperations { + + private GlueCatalogOperations ops; + private GlueClient mockClient; + + @BeforeEach + void setup() { + mockClient = mock(GlueClient.class); + ops = new GlueCatalogOperations(); + ops.glueClient = mockClient; + } + + // ------------------------------------------------------------------------- + // listSchemas + // ------------------------------------------------------------------------- + + @Test + void testListSchemas() { + Namespace ns = Namespace.of("metalake", "catalog"); + Database db1 = Database.builder().name("db1").build(); + Database db2 = Database.builder().name("db2").build(); + Database db3 = Database.builder().name("db3").build(); + Database db4 = Database.builder().name("db4").build(); + + when(mockClient.getDatabases(any(GetDatabasesRequest.class))) + .thenReturn( + GetDatabasesResponse.builder().databaseList(db1, db2).nextToken("token1").build()) + .thenReturn(GetDatabasesResponse.builder().databaseList(db3, db4).nextToken(null).build()); + + NameIdentifier[] result = ops.listSchemas(ns); + + assertEquals(4, result.length); + assertEquals("db1", result[0].name()); + assertEquals("db4", result[3].name()); + } + + @Test + void testListSchemasReturnsEmptyArray() { + Namespace ns = Namespace.of("metalake", "catalog"); + when(mockClient.getDatabases(any(GetDatabasesRequest.class))) + .thenReturn( + GetDatabasesResponse.builder() + .databaseList(Collections.emptyList()) + .nextToken(null) + .build()); + + assertEquals(0, ops.listSchemas(ns).length); + } + + // ------------------------------------------------------------------------- + // createSchema + // ------------------------------------------------------------------------- + + @Test + void testCreateSchema() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb"); + Map props = Map.of("k", "v"); + + GlueSchema schema = ops.createSchema(ident, "my comment", props); + + verify(mockClient).createDatabase(any(CreateDatabaseRequest.class)); + assertEquals("mydb", schema.name()); + assertEquals("my comment", schema.comment()); + assertEquals(props, schema.properties()); + } + + @Test + void testCreateSchemaAlreadyExists() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb"); + when(mockClient.createDatabase(any(CreateDatabaseRequest.class))) + .thenThrow(AlreadyExistsException.builder().message("exists").build()); + + assertThrows( + SchemaAlreadyExistsException.class, + () -> ops.createSchema(ident, null, Collections.emptyMap())); + } + + // ------------------------------------------------------------------------- + // loadSchema + // ------------------------------------------------------------------------- + + @Test + void testLoadSchema() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb"); + Database db = + Database.builder() + .name("mydb") + .description("desc") + .parameters(Map.of("k", "v")) + .createTime(Instant.now()) + .build(); + when(mockClient.getDatabase(any(GetDatabaseRequest.class))) + .thenReturn(GetDatabaseResponse.builder().database(db).build()); + + GlueSchema schema = ops.loadSchema(ident); + + assertEquals("mydb", schema.name()); + assertEquals("desc", schema.comment()); + assertEquals(Map.of("k", "v"), schema.properties()); + } + + @Test + void testLoadSchemaNotFound() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "missing"); + when(mockClient.getDatabase(any(GetDatabaseRequest.class))) + .thenThrow(EntityNotFoundException.builder().message("not found").build()); + + assertThrows(NoSuchSchemaException.class, () -> ops.loadSchema(ident)); + } + + // ------------------------------------------------------------------------- + // alterSchema + // ------------------------------------------------------------------------- + + @Test + void testAlterSchemaSetProperty() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb"); + Database db = + Database.builder() + .name("mydb") + .parameters(Map.of("existing", "val")) + .createTime(Instant.now()) + .build(); + when(mockClient.getDatabase(any(GetDatabaseRequest.class))) + .thenReturn(GetDatabaseResponse.builder().database(db).build()); + when(mockClient.updateDatabase(any(UpdateDatabaseRequest.class))) + .thenReturn(UpdateDatabaseResponse.builder().build()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(UpdateDatabaseRequest.class); + + GlueSchema result = ops.alterSchema(ident, SchemaChange.setProperty("newKey", "newVal")); + + verify(mockClient).updateDatabase(captor.capture()); + Map sentParams = captor.getValue().databaseInput().parameters(); + assertEquals("val", sentParams.get("existing")); + assertEquals("newVal", sentParams.get("newKey")); + assertEquals("newVal", result.properties().get("newKey")); + } + + @Test + void testAlterSchemaRemoveProperty() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb"); + Database db = + Database.builder() + .name("mydb") + .parameters(Map.of("toRemove", "v")) + .createTime(Instant.now()) + .build(); + when(mockClient.getDatabase(any(GetDatabaseRequest.class))) + .thenReturn(GetDatabaseResponse.builder().database(db).build()); + when(mockClient.updateDatabase(any(UpdateDatabaseRequest.class))) + .thenReturn(UpdateDatabaseResponse.builder().build()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(UpdateDatabaseRequest.class); + + ops.alterSchema(ident, SchemaChange.removeProperty("toRemove")); + + verify(mockClient).updateDatabase(captor.capture()); + assertFalse(captor.getValue().databaseInput().parameters().containsKey("toRemove")); + } + + @Test + void testAlterSchemaUnsupportedChange() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb"); + Database db = + Database.builder().name("mydb").parameters(Map.of()).createTime(Instant.now()).build(); + when(mockClient.getDatabase(any(GetDatabaseRequest.class))) + .thenReturn(GetDatabaseResponse.builder().database(db).build()); + + SchemaChange unsupported = mock(SchemaChange.class); + + assertThrows(IllegalArgumentException.class, () -> ops.alterSchema(ident, unsupported)); + } + + @Test + void testAlterSchemaNotFound() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "missing"); + when(mockClient.getDatabase(any(GetDatabaseRequest.class))) + .thenThrow(EntityNotFoundException.builder().message("not found").build()); + + assertThrows( + NoSuchSchemaException.class, + () -> ops.alterSchema(ident, SchemaChange.setProperty("k", "v"))); + } + + // ------------------------------------------------------------------------- + // dropSchema + // ------------------------------------------------------------------------- + + @Test + void testDropSchema() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb"); + when(mockClient.getTables(any(GetTablesRequest.class))) + .thenReturn(GetTablesResponse.builder().tableList(List.of()).build()); + + boolean dropped = ops.dropSchema(ident, false); + + verify(mockClient).deleteDatabase(any(DeleteDatabaseRequest.class)); + assertTrue(dropped); + } + + @Test + void testDropSchemaNonEmpty() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb"); + Table t = Table.builder().name("t1").build(); + when(mockClient.getTables(any(GetTablesRequest.class))) + .thenReturn(GetTablesResponse.builder().tableList(t).build()); + + assertThrows(NonEmptySchemaException.class, () -> ops.dropSchema(ident, false)); + verify(mockClient, never()).deleteDatabase(any(DeleteDatabaseRequest.class)); + } + + @Test + void testDropSchemaCascadeTrue() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb"); + + boolean dropped = ops.dropSchema(ident, true); + + verify(mockClient, never()).getTables(any(GetTablesRequest.class)); + verify(mockClient).deleteDatabase(any(DeleteDatabaseRequest.class)); + assertTrue(dropped); + } + + @Test + void testDropSchemaNotFound() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "missing"); + when(mockClient.deleteDatabase(any(DeleteDatabaseRequest.class))) + .thenThrow(EntityNotFoundException.builder().message("not found").build()); + + assertFalse(ops.dropSchema(ident, true)); + } + + @Test + void testDropSchemaWithCatalogId() { + ops.catalogId = "123456789012"; + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb"); + when(mockClient.getTables(any(GetTablesRequest.class))) + .thenReturn(GetTablesResponse.builder().tableList(List.of()).build()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(DeleteDatabaseRequest.class); + + ops.dropSchema(ident, false); + + verify(mockClient).deleteDatabase(captor.capture()); + assertEquals("123456789012", captor.getValue().catalogId()); + } + + // ------------------------------------------------------------------------- + // listTables + // ------------------------------------------------------------------------- + + @Test + void testListTables() { + Namespace ns = Namespace.of("metalake", "catalog", "mydb"); + Table t1 = Table.builder().name("t1").build(); + Table t2 = Table.builder().name("t2").build(); + Table t3 = Table.builder().name("t3").build(); + + when(mockClient.getTables(any(GetTablesRequest.class))) + .thenReturn(GetTablesResponse.builder().tableList(t1, t2).nextToken("tok").build()) + .thenReturn(GetTablesResponse.builder().tableList(t3).nextToken(null).build()); + + NameIdentifier[] result = ops.listTables(ns); + + assertEquals(3, result.length); + assertEquals("t1", result[0].name()); + assertEquals("t3", result[2].name()); + } + + @Test + void testListTablesSchemaNotFound() { + Namespace ns = Namespace.of("metalake", "catalog", "missing"); + when(mockClient.getTables(any(GetTablesRequest.class))) + .thenThrow(EntityNotFoundException.builder().message("not found").build()); + + assertThrows(NoSuchSchemaException.class, () -> ops.listTables(ns)); + } + + @Test + void testListTablesWithFormatFilter() { + ops.tableFormatFilter = java.util.Set.of("iceberg"); + Namespace ns = Namespace.of("metalake", "catalog", "mydb"); + + Table icebergTable = + Table.builder() + .name("ice_tbl") + .parameters(Map.of(GlueConstants.TABLE_FORMAT, "ICEBERG")) + .build(); + Table hiveTable = Table.builder().name("hive_tbl").parameters(Collections.emptyMap()).build(); + + when(mockClient.getTables(any(GetTablesRequest.class))) + .thenReturn( + GetTablesResponse.builder().tableList(icebergTable, hiveTable).nextToken(null).build()); + + NameIdentifier[] result = ops.listTables(ns); + + assertEquals(1, result.length); + assertEquals("ice_tbl", result[0].name()); + } + + // ------------------------------------------------------------------------- + // loadTable + // ------------------------------------------------------------------------- + + @Test + void testLoadTable() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "mytable"); + Table glueTable = + Table.builder() + .name("mytable") + .description("desc") + .storageDescriptor( + StorageDescriptor.builder() + .columns( + software.amazon.awssdk.services.glue.model.Column.builder() + .name("id") + .type("bigint") + .build()) + .build()) + .createTime(Instant.now()) + .build(); + when(mockClient.getTable(any(GetTableRequest.class))) + .thenReturn(GetTableResponse.builder().table(glueTable).build()); + + GlueTable result = ops.loadTable(ident); + + assertEquals("mytable", result.name()); + assertEquals("desc", result.comment()); + assertEquals(1, result.columns().length); + assertEquals("id", result.columns()[0].name()); + } + + @Test + void testLoadTableNotFound() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "missing"); + when(mockClient.getTable(any(GetTableRequest.class))) + .thenThrow(EntityNotFoundException.builder().message("not found").build()); + + assertThrows(NoSuchTableException.class, () -> ops.loadTable(ident)); + } + + // ------------------------------------------------------------------------- + // createTable + // ------------------------------------------------------------------------- + + @Test + void testCreateTable() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "mytable"); + Column[] columns = { + GlueColumn.builder().withName("id").withType(Types.LongType.get()).withNullable(true).build(), + GlueColumn.builder() + .withName("name") + .withType(Types.StringType.get()) + .withNullable(true) + .build() + }; + + GlueTable result = + ops.createTable( + ident, + columns, + "my comment", + Map.of(GlueConstants.LOCATION, "s3://bucket/path"), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + SortOrders.NONE, + Indexes.EMPTY_INDEXES); + + verify(mockClient).createTable(any(CreateTableRequest.class)); + assertEquals("mytable", result.name()); + assertEquals("my comment", result.comment()); + assertEquals(2, result.columns().length); + } + + @Test + void testCreateTableAlreadyExists() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "mytable"); + when(mockClient.createTable(any(CreateTableRequest.class))) + .thenThrow(AlreadyExistsException.builder().message("exists").build()); + + assertThrows( + TableAlreadyExistsException.class, + () -> + ops.createTable( + ident, + new Column[0], + null, + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + SortOrders.NONE, + Indexes.EMPTY_INDEXES)); + } + + @Test + void testCreateTableRejectsIndexes() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "mytable"); + + assertThrows( + IllegalArgumentException.class, + () -> + ops.createTable( + ident, + new Column[0], + null, + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + SortOrders.NONE, + new org.apache.gravitino.rel.indexes.Index[] { + mock(org.apache.gravitino.rel.indexes.Index.class) + })); + } + + @Test + void testCreateTableStorageDescriptorProperties() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "mytable"); + Map props = + Map.of( + GlueConstants.LOCATION, "s3://my-bucket/path", + GlueConstants.INPUT_FORMAT, "org.apache.hadoop.mapred.TextInputFormat", + GlueConstants.OUTPUT_FORMAT, + "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + GlueConstants.TABLE_TYPE, "EXTERNAL_TABLE"); + + ArgumentCaptor captor = ArgumentCaptor.forClass(CreateTableRequest.class); + + ops.createTable( + ident, + new Column[0], + "comment", + props, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + SortOrders.NONE, + Indexes.EMPTY_INDEXES); + + verify(mockClient).createTable(captor.capture()); + CreateTableRequest req = captor.getValue(); + assertEquals("EXTERNAL_TABLE", req.tableInput().tableType()); + assertEquals("s3://my-bucket/path", req.tableInput().storageDescriptor().location()); + assertFalse(req.tableInput().parameters().containsKey(GlueConstants.LOCATION)); + assertFalse(req.tableInput().parameters().containsKey(GlueConstants.TABLE_TYPE)); + } + + // ------------------------------------------------------------------------- + // alterTable + // ------------------------------------------------------------------------- + + @Test + void testAlterTable() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "old"); + Table glueTable = + Table.builder() + .name("old") + .description("old comment") + .storageDescriptor(StorageDescriptor.builder().build()) + .createTime(Instant.now()) + .build(); + when(mockClient.getTable(any(GetTableRequest.class))) + .thenReturn(GetTableResponse.builder().table(glueTable).build()); + when(mockClient.updateTable(any(UpdateTableRequest.class))) + .thenReturn(UpdateTableResponse.builder().build()); + + ArgumentCaptor captor = ArgumentCaptor.forClass(UpdateTableRequest.class); + + GlueTable result = + ops.alterTable(ident, TableChange.rename("new"), TableChange.updateComment("new comment")); + + verify(mockClient).updateTable(captor.capture()); + assertEquals("new", captor.getValue().tableInput().name()); + assertEquals("new comment", result.comment()); + } + + @Test + void testAlterTableSetProperty() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "t"); + Table glueTable = + Table.builder() + .name("t") + .parameters(Map.of("existing", "v1")) + .storageDescriptor(StorageDescriptor.builder().build()) + .createTime(Instant.now()) + .build(); + when(mockClient.getTable(any(GetTableRequest.class))) + .thenReturn(GetTableResponse.builder().table(glueTable).build()); + when(mockClient.updateTable(any(UpdateTableRequest.class))) + .thenReturn(UpdateTableResponse.builder().build()); + + GlueTable result = ops.alterTable(ident, TableChange.setProperty("newKey", "newVal")); + + assertEquals("newVal", result.properties().get("newKey")); + assertEquals("v1", result.properties().get("existing")); + } + + @Test + void testAlterTableAddColumn() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "t"); + Table glueTable = + Table.builder() + .name("t") + .storageDescriptor( + StorageDescriptor.builder() + .columns( + List.of( + software.amazon.awssdk.services.glue.model.Column.builder() + .name("id") + .type("bigint") + .build())) + .build()) + .createTime(Instant.now()) + .build(); + when(mockClient.getTable(any(GetTableRequest.class))) + .thenReturn(GetTableResponse.builder().table(glueTable).build()); + when(mockClient.updateTable(any(UpdateTableRequest.class))) + .thenReturn(UpdateTableResponse.builder().build()); + + GlueTable result = + ops.alterTable( + ident, TableChange.addColumn(new String[] {"email"}, Types.StringType.get())); + + assertEquals(2, result.columns().length); + assertEquals("email", result.columns()[1].name()); + } + + @Test + void testAlterTableNotFound() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "missing"); + when(mockClient.getTable(any(GetTableRequest.class))) + .thenThrow(EntityNotFoundException.builder().message("not found").build()); + + assertThrows( + NoSuchTableException.class, () -> ops.alterTable(ident, TableChange.updateComment("x"))); + } + + // ------------------------------------------------------------------------- + // dropTable + // ------------------------------------------------------------------------- + + @Test + void testDropTable() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "t"); + + assertTrue(ops.dropTable(ident)); + verify(mockClient).deleteTable(any(DeleteTableRequest.class)); + } + + @Test + void testDropTableNotFound() { + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "missing"); + when(mockClient.deleteTable(any(DeleteTableRequest.class))) + .thenThrow(EntityNotFoundException.builder().message("not found").build()); + + assertFalse(ops.dropTable(ident)); + } + + @Test + void testDropTableWithCatalogId() { + ops.catalogId = "123456789012"; + NameIdentifier ident = NameIdentifier.of("metalake", "catalog", "mydb", "t"); + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteTableRequest.class); + + ops.dropTable(ident); + + verify(mockClient).deleteTable(captor.capture()); + assertEquals("123456789012", captor.getValue().catalogId()); + assertEquals("mydb", captor.getValue().databaseName()); + assertEquals("t", captor.getValue().name()); + } +} diff --git a/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueTableOperations.java b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueTableOperations.java new file mode 100644 index 00000000000..14e294eef56 --- /dev/null +++ b/catalogs/catalog-glue/src/test/java/org/apache/gravitino/catalog/glue/TestGlueTableOperations.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.glue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import org.apache.gravitino.exceptions.NoSuchPartitionException; +import org.apache.gravitino.exceptions.PartitionAlreadyExistsException; +import org.apache.gravitino.rel.expressions.literals.Literals; +import org.apache.gravitino.rel.partitions.IdentityPartition; +import org.apache.gravitino.rel.partitions.Partition; +import org.apache.gravitino.rel.partitions.Partitions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.CreatePartitionRequest; +import software.amazon.awssdk.services.glue.model.DeletePartitionRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetPartitionRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionResponse; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; + +class TestGlueTableOperations { + + private GlueClient mockClient; + private GlueTableOperations ops; + + @BeforeEach + void setup() { + mockClient = mock(GlueClient.class); + ops = + new GlueTableOperations(mockClient, null, "mydb", "mytable", new String[] {"dt", "region"}); + } + + // ------------------------------------------------------------------------- + // listPartitionNames + // ------------------------------------------------------------------------- + + @Test + void testListPartitionNames() { + software.amazon.awssdk.services.glue.model.Partition p1 = + software.amazon.awssdk.services.glue.model.Partition.builder() + .values("2024-01-01", "us") + .build(); + software.amazon.awssdk.services.glue.model.Partition p2 = + software.amazon.awssdk.services.glue.model.Partition.builder() + .values("2024-01-02", "eu") + .build(); + + when(mockClient.getPartitions(any(GetPartitionsRequest.class))) + .thenReturn(GetPartitionsResponse.builder().partitions(p1).nextToken("tok").build()) + .thenReturn(GetPartitionsResponse.builder().partitions(p2).nextToken(null).build()); + + String[] names = ops.listPartitionNames(); + + assertEquals(2, names.length); + assertEquals("dt=2024-01-01/region=us", names[0]); + assertEquals("dt=2024-01-02/region=eu", names[1]); + } + + @Test + void testListPartitionNamesReturnsEmptyArray() { + when(mockClient.getPartitions(any(GetPartitionsRequest.class))) + .thenReturn(GetPartitionsResponse.builder().partitions(List.of()).nextToken(null).build()); + + assertEquals(0, ops.listPartitionNames().length); + } + + // ------------------------------------------------------------------------- + // listPartitions + // ------------------------------------------------------------------------- + + @Test + void testListPartitions() { + software.amazon.awssdk.services.glue.model.Partition p = + software.amazon.awssdk.services.glue.model.Partition.builder() + .values("2024-01-01", "us") + .build(); + + when(mockClient.getPartitions(any(GetPartitionsRequest.class))) + .thenReturn(GetPartitionsResponse.builder().partitions(p).nextToken(null).build()); + + Partition[] partitions = ops.listPartitions(); + + assertEquals(1, partitions.length); + IdentityPartition ip = (IdentityPartition) partitions[0]; + assertEquals("dt=2024-01-01/region=us", ip.name()); + assertEquals("2024-01-01", ip.values()[0].value()); + assertEquals("us", ip.values()[1].value()); + } + + // ------------------------------------------------------------------------- + // getPartition + // ------------------------------------------------------------------------- + + @Test + void testGetPartition() { + software.amazon.awssdk.services.glue.model.Partition gluePartition = + software.amazon.awssdk.services.glue.model.Partition.builder() + .values("2024-01-01", "us") + .build(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(GetPartitionRequest.class); + when(mockClient.getPartition(any(GetPartitionRequest.class))) + .thenReturn(GetPartitionResponse.builder().partition(gluePartition).build()); + + Partition result = ops.getPartition("dt=2024-01-01/region=us"); + + verify(mockClient).getPartition(captor.capture()); + assertEquals(List.of("2024-01-01", "us"), captor.getValue().partitionValues()); + assertEquals("dt=2024-01-01/region=us", result.name()); + } + + @Test + void testGetPartitionNotFound() { + when(mockClient.getPartition(any(GetPartitionRequest.class))) + .thenThrow(EntityNotFoundException.builder().message("not found").build()); + + assertThrows(NoSuchPartitionException.class, () -> ops.getPartition("dt=2024-01-01/region=us")); + } + + // ------------------------------------------------------------------------- + // addPartition + // ------------------------------------------------------------------------- + + @Test + void testAddPartition() { + IdentityPartition partition = + Partitions.identity( + "dt=2024-01-01/region=us", + new String[][] {{"dt"}, {"region"}}, + new org.apache.gravitino.rel.expressions.literals.Literal[] { + Literals.stringLiteral("2024-01-01"), Literals.stringLiteral("us") + }, + java.util.Collections.emptyMap()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(CreatePartitionRequest.class); + + Partition result = ops.addPartition(partition); + + verify(mockClient).createPartition(captor.capture()); + assertEquals(List.of("2024-01-01", "us"), captor.getValue().partitionInput().values()); + assertEquals("dt=2024-01-01/region=us", result.name()); + } + + @Test + void testAddPartitionAlreadyExists() { + IdentityPartition partition = + Partitions.identity( + "dt=2024-01-01/region=us", + new String[][] {{"dt"}, {"region"}}, + new org.apache.gravitino.rel.expressions.literals.Literal[] { + Literals.stringLiteral("2024-01-01"), Literals.stringLiteral("us") + }, + java.util.Collections.emptyMap()); + + when(mockClient.createPartition(any(CreatePartitionRequest.class))) + .thenThrow(AlreadyExistsException.builder().message("exists").build()); + + assertThrows(PartitionAlreadyExistsException.class, () -> ops.addPartition(partition)); + } + + @Test + void testAddPartitionRejectsNonIdentity() { + Partition nonIdentity = mock(Partition.class); + assertThrows(IllegalArgumentException.class, () -> ops.addPartition(nonIdentity)); + } + + // ------------------------------------------------------------------------- + // dropPartition + // ------------------------------------------------------------------------- + + @Test + void testDropPartition() { + ArgumentCaptor captor = + ArgumentCaptor.forClass(DeletePartitionRequest.class); + + boolean result = ops.dropPartition("dt=2024-01-01/region=us"); + + verify(mockClient).deletePartition(captor.capture()); + assertTrue(result); + assertEquals(List.of("2024-01-01", "us"), captor.getValue().partitionValues()); + assertEquals("mydb", captor.getValue().databaseName()); + assertEquals("mytable", captor.getValue().tableName()); + } + + @Test + void testDropPartitionNotFound() { + when(mockClient.deletePartition(any(DeletePartitionRequest.class))) + .thenThrow(EntityNotFoundException.builder().message("not found").build()); + + assertFalse(ops.dropPartition("dt=2024-01-01/region=us")); + } + + @Test + void testDropPartitionWithCatalogId() { + GlueTableOperations opsWithId = + new GlueTableOperations(mockClient, "123456789012", "mydb", "mytable", new String[] {"dt"}); + ArgumentCaptor captor = + ArgumentCaptor.forClass(DeletePartitionRequest.class); + + opsWithId.dropPartition("dt=2024-01-01"); + + verify(mockClient).deletePartition(captor.capture()); + assertEquals("123456789012", captor.getValue().catalogId()); + } +}