Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ static CloseableIterator<FilteredColumnarBatch> transformLogicalData(
}

ColumnarBatch data = filteredBatch.getData();
if (!data.getSchema().equals(tableSchema)) {
if (!data.getSchema().equivalentIgnoreCollations(tableSchema)) {
throw dataSchemaMismatch(tablePath, tableSchema, data.getSchema());
}
for (String partitionColName : partitionColNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@ private CloseableIterator<FilteredColumnarBatch> applyDataSkipping(
// pruning it after is much simpler
StructType prunedStatsSchema =
DataSkippingUtils.pruneStatsSchema(
getStatsSchema(metadata.getDataSchema()), dataSkippingFilter.getReferencedCols());
getStatsSchema(metadata.getDataSchema(), dataSkippingFilter.getReferencedCollations()),
dataSkippingFilter.getReferencedCols());

// Skipping happens in two steps:
// 1. The predicate produces false for any file whose stats prove we can safely skip it. A
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ public class DataSkippingPredicate extends Predicate {
/** Set of {@link Column}s referenced by the predicate or any of its child expressions */
private final Set<Column> referencedCols;

/**
* Set of {@link CollationIdentifier}s referenced by this predicate or any of its child
* expressions
*/
private final Set<CollationIdentifier> collationIdentifiers;

/**
* @param name the predicate name
* @param children list of expressions that are input to this predicate.
Expand All @@ -42,7 +36,6 @@ public class DataSkippingPredicate extends Predicate {
DataSkippingPredicate(String name, List<Expression> children, Set<Column> referencedCols) {
super(name, children);
this.referencedCols = Collections.unmodifiableSet(referencedCols);
this.collationIdentifiers = Collections.unmodifiableSet(new HashSet<>());
}

/**
Expand All @@ -59,7 +52,6 @@ public class DataSkippingPredicate extends Predicate {
Set<Column> referencedCols) {
super(name, children, collationIdentifier);
this.referencedCols = Collections.unmodifiableSet(referencedCols);
this.collationIdentifiers = Collections.singleton(collationIdentifier);
}

/**
Expand All @@ -73,8 +65,6 @@ public class DataSkippingPredicate extends Predicate {
DataSkippingPredicate(String name, DataSkippingPredicate left, DataSkippingPredicate right) {
super(name, Arrays.asList(left, right));
this.referencedCols = immutableUnion(left.referencedCols, right.referencedCols);
this.collationIdentifiers =
immutableUnion(left.collationIdentifiers, right.collationIdentifiers);
}

/** @return set of columns referenced by this predicate or any of its child expressions */
Expand All @@ -87,7 +77,26 @@ public Set<Column> getReferencedCols() {
* expressions
*/
public Set<CollationIdentifier> getReferencedCollations() {
return collationIdentifiers;
Set<CollationIdentifier> referencedCollations = new HashSet<>();

if (this.getCollationIdentifier().isPresent()) {
referencedCollations.add(this.getCollationIdentifier().get());
}

for (Expression child : children) {
if (child instanceof Predicate) {
if (child instanceof DataSkippingPredicate) {
referencedCollations.addAll(((DataSkippingPredicate) child).getReferencedCollations());
} else {
throw new IllegalStateException(
String.format(
"Expected child Predicate of DataSkippingPredicate to also be a"
+ " DataSkippingPredicate, but found %s",
child.getClass().getName()));
}
}
}
return Collections.unmodifiableSet(referencedCollations);
}

/** @return an unmodifiable set containing all elements from both sets. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public static boolean isSkippingEligibleDataType(DataType dataType, boolean isCo
* |-- a: struct (nullable = true)
* | |-- b: struct (nullable = true)
* | | |-- c: long (nullable = true)
* | | |-- d: string (nullable = true)
* </pre>
*
* <p>Collected Statistics:
Expand All @@ -102,18 +103,32 @@ public static boolean isSkippingEligibleDataType(DataType dataType, boolean isCo
* | | |-- a: struct (nullable = false)
* | | | |-- b: struct (nullable = false)
* | | | | |-- c: long (nullable = true)
* | | | | |-- d: string (nullable = true)
* | |-- maxValues: struct (nullable = false)
* | | |-- a: struct (nullable = false)
* | | | |-- b: struct (nullable = false)
* | | | | |-- c: long (nullable = true)
* | | | | |-- d: string (nullable = true)
* | |-- nullCount: struct (nullable = false)
* | | |-- a: struct (nullable = false)
* | | | |-- b: struct (nullable = false)
* | | | | |-- c: long (nullable = true)
* | | | | |-- d: string (nullable = true)
* | |-- statsWithCollation: struct (nullable = true)
* | | |-- collationName: struct (nullable = true)
* | | | |-- min: struct (nullable = false)
* | | | | |-- a: struct (nullable = false)
* | | | | | |-- b: struct (nullable = false)
* | | | | | | |-- d: string (nullable = true)
* | | | |-- max: struct (nullable = false)
* | | | | |-- a: struct (nullable = false)
* | | | | | |-- b: struct (nullable = false)
* | | | | | | |-- d: string (nullable = true)
* | |-- tightBounds: boolean (nullable = true)
* </pre>
*/
public static StructType getStatsSchema(StructType dataSchema) {
public static StructType getStatsSchema(
StructType dataSchema, Set<CollationIdentifier> collationIdentifiers) {
StructType statsSchema = new StructType().add(NUM_RECORDS, LongType.LONG, true);

StructType minMaxStatsSchema = getMinMaxStatsSchema(dataSchema);
Expand All @@ -128,6 +143,11 @@ public static StructType getStatsSchema(StructType dataSchema) {

statsSchema = statsSchema.add(TIGHT_BOUNDS, BooleanType.BOOLEAN, true);

StructType collatedMinMaxStatsSchema = getCollatedStatsSchema(dataSchema, collationIdentifiers);
if (collatedMinMaxStatsSchema.length() > 0) {
statsSchema = statsSchema.add(STATS_WITH_COLLATION, collatedMinMaxStatsSchema, true);
}

return statsSchema;
}

Expand Down Expand Up @@ -272,24 +292,60 @@ public boolean isSkippingEligibleNullCountColumn(Column column) {
/**
* Given a data schema returns the expected schema for a min or max statistics column. This means
* 1) replace logical names with physical names 2) set nullable=true 3) only keep stats eligible
* fields (i.e. don't include fields with isSkippingEligibleDataType=false)
* fields (i.e. don't include fields with isSkippingEligibleDataType=false). Collation-aware
* statistics are not included.
*/
private static StructType getMinMaxStatsSchema(StructType dataSchema) {
return getMinMaxStatsSchema(dataSchema, /* isCollatedSkipping */ false);
}

/**
* Given a data schema returns the expected schema for a min or max statistics column. This means
* 1) replace logical names with physical names 2) set nullable=true 3) only keep stats eligible
* fields (i.e. don't include fields with isSkippingEligibleDataType=false). In case when
* isCollatedSkipping is true, only `StringType` fields are eligible.
*/
private static StructType getMinMaxStatsSchema(
StructType dataSchema, boolean isCollatedSkipping) {
List<StructField> fields = new ArrayList<>();
for (StructField field : dataSchema.fields()) {
if (isSkippingEligibleDataType(field.getDataType(), false)) {
if (isSkippingEligibleDataType(field.getDataType(), isCollatedSkipping)) {
fields.add(new StructField(getPhysicalName(field), field.getDataType(), true));
} else if (field.getDataType() instanceof StructType) {
fields.add(
new StructField(
getPhysicalName(field),
getMinMaxStatsSchema((StructType) field.getDataType()),
getMinMaxStatsSchema((StructType) field.getDataType(), isCollatedSkipping),
true));
}
}
return new StructType(fields);
}

/**
* Given a data schema and a set of collation identifiers returns the expected schema for
* collation-aware statistics columns. This means 1) replace logical names with physical names 2)
* set nullable=true 3) only keep collated-stats eligible fields (`StringType` fields)
*/
private static StructType getCollatedStatsSchema(
StructType dataSchema, Set<CollationIdentifier> collationIdentifiers) {
StructType statsWithCollation = new StructType();
StructType minMaxSchemaForCollationAwareFields =
getMinMaxStatsSchema(dataSchema, /* isCollatedSkipping */ true);
if (minMaxSchemaForCollationAwareFields.length() > 0) {
for (CollationIdentifier collationIdentifier : collationIdentifiers) {
statsWithCollation =
statsWithCollation.add(
collationIdentifier.toString(),
new StructType()
.add(MIN, minMaxSchemaForCollationAwareFields, true)
.add(MAX, minMaxSchemaForCollationAwareFields, true),
true);
}
}
return statsWithCollation;
}

/**
* Given a data schema returns the expected schema for a null_count statistics column. This means
* 1) replace logical names with physical names 2) set nullable=true 3) use LongType for all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ private void validateLiteralType(StructField field, Literal literal) {
// Variant stats in JSON are Z85 encoded strings, all other stats should match the field type
DataType expectedLiteralType =
field.getDataType() instanceof VariantType ? StringType.STRING : field.getDataType();
if (literal.getDataType() == null || !literal.getDataType().equals(expectedLiteralType)) {
if (literal.getDataType() == null
|| !literal.getDataType().equivalentIgnoreCollations(expectedLiteralType)) {
throw DeltaErrors.statsTypeMismatch(
field.getName(), expectedLiteralType, literal.getDataType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@ public boolean equivalent(DataType dataType) {
&& ((ArrayType) dataType).getElementType().equivalent(getElementType());
}

/**
* Are the data types same? The collations could be different.
*
* @param dataType
* @return
*/
@Override
public boolean equivalentIgnoreCollations(DataType dataType) {
if (this == dataType) {
return true;
}
if (dataType == null || getClass() != dataType.getClass()) {
return false;
}
ArrayType arrayType = (ArrayType) dataType;
return (elementField == null && arrayType.elementField == null)
|| (elementField != null
&& elementField.equivalentIgnoreCollations(arrayType.elementField));
}

@Override
public boolean isNested() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ public boolean equivalent(DataType dataType) {
return equals(dataType);
}

/**
* Are the data types same? The collations could be different.
*
* @param dataType
* @return
*/
public boolean equivalentIgnoreCollations(DataType dataType) {
return equals(dataType);
}

/**
* Returns true iff this data is a nested data type (it logically parameterized by other types).
*
Expand Down
21 changes: 21 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/types/MapType.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ public boolean equivalent(DataType dataType) {
&& ((MapType) dataType).isValueContainsNull() == isValueContainsNull();
}

/**
* Are the data types same? The collations could be different.
*
* @param dataType
* @return
*/
@Override
public boolean equivalentIgnoreCollations(DataType dataType) {
if (this == dataType) {
return true;
}
if (dataType == null || getClass() != dataType.getClass()) {
return false;
}
MapType mapType = (MapType) dataType;
return ((keyField == null && mapType.keyField == null)
|| (keyField != null && keyField.equivalentIgnoreCollations(mapType.keyField)))
&& ((valueField == null && mapType.valueField == null)
|| (valueField != null && valueField.equivalentIgnoreCollations(mapType.valueField)));
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public CollationIdentifier getCollationIdentifier() {
return collationIdentifier;
}

@Override
public boolean equivalentIgnoreCollations(DataType dataType) {
return dataType instanceof StringType;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof StringType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,21 @@ public boolean equals(Object o) {
&& Objects.equals(typeChanges, that.typeChanges);
}

/** @return whether the struct fields are equal, ignoring collations */
public boolean equivalentIgnoreCollations(StructField other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
return nullable == other.nullable
&& name.equals(other.name)
&& dataType.equivalentIgnoreCollations(other.dataType)
&& metadata.equals(other.metadata)
&& Objects.equals(typeChanges, other.typeChanges);
}

@Override
public int hashCode() {
return Objects.hash(name, dataType, nullable, metadata, typeChanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,28 @@ public boolean equivalent(DataType dataType) {
.allMatch(result -> result);
}

/**
* Are the data types same? The collations could be different.
*
* @param dataType
* @return
*/
@Override
public boolean equivalentIgnoreCollations(DataType dataType) {
if (this == dataType) {
return true;
}
if (dataType == null || getClass() != dataType.getClass()) {
return false;
}
StructType structType = (StructType) dataType;
return this.length() == structType.length()
&& fieldNames.equals(structType.fieldNames)
&& IntStream.range(0, this.length())
.mapToObj(i -> this.at(i).equivalentIgnoreCollations(structType.at(i)))
.allMatch(result -> result);
}

@Override
public boolean isNested() {
return true;
Expand Down
Loading
Loading