Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ private CloseableIterator<FilteredColumnarBatch> applyDataSkipping(
// pruning it after is much simpler
StructType prunedStatsSchema =
DataSkippingUtils.pruneStatsSchema(
getStatsSchema(metadata.getDataSchema()), dataSkippingFilter.getReferencedCols());
getStatsSchema(metadata.getDataSchema(), dataSkippingFilter.getReferencedCollations()), dataSkippingFilter.getReferencedCols());

// Skipping happens in two steps:
// 1. The predicate produces false for any file whose stats prove we can safely skip it. A
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.CollationIdentifier;

import java.util.*;

/** A {@link Predicate} with a set of columns referenced by the expression. */
Expand All @@ -26,6 +28,9 @@ public class DataSkippingPredicate extends Predicate {
/** Set of {@link Column}s referenced by the predicate or any of its child expressions */
private final Set<Column> referencedCols;

/** Set of {@link CollationIdentifier}s referenced by this predicate or any of its child expressions */
private final Set<CollationIdentifier> collationIdentifiers;

/**
* @param name the predicate name
* @param children list of expressions that are input to this predicate.
Expand All @@ -35,6 +40,24 @@ public class DataSkippingPredicate extends Predicate {
DataSkippingPredicate(String name, List<Expression> children, Set<Column> referencedCols) {
super(name, children);
this.referencedCols = Collections.unmodifiableSet(referencedCols);
this.collationIdentifiers = Collections.unmodifiableSet(new HashSet<>());
}

/**
* @param name the predicate name
* @param children list of expressions that are input to this predicate.
* @param collationIdentifier collation identifier used for this predicate
* @param referencedCols set of columns referenced by this predicate or any of its child
* expressions
*/
DataSkippingPredicate(
String name,
List<Expression> children,
CollationIdentifier collationIdentifier,
Set<Column> referencedCols) {
super(name, children, collationIdentifier);
this.referencedCols = Collections.unmodifiableSet(referencedCols);
this.collationIdentifiers = Collections.singleton(collationIdentifier);
}

/**
Expand All @@ -46,18 +69,32 @@ public class DataSkippingPredicate extends Predicate {
* @param right right input to this predicate
*/
DataSkippingPredicate(String name, DataSkippingPredicate left, DataSkippingPredicate right) {
this(
name,
Arrays.asList(left, right),
new HashSet<Column>() {
{
addAll(left.getReferencedCols());
addAll(right.getReferencedCols());
}
});
super(name, Arrays.asList(left, right));
this.referencedCols =
Collections.unmodifiableSet(
new HashSet<Column>() {
{
addAll(left.getReferencedCols());
addAll(right.getReferencedCols());
}
});
this.collationIdentifiers =
Collections.unmodifiableSet(
new HashSet<CollationIdentifier>() {
{
addAll(left.getReferencedCollations());
addAll(right.getReferencedCollations());
}
});
}

/** @return set of columns referenced by this predicate or any of its child expressions */
public Set<Column> getReferencedCols() {
return referencedCols;
}

/** @return set of collation identifiers referenced by this predicate or any of its child expressions */
public Set<CollationIdentifier> getReferencedCollations() {
return collationIdentifiers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.*;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.CollationIdentifier;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import java.util.*;
Expand Down Expand Up @@ -259,14 +260,15 @@ private static Optional<DataSkippingPredicate> constructDataSkippingFilter(
case "IS NOT DISTINCT FROM":
Expression left = getLeft(dataFilters);
Expression right = getRight(dataFilters);
Optional<CollationIdentifier> collationIdentifier = dataFilters.getCollationIdentifier();

if (left instanceof Column && right instanceof Literal) {
Column leftCol = (Column) left;
Literal rightLit = (Literal) right;
if (schemaHelper.isSkippingEligibleMinMaxColumn(leftCol)
if (schemaHelper.isSkippingEligibleMinMaxColumn(leftCol, collationIdentifier.isPresent())
&& schemaHelper.isSkippingEligibleLiteral(rightLit)) {
return constructComparatorDataSkippingFilters(
dataFilters.getName(), leftCol, rightLit, schemaHelper);
dataFilters.getName(), leftCol, rightLit, schemaHelper, collationIdentifier);
}
} else if (right instanceof Column && left instanceof Literal) {
return constructDataSkippingFilter(reverseComparatorFilter(dataFilters), schemaHelper);
Expand All @@ -284,7 +286,7 @@ private static Optional<DataSkippingPredicate> constructDataSkippingFilter(

/** Construct the skipping predicate for a given comparator */
private static Optional<DataSkippingPredicate> constructComparatorDataSkippingFilters(
String comparator, Column leftCol, Literal rightLit, StatsSchemaHelper schemaHelper) {
String comparator, Column leftCol, Literal rightLit, StatsSchemaHelper schemaHelper, Optional<CollationIdentifier> collationIdentifier) {

switch (comparator.toUpperCase(Locale.ROOT)) {

Expand All @@ -295,35 +297,35 @@ private static Optional<DataSkippingPredicate> constructComparatorDataSkippingFi
new DataSkippingPredicate(
"AND",
constructBinaryDataSkippingPredicate(
"<=", schemaHelper.getMinColumn(leftCol), rightLit),
"<=", schemaHelper.getMinColumn(leftCol, collationIdentifier), rightLit, collationIdentifier),
constructBinaryDataSkippingPredicate(
">=", schemaHelper.getMaxColumn(leftCol), rightLit)));
">=", schemaHelper.getMaxColumn(leftCol, collationIdentifier), rightLit, collationIdentifier)));

// Match any file whose min is less than the requested upper bound.
case "<":
return Optional.of(
constructBinaryDataSkippingPredicate(
"<", schemaHelper.getMinColumn(leftCol), rightLit));
"<", schemaHelper.getMinColumn(leftCol, collationIdentifier), rightLit, collationIdentifier));

