Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1f6001f
init
KaiqiJinWow May 20, 2025
3283154
update
KaiqiJinWow May 20, 2025
074b131
update
KaiqiJinWow May 21, 2025
053c23c
Merge branch 'master' into stack/add_iceberg_compatV3
KaiqiJinWow Jun 9, 2025
7e10891
update
KaiqiJinWow Jun 9, 2025
a343b1b
move the data allow list
KaiqiJinWow Jun 9, 2025
0233915
add etsts
KaiqiJinWow Jun 10, 2025
afd5b86
update
KaiqiJinWow Jun 10, 2025
0a01a32
Merge branch 'master' into stack/add_iceberg_compatV3
KaiqiJinWow Jun 10, 2025
029044f
update
KaiqiJinWow Jun 10, 2025
2dad169
refactor
KaiqiJinWow Jun 10, 2025
8a858e8
Merge branch 'stack/refactor_iceberg_compat_check' into stack/add_ice…
KaiqiJinWow Jun 10, 2025
f9cb9ca
update
KaiqiJinWow Jun 10, 2025
35427a4
Merge branch 'master' into stack/add_iceberg_compatV3
KaiqiJinWow Jun 11, 2025
b366ec6
update
KaiqiJinWow Jun 11, 2025
74f1e05
update
KaiqiJinWow Jun 11, 2025
6e76076
Merge branch 'master' into stack/add_iceberg_compatV3
KaiqiJinWow Jun 12, 2025
a681913
refactor test
KaiqiJinWow Jun 12, 2025
158d04b
trigger CIn
KaiqiJinWow Jun 12, 2025
27e79fa
move test
KaiqiJinWow Jun 12, 2025
4688720
add e2e tests
KaiqiJinWow Jun 12, 2025
0514737
small refactor
KaiqiJinWow Jun 12, 2025
36d5736
address comment
KaiqiJinWow Jun 13, 2025
b6bc89c
address comment
KaiqiJinWow Jun 13, 2025
84299dd
address comment
KaiqiJinWow Jun 16, 2025
a80d79a
Merge branch 'master' into stack/add_iceberg_compatV3
KaiqiJinWow Jun 16, 2025
f03342c
remove redudent func
KaiqiJinWow Jun 16, 2025
2a93f08
add replace table test cases
KaiqiJinWow Jun 16, 2025
6966bfe
add more tests
KaiqiJinWow Jun 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater;
import io.delta.kernel.statistics.DataFileStatistics;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.*;
Expand Down Expand Up @@ -245,13 +246,18 @@ static CloseableIterator<Row> generateAppendActions(
"DataWriteContext is not created by the `Transaction.getWriteContext()`");

boolean isIcebergCompatV2Enabled = isIcebergCompatV2Enabled(transactionState);
boolean isIcebergCompatV3Enabled = isIcebergCompatV3Enabled(transactionState);

