Skip to content

Commit 5f60b98

Browse files
authored
Merge pull request #566 from mspruc/main
Fix up serialisation for filter predicates in sql-api
2 parents e405a4e + 87be8ed commit 5f60b98

5 files changed

Lines changed: 185 additions & 349 deletions

File tree

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.wayang.api.sql.calcite.converter.functions;
19+
20+
import java.io.Serializable;
21+
import java.util.List;
22+
import java.util.stream.Collectors;
23+
24+
import org.apache.calcite.rex.RexCall;
25+
import org.apache.calcite.rex.RexInputRef;
26+
import org.apache.calcite.rex.RexLiteral;
27+
import org.apache.calcite.rex.RexNode;
28+
import org.apache.calcite.sql.SqlKind;
29+
30+
import org.apache.wayang.basic.data.Record;
31+
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction;
32+
33+
/**
34+
* AST of the {@link RexCall} arithmetic, composed into serializable nodes;
35+
* {@link Call}, {@link InputRef}, {@link Literal}
36+
*/
37+
interface CallTreeFactory<Input, Output> extends Serializable {
38+
public default Node<Output> fromRexNode(final RexNode node) {
39+
if (node instanceof RexCall) {
40+
return new Call<>((RexCall) node, this);
41+
} else if (node instanceof RexInputRef) {
42+
return new InputRef<>((RexInputRef) node);
43+
} else if (node instanceof RexLiteral) {
44+
return new Literal<>((RexLiteral) node);
45+
} else {
46+
throw new UnsupportedOperationException("Unsupported RexNode in filter condition: " + node);
47+
}
48+
}
49+
50+
/**
51+
* Derives the java operator for a given {@link SqlKind}, and turns it into a
52+
* serializable function
53+
*
54+
* @param kind {@link SqlKind} from {@link RexCall} SqlOperator
55+
* @return a serializable function of +, -, * or /
56+
* @throws UnsupportedOperationException on unrecognized {@link SqlKind}
57+
*/
58+
public SerializableFunction<List<Output>, Output> deriveOperation(SqlKind kind);
59+
}
60+
61+
interface Node<Output> extends Serializable {
62+
public Output evaluate(final Record record);
63+
}
64+
65+
class Call<Input, Output> implements Node<Output> {
66+
final List<Node<Output>> operands;
67+
final SerializableFunction<List<Output>, Output> operation;
68+
69+
protected Call(final RexCall call, final CallTreeFactory<Input, Output> tree) {
70+
operands = call.getOperands().stream().map(tree::fromRexNode).collect(Collectors.toList());
71+
operation = tree.deriveOperation(call.getKind());
72+
}
73+
74+
@Override
75+
public Output evaluate(final Record record) {
76+
return operation.apply(operands.stream().map(op -> op.evaluate(record)).collect(Collectors.toList()));
77+
}
78+
}
79+
80+
class Literal<Output> implements Node<Output> {
81+
final Output value;
82+
83+
Literal(final RexLiteral literal) {
84+
value = (Output) literal.getValue2();
85+
}
86+
87+
@Override
88+
public Output evaluate(final Record record) {
89+
return value;
90+
}
91+
}
92+
93+
class InputRef<Output> implements Node<Output> {
94+
private final int key;
95+
96+
InputRef(final RexInputRef inputRef) {
97+
this.key = inputRef.getIndex();
98+
}
99+
100+
@Override
101+
public Output evaluate(final Record record) {
102+
return (Output) record.getField(key);
103+
}
104+
}

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/FilterEvaluateCondition.java

Lines changed: 0 additions & 154 deletions
This file was deleted.

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/FilterPredicateImpl.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,83 @@
1818

1919
package org.apache.wayang.api.sql.calcite.converter.functions;
2020

21+
import java.util.List;
22+
2123
import org.apache.calcite.rex.RexNode;
24+
import org.apache.calcite.runtime.SqlFunctions;
25+
import org.apache.calcite.sql.SqlKind;
26+
2227
import org.apache.wayang.basic.data.Record;
2328
import org.apache.wayang.core.function.FunctionDescriptor;
29+
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction;
2430

2531
public class FilterPredicateImpl implements FunctionDescriptor.SerializablePredicate<Record> {
26-
27-
private final RexNode condition;
32+
private final Node<Object> callTree;
2833

2934
public FilterPredicateImpl(final RexNode condition) {
30-
this.condition = condition;
35+
this.callTree = new FilterCallTreeFactory().fromRexNode(condition);
3136
}
3237

3338
@Override
3439
public boolean test(final Record record) {
35-
return condition.accept(new FilterEvaluateCondition(true, record));
40+
return (boolean) callTree.evaluate(record);
41+
}
42+
43+
class FilterCallTreeFactory implements CallTreeFactory <List<Object>, Object> {
44+
public SerializableFunction<List<Object>, Object> deriveOperation(final SqlKind kind) {
45+
switch (kind) {
46+
case NOT:
47+
return input -> !(boolean) input.get(0);
48+
case IS_NOT_NULL:
49+
return input -> !isEqualTo(input.get(0), null);
50+
case IS_NULL:
51+
return input -> isEqualTo(input.get(0), null);
52+
case LIKE:
53+
return input -> like((String) input.get(0), (String) input.get(1));
54+
case NOT_EQUALS:
55+
return input -> !isEqualTo(input.get(0), input.get(1));
56+
case EQUALS:
57+
return input -> isEqualTo(input.get(0), input.get(1));
58+
case GREATER_THAN:
59+
return input -> isGreaterThan(input.get(0), input.get(1));
60+
case LESS_THAN:
61+
return input -> isLessThan(input.get(0), input.get(1));
62+
case GREATER_THAN_OR_EQUAL:
63+
return input -> isGreaterThan(input.get(0), input.get(1)) || isEqualTo(input.get(0), input.get(1));
64+
case LESS_THAN_OR_EQUAL:
65+
return input -> isLessThan(input.get(0), input.get(1)) || isEqualTo(input.get(0), input.get(1));
66+
case AND:
67+
return input -> input.stream().map(Boolean.class::cast).allMatch(Boolean::booleanValue);
68+
case OR:
69+
return input -> input.stream().map(Boolean.class::cast).anyMatch(Boolean::booleanValue);
70+
default:
71+
throw new UnsupportedOperationException("Kind not supported: " + kind);
72+
}
73+
}
74+
}
75+
76+
private boolean like(final String s1, final String s2) {
77+
final SqlFunctions.LikeFunction likeFunction = new SqlFunctions.LikeFunction();
78+
final boolean isMatch = likeFunction.like(s1, s2);
79+
80+
return isMatch;
81+
}
82+
83+
@SuppressWarnings({ "rawtypes", "unchecked" })
84+
private boolean isGreaterThan(final Object o1, final Object o2) {
85+
assert (o1 instanceof Comparable);
86+
return ((Comparable) o1).compareTo(o2) > 0;
87+
}
88+
89+
@SuppressWarnings({ "rawtypes", "unchecked" })
90+
private boolean isLessThan(final Object o1, final Object o2) {
91+
assert (o1 instanceof Comparable);
92+
return ((Comparable) o1).compareTo(o2) < 0;
93+
}
94+
95+
@SuppressWarnings("rawtypes")
96+
private boolean isEqualTo(final Object o1, final Object o2) {
97+
assert (o1 instanceof Comparable);
98+
return ((Comparable) o1).equals(o2);
3699
}
37100
}

0 commit comments

Comments
 (0)