// Match any file whose min is less than or equal to the requested upper bound
case "<=":
return Optional.of(
constructBinaryDataSkippingPredicate(
"<=", schemaHelper.getMinColumn(leftCol), rightLit));
"<=", schemaHelper.getMinColumn(leftCol, collationIdentifier), rightLit, collationIdentifier));

// Match any file whose max is larger than the requested lower bound.
case ">":
return Optional.of(
constructBinaryDataSkippingPredicate(
">", schemaHelper.getMaxColumn(leftCol), rightLit));
">", schemaHelper.getMaxColumn(leftCol, collationIdentifier), rightLit, collationIdentifier));

// Match any file whose max is larger than or equal to the requested lower bound.
case ">=":
return Optional.of(
constructBinaryDataSkippingPredicate(
">=", schemaHelper.getMaxColumn(leftCol), rightLit));
">=", schemaHelper.getMaxColumn(leftCol, collationIdentifier), rightLit, collationIdentifier));
case "IS NOT DISTINCT FROM":
return constructDataSkippingFilter(rewriteEqualNullSafe(leftCol, rightLit), schemaHelper);
return constructDataSkippingFilter(rewriteEqualNullSafe(leftCol, rightLit, collationIdentifier), schemaHelper);
default:
throw new IllegalArgumentException(
String.format("Unsupported comparator expression %s", comparator));
Expand All @@ -336,11 +338,19 @@ private static Optional<DataSkippingPredicate> constructComparatorDataSkippingFi
* Literal}.
*/
private static DataSkippingPredicate constructBinaryDataSkippingPredicate(
String exprName, Tuple2<Column, Optional<Expression>> colExpr, Literal lit) {
String exprName, Tuple2<Column, Optional<Expression>> colExpr, Literal lit, Optional<CollationIdentifier> collationIdentifier) {
Column column = colExpr._1;
Expression adjColExpr = colExpr._2.isPresent() ? colExpr._2.get() : column;
return new DataSkippingPredicate(
exprName, Arrays.asList(adjColExpr, lit), Collections.singleton(column));
if (collationIdentifier.isPresent()) {
return new DataSkippingPredicate(
exprName,
Arrays.asList(adjColExpr, lit),
collationIdentifier.get(),
Collections.singleton(column));
} else {
return new DataSkippingPredicate(
exprName, Arrays.asList(adjColExpr, lit), Collections.singleton(column));
}
}

private static final Map<String, String> REVERSE_COMPARATORS =
Expand All @@ -356,15 +366,17 @@ private static DataSkippingPredicate constructBinaryDataSkippingPredicate(
};

private static Predicate reverseComparatorFilter(Predicate predicate) {
return new Predicate(
return createPredicate(
REVERSE_COMPARATORS.get(predicate.getName().toUpperCase(Locale.ROOT)),
getRight(predicate),
getLeft(predicate));
getLeft(predicate),
predicate.getCollationIdentifier());
}

