Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import io.delta.kernel.internal.deletionvectors.DeletionVectorUtils;
import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray;
import io.delta.kernel.internal.rowtracking.MaterializedRowTrackingColumn;
import io.delta.kernel.internal.skipping.PartitionUtils;
import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode;
import io.delta.kernel.internal.util.PartitionUtils;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.MetadataColumnSpec;
import io.delta.kernel.types.StructField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import static io.delta.kernel.internal.DeltaErrors.partitionColumnMissingInData;
import static io.delta.kernel.internal.TransactionImpl.getStatisticsColumns;
import static io.delta.kernel.internal.data.TransactionStateRow.*;
import static io.delta.kernel.internal.skipping.PartitionUtils.getTargetDirectory;
import static io.delta.kernel.internal.skipping.PartitionUtils.validateAndSanitizePartitionValues;
import static io.delta.kernel.internal.util.ColumnMapping.blockIfColumnMappingEnabled;
import static io.delta.kernel.internal.util.PartitionUtils.getTargetDirectory;
import static io.delta.kernel.internal.util.PartitionUtils.validateAndSanitizePartitionValues;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.SchemaUtils.findColIndex;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.delta.kernel.exceptions;

public class UnsupportedPredicateWithCollation extends KernelException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public class Predicate extends ScalarExpression {

public Predicate(String name, List<Expression> children) {
super(name, children);
checkArguments(name, children);
collationIdentifier = Optional.empty();
}

Expand Down Expand Up @@ -184,8 +185,51 @@ public boolean equals(Object o) {
return this.hashCode() == o.hashCode();
}

private void checkArguments(String name, List<Expression> children) {
boolean isPredicateSupported =
CONSTANT_OPERATORS.contains(this.name)
|| UNARY_OPERATORS.contains(this.name)
|| BINARY_OPERATORS.contains(this.name);
if (!isPredicateSupported) {
throw new IllegalArgumentException(
String.format(
"Predicate operator '%s' is not supported. Supported operators are %s, %s and %s.",
name, CONSTANT_OPERATORS, UNARY_OPERATORS, BINARY_OPERATORS));
}

int expectedNumberOfChildren =
CONSTANT_OPERATORS.contains(this.name)
? 0
: UNARY_OPERATORS.contains(this.name)
? 1
: BINARY_OPERATORS.contains(this.name) ? 2 : -1;
if (children.size() != expectedNumberOfChildren) {
throw new IllegalArgumentException(
String.format(
"Invalid Predicate: operator '%s' requires %s children, but found %d.",
this.name, expectedNumberOfChildren, children.size()));
}
}

private static final Set<String> CONSTANT_OPERATORS =
Stream.of("ALWAYS_TRUE", "ALWAYS_FALSE").collect(Collectors.toSet());

private static final Set<String> UNARY_OPERATORS =
Stream.of("NOT", "IS_NULL", "IS_NOT_NULL").collect(Collectors.toSet());

private static final Set<String> BINARY_OPERATORS =
Stream.of("<", "<=", ">", ">=", "=", "AND", "OR", "IS NOT DISTINCT FROM", "STARTS_WITH")
Stream.of(
"<",
"<=",
">",
">=",
"=",
"<>",
"AND",
"OR",
"IS NOT DISTINCT FROM",
"STARTS_WITH",
"LIKE")
.collect(Collectors.toSet());

/** Operators that support collation-based string comparison. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.delta.kernel.expressions.Column;
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.internal.tablefeatures.TableFeature;
import io.delta.kernel.types.CollationIdentifier;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.types.TypeChange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
package io.delta.kernel.internal;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineException;
import static io.delta.kernel.internal.skipping.PartitionUtils.rewritePartitionPredicateOnCheckpointFileSchema;
import static io.delta.kernel.internal.skipping.PartitionUtils.rewritePartitionPredicateOnScanFileSchema;
import static io.delta.kernel.internal.skipping.StatsSchemaHelper.getStatsSchema;
import static io.delta.kernel.internal.util.PartitionUtils.rewritePartitionPredicateOnCheckpointFileSchema;
import static io.delta.kernel.internal.util.PartitionUtils.rewritePartitionPredicateOnScanFileSchema;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;

import io.delta.kernel.Scan;
import io.delta.kernel.data.*;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.exceptions.UnsupportedPredicateWithCollation;
import io.delta.kernel.expressions.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand All @@ -39,9 +41,11 @@
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.skipping.DataSkippingPredicate;
import io.delta.kernel.internal.skipping.DataSkippingUtils;
import io.delta.kernel.internal.skipping.PartitionUtils;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.metrics.ScanReport;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.types.CollationIdentifier;
import io.delta.kernel.types.MetadataColumnSpec;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
Expand Down Expand Up @@ -366,7 +370,8 @@ private CloseableIterator<FilteredColumnarBatch> applyDataSkipping(
// pruning it after is much simpler
StructType prunedStatsSchema =
DataSkippingUtils.pruneStatsSchema(
getStatsSchema(metadata.getDataSchema()), dataSkippingFilter.getReferencedCols());
getStatsSchema(metadata.getDataSchema(), dataSkippingFilter.getReferencedCollations()),
dataSkippingFilter.getReferencedCols());

// Skipping happens in two steps:
// 1. The predicate produces false for any file whose stats prove we can safely skip it. A
Expand All @@ -380,15 +385,20 @@ private CloseableIterator<FilteredColumnarBatch> applyDataSkipping(
"COALESCE", Arrays.asList(dataSkippingFilter, Literal.ofBoolean(true))),
AlwaysTrue.ALWAYS_TRUE);

PredicateEvaluator predicateEvaluator =
wrapEngineException(
() ->
engine
.getExpressionHandler()
.getPredicateEvaluator(prunedStatsSchema, filterToEval),
"Get the predicate evaluator for data skipping with schema=%s and filter=%s",
prunedStatsSchema,
filterToEval);
PredicateEvaluator predicateEvaluator;
try {
predicateEvaluator =
wrapEngineException(
() ->
engine
.getExpressionHandler()
.getPredicateEvaluator(prunedStatsSchema, filterToEval),
"Get the predicate evaluator for data skipping with schema=%s and filter=%s",
prunedStatsSchema,
filterToEval);
} catch (UnsupportedPredicateWithCollation e) {
return scanFileIter;
}

return scanFileIter.map(
filteredScanFileBatch -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.skipping.PartitionUtils.serializePartitionMap;
import static io.delta.kernel.internal.util.InternalUtils.relativizePath;
import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.VectorUtils.toJavaMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.skipping.PartitionUtils.serializePartitionMap;
import static io.delta.kernel.internal.util.InternalUtils.relativizePath;
import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.CollationIdentifier;
import java.util.*;

/** A {@link Predicate} with a set of columns referenced by the expression. */
Expand All @@ -26,6 +27,8 @@ public class DataSkippingPredicate extends Predicate {
/** Set of {@link Column}s referenced by the predicate or any of its child expressions */
private final Set<Column> referencedCols;

private final Set<CollationIdentifier> collationIdentifiers;

/**
* @param name the predicate name
* @param children list of expressions that are input to this predicate.
Expand All @@ -35,6 +38,17 @@ public class DataSkippingPredicate extends Predicate {
DataSkippingPredicate(String name, List<Expression> children, Set<Column> referencedCols) {
super(name, children);
this.referencedCols = Collections.unmodifiableSet(referencedCols);
this.collationIdentifiers = Collections.unmodifiableSet(new HashSet<>());
}

DataSkippingPredicate(
String name,
List<Expression> children,
CollationIdentifier collationIdentifier,
Set<Column> referencedCols) {
super(name, children, collationIdentifier);
this.referencedCols = Collections.unmodifiableSet(referencedCols);
this.collationIdentifiers = Collections.singleton(collationIdentifier);
}

/**
Expand All @@ -46,18 +60,45 @@ public class DataSkippingPredicate extends Predicate {
* @param right right input to this predicate
*/
DataSkippingPredicate(String name, DataSkippingPredicate left, DataSkippingPredicate right) {
this(
name,
Arrays.asList(left, right),
new HashSet<Column>() {
{
addAll(left.getReferencedCols());
addAll(right.getReferencedCols());
}
});
super(name, Arrays.asList(left, right));
this.referencedCols =
Collections.unmodifiableSet(
new HashSet<Column>() {
{
addAll(left.getReferencedCols());
addAll(right.getReferencedCols());
}
});
this.collationIdentifiers =
Collections.unmodifiableSet(
new HashSet<CollationIdentifier>() {
{
addAll(left.getReferencedCollations());
addAll(right.getReferencedCollations());
}
});
}

public Set<Column> getReferencedCols() {
return referencedCols;
}

public Set<CollationIdentifier> getReferencedCollations() {
return collationIdentifiers;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DataSkippingPredicate)) return false;
if (!super.equals(o)) return false;
DataSkippingPredicate that = (DataSkippingPredicate) o;
return Objects.equals(referencedCols, that.referencedCols)
&& Objects.equals(collationIdentifiers, that.collationIdentifiers);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), referencedCols, collationIdentifiers);
}
}
Loading
Loading