Skip to content

Commit 0bc4199

Browse files
[Kernel] Support column mapping mode in REPLACE TABLE including with fieldId re-use (#4518)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description PR#1 here #4520 separates some of this out. This PR re-enables support for column mapping tables with REPLACE, and allows fieldId re-use when the type/nullability is compatible. To check for this compatibility we essentially re-use our schema evolution code. Valid schema transformations in the normal case should also be valid changes when reusing fieldIds across REPLACE. Any fields in the replaceSchema that are not reusing existing fieldIds will be treated as new fields during the schema evolution validation. This validation checks that nullability changes are valid (cannot change from nullable to non-nullable) and that type changes are valid (for now, cannot change types). Columns are able to be moved (or nested columns dropped) if the other invariants hold. Note, this currently restricts REPLACE to operations that maintain the same column mapping mode. In the future, we will loosen this restriction but for now this lessens the scope of this PR. ## How was this patch tested? Adds unit tests.
1 parent 2b1a0bd commit 0bc4199

File tree

4 files changed

+647
-50
lines changed

4 files changed

+647
-50
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
import static io.delta.kernel.internal.DeltaErrors.tableAlreadyExists;
2020
import static io.delta.kernel.internal.TransactionImpl.DEFAULT_READ_VERSION;
2121
import static io.delta.kernel.internal.TransactionImpl.DEFAULT_WRITE_VERSION;
22-
import static io.delta.kernel.internal.util.ColumnMapping.getColumnMappingMode;
2322
import static io.delta.kernel.internal.util.ColumnMapping.isColumnMappingModeEnabled;
2423
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
2524
import static io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames;
2625
import static io.delta.kernel.internal.util.VectorUtils.buildArrayValue;
2726
import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue;
2827
import static java.util.Collections.emptyList;
28+
import static java.util.Collections.emptySet;
2929
import static java.util.Objects.requireNonNull;
3030
import static java.util.stream.Collectors.toSet;
3131

@@ -300,7 +300,12 @@ protected TransactionImpl buildTransactionInternal(
300300
: ClusteringUtils.getClusteringColumnsOptional(latestSnapshot.get());
301301
Tuple2<Optional<Protocol>, Optional<Metadata>> updatedProtocolAndMetadata =
302302
validateAndUpdateProtocolAndMetadata(
303-
engine, baseMetadata, baseProtocol, isCreateOrReplace, existingClusteringCols);
303+
engine,
304+
baseMetadata,
305+
baseProtocol,
306+
isCreateOrReplace,
307+
existingClusteringCols,
308+
latestSnapshot);
304309
Optional<Protocol> newProtocol = updatedProtocolAndMetadata._1;
305310
Optional<Metadata> newMetadata = updatedProtocolAndMetadata._2;
306311

@@ -321,11 +326,6 @@ protected TransactionImpl buildTransactionInternal(
321326

322327
// Block this for now - in a future PR we will enable this
323328
if (operation == Operation.REPLACE_TABLE) {
324-
if (getColumnMappingMode(newMetadata.orElse(baseMetadata).getConfiguration())
325-
!= ColumnMappingMode.NONE) {
326-
throw new UnsupportedOperationException(
327-
"REPLACE TABLE is not yet supported with column mapping");
328-
}
329329
if (newProtocol.orElse(baseProtocol).supportsFeature(TableFeatures.ROW_TRACKING_W_FEATURE)) {
330330
// Block this for now to be safe, we will return to this in the future
331331
throw new UnsupportedOperationException(
@@ -365,7 +365,8 @@ protected Tuple2<Optional<Protocol>, Optional<Metadata>> validateAndUpdateProtoc
365365
Metadata baseMetadata,
366366
Protocol baseProtocol,
367367
boolean isCreateOrReplace,
368-
Optional<List<Column>> existingClusteringCols) {
368+
Optional<List<Column>> existingClusteringCols,
369+
Optional<SnapshotImpl> latestSnapshot) {
369370
if (isCreateOrReplace) {
370371
checkArgument(!existingClusteringCols.isPresent());
371372
}
@@ -477,7 +478,11 @@ protected Tuple2<Optional<Protocol>, Optional<Metadata>> validateAndUpdateProtoc
477478
// Now that all the config and schema changes have been made validate the old vs new metadata
478479
if (newMetadata.isPresent()) {
479480
validateMetadataChange(
480-
existingClusteringCols, baseMetadata, newMetadata.get(), isCreateOrReplace);
481+
existingClusteringCols,
482+
baseMetadata,
483+
newMetadata.get(),
484+
isCreateOrReplace,
485+
latestSnapshot);
481486
}
482487

483488
return new Tuple2(newProtocol, newMetadata);
@@ -572,7 +577,8 @@ private void validateMetadataChange(
572577
Optional<List<Column>> existingClusteringCols,
573578
Metadata oldMetadata,
574579
Metadata newMetadata,
575-
boolean isCreateOrReplace) {
580+
boolean isCreateOrReplace,
581+
Optional<SnapshotImpl> latestSnapshot) {
576582
ColumnMapping.verifyColumnMappingChange(
577583
oldMetadata.getConfiguration(), newMetadata.getConfiguration(), isCreateOrReplace);
578584
IcebergWriterCompatV1MetadataValidatorAndUpdater.validateIcebergWriterCompatV1Change(
@@ -611,7 +617,36 @@ private void validateMetadataChange(
611617
oldMetadata,
612618
newMetadata,
613619
clusteringColumnPhysicalNames,
614-
false /* allowNewRequiredFields*/);
620+
false /* allowNewRequiredFields */);
621+
}
622+
623+
// For replace table we need to do special validation in the case of fieldId re-use
624+
if (isCreateOrReplace && latestSnapshot.isPresent()) {
625+
// For now, we don't support changing column mapping mode during replace, in a future PR we
626+
// will loosen this restriction
627+
ColumnMappingMode oldMode =
628+
ColumnMapping.getColumnMappingMode(latestSnapshot.get().getMetadata().getConfiguration());
629+
ColumnMappingMode newMode =
630+
ColumnMapping.getColumnMappingMode(newMetadata.getConfiguration());
631+
if (oldMode != newMode) {
632+
throw new UnsupportedOperationException(
633+
String.format(
634+
"Changing column mapping mode from %s to %s is not currently supported in Kernel "
635+
+ "during REPLACE TABLE operations",
636+
oldMode, newMode));
637+
}
638+
639+
// We only need to check fieldId re-use when cmMode != none
640+
if (newMode != ColumnMappingMode.NONE) {
641+
SchemaUtils.validateUpdatedSchema(
642+
latestSnapshot.get().getMetadata(),
643+
newMetadata,
644+
// We already validate clustering columns elsewhere for isCreateOrReplace no need to
645+
// duplicate this check here
646+
emptySet() /* clusteringCols */,
647+
// We allow new non-null fields in REPLACE since we know all existing data is removed
648+
true /* allowNewRequiredFields */);
649+
}
615650
}
616651
}
617652

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/ColumnMappingSuiteBase.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,9 @@ trait ColumnMappingSuiteBase extends VectorTestUtils {
129129
metadata: Metadata,
130130
isNewTable: Boolean = true,
131131
enableIcebergCompatV2: Boolean = true,
132-
enableIcebergWriterCompatV1: Boolean = false): Unit = {
133-
var fieldId: Long = 0L
132+
enableIcebergWriterCompatV1: Boolean = false,
133+
initialFieldId: Long = 0L): Unit = {
134+
var fieldId: Long = initialFieldId
134135

135136
def nextFieldId: Long = {
136137
fieldId += 1

0 commit comments

Comments
 (0)