Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.api.sql.calcite.converter.functions;

import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;

import org.apache.wayang.basic.data.Record;
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction;

/**
* AST of the {@link RexCall} arithmetic, composed into serializable nodes;
* {@link Call}, {@link InputRef}, {@link Literal}
*/
interface CallTreeFactory<Input, Output> extends Serializable {
public default Node<Output> fromRexNode(final RexNode node) {
if (node instanceof RexCall) {
return new Call<>((RexCall) node, this);
} else if (node instanceof RexInputRef) {
return new InputRef<>((RexInputRef) node);
} else if (node instanceof RexLiteral) {
return new Literal<>((RexLiteral) node);
} else {
throw new UnsupportedOperationException("Unsupported RexNode in filter condition: " + node);
}
}

/**
* Derives the java operator for a given {@link SqlKind}, and turns it into a
* serializable function
*
* @param kind {@link SqlKind} from {@link RexCall} SqlOperator
* @return a serializable function of +, -, * or /
* @throws UnsupportedOperationException on unrecognized {@link SqlKind}
*/
public SerializableFunction<List<Output>, Output> deriveOperation(SqlKind kind);
}

interface Node<Output> extends Serializable {
public Output evaluate(final Record record);
}

class Call<Input, Output> implements Node<Output> {
final List<Node<Output>> operands;
final SerializableFunction<List<Output>, Output> operation;

protected Call(final RexCall call, final CallTreeFactory<Input, Output> tree) {
operands = call.getOperands().stream().map(tree::fromRexNode).collect(Collectors.toList());
operation = tree.deriveOperation(call.getKind());
}

@Override
public Output evaluate(final Record record) {
return operation.apply(operands.stream().map(op -> op.evaluate(record)).collect(Collectors.toList()));
}
}

class Literal<Output> implements Node<Output> {
final Output value;

Literal(final RexLiteral literal) {
value = (Output) literal.getValue2();
}

@Override
public Output evaluate(final Record record) {
return value;
}
}

class InputRef<Output> implements Node<Output> {
private final int key;

InputRef(final RexInputRef inputRef) {
this.key = inputRef.getIndex();
}

@Override
public Output evaluate(final Record record) {
return (Output) record.getField(key);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,83 @@

package org.apache.wayang.api.sql.calcite.converter.functions;

import java.util.List;

import org.apache.calcite.rex.RexNode;
import org.apache.calcite.runtime.SqlFunctions;
import org.apache.calcite.sql.SqlKind;

import org.apache.wayang.basic.data.Record;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction;

public class FilterPredicateImpl implements FunctionDescriptor.SerializablePredicate<Record> {

private final RexNode condition;
private final Node<Object> callTree;

public FilterPredicateImpl(final RexNode condition) {
this.condition = condition;
this.callTree = new FilterCallTreeFactory().fromRexNode(condition);
}

@Override
public boolean test(final Record record) {
return condition.accept(new FilterEvaluateCondition(true, record));
return (boolean) callTree.evaluate(record);
}

class FilterCallTreeFactory implements CallTreeFactory <List<Object>, Object> {
public SerializableFunction<List<Object>, Object> deriveOperation(final SqlKind kind) {
switch (kind) {
case NOT:
return input -> !(boolean) input.get(0);
case IS_NOT_NULL:
return input -> !isEqualTo(input.get(0), null);
case IS_NULL:
return input -> isEqualTo(input.get(0), null);
case LIKE:
return input -> like((String) input.get(0), (String) input.get(1));
case NOT_EQUALS:
return input -> !isEqualTo(input.get(0), input.get(1));
case EQUALS:
return input -> isEqualTo(input.get(0), input.get(1));
case GREATER_THAN:
return input -> isGreaterThan(input.get(0), input.get(1));
case LESS_THAN:
return input -> isLessThan(input.get(0), input.get(1));
case GREATER_THAN_OR_EQUAL:
return input -> isGreaterThan(input.get(0), input.get(1)) || isEqualTo(input.get(0), input.get(1));
case LESS_THAN_OR_EQUAL:
return input -> isLessThan(input.get(0), input.get(1)) || isEqualTo(input.get(0), input.get(1));
case AND:
return input -> input.stream().map(Boolean.class::cast).allMatch(Boolean::booleanValue);
case OR:
return input -> input.stream().map(Boolean.class::cast).anyMatch(Boolean::booleanValue);
default:
throw new UnsupportedOperationException("Kind not supported: " + kind);
}
}
}

private boolean like(final String s1, final String s2) {
final SqlFunctions.LikeFunction likeFunction = new SqlFunctions.LikeFunction();
final boolean isMatch = likeFunction.like(s1, s2);

return isMatch;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private boolean isGreaterThan(final Object o1, final Object o2) {
assert (o1 instanceof Comparable);
return ((Comparable) o1).compareTo(o2) > 0;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private boolean isLessThan(final Object o1, final Object o2) {
assert (o1 instanceof Comparable);
return ((Comparable) o1).compareTo(o2) < 0;
}

@SuppressWarnings("rawtypes")
private boolean isEqualTo(final Object o1, final Object o2) {
assert (o1 instanceof Comparable);
return ((Comparable) o1).equals(o2);
}
}
Loading
Loading