-
Couldn't load subscription status.
- Fork 1.9k
[Kernel] Introduce IcebergCompatV3 #4595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
1f6001f
3283154
074b131
053c23c
7e10891
a343b1b
0233915
afd5b86
0a01a32
029044f
2dad169
8a858e8
f9cb9ca
35427a4
b366ec6
74f1e05
6e76076
a681913
158d04b
27e79fa
4688720
0514737
36d5736
b6bc89c
84299dd
a80d79a
f03342c
2a93f08
6966bfe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,202 @@ | ||
| /* | ||
| * Copyright (2023) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.delta.kernel.internal.icebergcompat; | ||
|
|
||
| import static io.delta.kernel.internal.tablefeatures.TableFeatures.*; | ||
| import static io.delta.kernel.internal.util.Preconditions.checkArgument; | ||
| import static java.util.Collections.singletonList; | ||
| import static java.util.stream.Collectors.toList; | ||
|
|
||
| import io.delta.kernel.internal.DeltaErrors; | ||
| import io.delta.kernel.internal.TableConfig; | ||
| import io.delta.kernel.internal.actions.Metadata; | ||
| import io.delta.kernel.internal.actions.Protocol; | ||
| import io.delta.kernel.internal.tablefeatures.TableFeature; | ||
| import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode; | ||
| import io.delta.kernel.internal.util.SchemaUtils; | ||
| import io.delta.kernel.internal.util.Tuple2; | ||
| import io.delta.kernel.types.*; | ||
| import io.delta.kernel.utils.DataFileStatus; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.stream.Stream; | ||
|
|
||
| /** Utility methods for validation and compatibility checks for Iceberg V3. */ | ||
| public class IcebergCompatV3MetadataValidatorAndUpdater | ||
KaiqiJinWow marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| extends IcebergCompatMetadataValidatorAndUpdater { | ||
| /** | ||
| * Validate and update the given Iceberg V3 metadata. | ||
| * | ||
| * @param newMetadata Metadata after the current updates | ||
| * @param newProtocol Protocol after the current updates | ||
| * @return The updated metadata if the metadata is valid and updated, otherwise empty. | ||
| * @throws UnsupportedOperationException if the metadata is not compatible with Iceberg V3 | ||
| * requirements | ||
| */ | ||
| public static Optional<Metadata> validateAndUpdateIcebergCompatV3Metadata( | ||
| boolean isCreatingNewTable, Metadata newMetadata, Protocol newProtocol) { | ||
| return INSTANCE.validateAndUpdateMetadata( | ||
| new IcebergCompatInputContext(isCreatingNewTable, newMetadata, newProtocol)); | ||
| } | ||
|
|
||
| /** | ||
| * Validate the given {@link DataFileStatus} that is being added as a {@code add} action to Delta | ||
| * Log. Currently, it checks that the statistics are not empty. | ||
| * | ||
| * @param dataFileStatus The {@link DataFileStatus} to validate. | ||
| */ | ||
| public static void validateDataFileStatus(DataFileStatus dataFileStatus) { | ||
KaiqiJinWow marked this conversation as resolved.
Show resolved
Hide resolved
KaiqiJinWow marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (!dataFileStatus.getStatistics().isPresent()) { | ||
| // presence of stats means always has a non-null `numRecords` | ||
| throw DeltaErrors.icebergCompatMissingNumRecordsStats( | ||
| INSTANCE.compatFeatureName(), dataFileStatus); | ||
| } | ||
| } | ||
|
|
||
| /// ////////////////////////////////////////////////////////////////////////////// | ||
| /// Define the compatibility and update checks for icebergCompatV3 /// | ||
| /// ////////////////////////////////////////////////////////////////////////////// | ||
|
|
||
| private static final IcebergCompatV3MetadataValidatorAndUpdater INSTANCE = | ||
| new IcebergCompatV3MetadataValidatorAndUpdater(); | ||
|
|
||
| private static final IcebergCompatRequiredTablePropertyEnforcer ICEBERG_COMPAT_V3_CM_REQUIREMENT = | ||
|
||
| new IcebergCompatRequiredTablePropertyEnforcer<>( | ||
| TableConfig.COLUMN_MAPPING_MODE, | ||
| (value) -> ColumnMappingMode.NAME == value || ColumnMappingMode.ID == value, | ||
| ColumnMappingMode.NAME.value); | ||
|
|
||
| private static final IcebergCompatCheck ICEBERG_COMPAT_V3_CHECK_NO_LOWER_COMPAT_ENABLED = | ||
| (inputContext) -> { | ||
| List<String> incompatibleProperties = | ||
| Arrays.asList("delta.enableIcebergCompatV1", "delta.enableIcebergCompatV2"); | ||
|
|
||
| incompatibleProperties.forEach( | ||
| prop -> { | ||
| if (Boolean.parseBoolean( | ||
| inputContext.newMetadata.getConfiguration().getOrDefault(prop, "false"))) { | ||
| throw DeltaErrors.icebergCompatIncompatibleVersionEnabled( | ||
| INSTANCE.compatFeatureName(), prop); | ||
| } | ||
|
||
| }); | ||
| }; | ||
|
|
||
| private static final IcebergCompatCheck ICEBERG_COMPAT_V3_CHECK_HAS_SUPPORTED_TYPES = | ||
| (inputContext) -> { | ||
| List<Tuple2<List<String>, StructField>> matches = | ||
| SchemaUtils.filterRecursively( | ||
| inputContext.newMetadata.getSchema(), | ||
| /* recurseIntoMapAndArrayTypes= */ true, | ||
| /* stopOnFirstMatch = */ false, | ||
| field -> { | ||
| DataType dataType = field.getDataType(); | ||
| return !(dataType instanceof ByteType | ||
| || dataType instanceof ShortType | ||
| || dataType instanceof IntegerType | ||
| || dataType instanceof LongType | ||
| || dataType instanceof FloatType | ||
| || dataType instanceof DoubleType | ||
| || dataType instanceof DecimalType | ||
| || dataType instanceof StringType | ||
| || dataType instanceof BinaryType | ||
| || dataType instanceof BooleanType | ||
| || dataType instanceof DateType | ||
| || dataType instanceof TimestampType | ||
| || dataType instanceof TimestampNTZType | ||
| || dataType instanceof ArrayType | ||
| || dataType instanceof MapType | ||
| || dataType instanceof StructType); | ||
KaiqiJinWow marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| }); | ||
|
|
||
| if (!matches.isEmpty()) { | ||
| throw DeltaErrors.icebergCompatUnsupportedTypeColumns( | ||
| INSTANCE.compatFeatureName(), | ||
| matches.stream().map(tuple -> tuple._2.getDataType()).collect(toList())); | ||
| } | ||
| }; | ||
|
|
||
| private static final IcebergCompatCheck ICEBERG_COMPAT_V3_CHECK_HAS_ALLOWED_PARTITION_TYPES = | ||
|
||
| (inputContext) -> | ||
| inputContext | ||
| .newMetadata | ||
| .getPartitionColNames() | ||
| .forEach( | ||
| partitionCol -> { | ||
| int partitionFieldIndex = | ||
| inputContext.newMetadata.getSchema().indexOf(partitionCol); | ||
| checkArgument( | ||
| partitionFieldIndex != -1, | ||
| "Partition column %s not found in the schema", | ||
| partitionCol); | ||
| DataType dataType = | ||
| inputContext.newMetadata.getSchema().at(partitionFieldIndex).getDataType(); | ||
| boolean validType = | ||
| dataType instanceof ByteType | ||
| || dataType instanceof ShortType | ||
| || dataType instanceof IntegerType | ||
| || dataType instanceof LongType | ||
| || dataType instanceof FloatType | ||
| || dataType instanceof DoubleType | ||
| || dataType instanceof DecimalType | ||
| || dataType instanceof StringType | ||
| || dataType instanceof BinaryType | ||
| || dataType instanceof BooleanType | ||
| || dataType instanceof DateType | ||
| || dataType instanceof TimestampType | ||
| || dataType instanceof TimestampNTZType; | ||
| if (!validType) { | ||
| throw DeltaErrors.icebergCompatUnsupportedTypePartitionColumn( | ||
| INSTANCE.compatFeatureName(), dataType); | ||
| } | ||
| }); | ||
|
|
||
| private static final IcebergCompatCheck ICEBERG_COMPAT_V3_CHECK_HAS_NO_PARTITION_EVOLUTION = | ||
| (inputContext) -> { | ||
| // TODO: Kernel doesn't support replace table yet. When it is supported, extend | ||
| // this to allow checking the partition columns aren't changed | ||
| }; | ||
|
|
||
| @Override | ||
| String compatFeatureName() { | ||
| return "icebergCompatV3"; | ||
| } | ||
|
|
||
| @Override | ||
| TableConfig<Boolean> requiredDeltaTableProperty() { | ||
| return TableConfig.ICEBERG_COMPAT_V3_ENABLED; | ||
| } | ||
|
|
||
| @Override | ||
| List<IcebergCompatRequiredTablePropertyEnforcer> requiredDeltaTableProperties() { | ||
| return singletonList(ICEBERG_COMPAT_V3_CM_REQUIREMENT); | ||
| } | ||
|
|
||
| @Override | ||
| List<TableFeature> requiredDependencyTableFeatures() { | ||
| return Stream.of(ICEBERG_COMPAT_V3_W_FEATURE, COLUMN_MAPPING_RW_FEATURE).collect(toList()); | ||
|
||
| } | ||
|
|
||
| @Override | ||
| List<IcebergCompatCheck> icebergCompatChecks() { | ||
| return Stream.of( | ||
| ICEBERG_COMPAT_V3_CHECK_NO_LOWER_COMPAT_ENABLED, | ||
| ICEBERG_COMPAT_V3_CHECK_HAS_SUPPORTED_TYPES, | ||
| ICEBERG_COMPAT_V3_CHECK_HAS_ALLOWED_PARTITION_TYPES, | ||
| ICEBERG_COMPAT_V3_CHECK_HAS_NO_PARTITION_EVOLUTION) | ||
| .collect(toList()); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this link doesn't exists?