Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1f6001f
init
KaiqiJinWow May 20, 2025
3283154
update
KaiqiJinWow May 20, 2025
074b131
update
KaiqiJinWow May 21, 2025
053c23c
Merge branch 'master' into stack/add_iceberg_compatV3
KaiqiJinWow Jun 9, 2025
7e10891
update
KaiqiJinWow Jun 9, 2025
a343b1b
move the data allow list
KaiqiJinWow Jun 9, 2025
0233915
add etsts
KaiqiJinWow Jun 10, 2025
afd5b86
update
KaiqiJinWow Jun 10, 2025
0a01a32
Merge branch 'master' into stack/add_iceberg_compatV3
KaiqiJinWow Jun 10, 2025
029044f
update
KaiqiJinWow Jun 10, 2025
2dad169
refactor
KaiqiJinWow Jun 10, 2025
8a858e8
Merge branch 'stack/refactor_iceberg_compat_check' into stack/add_ice…
KaiqiJinWow Jun 10, 2025
f9cb9ca
update
KaiqiJinWow Jun 10, 2025
35427a4
Merge branch 'master' into stack/add_iceberg_compatV3
KaiqiJinWow Jun 11, 2025
b366ec6
update
KaiqiJinWow Jun 11, 2025
74f1e05
update
KaiqiJinWow Jun 11, 2025
6e76076
Merge branch 'master' into stack/add_iceberg_compatV3
KaiqiJinWow Jun 12, 2025
a681913
refactor test
KaiqiJinWow Jun 12, 2025
158d04b
trigger CIn
KaiqiJinWow Jun 12, 2025
27e79fa
move test
KaiqiJinWow Jun 12, 2025
4688720
add e2e tests
KaiqiJinWow Jun 12, 2025
0514737
small refactor
KaiqiJinWow Jun 12, 2025
36d5736
address comment
KaiqiJinWow Jun 13, 2025
b6bc89c
address comment
KaiqiJinWow Jun 13, 2025
84299dd
address comment
KaiqiJinWow Jun 16, 2025
a80d79a
Merge branch 'master' into stack/add_iceberg_compatV3
KaiqiJinWow Jun 16, 2025
f03342c
remove redudent func
KaiqiJinWow Jun 16, 2025
2a93f08
add replace table test cases
KaiqiJinWow Jun 16, 2025
6966bfe
add more tests
KaiqiJinWow Jun 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater;
import io.delta.kernel.statistics.DataFileStatistics;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.*;
Expand Down Expand Up @@ -164,14 +165,15 @@ static CloseableIterator<FilteredColumnarBatch> transformLogicalData(
// - generating the default value columns
// - generating the generated columns

boolean isIcebergCompatV2Enabled = isIcebergCompatV2Enabled(transactionState);
boolean isIcebergCompatEnabled =
isIcebergCompatV2Enabled(transactionState) || isIcebergCompatV3Enabled(transactionState);
blockIfColumnMappingEnabled(transactionState);

// TODO: set the correct schema once writing into column mapping enabled table is supported.
String tablePath = getTablePath(transactionState);
return dataIter.map(
filteredBatch -> {
if (isIcebergCompatV2Enabled) {
if (isIcebergCompatEnabled) {
// don't remove the partition columns for iceberg compat v2 enabled tables
return filteredBatch;
}
Expand Down Expand Up @@ -245,13 +247,18 @@ static CloseableIterator<Row> generateAppendActions(
"DataWriteContext is not created by the `Transaction.getWriteContext()`");

boolean isIcebergCompatV2Enabled = isIcebergCompatV2Enabled(transactionState);
boolean isIcebergCompatV3Enabled = isIcebergCompatV3Enabled(transactionState);

URI tableRoot = new Path(getTablePath(transactionState)).toUri();
StructType physicalSchema = TransactionStateRow.getPhysicalSchema(transactionState);
return fileStatusIter.map(
dataFileStatus -> {
if (isIcebergCompatV2Enabled) {
IcebergCompatV2MetadataValidatorAndUpdater.validateDataFileStatus(dataFileStatus);
} else if (isIcebergCompatV3Enabled) {
IcebergCompatV3MetadataValidatorAndUpdater.validateDataFileStatus(dataFileStatus);
}
Comment on lines 256 to 260
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use isIcebergCompatEnabled and just call IcebergCompatMetadataValidatorAndUpdater.validateDataFileStatus

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would push back this as IcebergCompatMetadataValidatorAndUpdater.validateDataFileStatus also need a String compatFeatureName to better expose the error message, so it would not reduce the code imo.


AddFile addFileRow =
AddFile.convertDataFileStatus(
physicalSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public static KernelException icebergCompatRequiredFeatureMissing(
format("%s: requires the feature '%s' to be enabled.", compatVersion, feature));
}

public static KernelException enablingIcebergWriterCompatV1OnExistingTable(String key) {
public static KernelException enablingIcebergCompatFeatureOnExistingTable(String key) {
return new KernelException(
String.format(
"Cannot enable %s on an existing table. "
Expand All @@ -346,7 +346,7 @@ public static KernelException icebergWriterCompatInvalidPhysicalName(List<String
invalidFields));
}

public static KernelException disablingIcebergWriterCompatV1OnExistingTable(String key) {
public static KernelException disablingIcebergCompatFeatureOnExistingTable(String key) {
return new KernelException(
String.format("Disabling %s on an existing table is not allowed.", key));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ public class TableConfig<T> {
/**
* Table property that enables modifying the table in accordance with the Delta-Iceberg
* Compatibility V3 protocol. TODO: add the delta protocol link once updated
* [https://github.com/delta-io/delta/issues/4574]
*/
public static final TableConfig<Boolean> ICEBERG_COMPAT_V3_ENABLED =
new TableConfig<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.delta.kernel.internal.clustering.ClusteringUtils;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergUniversalFormatMetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV1MetadataValidatorAndUpdater;
import io.delta.kernel.internal.lang.Lazy;
Expand Down Expand Up @@ -450,6 +451,10 @@ protected Tuple2<Optional<Protocol>, Optional<Metadata>> validateAndUpdateProtoc
metadata ->
IcebergWriterCompatV1MetadataValidatorAndUpdater.validateIcebergWriterCompatV1Change(
baseMetadata.getConfiguration(), metadata.getConfiguration(), isCreateOrReplace));
newMetadata.ifPresent(
metadata ->
IcebergCompatV3MetadataValidatorAndUpdater.validateIcebergCompatV3Change(
baseMetadata.getConfiguration(), metadata.getConfiguration(), isCreateOrReplace));

// We must do our icebergWriterCompatV1 checks/updates FIRST since it has stricter column
// mapping requirements (id mode) than icebergCompatV2. It also may enable icebergCompatV2.
Expand All @@ -463,12 +468,19 @@ protected Tuple2<Optional<Protocol>, Optional<Metadata>> validateAndUpdateProtoc
newMetadata = icebergWriterCompatV1;
}

// TODO: refactor this method to use a single validator and updater.
Optional<Metadata> icebergCompatV2Metadata =
IcebergCompatV2MetadataValidatorAndUpdater.validateAndUpdateIcebergCompatV2Metadata(
isCreateOrReplace, newMetadata.orElse(baseMetadata), newProtocol.orElse(baseProtocol));
if (icebergCompatV2Metadata.isPresent()) {
newMetadata = icebergCompatV2Metadata;
}
Optional<Metadata> icebergCompatV3Metadata =
IcebergCompatV3MetadataValidatorAndUpdater.validateAndUpdateIcebergCompatV3Metadata(
isCreateOrReplace, newMetadata.orElse(baseMetadata), newProtocol.orElse(baseProtocol));
if (icebergCompatV3Metadata.isPresent()) {
newMetadata = icebergCompatV3Metadata;
}

/* ----- 4: Update the METADATA with column mapping info if applicable ----- */
// We update the column mapping info here after all configuration changes are finished
Expand Down Expand Up @@ -628,6 +640,8 @@ private Optional<Metadata> validateMetadataChangeAndUpdateMetadata(
oldMetadata.getConfiguration(), newMetadata.getConfiguration(), isCreateOrReplace);
IcebergWriterCompatV1MetadataValidatorAndUpdater.validateIcebergWriterCompatV1Change(
oldMetadata.getConfiguration(), newMetadata.getConfiguration(), isCreateOrReplace);
IcebergCompatV3MetadataValidatorAndUpdater.validateIcebergCompatV3Change(
oldMetadata.getConfiguration(), newMetadata.getConfiguration(), isCreateOrReplace);
IcebergUniversalFormatMetadataValidatorAndUpdater.validate(newMetadata);
Optional<Metadata> updatedMetadata = Optional.empty();
// Validate the conditions for schema evolution and the updated schema if applicable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.delta.kernel.internal.util.SchemaUtils;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.DataFileStatus;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -58,6 +59,19 @@
* </ul>
*/
public abstract class IcebergCompatMetadataValidatorAndUpdater {

/**
* Returns whether Iceberg compatibility is enabled for the given table metadata. This checks if
* either `icebergCompatV2` or `icebergCompatV3` table property is enabled.
*
* @param metadata The table metadata to check.
* @return true if either Iceberg compatibility V2 or V3 is enabled; false otherwise.
*/
public static Boolean isIcebergCompatEnabled(Metadata metadata) {
return TableConfig.ICEBERG_COMPAT_V2_ENABLED.fromMetadata(metadata)
|| TableConfig.ICEBERG_COMPAT_V3_ENABLED.fromMetadata(metadata);
}

/////////////////////////////////////////////////////////////////////////////////
/// Interfaces for defining checks for the compat validation and updating ///
/////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -174,6 +188,10 @@ Optional<Metadata> validateAndUpdate(
|| ColumnMapping.ColumnMappingMode.ID == value,
ColumnMapping.ColumnMappingMode.NAME.value);

protected static final IcebergCompatRequiredTablePropertyEnforcer ROW_TRACKING_ENABLED =
new IcebergCompatRequiredTablePropertyEnforcer<>(
TableConfig.ROW_TRACKING_ENABLED, (value) -> value, "true");

/**
* Defines checks for compatibility with the targeted iceberg features (icebergCompatV1 or
* icebergCompatV2 etc.)
Expand Down Expand Up @@ -377,7 +395,7 @@ Optional<Metadata> validateAndUpdateMetadata(IcebergCompatInputContext inputCont
}
}

// check for IcebergV2 compatibility checks
// check for Iceberg compatibility checks
for (IcebergCompatCheck icebergCompatCheck : icebergCompatChecks()) {
icebergCompatCheck.check(inputContext);
}
Expand All @@ -394,4 +412,50 @@ Optional<Metadata> validateAndUpdateMetadata(IcebergCompatInputContext inputCont
abstract List<TableFeature> requiredDependencyTableFeatures();

abstract List<IcebergCompatCheck> icebergCompatChecks();

/////////////////////////////
/// Helper function ///
/////////////////////////////

/**
* Validate the given {@link DataFileStatus} that is being added as a {@code add} action to Delta
* Log. Currently, it checks that the statistics are not empty.
*
* @param dataFileStatus The {@link DataFileStatus} to validate.
* @param compatFeatureName The name of the compatibility feature being validated (e.g.
* "icebergCompatV2").
*/
protected static void validateDataFileStatus(
DataFileStatus dataFileStatus, String compatFeatureName) {
if (!dataFileStatus.getStatistics().isPresent()) {
// presence of stats means always has a non-null `numRecords`
throw DeltaErrors.icebergCompatMissingNumRecordsStats(compatFeatureName, dataFileStatus);
}
}

/**
* Block the Iceberg Compat config related changes that we do not support and for which we throw
* an {@link KernelException},
*
* <ul>
* <li>Disabling on an existing table (true to false)
* <li>Enabling on an existing table (false to true)
* </ul>
*/
protected static void blockConfigChangeOnExistingTable(
TableConfig<Boolean> tableConfig,
Map<String, String> oldConfig,
Map<String, String> newConfig,
boolean isNewTable) {
if (!isNewTable) {
boolean wasEnabled = tableConfig.fromMetadata(oldConfig);
boolean isEnabled = tableConfig.fromMetadata(newConfig);
if (!wasEnabled && isEnabled) {
throw DeltaErrors.enablingIcebergCompatFeatureOnExistingTable(tableConfig.getKey());
}
if (wasEnabled && !isEnabled) {
throw DeltaErrors.disablingIcebergCompatFeatureOnExistingTable(tableConfig.getKey());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;

import io.delta.kernel.internal.DeltaErrors;
import io.delta.kernel.internal.TableConfig;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand Down Expand Up @@ -56,11 +55,7 @@ public static Optional<Metadata> validateAndUpdateIcebergCompatV2Metadata(
* @param dataFileStatus The {@link DataFileStatus} to validate.
*/
public static void validateDataFileStatus(DataFileStatus dataFileStatus) {
if (!dataFileStatus.getStatistics().isPresent()) {
// presence of stats means always has a non-null `numRecords`
throw DeltaErrors.icebergCompatMissingNumRecordsStats(
INSTANCE.compatFeatureName(), dataFileStatus);
}
validateDataFileStatus(dataFileStatus, INSTANCE.compatFeatureName());
}

/// //////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.icebergcompat;

import static io.delta.kernel.internal.tablefeatures.TableFeatures.*;
import static java.util.stream.Collectors.toList;

import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.internal.TableConfig;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.tablefeatures.TableFeature;
import io.delta.kernel.utils.DataFileStatus;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

/** Utility methods for validation and compatibility checks for Iceberg V3. */
public class IcebergCompatV3MetadataValidatorAndUpdater
extends IcebergCompatMetadataValidatorAndUpdater {

/**
* Validates that any change to property {@link TableConfig#ICEBERG_COMPAT_V3_ENABLED} is valid.
* Currently, the changes we support are
*
* <ul>
* <li>No change in enablement (true to true or false to false)
* <li>Enabling but only on a new table (false to true)
* </ul>
*
* The changes that we do not support and for which we throw an {@link KernelException} are
*
* <ul>
* <li>Disabling on an existing table (true to false)
* <li>Enabling on an existing table (false to true)
* </ul>
*/
public static void validateIcebergCompatV3Change(
Map<String, String> oldConfig, Map<String, String> newConfig, boolean isNewTable) {
blockConfigChangeOnExistingTable(
TableConfig.ICEBERG_COMPAT_V3_ENABLED, oldConfig, newConfig, isNewTable);
}

/**
* Validate and update the given Iceberg V3 metadata.
*
* @param newMetadata Metadata after the current updates
* @param newProtocol Protocol after the current updates
* @return The updated metadata if the metadata is valid and updated, otherwise empty.
* @throws UnsupportedOperationException if the metadata is not compatible with Iceberg V3
* requirements
*/
public static Optional<Metadata> validateAndUpdateIcebergCompatV3Metadata(
boolean isCreatingNewTable, Metadata newMetadata, Protocol newProtocol) {
return INSTANCE.validateAndUpdateMetadata(
new IcebergCompatInputContext(
INSTANCE.compatFeatureName(), isCreatingNewTable, newMetadata, newProtocol));
}

/**
* Validate the given {@link DataFileStatus} that is being added as a {@code add} action to Delta
* Log. Currently, it checks that the statistics are not empty.
*
* @param dataFileStatus The {@link DataFileStatus} to validate.
*/
public static void validateDataFileStatus(DataFileStatus dataFileStatus) {
validateDataFileStatus(dataFileStatus, INSTANCE.compatFeatureName());
}

/// //////////////////////////////////////////////////////////////////////////////
/// Define the compatibility and update checks for icebergCompatV3 ///
/// //////////////////////////////////////////////////////////////////////////////

private static final IcebergCompatV3MetadataValidatorAndUpdater INSTANCE =
new IcebergCompatV3MetadataValidatorAndUpdater();

@Override
String compatFeatureName() {
return "icebergCompatV3";
}

@Override
TableConfig<Boolean> requiredDeltaTableProperty() {
return TableConfig.ICEBERG_COMPAT_V3_ENABLED;
}

@Override
List<IcebergCompatRequiredTablePropertyEnforcer> requiredDeltaTableProperties() {
return Stream.of(COLUMN_MAPPING_REQUIREMENT, ROW_TRACKING_ENABLED).collect(toList());
}

@Override
List<TableFeature> requiredDependencyTableFeatures() {
return Stream.of(ICEBERG_COMPAT_V3_W_FEATURE, COLUMN_MAPPING_RW_FEATURE, ROW_TRACKING_W_FEATURE)
.collect(toList());
}

@Override
List<IcebergCompatCheck> icebergCompatChecks() {
return Stream.of(
V3_CHECK_HAS_SUPPORTED_TYPES,
CHECK_ONLY_ICEBERG_COMPAT_V3_ENABLED,
CHECK_HAS_ALLOWED_PARTITION_TYPES,
CHECK_HAS_NO_PARTITION_EVOLUTION,
CHECK_HAS_SUPPORTED_TYPE_WIDENING)
.collect(toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,6 @@ abstract class IcebergWriterCompatMetadataValidatorAndUpdater
/// Interfaces for defining validations and updates necessary to support IcebergWriterCompats
// ///
/////////////////////////////////////////////////////////////////////////////////
public static void validateIcebergWriterCompatChange(
Map<String, String> oldConfig,
Map<String, String> newConfig,
boolean isNewTable,
TableConfig<Boolean> writerCompatProperty) {
if (!isNewTable) {
boolean wasEnabled = writerCompatProperty.fromMetadata(oldConfig);
boolean isEnabled = writerCompatProperty.fromMetadata(newConfig);
if (!wasEnabled && isEnabled) {
throw DeltaErrors.enablingIcebergWriterCompatV1OnExistingTable(
writerCompatProperty.getKey());
}
if (wasEnabled && !isEnabled) {
throw DeltaErrors.disablingIcebergWriterCompatV1OnExistingTable(
writerCompatProperty.getKey());
}
}
}

/**
* Common property enforcer for Column Mapping ID mode requirement. This is identical across all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public class IcebergWriterCompatV1MetadataValidatorAndUpdater
*/
public static void validateIcebergWriterCompatV1Change(
Map<String, String> oldConfig, Map<String, String> newConfig, boolean isNewTable) {
validateIcebergWriterCompatChange(
oldConfig, newConfig, isNewTable, TableConfig.ICEBERG_WRITER_COMPAT_V1_ENABLED);
blockConfigChangeOnExistingTable(
TableConfig.ICEBERG_WRITER_COMPAT_V1_ENABLED, oldConfig, newConfig, isNewTable);
}

/**
Expand Down
Loading
Loading