Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.expressions;

import java.nio.ByteBuffer;
import org.apache.iceberg.geospatial.BoundingBox;

public class BoundGeospatialPredicate extends BoundPredicate<ByteBuffer> {
private final Literal<BoundingBox> literal;

BoundGeospatialPredicate(Operation op, BoundTerm<ByteBuffer> term, Literal<BoundingBox> literal) {
super(op, term);
this.literal = literal;
}

public Literal<BoundingBox> literal() {
return literal;
}

@Override
public boolean test(ByteBuffer value) {
throw new UnsupportedOperationException(
"Evaluation of spatial predicate \""
+ op()
+ "\" against geometry/geography value is not implemented.");
}

@Override
public boolean isGeospatialPredicate() {
return true;
}

@Override
public BoundGeospatialPredicate asGeospatialPredicate() {
return this;
}

@Override
public Expression negate() {
return new BoundGeospatialPredicate(op().negate(), term(), literal);
}

@Override
public boolean isEquivalentTo(Expression expr) {
if (!(expr instanceof BoundGeospatialPredicate)) {
return false;
}

BoundGeospatialPredicate other = (BoundGeospatialPredicate) expr;
return op() == other.op()
&& term().isEquivalentTo(other.term())
&& literal.value().equals(other.literal.value());
}

@Override
public String toString() {
switch (op()) {
case ST_INTERSECTS:
return term().toString() + " stIntersects " + literal.value();
case ST_DISJOINT:
return term().toString() + " stDisjoint " + literal.value();
default:
return "Invalid geospatial predicate: operation = " + op();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,12 @@ public boolean isSetPredicate() {
public BoundSetPredicate<T> asSetPredicate() {
throw new IllegalStateException("Not a set predicate: " + this);
}

public boolean isGeospatialPredicate() {
return false;
}

public BoundGeospatialPredicate asGeospatialPredicate() {
throw new IllegalStateException("Not a geospatial predicate: " + this);
}
}
21 changes: 21 additions & 0 deletions api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundVisitor;
import org.apache.iceberg.geospatial.BoundingBox;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.NaNUtil;

Expand Down Expand Up @@ -156,5 +157,25 @@ public <T> Boolean startsWith(Bound<T> valueExpr, Literal<T> lit) {
public <T> Boolean notStartsWith(Bound<T> valueExpr, Literal<T> lit) {
return !startsWith(valueExpr, lit);
}

@Override
public <T> Boolean stIntersects(Bound<T> valueExpr, Literal<BoundingBox> literal) {
// Evaluation of stIntersects against geometry/geography value is not supported. Spatial
// predicates only
// supports data skipping but not filtering individual records in iceberg-api. Readers should
// expect
// false-positives and run the actual spatial filters on their own.
return true;
}

@Override
public <T> Boolean stDisjoint(Bound<T> valueExpr, Literal<BoundingBox> literal) {
// Evaluation of stIntersects against geometry/geography value is not supported. Spatial
// predicates only
// supports data skipping but not filtering individual records in iceberg-api. Readers should
// expect
// false-positives and run the actual spatial filters on their own.
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ enum Operation {
OR,
STARTS_WITH,
NOT_STARTS_WITH,
ST_INTERSECTS,
ST_DISJOINT,
COUNT,
COUNT_STAR,
MAX,
Expand Down Expand Up @@ -90,6 +92,10 @@ public Operation negate() {
return Operation.NOT_STARTS_WITH;
case NOT_STARTS_WITH:
return Operation.STARTS_WITH;
case ST_INTERSECTS:
return Operation.ST_DISJOINT;
case ST_DISJOINT:
return Operation.ST_INTERSECTS;
default:
throw new IllegalArgumentException("No negation for operation: " + this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.expressions;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
Expand All @@ -34,6 +35,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.geospatial.BoundingBox;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.transforms.Transforms;
Expand Down Expand Up @@ -318,6 +320,10 @@ public <T> Expression predicate(BoundPredicate<T> pred) {
.map(lit -> (T) sanitize(bound.term().type(), lit, now, today))
.iterator();
return new UnboundPredicate<>(pred.op(), unbind(pred.term()), iter);
} else if (pred.isGeospatialPredicate()) {
BoundGeospatialPredicate bound = (BoundGeospatialPredicate) pred;
return Expressions.geospatialPredicate(
pred.op(), unbind(bound.term()), BoundingBox.empty());
}

throw new UnsupportedOperationException("Cannot sanitize bound predicate type: " + pred.op());
Expand All @@ -343,6 +349,10 @@ public <T> Expression predicate(UnboundPredicate<T> pred) {
case NOT_STARTS_WITH:
return new UnboundPredicate<>(
pred.op(), pred.term(), (T) sanitize(pred.literal(), now, today));
case ST_INTERSECTS:
case ST_DISJOINT:
return Expressions.geospatialPredicate(
pred.op(), (UnboundTerm<ByteBuffer>) pred.term(), BoundingBox.empty());
case IN:
case NOT_IN:
Iterable<T> iter =
Expand Down Expand Up @@ -493,6 +503,10 @@ public <T> String predicate(UnboundPredicate<T> pred) {
return term + " STARTS WITH " + sanitize(pred.literal(), nowMicros, today);
case NOT_STARTS_WITH:
return term + " NOT STARTS WITH " + sanitize(pred.literal(), nowMicros, today);
case ST_INTERSECTS:
return term + " ST_INTERSECTS WITH (bounding-box)";
case ST_DISJOINT:
return term + " ST_DISJOINT WITH (bounding-box)";
default:
throw new UnsupportedOperationException(
"Cannot sanitize unsupported predicate type: " + pred.op());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Set;
import java.util.function.Supplier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.geospatial.BoundingBox;

/** Utils for traversing {@link Expression expressions}. */
public class ExpressionVisitors {
Expand Down Expand Up @@ -126,6 +127,16 @@ public <T> R notStartsWith(BoundReference<T> ref, Literal<T> lit) {
"notStartsWith expression is not supported by the visitor");
}

public <T> R stIntersects(BoundReference<T> ref, Literal<BoundingBox> lit) {
throw new UnsupportedOperationException(
"stIntersects expression is not supported by the visitor");
}

public <T> R stDisjoint(BoundReference<T> ref, Literal<BoundingBox> lit) {
throw new UnsupportedOperationException(
"stDisjoint expression is not supported by the visitor");
}

/**
* Handle a non-reference value in this visitor.
*
Expand Down Expand Up @@ -196,6 +207,19 @@ public <T> R predicate(BoundPredicate<T> pred) {
throw new IllegalStateException(
"Invalid operation for BoundSetPredicate: " + pred.op());
}

} else if (pred.isGeospatialPredicate()) {
switch (pred.op()) {
case ST_INTERSECTS:
return stIntersects(
(BoundReference<T>) pred.term(), pred.asGeospatialPredicate().literal());
case ST_DISJOINT:
return stDisjoint(
(BoundReference<T>) pred.term(), pred.asGeospatialPredicate().literal());
default:
throw new IllegalStateException(
"Invalid operation for BoundGeospatialPredicate: " + pred.op());
}
}

throw new IllegalStateException("Unsupported bound predicate: " + pred.getClass().getName());
Expand Down Expand Up @@ -266,6 +290,14 @@ public <T> R notStartsWith(Bound<T> expr, Literal<T> lit) {
throw new UnsupportedOperationException("Unsupported operation.");
}

public <T> R stIntersects(Bound<T> term, Literal<BoundingBox> literal) {
throw new UnsupportedOperationException("ST_INTERSECTS is not supported by the visitor");
}

public <T> R stDisjoint(Bound<T> term, Literal<BoundingBox> literal) {
throw new UnsupportedOperationException("ST_DISJOINT is not supported by the visitor");
}

@Override
public <T> R predicate(BoundPredicate<T> pred) {
if (pred.isLiteralPredicate()) {
Expand Down Expand Up @@ -317,8 +349,15 @@ public <T> R predicate(BoundPredicate<T> pred) {
throw new IllegalStateException(
"Invalid operation for BoundSetPredicate: " + pred.op());
}
}

} else if (pred.isGeospatialPredicate()) {
switch (pred.op()) {
case ST_INTERSECTS:
return stIntersects(pred.term(), pred.asGeospatialPredicate().literal());
case ST_DISJOINT:
return stDisjoint(pred.term(), pred.asGeospatialPredicate().literal());
}
}
throw new IllegalStateException("Unsupported bound predicate: " + pred.getClass().getName());
}

Expand Down Expand Up @@ -495,6 +534,13 @@ public <T> R predicate(BoundPredicate<T> pred) {
throw new IllegalStateException(
"Invalid operation for BoundSetPredicate: " + pred.op());
}
} else if (pred.isGeospatialPredicate()) {
switch (pred.op()) {
case ST_INTERSECTS:
return stIntersects(pred.term(), pred.asGeospatialPredicate().literal());
case ST_DISJOINT:
return stDisjoint(pred.term(), pred.asGeospatialPredicate().literal());
}
}

throw new IllegalStateException("Unsupported bound predicate: " + pred.getClass().getName());
Expand Down Expand Up @@ -555,6 +601,14 @@ public <T> R startsWith(BoundTerm<T> term, Literal<T> lit) {
public <T> R notStartsWith(BoundTerm<T> term, Literal<T> lit) {
return null;
}

public <T> R stIntersects(BoundTerm<T> term, Literal<BoundingBox> lit) {
return null;
}

public <T> R stDisjoint(BoundTerm<T> term, Literal<BoundingBox> lit) {
return null;
}
}

/**
Expand Down
61 changes: 61 additions & 0 deletions api/src/main/java/org/apache/iceberg/expressions/Expressions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.iceberg.expressions;

import java.nio.ByteBuffer;
import java.util.stream.Stream;
import org.apache.iceberg.expressions.Expression.Operation;
import org.apache.iceberg.geospatial.BoundingBox;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.transforms.Transform;
Expand Down Expand Up @@ -202,6 +204,44 @@ public static UnboundPredicate<String> notStartsWith(UnboundTerm<String> expr, S
return new UnboundPredicate<>(Expression.Operation.NOT_STARTS_WITH, expr, value);
}

public static UnboundPredicate<ByteBuffer> stIntersects(String name, BoundingBox value) {
return geospatialPredicate(Operation.ST_INTERSECTS, name, value);
}

public static UnboundPredicate<ByteBuffer> stIntersects(
String name, ByteBuffer min, ByteBuffer max) {
return geospatialPredicate(Operation.ST_INTERSECTS, name, min, max);
}

public static UnboundPredicate<ByteBuffer> stIntersects(
UnboundTerm<ByteBuffer> expr, BoundingBox value) {
return geospatialPredicate(Operation.ST_INTERSECTS, expr, value);
}

public static UnboundPredicate<ByteBuffer> stIntersects(
UnboundTerm<ByteBuffer> expr, ByteBuffer min, ByteBuffer max) {
return geospatialPredicate(Operation.ST_INTERSECTS, expr, min, max);
}

public static UnboundPredicate<ByteBuffer> stDisjoint(String name, BoundingBox value) {
return geospatialPredicate(Operation.ST_DISJOINT, name, value);
}

public static UnboundPredicate<ByteBuffer> stDisjoint(
String name, ByteBuffer min, ByteBuffer max) {
return geospatialPredicate(Operation.ST_DISJOINT, name, min, max);
}

public static UnboundPredicate<ByteBuffer> stDisjoint(
UnboundTerm<ByteBuffer> expr, BoundingBox value) {
return geospatialPredicate(Operation.ST_DISJOINT, expr, value);
}

public static UnboundPredicate<ByteBuffer> stDisjoint(
UnboundTerm<ByteBuffer> expr, ByteBuffer min, ByteBuffer max) {
return geospatialPredicate(Operation.ST_DISJOINT, expr, min, max);
}

public static <T> UnboundPredicate<T> in(String name, T... values) {
return predicate(Operation.IN, name, Lists.newArrayList(values));
}
Expand Down Expand Up @@ -280,6 +320,27 @@ public static <T> UnboundPredicate<T> predicate(Operation op, UnboundTerm<T> exp
return new UnboundPredicate<>(op, expr);
}

public static UnboundPredicate<ByteBuffer> geospatialPredicate(
Operation op, String name, BoundingBox value) {
return geospatialPredicate(
op, ref(name), value.min().toByteBuffer(), value.max().toByteBuffer());
}

public static UnboundPredicate<ByteBuffer> geospatialPredicate(
Operation op, UnboundTerm<ByteBuffer> expr, BoundingBox value) {
return geospatialPredicate(op, expr, value.min().toByteBuffer(), value.max().toByteBuffer());
}

public static UnboundPredicate<ByteBuffer> geospatialPredicate(
Operation op, String name, ByteBuffer min, ByteBuffer max) {
return geospatialPredicate(op, ref(name), min, max);
}

public static UnboundPredicate<ByteBuffer> geospatialPredicate(
Operation op, UnboundTerm<ByteBuffer> expr, ByteBuffer min, ByteBuffer max) {
return new UnboundPredicate<>(op, expr, Lists.newArrayList(min, max));
}

public static True alwaysTrue() {
return True.INSTANCE;
}
Expand Down
Loading