Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ static CloseableIterator<FilteredColumnarBatch> transformLogicalData(
}

ColumnarBatch data = filteredBatch.getData();
if (!data.getSchema().equals(tableSchema)) {
if (!data.getSchema().equivalentIgnoreCollations(tableSchema)) {
throw dataSchemaMismatch(tablePath, tableSchema, data.getSchema());
}
for (String partitionColName : partitionColNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,13 @@ public Set<CollationIdentifier> getReferencedCollations() {
Set<CollationIdentifier> referencedCollations = new HashSet<>();

if (this.getCollationIdentifier().isPresent()) {
referencedCollations = Collections.singleton(this.getCollationIdentifier().get());
referencedCollations.add(this.getCollationIdentifier().get());
}

for (Expression child : children) {
if (child instanceof Predicate) {
if (child instanceof DataSkippingPredicate) {
referencedCollations =
immutableUnion(
referencedCollations, ((DataSkippingPredicate) child).getReferencedCollations());
referencedCollations.addAll(((DataSkippingPredicate) child).getReferencedCollations());
} else {
throw new IllegalStateException(
String.format(
Expand All @@ -98,7 +96,7 @@ public Set<CollationIdentifier> getReferencedCollations() {
}
}
}
return referencedCollations;
return Collections.unmodifiableSet(referencedCollations);
}

/** @return an unmodifiable set containing all elements from both sets. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public static boolean isSkippingEligibleDataType(DataType dataType, boolean isCo
* |-- a: struct (nullable = true)
* | |-- b: struct (nullable = true)
* | | |-- c: long (nullable = true)
* | | |-- d: string (nullable = true)
* </pre>
*
* <p>Collected Statistics:
Expand All @@ -102,22 +103,35 @@ public static boolean isSkippingEligibleDataType(DataType dataType, boolean isCo
* | | |-- a: struct (nullable = false)
* | | | |-- b: struct (nullable = false)
* | | | | |-- c: long (nullable = true)
* | | | | |-- d: string (nullable = true)
* | |-- maxValues: struct (nullable = false)
* | | |-- a: struct (nullable = false)
* | | | |-- b: struct (nullable = false)
* | | | | |-- c: long (nullable = true)
* | | | | |-- d: string (nullable = true)
* | |-- nullCount: struct (nullable = false)
* | | |-- a: struct (nullable = false)
* | | | |-- b: struct (nullable = false)
* | | | | |-- c: long (nullable = true)
* | | | | |-- d: string (nullable = true)
* | |-- statsWithCollation: struct (nullable = true)
* | | |-- collationName: struct (nullable = true)
* | | | |-- min: struct (nullable = false)
* | | | | |-- a: struct (nullable = false)
* | | | | | |-- b: struct (nullable = false)
* | | | | | | |-- d: string (nullable = true)
* | | | |-- max: struct (nullable = false)
* | | | | |-- a: struct (nullable = false)
* | | | | | |-- b: struct (nullable = false)
* | | | | | | |-- d: string (nullable = true)
* | |-- tightBounds: boolean (nullable = true)
* </pre>
*/
public static StructType getStatsSchema(
StructType dataSchema, Set<CollationIdentifier> collationIdentifiers) {
StructType statsSchema = new StructType().add(NUM_RECORDS, LongType.LONG, true);

StructType minMaxStatsSchema = getMinMaxStatsSchema(dataSchema, false);
StructType minMaxStatsSchema = getMinMaxStatsSchema(dataSchema);
if (minMaxStatsSchema.length() > 0) {
statsSchema = statsSchema.add(MIN, minMaxStatsSchema, true).add(MAX, minMaxStatsSchema, true);
}
Expand Down Expand Up @@ -275,6 +289,16 @@ public boolean isSkippingEligibleNullCountColumn(Column column) {
}
};

/**
* Given a data schema returns the expected schema for a min or max statistics column. This means
* 1) replace logical names with physical names 2) set nullable=true 3) only keep stats eligible
* fields (i.e. don't include fields with isSkippingEligibleDataType=false). Collation-aware
* statistics are not included.
*/
private static StructType getMinMaxStatsSchema(StructType dataSchema) {
return getMinMaxStatsSchema(dataSchema, /* isCollatedSkipping */ false);
}

/**
* Given a data schema returns the expected schema for a min or max statistics column. This means
* 1) replace logical names with physical names 2) set nullable=true 3) only keep stats eligible
Expand Down Expand Up @@ -306,15 +330,16 @@ private static StructType getMinMaxStatsSchema(
private static StructType getCollatedStatsSchema(
StructType dataSchema, Set<CollationIdentifier> collationIdentifiers) {
StructType statsWithCollation = new StructType();
StructType collatedMinMaxStatsSchema = getMinMaxStatsSchema(dataSchema, true);
for (CollationIdentifier collationIdentifier : collationIdentifiers) {
if (collatedMinMaxStatsSchema.length() > 0) {
StructType minMaxSchemaForCollationAwareFields =
getMinMaxStatsSchema(dataSchema, /* isCollatedSkipping */ true);
if (minMaxSchemaForCollationAwareFields.length() > 0) {
for (CollationIdentifier collationIdentifier : collationIdentifiers) {
statsWithCollation =
statsWithCollation.add(
collationIdentifier.toString(),
new StructType()
.add(MIN, collatedMinMaxStatsSchema, true)
.add(MAX, collatedMinMaxStatsSchema, true),
.add(MIN, minMaxSchemaForCollationAwareFields, true)
.add(MAX, minMaxSchemaForCollationAwareFields, true),
true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ private void validateLiteralType(StructField field, Literal literal) {
// Variant stats in JSON are Z85 encoded strings, all other stats should match the field type
DataType expectedLiteralType =
field.getDataType() instanceof VariantType ? StringType.STRING : field.getDataType();
if (literal.getDataType() == null || !literal.getDataType().equals(expectedLiteralType)) {
if (literal.getDataType() == null
|| !literal.getDataType().equivalentIgnoreCollations(expectedLiteralType)) {
throw DeltaErrors.statsTypeMismatch(
field.getName(), expectedLiteralType, literal.getDataType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@ public boolean equivalent(DataType dataType) {
&& ((ArrayType) dataType).getElementType().equivalent(getElementType());
}

/**
* Are the data types same? The collations could be different.
*
* @param dataType
* @return
*/
@Override
public boolean equivalentIgnoreCollations(DataType dataType) {
if (this == dataType) {
return true;
}
if (dataType == null || getClass() != dataType.getClass()) {
return false;
}
ArrayType arrayType = (ArrayType) dataType;
return (elementField == null && arrayType.elementField == null)
|| (elementField != null
&& elementField.equivalentIgnoreCollations(arrayType.elementField));
}

@Override
public boolean isNested() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ public boolean equivalent(DataType dataType) {
return equals(dataType);
}

/**
* Are the data types same? The collations could be different.
*
* @param dataType
* @return
*/
public boolean equivalentIgnoreCollations(DataType dataType) {
return equals(dataType);
}

/**
* Returns true iff this data is a nested data type (it logically parameterized by other types).
*
Expand Down
21 changes: 21 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/types/MapType.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ public boolean equivalent(DataType dataType) {
&& ((MapType) dataType).isValueContainsNull() == isValueContainsNull();
}

/**
* Are the data types same? The collations could be different.
*
* @param dataType
* @return
*/
@Override
public boolean equivalentIgnoreCollations(DataType dataType) {
if (this == dataType) {
return true;
}
if (dataType == null || getClass() != dataType.getClass()) {
return false;
}
MapType mapType = (MapType) dataType;
return ((keyField == null && mapType.keyField == null)
|| (keyField != null && keyField.equivalentIgnoreCollations(mapType.keyField)))
&& ((valueField == null && mapType.valueField == null)
|| (valueField != null && valueField.equivalentIgnoreCollations(mapType.valueField)));
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public CollationIdentifier getCollationIdentifier() {
return collationIdentifier;
}

@Override
public boolean equivalentIgnoreCollations(DataType dataType) {
return dataType instanceof StringType;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof StringType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,21 @@ public boolean equals(Object o) {
&& Objects.equals(typeChanges, that.typeChanges);
}

/** @return whether the struct fields are equal, ignoring collations */
public boolean equivalentIgnoreCollations(StructField other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
return nullable == other.nullable
&& name.equals(other.name)
&& dataType.equivalentIgnoreCollations(other.dataType)
&& metadata.equals(other.metadata)
&& Objects.equals(typeChanges, other.typeChanges);
}

@Override
public int hashCode() {
return Objects.hash(name, dataType, nullable, metadata, typeChanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,28 @@ public boolean equivalent(DataType dataType) {
.allMatch(result -> result);
}

/**
* Are the data types same? The collations could be different.
*
* @param dataType
* @return
*/
@Override
public boolean equivalentIgnoreCollations(DataType dataType) {
if (this == dataType) {
return true;
}
if (dataType == null || getClass() != dataType.getClass()) {
return false;
}
StructType structType = (StructType) dataType;
return this.length() == structType.length()
&& fieldNames.equals(structType.fieldNames)
&& IntStream.range(0, this.length())
.mapToObj(i -> this.at(i).equivalentIgnoreCollations(structType.at(i)))
.allMatch(result -> result);
}

@Override
public boolean isNested() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,118 @@ class DataSkippingUtilsSuite extends AnyFunSuite with TestUtils {
new StructType())
}

test("pruneStatsSchema - collated min/max columns") {
val utf8Lcase = CollationIdentifier.fromString("SPARK.UTF8_LCASE")
val unicode = CollationIdentifier.fromString("ICU.UNICODE")
val testSchema = new StructType()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - I think you could define vals to make this simpler that can be reused?

val nestedField = ...
val s1Field = ...
val allFields = ... // (s1 + i1 + i2 + nested)

.add(
MIN,
new StructType()
.add("s1", StringType.STRING)
.add("i1", INTEGER)
.add("i2", INTEGER)
.add("nested", new StructType().add("s2", StringType.STRING)))
.add(
MAX,
new StructType()
.add("s1", StringType.STRING)
.add("i1", INTEGER)
.add("i2", INTEGER)
.add("nested", new StructType().add("s2", StringType.STRING)))
.add(
STATS_WITH_COLLATION,
new StructType()
.add(
utf8Lcase.toString,
new StructType()
.add(
MIN,
new StructType()
.add("s1", StringType.STRING)
.add("nested", new StructType().add("s2", StringType.STRING)))
.add(
MAX,
new StructType()
.add("s1", StringType.STRING)
.add("nested", new StructType().add("s2", StringType.STRING))))
.add(
unicode.toString,
new StructType()
.add(
MIN,
new StructType()
.add("s1", StringType.STRING)
.add("nested", new StructType().add("s2", StringType.STRING)))
.add(
MAX,
new StructType()
.add("s1", StringType.STRING)
.add("nested", new StructType().add("s2", StringType.STRING)))))

val testCases = Seq(
(
Set(nestedCol(s"$MIN.nested.s2"), nestedCol(s"$MAX.i1")),
new StructType()
.add(
MIN,
new StructType()
.add("nested", new StructType().add("s2", StringType.STRING)))
.add(
MAX,
new StructType()
.add("i1", INTEGER))),
(
Set(
collatedStatsCol(utf8Lcase, MIN, "s1"),
collatedStatsCol(unicode, MAX, "nested.s2")),
new StructType()
.add(
STATS_WITH_COLLATION,
new StructType()
.add(
utf8Lcase.toString,
new StructType()
.add(
MIN,
new StructType().add("s1", StringType.STRING)))
.add(
unicode.toString,
new StructType()
.add(
MAX,
new StructType().add(
"nested",
new StructType().add("s2", StringType.STRING)))))),
(
Set(
nestedCol(s"$MIN.i2"),
collatedStatsCol(utf8Lcase, MAX, "nested.s2"),
collatedStatsCol(utf8Lcase, MIN, "nested.s2")),
new StructType()
.add(
MIN,
new StructType()
.add("i2", INTEGER))
.add(
STATS_WITH_COLLATION,
new StructType()
.add(
utf8Lcase.toString,
new StructType()
.add(
MIN,
new StructType()
.add("nested", new StructType().add("s2", StringType.STRING)))
.add(
MAX,
new StructType()
.add("nested", new StructType().add("s2", StringType.STRING)))))))

testCases.foreach { case (referencedCols, expectedSchema) =>
checkPruneStatsSchema(testSchema, referencedCols, expectedSchema)
}
}

// TODO: add tests for remaining operators
test("check constructDataSkippingFilter") {
val testCases = Seq(
Expand Down
Loading
Loading