/** Construct the skipping predicate for a NOT expression child if possible */
private static Optional<DataSkippingPredicate> constructNotDataSkippingFilters(
Predicate childPredicate, StatsSchemaHelper schemaHelper) {
Optional<CollationIdentifier> collationIdentifier = childPredicate.getCollationIdentifier();
switch (childPredicate.getName().toUpperCase(Locale.ROOT)) {
// Use deMorgan's law to push the NOT past the AND. This is safe even with SQL
// tri-valued logic (see below), and is desirable because we cannot generally push
Expand Down Expand Up @@ -423,29 +435,29 @@ private static Optional<DataSkippingPredicate> constructNotDataSkippingFilters(
new DataSkippingPredicate(
"OR",
constructBinaryDataSkippingPredicate(
"<", schemaHelper.getMinColumn(leftColumn), rightLiteral),
"<", schemaHelper.getMinColumn(leftColumn, collationIdentifier), rightLiteral, collationIdentifier),
constructBinaryDataSkippingPredicate(
">", schemaHelper.getMaxColumn(leftColumn), rightLiteral)));
">", schemaHelper.getMaxColumn(leftColumn, collationIdentifier), rightLiteral, collationIdentifier)));
});
case "<":
return constructDataSkippingFilter(
new Predicate(">=", childPredicate.getChildren()), schemaHelper);
createPredicate(">=", childPredicate.getChildren(), collationIdentifier), schemaHelper);
case "<=":
return constructDataSkippingFilter(
new Predicate(">", childPredicate.getChildren()), schemaHelper);
createPredicate(">", childPredicate.getChildren(), collationIdentifier), schemaHelper);
case ">":
return constructDataSkippingFilter(
new Predicate("<=", childPredicate.getChildren()), schemaHelper);
createPredicate("<=", childPredicate.getChildren(), collationIdentifier), schemaHelper);
case ">=":
return constructDataSkippingFilter(
new Predicate("<", childPredicate.getChildren()), schemaHelper);
createPredicate("<", childPredicate.getChildren(), collationIdentifier), schemaHelper);
case "IS NOT DISTINCT FROM":
return constructDataSkippingFiltersForNotEqual(
childPredicate,
schemaHelper,
(leftColumn, rightLiteral) ->
constructDataSkippingFilter(
new Predicate("NOT", rewriteEqualNullSafe(leftColumn, rightLiteral)),
new Predicate("NOT", rewriteEqualNullSafe(leftColumn, rightLiteral, collationIdentifier)),
schemaHelper));
case "NOT":
// Remove redundant pairs of NOT
Expand Down Expand Up @@ -525,12 +537,12 @@ private static String[] appendArray(String[] arr, String appendElem) {
* Rewrite `EqualNullSafe(a, NotNullLiteral)` as `And(IsNotNull(a), EqualTo(a, NotNullLiteral))`
* and rewrite `EqualNullSafe(a, null)` as `IsNull(a)`
*/
private static Predicate rewriteEqualNullSafe(Column leftCol, Literal rightLit) {
private static Predicate rewriteEqualNullSafe(Column leftCol, Literal rightLit, Optional<CollationIdentifier> collationIdentifier) {
if (rightLit.getValue() == null) {
return new Predicate("IS_NULL", leftCol);
}
return new Predicate(
"AND", new Predicate("IS_NOT_NULL", leftCol), new Predicate("=", leftCol, rightLit));
"AND", new Predicate("IS_NOT_NULL", leftCol), createPredicate("=", leftCol, rightLit, collationIdentifier));
}

/** Helper method for building DataSkippingPredicate for NOT =/IS NOT DISTINCT FROM */
Expand All @@ -544,15 +556,16 @@ private static Optional<DataSkippingPredicate> constructDataSkippingFiltersForNo
"Expects predicate to be = or IS NOT DISTINCT FROM");
Expression leftChild = getLeft(equalPredicate);
Expression rightChild = getRight(equalPredicate);
Optional<CollationIdentifier> collationIdentifier = equalPredicate.getCollationIdentifier();
if (rightChild instanceof Column && leftChild instanceof Literal) {
return constructDataSkippingFilter(
new Predicate("NOT", new Predicate(equalPredicate.getName(), rightChild, leftChild)),
new Predicate("NOT", createPredicate(equalPredicate.getName(), rightChild, leftChild, collationIdentifier)),
schemaHelper);
}
if (leftChild instanceof Column && rightChild instanceof Literal) {
Column leftCol = (Column) leftChild;
Literal rightLit = (Literal) rightChild;
if (schemaHelper.isSkippingEligibleMinMaxColumn(leftCol)
if (schemaHelper.isSkippingEligibleMinMaxColumn(leftCol, collationIdentifier.isPresent())
&& schemaHelper.isSkippingEligibleLiteral(rightLit)) {
return buildDataSkippingPredicateFunc.apply(leftCol, rightLit);
}
Expand Down
Loading
Loading