URI tableRoot = new Path(getTablePath(transactionState)).toUri();
StructType physicalSchema = TransactionStateRow.getPhysicalSchema(transactionState);
return fileStatusIter.map(
dataFileStatus -> {
if (isIcebergCompatV2Enabled) {
IcebergCompatV2MetadataValidatorAndUpdater.validateDataFileStatus(dataFileStatus);
} else if (isIcebergCompatV3Enabled) {
IcebergCompatV3MetadataValidatorAndUpdater.validateDataFileStatus(dataFileStatus);
}
Comment on lines 256 to 260
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use isIcebergCompatEnabled and just call IcebergCompatMetadataValidatorAndUpdater.validateDataFileStatus

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would push back this as IcebergCompatMetadataValidatorAndUpdater.validateDataFileStatus also need a String compatFeatureName to better expose the error message, so it would not reduce the code imo.


AddFile addFileRow =
AddFile.convertDataFileStatus(
physicalSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,23 @@ public class TableConfig<T> {
"needs to be a boolean.",
true);

/**
* Table property that enables modifying the table in accordance with the Delta-Iceberg
* Compatibility V3 protocol.
*
* @see <a
* href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-iceberg-compatibility-v3">
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this link doesn't exists?

* Delta-Iceberg Compatibility V3 Protocol</a>
*/
public static final TableConfig<Boolean> ICEBERG_COMPAT_V3_ENABLED =
new TableConfig<>(
"delta.enableIcebergCompatV3",
"false",
Boolean::valueOf,
value -> true,
"needs to be a boolean.",
true);

/**
* The number of columns to collect stats on for data skipping. A value of -1 means collecting
* stats for all columns.
Expand Down Expand Up @@ -357,6 +374,7 @@ public static class UniversalFormats {
addConfig(this, IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP);
addConfig(this, COLUMN_MAPPING_MODE);
addConfig(this, ICEBERG_COMPAT_V2_ENABLED);
addConfig(this, ICEBERG_COMPAT_V3_ENABLED);
addConfig(this, ICEBERG_WRITER_COMPAT_V1_ENABLED);
addConfig(this, COLUMN_MAPPING_MAX_COLUMN_ID);
addConfig(this, DATA_SKIPPING_NUM_INDEXED_COLS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.delta.kernel.internal.clustering.ClusteringUtils;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergUniversalFormatMetadataValidatorAndUpdater;
import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV1MetadataValidatorAndUpdater;
import io.delta.kernel.internal.lang.Lazy;
Expand Down Expand Up @@ -463,12 +464,19 @@ protected Tuple2<Optional<Protocol>, Optional<Metadata>> validateAndUpdateProtoc
newMetadata = icebergWriterCompatV1;
}

// TODO: refactor this method to use a single validator and updater.
Optional<Metadata> icebergCompatV2Metadata =
IcebergCompatV2MetadataValidatorAndUpdater.validateAndUpdateIcebergCompatV2Metadata(
isCreateOrReplace, newMetadata.orElse(baseMetadata), newProtocol.orElse(baseProtocol));
if (icebergCompatV2Metadata.isPresent()) {
newMetadata = icebergCompatV2Metadata;
}
Optional<Metadata> icebergCompatV3Metadata =
IcebergCompatV3MetadataValidatorAndUpdater.validateAndUpdateIcebergCompatV3Metadata(
isCreateOrReplace, newMetadata.orElse(baseMetadata), newProtocol.orElse(baseProtocol));
if (icebergCompatV3Metadata.isPresent()) {
newMetadata = icebergCompatV3Metadata;
}

/* ----- 4: Update the METADATA with column mapping info if applicable ----- */
// We update the column mapping info here after all configuration changes are finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ public static boolean isIcebergCompatV2Enabled(Row transactionState) {
.getOrDefault(TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey(), "false"));
}

/**
* Get the iceberg compatibility enabled or not from the transaction state {@link Row} returned by
* {@link Transaction#getTransactionState(Engine)}
*
* @param transactionState Transaction state state {@link Row}
* @return True if iceberg compatibility is enabled, false otherwise.
*/
public static boolean isIcebergCompatV3Enabled(Row transactionState) {
return Boolean.parseBoolean(
getConfiguration(transactionState)
.getOrDefault(TableConfig.ICEBERG_COMPAT_V3_ENABLED.getKey(), "false"));
}

/**
* Get the column mapping mode from the transaction state {@link Row} returned by {@link
* Transaction#getTransactionState(Engine)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.tablefeatures.TableFeature;
import io.delta.kernel.types.*;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
Expand Down Expand Up @@ -155,6 +156,45 @@ interface IcebergCompatCheck {
void check(IcebergCompatInputContext inputContext);
}

protected static boolean isSupportedDataTypesForV2(DataType dataType) {
return dataType instanceof ByteType
|| dataType instanceof ShortType
|| dataType instanceof IntegerType
|| dataType instanceof LongType
|| dataType instanceof FloatType
|| dataType instanceof DoubleType
|| dataType instanceof DecimalType
|| dataType instanceof StringType
|| dataType instanceof BinaryType
|| dataType instanceof BooleanType
|| dataType instanceof DateType
|| dataType instanceof TimestampType
|| dataType instanceof TimestampNTZType
|| dataType instanceof ArrayType
|| dataType instanceof MapType
|| dataType instanceof StructType;
}

protected static boolean isAdditionalSupportedDataTypesForV3(DataType dataType) {
return dataType instanceof VariantType;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check isSupportedDataTypesForV2 and the new type supported?

}

protected static boolean isAllowedPartitionType(DataType dataType) {
return dataType instanceof ByteType
|| dataType instanceof ShortType
|| dataType instanceof IntegerType
|| dataType instanceof LongType
|| dataType instanceof FloatType
|| dataType instanceof DoubleType
|| dataType instanceof DecimalType
|| dataType instanceof StringType
|| dataType instanceof BinaryType
|| dataType instanceof BooleanType
|| dataType instanceof DateType
|| dataType instanceof TimestampType
|| dataType instanceof TimestampNTZType;
}

/////////////////////////////////////////////////////////////////////////////////
/// Implementation of {@link IcebergCompatMetadataValidatorAndUpdater} ///
/////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,7 @@ public static void validateDataFileStatus(DataFileStatus dataFileStatus) {
/* stopOnFirstMatch = */ false,
field -> {
DataType dataType = field.getDataType();
return !(dataType instanceof ByteType
|| dataType instanceof ShortType
|| dataType instanceof IntegerType
|| dataType instanceof LongType
|| dataType instanceof FloatType
|| dataType instanceof DoubleType
|| dataType instanceof DecimalType
|| dataType instanceof StringType
|| dataType instanceof BinaryType
|| dataType instanceof BooleanType
|| dataType instanceof DateType
|| dataType instanceof TimestampType
|| dataType instanceof TimestampNTZType
|| dataType instanceof ArrayType
|| dataType instanceof MapType
|| dataType instanceof StructType);
return !isSupportedDataTypesForV2(dataType);
});

