diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java index 3c4c36408..58050e29a 100644 --- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java @@ -32,7 +32,8 @@ */ public abstract class FunctionDescriptor implements Serializable { - public FunctionDescriptor() {} + public FunctionDescriptor() { + } private LoadProfileEstimator loadProfileEstimator; @@ -51,11 +52,14 @@ public Optional getLoadProfileEstimator() { /** * Utility method to retrieve the selectivity of a {@link FunctionDescriptor} * - * @param functionDescriptor either a {@link PredicateDescriptor}, a {@link FlatMapDescriptor}, or a {@link MapPartitionsDescriptor} + * @param functionDescriptor either a {@link PredicateDescriptor}, a + * {@link FlatMapDescriptor}, or a + * {@link MapPartitionsDescriptor} * @return the selectivity */ public static Optional getSelectivity(FunctionDescriptor functionDescriptor) { - if (functionDescriptor == null) throw new NullPointerException(); + if (functionDescriptor == null) + throw new NullPointerException(); if (functionDescriptor instanceof PredicateDescriptor) { return ((PredicateDescriptor) functionDescriptor).getSelectivity(); } @@ -73,94 +77,169 @@ public static Optional getSelectivity(FunctionDescr * * @param cpuEstimator the {@link LoadEstimator} for the CPU load * @param ramEstimator the {@link LoadEstimator} for the RAM load - * @deprecated Use {@link #setLoadProfileEstimator(LoadProfileEstimator)} instead. + * @deprecated Use {@link #setLoadProfileEstimator(LoadProfileEstimator)} + * instead. */ public void setLoadEstimators(LoadEstimator cpuEstimator, LoadEstimator ramEstimator) { this.setLoadProfileEstimator(new NestableLoadProfileEstimator( cpuEstimator, - ramEstimator - )); + ramEstimator)); } /** - * Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks. + * Decorates the default {@link Function} with {@link Serializable}, which is + * required by some distributed frameworks. + * + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableFunction} + * instead. */ @FunctionalInterface + @Deprecated(forRemoval = true) public interface SerializableFunction extends Function, Serializable { } /** - * Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks. + * Decorates the default {@link BiFunction} with {@link Serializable}, which is + * required by some distributed frameworks. + * + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableBiFunction} + * instead. */ @FunctionalInterface - public interface SerializableBiFunction extends BiFunction, Serializable { + @Deprecated(forRemoval = true) + public interface SerializableBiFunction + extends BiFunction, Serializable { } - /** * Extends a {@link SerializableFunction} to an {@link ExtendedFunction}. + * + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.ExtendedSerializableFunction} + * instead. */ - public interface ExtendedSerializableFunction extends SerializableFunction, ExtendedFunction { + @Deprecated(forRemoval = true) + public interface ExtendedSerializableFunction + extends SerializableFunction, ExtendedFunction { } /** - * Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks. + * Decorates the default {@link BinaryOperator} with {@link Serializable}, which + * is required by some distributed frameworks. + * + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableBinaryOperator} + * instead. */ @FunctionalInterface + @Deprecated(forRemoval = true) public interface SerializableBinaryOperator extends BinaryOperator, Serializable { } /** * Extends a {@link SerializableBinaryOperator} to an {@link ExtendedFunction}. + * + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.ExtendedSerializableBinaryOperator} + * instead. */ - public interface ExtendedSerializableBinaryOperator extends SerializableBinaryOperator, ExtendedFunction { + @Deprecated(forRemoval = true) + public interface ExtendedSerializableBinaryOperator + extends SerializableBinaryOperator, ExtendedFunction { } + /** + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializablePredicate} + * instead. + */ @FunctionalInterface + @Deprecated(forRemoval = true) public interface SerializablePredicate extends Predicate, Serializable { - } + /** + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.ExtendedSerializablePredicate} + * instead. + */ + @Deprecated(forRemoval = true) public interface ExtendedSerializablePredicate extends SerializablePredicate, ExtendedFunction { - } /** - * Decorates the default {@link Consumer} with {@link Serializable}, which is required by some distributed frameworks. + * Decorates the default {@link Consumer} with {@link Serializable}, which is + * required by some distributed frameworks. + * + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableConsumer} + * instead. */ @FunctionalInterface + @Deprecated(forRemoval = true) public interface SerializableConsumer extends Consumer, Serializable { - } + /** * Extends a {@link SerializableConsumer} to an {@link ExtendedFunction}. + * + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.ExtendedSerializableConsumer} + * instead. */ - public interface ExtendedSerializableConsumer extends SerializableConsumer, ExtendedFunction{ - + @Deprecated(forRemoval = true) + public interface ExtendedSerializableConsumer extends SerializableConsumer, ExtendedFunction { } + /** + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableIntUnaryOperator} + * instead. + */ @FunctionalInterface + @Deprecated(forRemoval = true) public interface SerializableIntUnaryOperator extends IntUnaryOperator, Serializable { - } + /** + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableLongUnaryOperator} + * instead. + */ @FunctionalInterface + @Deprecated(forRemoval = true) public interface SerializableLongUnaryOperator extends LongUnaryOperator, Serializable { - } + /** + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableToLongBiFunction} + * instead. + */ @FunctionalInterface + @Deprecated(forRemoval = true) public interface SerializableToLongBiFunction extends ToLongBiFunction, Serializable { - } + /** + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableToDoubleBiFunction} + * instead. + */ @FunctionalInterface + @Deprecated(forRemoval = true) public interface SerializableToDoubleBiFunction extends ToDoubleBiFunction, Serializable { - } + /** + * @deprecated Use + * {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableToLongFunction} + * instead. + */ @FunctionalInterface + @Deprecated(forRemoval = true) public interface SerializableToLongFunction extends ToLongFunction, Serializable { - } } diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/PredicateDescriptor.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/PredicateDescriptor.java index 6b90e27c2..a6f98ddee 100644 --- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/PredicateDescriptor.java +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/PredicateDescriptor.java @@ -101,7 +101,9 @@ public String getSqlImplementation() { * out how to express functions in a platform-independent way. * * @param sqlImplementation a SQL predicate applicable in a {@code WHERE} clause representing this predicate + * @deprecated to be removed, should be handled as a concrete implementation in a descriptor. */ + @Deprecated public PredicateDescriptor withSqlImplementation(String sqlImplementation) { this.sqlImplementation = sqlImplementation; return this; diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/SerializableFunctionInterface.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/SerializableFunctionInterface.java new file mode 100644 index 000000000..3a7f9e0ee --- /dev/null +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/SerializableFunctionInterface.java @@ -0,0 +1,98 @@ +package org.apache.wayang.core.function; + +import java.io.Serializable; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntUnaryOperator; +import java.util.function.LongUnaryOperator; +import java.util.function.Predicate; +import java.util.function.ToDoubleBiFunction; +import java.util.function.ToLongBiFunction; +import java.util.function.ToLongFunction; + +public interface SerializableFunctionInterface { + + /** + * Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks. + */ + @FunctionalInterface + public interface SerializableFunction extends Function, Serializable { + } + + /** + * Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks. + */ + @FunctionalInterface + public interface SerializableBiFunction extends BiFunction, Serializable { + } + + + /** + * Extends a {@link SerializableFunction} to an {@link ExtendedFunction}. + */ + public interface ExtendedSerializableFunction extends SerializableFunction, ExtendedFunction { + } + + /** + * Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks. + */ + @FunctionalInterface + public interface SerializableBinaryOperator extends BinaryOperator, Serializable { + } + + /** + * Extends a {@link SerializableBinaryOperator} to an {@link ExtendedFunction}. + */ + public interface ExtendedSerializableBinaryOperator extends SerializableBinaryOperator, ExtendedFunction { + } + + @FunctionalInterface + public interface SerializablePredicate extends Predicate, Serializable { + + } + + public interface ExtendedSerializablePredicate extends SerializablePredicate, ExtendedFunction { + + } + + /** + * Decorates the default {@link Consumer} with {@link Serializable}, which is required by some distributed frameworks. + */ + @FunctionalInterface + public interface SerializableConsumer extends Consumer, Serializable { + + } + /** + * Extends a {@link SerializableConsumer} to an {@link ExtendedFunction}. + */ + public interface ExtendedSerializableConsumer extends SerializableConsumer, ExtendedFunction{ + + } + + @FunctionalInterface + public interface SerializableIntUnaryOperator extends IntUnaryOperator, Serializable { + + } + + @FunctionalInterface + public interface SerializableLongUnaryOperator extends LongUnaryOperator, Serializable { + + } + + @FunctionalInterface + public interface SerializableToLongBiFunction extends ToLongBiFunction, Serializable { + + } + + @FunctionalInterface + public interface SerializableToDoubleBiFunction extends ToDoubleBiFunction, Serializable { + + } + + @FunctionalInterface + public interface SerializableToLongFunction extends ToLongFunction, Serializable { + + } +} \ No newline at end of file diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/TransformationDescriptor.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/TransformationDescriptor.java index 419370638..ff0fbc92d 100644 --- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/TransformationDescriptor.java +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/TransformationDescriptor.java @@ -110,7 +110,9 @@ public Tuple getSqlImplementation() { * * @param tableName a SQL table name applicable in a {@code JOIN TABLE ON}. * @param sqlImplementation a SQL predicate applicable in a {@code WHERE} clause representing this predicate + * @deprecated to be removed, should be handled as a concrete implementation of a descriptor. */ + @Deprecated public TransformationDescriptor withSqlImplementation(String tableName, String sqlImplementation) { this.sqlImplementation = new Tuple(tableName, sqlImplementation); return this; diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/compiler/FunctionCompiler.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/compiler/FunctionCompiler.java index 01cc43390..42ea91da2 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/compiler/FunctionCompiler.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/compiler/FunctionCompiler.java @@ -23,7 +23,9 @@ /** * Compiles {@link FunctionDescriptor}s to SQL clauses. + * @deprecated to be removed. */ +@Deprecated public class FunctionCompiler { /** @@ -31,7 +33,9 @@ public class FunctionCompiler { * * @param descriptor describes the predicate * @return a compiled SQL {@code WHERE} clause + * @deprecated to be removed */ + @Deprecated public String compile(PredicateDescriptor descriptor) { final String sqlImplementation = descriptor.getSqlImplementation(); assert sqlImplementation != null; diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java index f7a9d7c5a..55853c3e3 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java @@ -204,13 +204,13 @@ protected static Tuple2 createSqlQuery(final E public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp, final Collection filterTasks, JdbcProjectionOperator projectionTask, final Collection> joinTasks) { - final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + final String tableName = tableOp.createSqlClause(); final Collection conditions = filterTasks.stream() - .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) + .map(op -> op.createSqlClause()) .collect(Collectors.toList()); - final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(); final Collection joins = joinTasks.stream() - .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) + .map(op -> op.createSqlClause()) .collect(Collectors.toList()); final StringBuilder sb = new StringBuilder(1000); diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcExecutionOperator.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcExecutionOperator.java index 570897aed..97a615cc8 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcExecutionOperator.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcExecutionOperator.java @@ -39,19 +39,28 @@ public interface JdbcExecutionOperator extends ExecutionOperator { * * @param compiler used to create SQL code * @return the SQL clause + * + * @deprecated to be removed, use {@link #createSqlClause()} instead. */ + @Deprecated String createSqlClause(Connection connection, FunctionCompiler compiler); + + /** + * Creates a string SQL clause for this operator. + */ + public String createSqlClause(); + @Override - JdbcPlatformTemplate getPlatform(); + public JdbcPlatformTemplate getPlatform(); @Override - default List getSupportedInputChannels(int index) { + public default List getSupportedInputChannels(int index) { return Collections.singletonList(this.getPlatform().getSqlQueryChannelDescriptor()); } @Override - default List getSupportedOutputChannels(int index) { + public default List getSupportedOutputChannels(int index) { return Collections.singletonList(this.getPlatform().getSqlQueryChannelDescriptor()); } diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcFilterOperator.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcFilterOperator.java index d28253ad4..26132b497 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcFilterOperator.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcFilterOperator.java @@ -43,11 +43,21 @@ public JdbcFilterOperator(FilterOperator that) { super(that); } + /** + * @deprecated to be removed + * use {@link #createSqlClause()} instead. + */ + @Deprecated @Override public String createSqlClause(Connection connection, FunctionCompiler compiler) { return compiler.compile(this.predicateDescriptor); } + public String createSqlClause() { + return this.predicateDescriptor.getSqlImplementation(); + } + + @Override public String getLoadProfileEstimatorConfigurationKey() { return String.format("wayang.%s.filter.load", this.getPlatform().getPlatformId()); diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcJoinOperator.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcJoinOperator.java index 311b4f8fa..1edb5d5dc 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcJoinOperator.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcJoinOperator.java @@ -64,6 +64,11 @@ public JdbcJoinOperator(JoinOperator that) { super(that); } + /** + * @deprecated to be removed + * use {@link #createSqlClause()} instead. + */ + @Deprecated @Override public String createSqlClause(Connection connection, FunctionCompiler compiler) { final Tuple left = this.keyDescriptor0.getSqlImplementation(); @@ -78,6 +83,20 @@ public String createSqlClause(Connection connection, FunctionCompiler compiler) + "=" + leftTableName + "." + leftKey; } + @Override + public String createSqlClause() { + final Tuple left = this.keyDescriptor0.getSqlImplementation(); + final Tuple right = this.keyDescriptor1.getSqlImplementation(); + final String leftTableName = left.field0; + final String leftKey = left.field1; + final String rightTableName = right.field0; + final String rightKey = right.field1; + + return "JOIN " + rightTableName + " ON " + + rightTableName + "." + rightKey + + "=" + leftTableName + "." + leftKey; + } + @Override public String getLoadProfileEstimatorConfigurationKey() { return String.format("wayang.%s.join.load", this.getPlatform().getPlatformId()); diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcProjectionOperator.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcProjectionOperator.java index c8615fa29..1859e61e5 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcProjectionOperator.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcProjectionOperator.java @@ -60,11 +60,21 @@ public JdbcProjectionOperator(MapOperator that) { } } + + /** + * @deprecated to be removed + * use {@link #createSqlClause()} instead. + */ + @Deprecated @Override public String createSqlClause(Connection connection, FunctionCompiler compiler) { return String.join(", ", this.getFunctionDescriptor().getFieldNames()); } + public String createSqlClause() { + return String.join(", ", this.getFunctionDescriptor().getFieldNames()); + } + @Override public ProjectionDescriptor getFunctionDescriptor() { return (ProjectionDescriptor) super.getFunctionDescriptor(); diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java index 2d546deec..19698ae36 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java @@ -53,11 +53,21 @@ public JdbcTableSource(JdbcTableSource that) { super(that); } + + /** + * @deprecated to be removed + * use {@link #createSqlClause()} instead. + */ + @Deprecated @Override public String createSqlClause(Connection connection, FunctionCompiler compiler) { return this.getTableName(); } + public String createSqlClause() { + return this.getTableName(); + } + @Override public String getLoadProfileEstimatorConfigurationKey() {