Skip to content

Commit 74ab1f1

Browse files
committed
[common] Support statistic-based Predicate interface
1 parent c843f96 commit 74ab1f1

31 files changed

+399
-71
lines changed

LICENSE

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,33 @@ Apache Kafka
359359
./fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimingWheel.java
360360

361361
Apache Paimon
362+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/And.java
363+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/CompareUtils.java
364+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/CompoundPredicate.java
365+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/Contains.java
366+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/EndsWith.java
367+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/Equal.java
368+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/FieldRef.java
369+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/FunctionVisitor.java
370+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterOrEqual.java
371+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterThan.java
372+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/In.java
373+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNotNull.java
374+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNull.java
375+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafFunction.java
376+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafPredicate.java
377+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafUnaryFunction.java
378+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/LessOrEqual.java
379+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/LessThan.java
380+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/NotEqual.java
381+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/NotIn.java
382+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/NullFalseLeafBinaryFunction.java
383+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/Or.java
384+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/PartitionPredicateVisitor.java
385+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/Predicate.java
386+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateBuilder.java
387+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateVisitor.java
388+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/StartsWith.java
362389
./fluss-common/src/main/java/com/alibaba/fluss/row/encode/paimon/PaimonBinaryRowWriter.java
363390
./fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotScanner.java
364391

fluss-common/src/main/java/com/alibaba/fluss/predicate/And.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,21 @@ public boolean test(InternalRow row, List<Predicate> children) {
4646
return true;
4747
}
4848

49+
@Override
50+
public boolean test(
51+
long rowCount,
52+
InternalRow minValues,
53+
InternalRow maxValues,
54+
Long[] nullCounts,
55+
List<Predicate> children) {
56+
for (Predicate child : children) {
57+
if (!child.test(rowCount, minValues, maxValues, nullCounts)) {
58+
return false;
59+
}
60+
}
61+
return true;
62+
}
63+
4964
@Override
5065
public Optional<Predicate> negate(List<Predicate> children) {
5166
List<Predicate> negatedChildren = new ArrayList<>();

fluss-common/src/main/java/com/alibaba/fluss/predicate/CompareUtils.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.alibaba.fluss.predicate;
1919

20-
import com.alibaba.fluss.row.BinaryString;
2120
import com.alibaba.fluss.types.DataType;
2221

2322
import static java.lang.Math.min;
@@ -30,16 +29,9 @@
3029
public class CompareUtils {
3130
private CompareUtils() {}
3231

32+
@SuppressWarnings("unchecked")
3333
public static int compareLiteral(DataType type, Object v1, Object v2) {
3434
if (v1 instanceof Comparable) {
35-
// because BinaryString can not serialize so v1 or v2 may be BinaryString convert to
36-
// String for compare
37-
if (v1 instanceof BinaryString) {
38-
v1 = ((BinaryString) v1).toString();
39-
}
40-
if (v2 instanceof BinaryString) {
41-
v2 = ((BinaryString) v2).toString();
42-
}
4335
return ((Comparable<Object>) v1).compareTo(v2);
4436
} else if (v1 instanceof byte[]) {
4537
return compare((byte[]) v1, (byte[]) v2);

fluss-common/src/main/java/com/alibaba/fluss/predicate/CompoundPredicate.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
*/
3535
public class CompoundPredicate implements Predicate {
3636

37+
private static final long serialVersionUID = 1L;
3738
private final Function function;
3839
private final List<Predicate> children;
3940

@@ -55,6 +56,12 @@ public boolean test(InternalRow row) {
5556
return function.test(row, children);
5657
}
5758

59+
@Override
60+
public boolean test(
61+
long rowCount, InternalRow minValues, InternalRow maxValues, Long[] nullCounts) {
62+
return function.test(rowCount, minValues, maxValues, nullCounts, children);
63+
}
64+
5865
@Override
5966
public Optional<Predicate> negate() {
6067
return function.negate(children);
@@ -87,8 +94,17 @@ public String toString() {
8794
/** Evaluate the predicate result based on multiple {@link Predicate}s. */
8895
public abstract static class Function implements Serializable {
8996

97+
private static final long serialVersionUID = 1L;
98+
9099
public abstract boolean test(InternalRow row, List<Predicate> children);
91100

101+
public abstract boolean test(
102+
long rowCount,
103+
InternalRow minValues,
104+
InternalRow maxValues,
105+
Long[] nullCounts,
106+
List<Predicate> children);
107+
92108
public abstract Optional<Predicate> negate(List<Predicate> children);
93109

94110
public abstract <T> T visit(FunctionVisitor<T> visitor, List<T> children);

fluss-common/src/main/java/com/alibaba/fluss/predicate/Contains.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
/** A {@link NullFalseLeafBinaryFunction} to evaluate {@code filter like '%abc%'}. */
3030
public class Contains extends NullFalseLeafBinaryFunction {
3131

32+
private static final long serialVersionUID = 1L;
33+
3234
public static final Contains INSTANCE = new Contains();
3335

3436
private Contains() {}

fluss-common/src/main/java/com/alibaba/fluss/predicate/EndsWith.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
*/
3333
public class EndsWith extends NullFalseLeafBinaryFunction {
3434

35+
private static final long serialVersionUID = 1L;
36+
3537
public static final EndsWith INSTANCE = new EndsWith();
3638

3739
private EndsWith() {}

fluss-common/src/main/java/com/alibaba/fluss/predicate/Equal.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
/** A {@link NullFalseLeafBinaryFunction} to eval equal. */
3232
public class Equal extends NullFalseLeafBinaryFunction {
3333

34+
private static final long serialVersionUID = 1L;
35+
3436
public static final Equal INSTANCE = new Equal();
3537

3638
private Equal() {}

fluss-common/src/main/java/com/alibaba/fluss/predicate/FieldRef.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
/** A reference to a field in an input. */
3030
public class FieldRef implements Serializable {
3131

32-
private static final long serialVersionUID = 4982103776651292199L;
32+
private static final long serialVersionUID = 1L;
3333

3434
private final int index;
3535
private final String name;

fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterOrEqual.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
/** A {@link NullFalseLeafBinaryFunction} to eval greater or equal. */
3232
public class GreaterOrEqual extends NullFalseLeafBinaryFunction {
3333

34+
private static final long serialVersionUID = 1L;
35+
3436
public static final GreaterOrEqual INSTANCE = new GreaterOrEqual();
3537

3638
private GreaterOrEqual() {}

fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterThan.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
/** A {@link LeafFunction} to eval greater. */
3232
public class GreaterThan extends NullFalseLeafBinaryFunction {
3333

34+
private static final long serialVersionUID = 1L;
35+
3436
public static final GreaterThan INSTANCE = new GreaterThan();
3537

3638
private GreaterThan() {}

0 commit comments

Comments
 (0)