Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
*/
public abstract class FunctionDescriptor implements Serializable {

public FunctionDescriptor() {}
public FunctionDescriptor() {
}

private LoadProfileEstimator loadProfileEstimator;

Expand All @@ -51,11 +52,14 @@ public Optional<LoadProfileEstimator> getLoadProfileEstimator() {
/**
* Utility method to retrieve the selectivity of a {@link FunctionDescriptor}
*
* @param functionDescriptor either a {@link PredicateDescriptor}, a {@link FlatMapDescriptor}, or a {@link MapPartitionsDescriptor}
* @param functionDescriptor either a {@link PredicateDescriptor}, a
* {@link FlatMapDescriptor}, or a
* {@link MapPartitionsDescriptor}
* @return the selectivity
*/
public static Optional<ProbabilisticDoubleInterval> getSelectivity(FunctionDescriptor functionDescriptor) {
if (functionDescriptor == null) throw new NullPointerException();
if (functionDescriptor == null)
throw new NullPointerException();
if (functionDescriptor instanceof PredicateDescriptor) {
return ((PredicateDescriptor<?>) functionDescriptor).getSelectivity();
}
Expand All @@ -73,94 +77,169 @@ public static Optional<ProbabilisticDoubleInterval> getSelectivity(FunctionDescr
*
* @param cpuEstimator the {@link LoadEstimator} for the CPU load
* @param ramEstimator the {@link LoadEstimator} for the RAM load
* @deprecated Use {@link #setLoadProfileEstimator(LoadProfileEstimator)} instead.
* @deprecated Use {@link #setLoadProfileEstimator(LoadProfileEstimator)}
* instead.
*/
public void setLoadEstimators(LoadEstimator cpuEstimator, LoadEstimator ramEstimator) {
this.setLoadProfileEstimator(new NestableLoadProfileEstimator(
cpuEstimator,
ramEstimator
));
ramEstimator));
}

/**
* Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks.
* Decorates the default {@link Function} with {@link Serializable}, which is
* required by some distributed frameworks.
*
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableFunction}
* instead.
*/
@FunctionalInterface
@Deprecated(forRemoval = true)
public interface SerializableFunction<Input, Output> extends Function<Input, Output>, Serializable {
}

/**
* Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks.
* Decorates the default {@link BiFunction} with {@link Serializable}, which is
* required by some distributed frameworks.
*
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableBiFunction}
* instead.
*/
@FunctionalInterface
public interface SerializableBiFunction<Input0, Input1, Output> extends BiFunction<Input0, Input1, Output>, Serializable {
@Deprecated(forRemoval = true)
public interface SerializableBiFunction<Input0, Input1, Output>
extends BiFunction<Input0, Input1, Output>, Serializable {
}


/**
* Extends a {@link SerializableFunction} to an {@link ExtendedFunction}.
*
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.ExtendedSerializableFunction}
* instead.
*/
public interface ExtendedSerializableFunction<Input, Output> extends SerializableFunction<Input, Output>, ExtendedFunction {
@Deprecated(forRemoval = true)
public interface ExtendedSerializableFunction<Input, Output>
extends SerializableFunction<Input, Output>, ExtendedFunction {
}

/**
* Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks.
* Decorates the default {@link BinaryOperator} with {@link Serializable}, which
* is required by some distributed frameworks.
*
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableBinaryOperator}
* instead.
*/
@FunctionalInterface
@Deprecated(forRemoval = true)
public interface SerializableBinaryOperator<Type> extends BinaryOperator<Type>, Serializable {
}

/**
* Extends a {@link SerializableBinaryOperator} to an {@link ExtendedFunction}.
*
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.ExtendedSerializableBinaryOperator}
* instead.
*/
public interface ExtendedSerializableBinaryOperator<Type> extends SerializableBinaryOperator<Type>, ExtendedFunction {
@Deprecated(forRemoval = true)
public interface ExtendedSerializableBinaryOperator<Type>
extends SerializableBinaryOperator<Type>, ExtendedFunction {
}

/**
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializablePredicate}
* instead.
*/
@FunctionalInterface
@Deprecated(forRemoval = true)
public interface SerializablePredicate<T> extends Predicate<T>, Serializable {

}

/**
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.ExtendedSerializablePredicate}
* instead.
*/
@Deprecated(forRemoval = true)
public interface ExtendedSerializablePredicate<T> extends SerializablePredicate<T>, ExtendedFunction {

}

/**
* Decorates the default {@link Consumer} with {@link Serializable}, which is required by some distributed frameworks.
* Decorates the default {@link Consumer} with {@link Serializable}, which is
* required by some distributed frameworks.
*
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableConsumer}
* instead.
*/
@FunctionalInterface
@Deprecated(forRemoval = true)
public interface SerializableConsumer<T> extends Consumer<T>, Serializable {

}

/**
* Extends a {@link SerializableConsumer} to an {@link ExtendedFunction}.
*
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.ExtendedSerializableConsumer}
* instead.
*/
public interface ExtendedSerializableConsumer<T> extends SerializableConsumer<T>, ExtendedFunction{

@Deprecated(forRemoval = true)
public interface ExtendedSerializableConsumer<T> extends SerializableConsumer<T>, ExtendedFunction {
}

/**
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableIntUnaryOperator}
* instead.
*/
@FunctionalInterface
@Deprecated(forRemoval = true)
public interface SerializableIntUnaryOperator extends IntUnaryOperator, Serializable {

}

/**
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableLongUnaryOperator}
* instead.
*/
@FunctionalInterface
@Deprecated(forRemoval = true)
public interface SerializableLongUnaryOperator extends LongUnaryOperator, Serializable {

}

/**
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableToLongBiFunction}
* instead.
*/
@FunctionalInterface
@Deprecated(forRemoval = true)
public interface SerializableToLongBiFunction<T, U> extends ToLongBiFunction<T, U>, Serializable {

}

/**
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableToDoubleBiFunction}
* instead.
*/
@FunctionalInterface
@Deprecated(forRemoval = true)
public interface SerializableToDoubleBiFunction<T, U> extends ToDoubleBiFunction<T, U>, Serializable {

}

/**
* @deprecated Use
* {@link org.apache.wayang.core.function.SerializableFunctionInterface.SerializableToLongFunction}
* instead.
*/
@FunctionalInterface
@Deprecated(forRemoval = true)
public interface SerializableToLongFunction<T> extends ToLongFunction<T>, Serializable {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ public String getSqlImplementation() {
* out how to express functions in a platform-independent way.
*
* @param sqlImplementation a SQL predicate applicable in a {@code WHERE} clause representing this predicate
* @deprecated to be removed, should be handled as a concrete implementation in a descriptor.
*/
@Deprecated
public PredicateDescriptor<Input> withSqlImplementation(String sqlImplementation) {
this.sqlImplementation = sqlImplementation;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.apache.wayang.core.function;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add license header.


import java.io.Serializable;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntUnaryOperator;
import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
import java.util.function.ToDoubleBiFunction;
import java.util.function.ToLongBiFunction;
import java.util.function.ToLongFunction;

public interface SerializableFunctionInterface {

/**
* Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks.
*/
@FunctionalInterface
public interface SerializableFunction<Input, Output> extends Function<Input, Output>, Serializable {
}

/**
* Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks.
*/
@FunctionalInterface
public interface SerializableBiFunction<Input0, Input1, Output> extends BiFunction<Input0, Input1, Output>, Serializable {
}


/**
* Extends a {@link SerializableFunction} to an {@link ExtendedFunction}.
*/
public interface ExtendedSerializableFunction<Input, Output> extends SerializableFunction<Input, Output>, ExtendedFunction {
}

/**
* Decorates the default {@link Function} with {@link Serializable}, which is required by some distributed frameworks.
*/
@FunctionalInterface
public interface SerializableBinaryOperator<Type> extends BinaryOperator<Type>, Serializable {
}

/**
* Extends a {@link SerializableBinaryOperator} to an {@link ExtendedFunction}.
*/
public interface ExtendedSerializableBinaryOperator<Type> extends SerializableBinaryOperator<Type>, ExtendedFunction {
}

@FunctionalInterface
public interface SerializablePredicate<T> extends Predicate<T>, Serializable {

}

public interface ExtendedSerializablePredicate<T> extends SerializablePredicate<T>, ExtendedFunction {

}

/**
* Decorates the default {@link Consumer} with {@link Serializable}, which is required by some distributed frameworks.
*/
@FunctionalInterface
public interface SerializableConsumer<T> extends Consumer<T>, Serializable {

}
/**
* Extends a {@link SerializableConsumer} to an {@link ExtendedFunction}.
*/
public interface ExtendedSerializableConsumer<T> extends SerializableConsumer<T>, ExtendedFunction{

}

@FunctionalInterface
public interface SerializableIntUnaryOperator extends IntUnaryOperator, Serializable {

}

@FunctionalInterface
public interface SerializableLongUnaryOperator extends LongUnaryOperator, Serializable {

}

@FunctionalInterface
public interface SerializableToLongBiFunction<T, U> extends ToLongBiFunction<T, U>, Serializable {

}

@FunctionalInterface
public interface SerializableToDoubleBiFunction<T, U> extends ToDoubleBiFunction<T, U>, Serializable {

}

@FunctionalInterface
public interface SerializableToLongFunction<T> extends ToLongFunction<T>, Serializable {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ public Tuple<String, String> getSqlImplementation() {
*
* @param tableName a SQL table name applicable in a {@code JOIN TABLE ON}.
* @param sqlImplementation a SQL predicate applicable in a {@code WHERE} clause representing this predicate
* @deprecated to be removed, should be handled as a concrete implementation of a descriptor.
*/
@Deprecated
public TransformationDescriptor<Input, Output> withSqlImplementation(String tableName, String sqlImplementation) {
this.sqlImplementation = new Tuple<String, String>(tableName, sqlImplementation);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@

/**
* Compiles {@link FunctionDescriptor}s to SQL clauses.
* @deprecated to be removed.
*/
@Deprecated
public class FunctionCompiler {

/**
* Compile a predicate to a SQL {@code WHERE} clause.
*
* @param descriptor describes the predicate
* @return a compiled SQL {@code WHERE} clause
* @deprecated to be removed
*/
@Deprecated
public String compile(PredicateDescriptor descriptor) {
final String sqlImplementation = descriptor.getSqlImplementation();
assert sqlImplementation != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ protected static Tuple2<String, SqlQueryChannel.Instance> createSqlQuery(final E
public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp,
final Collection<JdbcFilterOperator> filterTasks, JdbcProjectionOperator projectionTask,
final Collection<JdbcJoinOperator<?>> joinTasks) {
final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
final String tableName = tableOp.createSqlClause();
final Collection<String> conditions = filterTasks.stream()
.map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
.map(op -> op.createSqlClause())
.collect(Collectors.toList());
final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause();
final Collection<String> joins = joinTasks.stream()
.map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
.map(op -> op.createSqlClause())
.collect(Collectors.toList());

final StringBuilder sb = new StringBuilder(1000);
Expand Down
Loading
Loading