if (!matches.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.icebergcompat;

import static io.delta.kernel.internal.tablefeatures.TableFeatures.*;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;

import io.delta.kernel.internal.DeltaErrors;
import io.delta.kernel.internal.TableConfig;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.tablefeatures.TableFeature;
import io.delta.kernel.internal.types.TypeWideningChecker;
import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode;
import io.delta.kernel.internal.util.SchemaIterable;
import io.delta.kernel.internal.util.SchemaUtils;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.DataFileStatus;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

/** Utility methods for validation and compatibility checks for Iceberg V3. */
public class IcebergCompatV3MetadataValidatorAndUpdater
extends IcebergCompatMetadataValidatorAndUpdater {
/**
* Validate and update the given Iceberg V3 metadata.
*
* @param newMetadata Metadata after the current updates
* @param newProtocol Protocol after the current updates
* @return The updated metadata if the metadata is valid and updated, otherwise empty.
* @throws UnsupportedOperationException if the metadata is not compatible with Iceberg V3
* requirements
*/
public static Optional<Metadata> validateAndUpdateIcebergCompatV3Metadata(
boolean isCreatingNewTable, Metadata newMetadata, Protocol newProtocol) {
return INSTANCE.validateAndUpdateMetadata(
new IcebergCompatInputContext(isCreatingNewTable, newMetadata, newProtocol));
}

/**
* Validate the given {@link DataFileStatus} that is being added as a {@code add} action to Delta
* Log. Currently, it checks that the statistics are not empty.
*
* @param dataFileStatus The {@link DataFileStatus} to validate.
*/
public static void validateDataFileStatus(DataFileStatus dataFileStatus) {
if (!dataFileStatus.getStatistics().isPresent()) {
// presence of stats means always has a non-null `numRecords`
throw DeltaErrors.icebergCompatMissingNumRecordsStats(
INSTANCE.compatFeatureName(), dataFileStatus);
}
}

/// //////////////////////////////////////////////////////////////////////////////
/// Define the compatibility and update checks for icebergCompatV3 ///
/// //////////////////////////////////////////////////////////////////////////////

private static final IcebergCompatV3MetadataValidatorAndUpdater INSTANCE =
new IcebergCompatV3MetadataValidatorAndUpdater();

private static final IcebergCompatRequiredTablePropertyEnforcer ICEBERG_COMPAT_V3_CM_REQUIREMENT =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a follow up task to define this check in the base class and reuse it. You can post that as the first PR precuros to this PR.

new IcebergCompatRequiredTablePropertyEnforcer<>(
TableConfig.COLUMN_MAPPING_MODE,
(value) -> ColumnMappingMode.NAME == value || ColumnMappingMode.ID == value,
ColumnMappingMode.NAME.value);

private static final IcebergCompatCheck ICEBERG_COMPAT_V3_CHECK_NO_LOWER_COMPAT_ENABLED =
(inputContext) -> {
List<String> incompatibleProperties =
Arrays.asList("delta.enableIcebergCompatV1", "delta.enableIcebergCompatV2");

incompatibleProperties.forEach(
prop -> {
if (Boolean.parseBoolean(
inputContext.newMetadata.getConfiguration().getOrDefault(prop, "false"))) {
throw DeltaErrors.icebergCompatIncompatibleVersionEnabled(
INSTANCE.compatFeatureName(), prop);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check if you use the utility in TableConfig.PROP.fromMetadata here?

});
};

private static final IcebergCompatCheck ICEBERG_COMPAT_V3_CHECK_HAS_SUPPORTED_TYPES =
(inputContext) -> {
List<Tuple2<List<String>, StructField>> matches =
SchemaUtils.filterRecursively(
inputContext.newMetadata.getSchema(),
/* recurseIntoMapAndArrayTypes= */ true,
/* stopOnFirstMatch = */ false,
field -> {
DataType dataType = field.getDataType();
// IcebergCompatV3 supports variants and all the IcebergCompatV2 supported types
return !isSupportedDataTypesForV2(dataType)
&& !isAdditionalSupportedDataTypesForV3(dataType);
});

if (!matches.isEmpty()) {
throw DeltaErrors.icebergCompatUnsupportedTypeColumns(
INSTANCE.compatFeatureName(),
matches.stream().map(tuple -> tuple._2.getDataType()).collect(toList()));
}
};

private static final IcebergCompatCheck ICEBERG_COMPAT_V3_CHECK_HAS_ALLOWED_PARTITION_TYPES =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment here. create common check in the base class.

(inputContext) ->
inputContext
.newMetadata
.getPartitionColNames()
.forEach(
partitionCol -> {
int partitionFieldIndex =
inputContext.newMetadata.getSchema().indexOf(partitionCol);
checkArgument(
partitionFieldIndex != -1,
"Partition column %s not found in the schema",
partitionCol);
DataType dataType =
inputContext.newMetadata.getSchema().at(partitionFieldIndex).getDataType();
if (!isAllowedPartitionType(dataType)) {
throw DeltaErrors.icebergCompatUnsupportedTypePartitionColumn(
INSTANCE.compatFeatureName(), dataType);
}
});

private static final IcebergCompatCheck ICEBERG_COMPAT_V3_CHECK_HAS_NO_PARTITION_EVOLUTION =
(inputContext) -> {
// TODO: Kernel doesn't support replace table yet. When it is supported, extend
// this to allow checking the partition columns aren't changed
};

private static final IcebergCompatCheck ICEBERG_COMPAT_V3_CHECK_HAS_SUPPORTED_TYPE_WIDENING =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

(inputContext) -> {
Protocol protocol = inputContext.newProtocol;
if (!protocol.supportsFeature(TYPE_WIDENING_RW_FEATURE)
&& !protocol.supportsFeature(TYPE_WIDENING_RW_PREVIEW_FEATURE)) {
return;
}
for (SchemaIterable.SchemaElement element :
new SchemaIterable(inputContext.newMetadata.getSchema())) {
for (TypeChange typeChange : element.getField().getTypeChanges()) {
if (!TypeWideningChecker.isIcebergV2Compatible(
typeChange.getFrom(), typeChange.getTo())) {
throw DeltaErrors.icebergCompatUnsupportedTypeWidening(
INSTANCE.compatFeatureName(), typeChange);
}
}
}
};

@Override
String compatFeatureName() {
return "icebergCompatV3";
}

@Override
TableConfig<Boolean> requiredDeltaTableProperty() {
return TableConfig.ICEBERG_COMPAT_V3_ENABLED;
}

@Override
List<IcebergCompatRequiredTablePropertyEnforcer> requiredDeltaTableProperties() {
return singletonList(ICEBERG_COMPAT_V3_CM_REQUIREMENT);
}

@Override
List<TableFeature> requiredDependencyTableFeatures() {
return Stream.of(ICEBERG_COMPAT_V3_W_FEATURE, COLUMN_MAPPING_RW_FEATURE, ROW_TRACKING_W_FEATURE)
.collect(toList());
}

@Override
List<IcebergCompatCheck> icebergCompatChecks() {
return Stream.of(
ICEBERG_COMPAT_V3_CHECK_NO_LOWER_COMPAT_ENABLED,
ICEBERG_COMPAT_V3_CHECK_HAS_SUPPORTED_TYPES,
ICEBERG_COMPAT_V3_CHECK_HAS_ALLOWED_PARTITION_TYPES,
ICEBERG_COMPAT_V3_CHECK_HAS_NO_PARTITION_EVOLUTION,
ICEBERG_COMPAT_V3_CHECK_HAS_SUPPORTED_TYPE_WIDENING)
.collect(toList());
}
}
Loading
Loading