Skip to content

[Kernel][icebergWriterCompatV1] Add a check that map struct keys don't evolve #4525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.internal.DeltaErrors;
import io.delta.kernel.internal.TableConfig;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.skipping.StatsSchemaHelper;
import io.delta.kernel.types.*;
Expand Down Expand Up @@ -95,6 +96,7 @@ public static void validateSchema(StructType schema, boolean isColumnMappingEnab
* <li>Column names contain only valid characters
* <li>Data types are supported
* <li>Physical column name consistency is preserved in the new schema
* <li>If IcebergWriterCompatV1 is enabled, that map struct keys have not changed
* <li>ToDo: No new non-nullable fields are added or no tightening of nullable fields
* <li>ToDo: Nested IDs for array/map types are preserved in the new schema for IcebergCompatV2
* <li>ToDo: No type changes
Expand All @@ -121,7 +123,8 @@ public static void validateUpdatedSchema(
ColumnMapping.getColumnMappingMode(newMetadata.getConfiguration()),
clusteringColumnPhysicalNames,
currentMaxFieldId,
allowNewRequiredFields);
allowNewRequiredFields,
TableConfig.ICEBERG_WRITER_COMPAT_V1_ENABLED.fromMetadata(newMetadata.getConfiguration()));
}

/**
Expand Down Expand Up @@ -438,7 +441,8 @@ private static void validateSchemaEvolution(
ColumnMappingMode columnMappingMode,
Set<String> clusteringColumnPhysicalNames,
int currentMaxFieldId,
boolean allowNewRequiredFields) {
boolean allowNewRequiredFields,
boolean icebergWriterCompatV1Enabled) {
switch (columnMappingMode) {
case ID:
case NAME:
Expand All @@ -447,7 +451,8 @@ private static void validateSchemaEvolution(
newSchema,
clusteringColumnPhysicalNames,
currentMaxFieldId,
allowNewRequiredFields);
allowNewRequiredFields,
icebergWriterCompatV1Enabled);
return;
case NONE:
throw new UnsupportedOperationException(
Expand All @@ -467,14 +472,16 @@ private static void validateSchemaEvolutionById(
StructType newSchema,
Set<String> clusteringColumnPhysicalNames,
int oldMaxFieldId,
boolean allowNewRequiredFields) {
boolean allowNewRequiredFields,
boolean icebergWriterCompatV1Enabled) {
Map<Integer, StructField> currentFieldsById = fieldsById(currentSchema);
Map<Integer, StructField> updatedFieldsById = fieldsById(newSchema);
SchemaChanges schemaChanges = computeSchemaChangesById(currentFieldsById, updatedFieldsById);
validatePhysicalNameConsistency(schemaChanges.updatedFields());
// Validates that the updated schema does not contain breaking changes in terms of types and
// nullability
validateUpdatedSchemaCompatibility(schemaChanges, oldMaxFieldId, allowNewRequiredFields);
validateUpdatedSchemaCompatibility(
schemaChanges, oldMaxFieldId, allowNewRequiredFields, icebergWriterCompatV1Enabled);
validateClusteringColumnsNotDropped(
schemaChanges.removedFields(), clusteringColumnPhysicalNames);
// ToDo Potentially validate IcebergCompatV2 nested IDs
Expand All @@ -499,7 +506,10 @@ private static void validateClusteringColumnsNotDropped(
* <p>ToDo: Prevent moving fields outside of their containing struct
*/
private static void validateUpdatedSchemaCompatibility(
SchemaChanges schemaChanges, int oldMaxFieldId, boolean allowNewRequiredFields) {
SchemaChanges schemaChanges,
int oldMaxFieldId,
boolean allowNewRequiredFields,
boolean icebergWriterCompatV1Enabled) {
for (StructField addedField : schemaChanges.addedFields()) {
if (!allowNewRequiredFields && !addedField.isNullable()) {
throw new KernelException(
Expand All @@ -518,7 +528,7 @@ private static void validateUpdatedSchemaCompatibility(
for (Tuple2<StructField, StructField> updatedFields : schemaChanges.updatedFields()) {
// ToDo: See if recursion can be avoided by incorporating map key/value and array element
// updates in updatedFields
validateFieldCompatibility(updatedFields._1, updatedFields._2);
validateFieldCompatibility(updatedFields._1, updatedFields._2, icebergWriterCompatV1Enabled);
}
}

Expand All @@ -527,7 +537,8 @@ private static void validateUpdatedSchemaCompatibility(
* modified, dropped, or added fields to structs. Validates that a field's nullability is not
* tightened
*/
private static void validateFieldCompatibility(StructField existingField, StructField newField) {
private static void validateFieldCompatibility(
StructField existingField, StructField newField, boolean icebergWriterCompatV1Enabled) {
if (existingField.isNullable() && !newField.isNullable()) {
throw new KernelException(
String.format(
Expand All @@ -548,23 +559,45 @@ private static void validateFieldCompatibility(StructField existingField, Struct
for (StructField newNestedField : newStruct.fields()) {
StructField existingNestedField = existingNestedFields.get(getColumnId(newNestedField));
if (existingNestedField != null) {
validateFieldCompatibility(existingNestedField, newNestedField);
validateFieldCompatibility(
existingNestedField, newNestedField, icebergWriterCompatV1Enabled);
}
}
} else if (existingField.getDataType() instanceof MapType
&& newField.getDataType() instanceof MapType) {
MapType existingMapType = (MapType) existingField.getDataType();
MapType newMapType = (MapType) newField.getDataType();

validateFieldCompatibility(existingMapType.getKeyField(), newMapType.getKeyField());
validateFieldCompatibility(existingMapType.getValueField(), newMapType.getValueField());
if (icebergWriterCompatV1Enabled
&& existingMapType.getKeyType() instanceof StructType
&& newMapType.getKeyType() instanceof StructType) {
// Enforce that we don't change map struct keys. This is a requirement for
// IcebergWriterCompatV1
StructType currentKeyType = (StructType) existingMapType.getKeyType();
StructType newKeyType = (StructType) newMapType.getKeyType();
if (!currentKeyType.equals(newKeyType)) {
throw new KernelException(
String.format(
"Cannot change the type key of Map field %s from %s to %s",
newField.getName(), currentKeyType, newKeyType));
}
}

validateFieldCompatibility(
existingMapType.getKeyField(), newMapType.getKeyField(), icebergWriterCompatV1Enabled);
validateFieldCompatibility(
existingMapType.getValueField(),
newMapType.getValueField(),
icebergWriterCompatV1Enabled);
} else if (existingField.getDataType() instanceof ArrayType
&& newField.getDataType() instanceof ArrayType) {
ArrayType existingArrayType = (ArrayType) existingField.getDataType();
ArrayType newArrayType = (ArrayType) newField.getDataType();

validateFieldCompatibility(
existingArrayType.getElementField(), newArrayType.getElementField());
existingArrayType.getElementField(),
newArrayType.getElementField(),
icebergWriterCompatV1Enabled);
} else if (!existingField.getDataType().equivalent(newField.getDataType())) {
throw new KernelException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.reflect.ClassTag

import io.delta.kernel.exceptions.KernelException
import io.delta.kernel.internal.TableConfig
import io.delta.kernel.internal.actions.{Format, Metadata}
import io.delta.kernel.internal.types.DataTypeJsonSerDe
import io.delta.kernel.internal.util.ColumnMapping.{COLUMN_MAPPING_ID_KEY, COLUMN_MAPPING_MODE_KEY, COLUMN_MAPPING_NESTED_IDS_KEY, COLUMN_MAPPING_PHYSICAL_NAME_KEY, ColumnMappingMode}
Expand Down Expand Up @@ -1251,4 +1252,76 @@ class SchemaUtilsSuite extends AnyFunSuite {
assert(results === flattenedTestSchema.filterKeys(expectedColumns.contains))
}
}

///////////////////////////////////////////////////////////////////////////
// validateNoMapStructKeyChanges
///////////////////////////////////////////////////////////////////////////

private val updatedSchemasWithChangedMaps = Table(
("schemaBefore", "updatedSchemaWithChangedMapKey"),
// add a col
(
mapWithStructKey,
new StructType()
.add(
"map",
new MapType(
new StructType()
.add("id", IntegerType.INTEGER, true, fieldMetadata(id = 2, physicalName = "id"))
.add("id2", IntegerType.INTEGER, true, fieldMetadata(id = 3, physicalName = "id2")),
IntegerType.INTEGER,
false),
true,
fieldMetadata(id = 1, physicalName = "map"))),
(
new StructType()
.add(
"map",
new MapType(
new StructType()
.add("id", IntegerType.INTEGER, true, fieldMetadata(id = 2, physicalName = "id"))
.add("id2", IntegerType.INTEGER, true, fieldMetadata(id = 3, physicalName = "id2")),
IntegerType.INTEGER,
false),
true,
fieldMetadata(id = 1, physicalName = "map")),
mapWithStructKey),
(
new StructType()
.add(
"top_level_struct",
new StructType().add(
"map",
new MapType(
new StructType()
.add("id", IntegerType.INTEGER, true, fieldMetadata(id = 3, physicalName = "id")),
IntegerType.INTEGER,
false),
true,
fieldMetadata(2, "map")),
fieldMetadata(1, "top_level_struct")),
new StructType()
.add(
"top_level_struct",
new StructType().add(
"map",
new MapType(
new StructType()
.add("id", IntegerType.INTEGER, true, fieldMetadata(id = 3, physicalName = "id"))
.add("id2", IntegerType.INTEGER, true, fieldMetadata(id = 4, physicalName = "id")),
IntegerType.INTEGER,
false),
true,
fieldMetadata(2, "map")),
fieldMetadata(1, "top_level_struct"))))

test("validateNoMapStructKeyChanges fails when map struct changes") {
val tblProperties = Map(
TableConfig.ICEBERG_WRITER_COMPAT_V1_ENABLED.getKey -> "true",
ColumnMapping.COLUMN_MAPPING_MODE_KEY -> "id")
assertSchemaEvolutionFailure[KernelException](
updatedSchemasWithChangedMaps,
"Cannot change the type key of Map field map from .*",
tableProperties = tblProperties)
}
}
Loading