diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangMultiConditionJoinVisitor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangMultiConditionJoinVisitor.java index c88faa9ac..dfc14462a 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangMultiConditionJoinVisitor.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangMultiConditionJoinVisitor.java @@ -134,14 +134,12 @@ Operator visit(WayangJoin wayangRelNode) { } /** - * This method handles the {@link JoinOperator} creation, used in conjunction - * with: - * {@link #determineKeyExtractionDirection(Integer, Integer, WayangJoin)} + * This method handles the {@link JoinOperator} creation * * @param wayangRelNode * @param leftKeyIndex * @param rightKeyIndex - * @return a {@link JoinOperator} with {@link KeyExtractors} set + * @return */ protected JoinOperator getJoinOperator(final Integer[] leftKeyIndexes, final Integer[] rightKeyIndexes, diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java index f6452da84..90eee252c 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java @@ -56,6 +56,8 @@ public Operator convert(final RelNode node) { return new WayangProjectVisitor(this).visit((WayangProject) node); } else if (node instanceof WayangFilter) { return new WayangFilterVisitor(this).visit((WayangFilter) node); + } else if (node instanceof WayangSort) { + return new WayangSortVisitor(this).visit((WayangSort) node); } else if (node instanceof WayangJoin && ((WayangJoin) node).getCondition().isA(SqlKind.AND)) { return new WayangMultiConditionJoinVisitor(this).visit((WayangJoin) node); } else if (node instanceof WayangJoin && WayangJoin.class.cast(node).getCondition().isAlwaysTrue()) { diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java new file mode 100644 index 000000000..5176f24b2 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.sql.calcite.converter; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelFieldCollation.Direction; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; + +import org.apache.wayang.api.sql.calcite.converter.functions.SortKeyExtractor; +import org.apache.wayang.api.sql.calcite.rel.WayangSort; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.SortOperator; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.plan.wayangplan.Operator; + +public class WayangSortVisitor extends WayangRelNodeVisitor { + + WayangSortVisitor(final WayangRelConverter wayangRelConverter) { + super(wayangRelConverter); + } + + @Override + Operator visit(final WayangSort wayangRelNode) { + assert (wayangRelNode.getInputs().size() == 1) + : "Sorts must only have one input, but found: " + wayangRelNode.getInputs().size(); + + final Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput()); + + //TODO: implement fetch & offset for java + final RexNode fetch = wayangRelNode.fetch; + final RexLiteral offset = (RexLiteral) wayangRelNode.offset; + + if (fetch != null || offset != null) throw new UnsupportedOperationException("Offset and fetch currently not supported, these appear via LIMIT statements in SQL"); + + final RelCollation collation = wayangRelNode.getCollation(); + + final List collationDirections = collation.getFieldCollations().stream() + .map(fieldCol -> fieldCol.getDirection()) + .collect(Collectors.toList()); + + final List collationIndexes = collation.getFieldCollations().stream() + .map(fieldCol -> fieldCol.getFieldIndex()) + .collect(Collectors.toList()); + + final TransformationDescriptor td = new TransformationDescriptor( + new SortKeyExtractor( + collationDirections, + collationIndexes), + Record.class, Record.class); + + final SortOperator sort = new SortOperator(td); + + childOp.connectTo(0, sort, 0); + + return sort; + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortKeyExtractor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortKeyExtractor.java new file mode 100644 index 000000000..440152f29 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortKeyExtractor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.sql.calcite.converter.functions; + +import java.util.List; + +import org.apache.calcite.rel.RelFieldCollation.Direction; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.function.FunctionDescriptor; + +public class SortKeyExtractor implements FunctionDescriptor.SerializableFunction { + final List directions; + final List collationIndexes; + + public SortKeyExtractor(final List collationDirections, final List collationIndexes) { + this.directions = collationDirections; + this.collationIndexes = collationIndexes; + } + + @Override + public Record apply(final Record record) { + return new Record(collationIndexes.stream().map(record::getField).toArray()) { + @Override + public int compareTo(final Record that) throws IllegalStateException { + assert (directions.size() == collationIndexes.size()) : "Mismatch between the amount of collation indexes and directions"; + + for (int i = 0; i < directions.size(); i++) { + final Direction direction = directions.get(i); + final Comparable thisField = (Comparable) this.getValues()[i]; + final Comparable thatField = (Comparable) that.getValues()[i]; + + // == 0, < -1, > 1 + final int comparison = direction.isDescending() ? -thisField.compareTo(thatField) : thisField.compareTo(thatField); + + if (comparison != 0) return comparison; + } + + return 0; + } + }; + } +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rel/WayangSort.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rel/WayangSort.java new file mode 100644 index 000000000..8ae0d2337 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rel/WayangSort.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.sql.calcite.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rex.RexNode; +import org.apache.wayang.api.sql.calcite.convention.WayangConvention; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; + +public class WayangSort extends Sort implements WayangRel { + public WayangSort(final RelOptCluster cluster, + final RelTraitSet traits, + final List hints, + final RelNode child, + final RelCollation collation, + @Nullable final RexNode offset, + @Nullable final RexNode fetch) { + super(cluster, traits, hints, child, collation, offset, fetch); + assert getConvention() instanceof WayangConvention; + } + + @Override + public Sort copy(final RelTraitSet traitSet, final RelNode newInput, final RelCollation newCollation, @Nullable final RexNode offset, + @Nullable final RexNode fetch) { + return new WayangSort(getCluster(), traitSet, getHints(), newInput, newCollation, offset, fetch); + } + + @Override + public String toString() { + return "WayangSort"; + } +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java index 0aa221f48..2a7609abd 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangRules.java @@ -29,12 +29,14 @@ import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalTableScan; + import org.apache.wayang.api.sql.calcite.convention.WayangConvention; import org.apache.wayang.api.sql.calcite.rel.WayangFilter; import org.apache.wayang.api.sql.calcite.rel.WayangJoin; import org.apache.wayang.api.sql.calcite.rel.WayangProject; import org.apache.wayang.api.sql.calcite.rel.WayangTableScan; import org.apache.wayang.api.sql.calcite.rel.WayangAggregate; + import org.checkerframework.checker.nullness.qual.Nullable; import java.util.ArrayList; @@ -43,18 +45,17 @@ //TODO: split into multiple classes public class WayangRules { - private WayangRules(){ + private WayangRules() { } - public static final RelOptRule WAYANG_JOIN_RULE = new WayangJoinRule(WayangJoinRule.DEFAULT_CONFIG); public static final RelOptRule WAYANG_PROJECT_RULE = new WayangProjectRule(WayangProjectRule.DEFAULT_CONFIG); - public static final RelOptRule WAYANG_FILTER_RULE = new WayangFilterRule(WayangFilterRule.DEFAULT_CONFIG); + public static final RelOptRule WAYANG_FILTER_RULE = new WayangFilterRule(WayangFilterRule.DEFAULT_CONFIG); public static final RelOptRule WAYANG_TABLESCAN_RULE = new WayangTableScanRule(WayangTableScanRule.DEFAULT_CONFIG); - public static final RelOptRule WAYANG_TABLESCAN_ENUMERABLE_RULE = - new WayangTableScanRule(WayangTableScanRule.ENUMERABLE_CONFIG); - public static final RelOptRule WAYANG_AGGREGATE_RULE = new WayangAggregateRule(WayangAggregateRule.DEFAULT_CONFIG); - + public static final RelOptRule WAYANG_TABLESCAN_ENUMERABLE_RULE = new WayangTableScanRule( + WayangTableScanRule.ENUMERABLE_CONFIG); + public static final RelOptRule WAYANG_AGGREGATE_RULE = new WayangAggregateRule(WayangAggregateRule.DEFAULT_CONFIG); + public static final RelOptRule WAYANG_SORT_RULE = new WayangSortRule(WayangSortRule.DEFAULT_CONFIG); private static class WayangProjectRule extends ConverterRule { @@ -64,12 +65,11 @@ private static class WayangProjectRule extends ConverterRule { "WayangProjectRule") .withRuleFactory(WayangProjectRule::new); - - protected WayangProjectRule(Config config) { + protected WayangProjectRule(final Config config) { super(config); } - public RelNode convert(RelNode rel) { + public RelNode convert(final RelNode rel) { final LogicalProject project = (LogicalProject) rel; return new WayangProject( project.getCluster(), @@ -81,7 +81,6 @@ public RelNode convert(RelNode rel) { } } - private static class WayangFilterRule extends ConverterRule { public static final Config DEFAULT_CONFIG = Config.INSTANCE @@ -90,19 +89,17 @@ private static class WayangFilterRule extends ConverterRule { "WayangFilterRule") .withRuleFactory(WayangFilterRule::new); - - protected WayangFilterRule(Config config) { + protected WayangFilterRule(final Config config) { super(config); } @Override - public RelNode convert(RelNode rel) { + public RelNode convert(final RelNode rel) { final LogicalFilter filter = (LogicalFilter) rel; return new WayangFilter( rel.getCluster(), rel.getTraitSet().replace(WayangConvention.INSTANCE), - convert(filter.getInput(), filter.getInput().getTraitSet(). - replace(WayangConvention.INSTANCE)), + convert(filter.getInput(), filter.getInput().getTraitSet().replace(WayangConvention.INSTANCE)), filter.getCondition()); } @@ -122,28 +119,27 @@ private static class WayangTableScanRule extends ConverterRule { "WayangTableScanRule1") .withRuleFactory(WayangTableScanRule::new); - protected WayangTableScanRule(Config config) { + protected WayangTableScanRule(final Config config) { super(config); } @Override - public @Nullable RelNode convert(RelNode relNode) { + public @Nullable RelNode convert(final RelNode relNode) { - TableScan scan = (TableScan) relNode; + final TableScan scan = (TableScan) relNode; final RelOptTable relOptTable = scan.getTable(); /** * This is quick hack to prevent volcano from merging projects on to TableScans * TODO: a cleaner way to handle this */ - if(relOptTable.getRowType() == scan.getRowType()) { + if (relOptTable.getRowType() == scan.getRowType()) { return WayangTableScan.create(scan.getCluster(), relOptTable); } return null; } } - private static class WayangJoinRule extends ConverterRule { public static final Config DEFAULT_CONFIG = Config.INSTANCE @@ -151,16 +147,16 @@ private static class WayangJoinRule extends ConverterRule { WayangConvention.INSTANCE, "WayangJoinRule") .withRuleFactory(WayangJoinRule::new); - protected WayangJoinRule(Config config) { + protected WayangJoinRule(final Config config) { super(config); } @Override - public @Nullable RelNode convert(RelNode relNode) { - LogicalJoin join = (LogicalJoin) relNode; - List newInputs = new ArrayList<>(); - for(RelNode input : join.getInputs()) { - if(!(input.getConvention() instanceof WayangConvention)) { + public @Nullable RelNode convert(final RelNode relNode) { + final LogicalJoin join = (LogicalJoin) relNode; + final List newInputs = new ArrayList<>(); + for (RelNode input : join.getInputs()) { + if (!(input.getConvention() instanceof WayangConvention)) { input = convert(input, input.getTraitSet().replace(WayangConvention.INSTANCE)); } newInputs.add(input); @@ -173,10 +169,10 @@ protected WayangJoinRule(Config config) { newInputs.get(1), join.getCondition(), join.getVariablesSet(), - join.getJoinType() - ); + join.getJoinType()); } } + private static class WayangAggregateRule extends ConverterRule { public static final Config DEFAULT_CONFIG = Config.INSTANCE @@ -185,14 +181,15 @@ private static class WayangAggregateRule extends ConverterRule { "WayangAggregateRule") .withRuleFactory(WayangAggregateRule::new); - protected WayangAggregateRule(Config config) { + protected WayangAggregateRule(final Config config) { super(config); } @Override - public @Nullable RelNode convert(RelNode relNode) { - LogicalAggregate aggregate = (LogicalAggregate) relNode; - RelNode input = convert(aggregate.getInput(), aggregate.getInput().getTraitSet().replace(WayangConvention.INSTANCE)); + public @Nullable RelNode convert(final RelNode relNode) { + final LogicalAggregate aggregate = (LogicalAggregate) relNode; + final RelNode input = convert(aggregate.getInput(), + aggregate.getInput().getTraitSet().replace(WayangConvention.INSTANCE)); return new WayangAggregate( aggregate.getCluster(), @@ -201,10 +198,7 @@ protected WayangAggregateRule(Config config) { input, aggregate.getGroupSet(), aggregate.getGroupSets(), - aggregate.getAggCallList() - ); + aggregate.getAggCallList()); } } - - } diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangSortRule.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangSortRule.java new file mode 100644 index 000000000..7011b82fe --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangSortRule.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.sql.calcite.rules; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.logical.LogicalSort; + +import org.apache.wayang.api.sql.calcite.convention.WayangConvention; +import org.apache.wayang.api.sql.calcite.rel.WayangSort; + +/** + * Rule that converts {@link LogicalSort} to Wayang convention + * {@link WayangSort} + */ +public class WayangSortRule extends ConverterRule { + public static final Config DEFAULT_CONFIG = Config.INSTANCE + .withConversion(LogicalSort.class, Convention.NONE, WayangConvention.INSTANCE, + "WayangSortRule") + .withRuleFactory(WayangSortRule::new); + + protected WayangSortRule(final Config config) { + super(config); + } + + @Override + public RelNode convert(final RelNode rel) { + final LogicalSort sort = (LogicalSort) rel; + + final RelNode newInput = convert( + sort.getInput(), + sort.getInput() + .getTraitSet() + .replace(WayangConvention.INSTANCE)); + + return new WayangSort(sort.getCluster(), + sort.getTraitSet().replace(WayangConvention.INSTANCE), + sort.getHints(), + newInput, + sort.collation, + sort.fetch, + sort.offset); + } +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java index 9c10c3dfc..46df64ca8 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java @@ -56,7 +56,7 @@ public SqlContext() throws SQLException { this(new Configuration()); } - public SqlContext(Configuration configuration) throws SQLException { + public SqlContext(final Configuration configuration) throws SQLException { super(configuration.fork(String.format("SqlContext(%s)", configuration.getName()))); this.withPlugin(Java.basicPlugin()); @@ -66,58 +66,55 @@ public SqlContext(Configuration configuration) throws SQLException { calciteSchema = SchemaUtils.getSchema(configuration); } - public SqlContext(Configuration configuration, List plugins) throws SQLException { + public SqlContext(final Configuration configuration, final List plugins) throws SQLException { super(configuration.fork(String.format("SqlContext(%s)", configuration.getName()))); - for (Plugin plugin : plugins) { + for (final Plugin plugin : plugins) { this.withPlugin(plugin); } calciteSchema = SchemaUtils.getSchema(configuration); } - public Collection executeSql(String sql) throws SqlParseException { + public Collection executeSql(final String sql) throws SqlParseException { - Properties configProperties = Optimizer.ConfigProperties.getDefaults(); - RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); + final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); + final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); - Optimizer optimizer = Optimizer.create(calciteSchema, configProperties, + final Optimizer optimizer = Optimizer.create(calciteSchema, configProperties, relDataTypeFactory); - SqlNode sqlNode = optimizer.parseSql(sql); - SqlNode validatedSqlNode = optimizer.validate(sqlNode); - RelNode relNode = optimizer.convert(validatedSqlNode); + final SqlNode sqlNode = optimizer.parseSql(sql); + final SqlNode validatedSqlNode = optimizer.validate(sqlNode); + final RelNode relNode = optimizer.convert(validatedSqlNode); PrintUtils.print("After parsing sql query", relNode); - - RuleSet rules = RuleSets.ofList( + final RuleSet rules = RuleSets.ofList( WayangRules.WAYANG_TABLESCAN_RULE, WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, WayangRules.WAYANG_PROJECT_RULE, WayangRules.WAYANG_FILTER_RULE, WayangRules.WAYANG_JOIN_RULE, - WayangRules.WAYANG_AGGREGATE_RULE - ); - RelNode wayangRel = optimizer.optimize( + WayangRules.WAYANG_AGGREGATE_RULE, + WayangRules.WAYANG_SORT_RULE); + + final RelNode wayangRel = optimizer.optimize( relNode, relNode.getTraitSet().plus(WayangConvention.INSTANCE), - rules - ); + rules); PrintUtils.print("After translating logical intermediate plan", wayangRel); - - Collection collector = new ArrayList<>(); - WayangPlan wayangPlan = optimizer.convert(wayangRel, collector); + final Collection collector = new ArrayList<>(); + final WayangPlan wayangPlan = optimizer.convert(wayangRel, collector); this.execute(getJobName(), wayangPlan); return collector; } private static String getJobName() { - return "SQL["+jobId.incrementAndGet()+"]"; + return "SQL[" + jobId.incrementAndGet() + "]"; } - } diff --git a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java index 0a03339a1..37a93a305 100755 --- a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java +++ b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java @@ -17,7 +17,6 @@ package org.apache.wayang.api.sql; -import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.externalize.RelWriterImpl; @@ -43,10 +42,10 @@ import org.apache.wayang.api.sql.calcite.schema.WayangTable; import org.apache.wayang.api.sql.calcite.schema.WayangTableBuilder; import org.apache.wayang.api.sql.calcite.utils.ModelParser; +import org.apache.wayang.api.sql.calcite.utils.PrintUtils; import org.apache.wayang.api.sql.context.SqlContext; import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.core.api.Configuration; -import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction; import org.apache.wayang.core.function.FunctionDescriptor.SerializablePredicate; import org.apache.wayang.core.plan.wayangplan.Operator; import org.apache.wayang.core.plan.wayangplan.PlanTraversal; @@ -54,6 +53,7 @@ import org.apache.wayang.java.Java; import org.apache.wayang.spark.Spark; import org.apache.wayang.basic.data.Record; + import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; @@ -77,526 +77,586 @@ public class SqlToWayangRelTest { - /** - * Method for building {@link WayangPlan}s useful for testing, benchmarking and - * other - * usages where you want to handle the intermediate {@link WayangPlan} - * - * @param sql sql query string with the {@code ;} cut off - * @param udfJars - * @return a {@link WayangPlan} of a given sql string - * @throws SqlParseException - * @throws SQLException - */ - public Tuple2, WayangPlan> buildCollectorAndWayangPlan(final SqlContext context, - final String sql, final String... udfJars) throws SqlParseException, SQLException { - final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); - final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); - - final Optimizer optimizer = Optimizer.create( - SchemaUtils.getSchema(context.getConfiguration()), - configProperties, - relDataTypeFactory); - - final SqlNode sqlNode = optimizer.parseSql(sql); - final SqlNode validatedSqlNode = optimizer.validate(sqlNode); - final RelNode relNode = optimizer.convert(validatedSqlNode); - - final RuleSet rules = RuleSets.ofList( - CoreRules.FILTER_INTO_JOIN, - WayangRules.WAYANG_TABLESCAN_RULE, - WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, - WayangRules.WAYANG_PROJECT_RULE, - WayangRules.WAYANG_FILTER_RULE, - WayangRules.WAYANG_JOIN_RULE, - WayangRules.WAYANG_AGGREGATE_RULE); - - final RelNode wayangRel = optimizer.optimize( - relNode, - relNode.getTraitSet().plus(WayangConvention.INSTANCE), - rules); - - final Collection collector = new ArrayList<>(); - final WayangPlan wayangPlan = optimizer.convertWithConfig(wayangRel, context.getConfiguration(), - collector); - - return new Tuple2<>(collector, wayangPlan); - } - - @Test - public void javaJoinTest() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEA = exampleRefToRef.NAMEA"); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - // except reduce by - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Java.platform()); - }); - - sqlContext.execute(wayangPlan); - - assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2", "test1", "test1")))); - } - - @Test - public void javaMultiConditionJoin() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEB = exampleRefToRef.NAMEB AND largeLeftTableIndex.NAMEC = exampleRefToRef.NAMEB"); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - // except reduce by - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Java.platform()); - }); - - sqlContext.execute(wayangPlan); - - final boolean checkEq = result.stream() - .allMatch(rec -> rec.equals(new Record("", "test2", "test2", "", "test2"))); - - assert (checkEq); - } - - @Test - public void aggregateCountInJavaWithIntegers() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/exampleInt.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT exampleInt.NAMEC, COUNT(*) FROM fs.exampleInt GROUP BY NAMEC"); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - // except reduce by - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Java.platform()); - }); - - sqlContext.execute(wayangPlan); - - final Record rec = result.stream().findFirst().get(); - assert (rec.size() == 2); - assert (rec.getInt(1) == 3); - } - - @Test - public void aggregateCountInJava() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT largeLeftTableIndex.NAMEC, COUNT(*) FROM fs.largeLeftTableIndex GROUP BY NAMEC"); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - // except reduce by - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Java.platform()); - }); - - sqlContext.execute(wayangPlan); - - final Record rec = result.stream().findFirst().get(); - assert (rec.size() == 2); - assert (rec.getInt(1) == 3); - } - - @Test - public void filterIsNull() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA IS NULL)" // - ); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - assert (result.size() == 0); - } - - @Test - public void filterIsNotValue() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA <> 'test1')" // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - sqlContext.execute(wayangPlan); - - assert (!result.stream().anyMatch(record -> record.getField(0).equals("test1"))); - } + /** + * Method for building {@link WayangPlan}s useful for testing, benchmarking and + * other + * usages where you want to handle the intermediate {@link WayangPlan} + * + * @param sql sql query string with the {@code ;} cut off + * @param udfJars + * @return a {@link WayangPlan} of a given sql string + * @throws SqlParseException + * @throws SQLException + */ + public Tuple2, WayangPlan> buildCollectorAndWayangPlan(final SqlContext context, + final String sql, final String... udfJars) throws SqlParseException, SQLException { + final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); + final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); + + final Optimizer optimizer = Optimizer.create( + SchemaUtils.getSchema(context.getConfiguration()), + configProperties, + relDataTypeFactory); + + final SqlNode sqlNode = optimizer.parseSql(sql); + final SqlNode validatedSqlNode = optimizer.validate(sqlNode); + final RelNode relNode = optimizer.convert(validatedSqlNode); + + final RuleSet rules = RuleSets.ofList( + CoreRules.FILTER_INTO_JOIN, + WayangRules.WAYANG_TABLESCAN_RULE, + WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, + WayangRules.WAYANG_PROJECT_RULE, + WayangRules.WAYANG_FILTER_RULE, + WayangRules.WAYANG_JOIN_RULE, + WayangRules.WAYANG_AGGREGATE_RULE, + WayangRules.WAYANG_SORT_RULE); + + final RelNode wayangRel = optimizer.optimize( + relNode, + relNode.getTraitSet().plus(WayangConvention.INSTANCE), + rules); + + final Collection collector = new ArrayList<>(); + + final WayangPlan wayangPlan = optimizer.convertWithConfig(wayangRel, context.getConfiguration(), + collector); + + return new Tuple2<>(collector, wayangPlan); + } + + @Test + public void javaJoinTest() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEA = exampleRefToRef.NAMEA"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Java.platform()); + }); + + sqlContext.execute(wayangPlan); + + assert (result.stream() + .anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2", "test1", "test1")))); + } + + @Test + public void javaMultiConditionJoin() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEB = exampleRefToRef.NAMEB AND largeLeftTableIndex.NAMEC = exampleRefToRef.NAMEB"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Java.platform()); + }); + + sqlContext.execute(wayangPlan); + + final boolean checkEq = result.stream() + .allMatch(rec -> rec.equals(new Record("", "test2", "test2", "", "test2"))); + + assert (checkEq); + } + + @Test + public void aggregateCountInJavaWithIntegers() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/exampleInt.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT exampleInt.NAMEC, COUNT(*) FROM fs.exampleInt GROUP BY NAMEC"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Java.platform()); + }); + + sqlContext.execute(wayangPlan); + + final Record rec = result.stream().findFirst().get(); + assert (rec.size() == 2); + assert (rec.getInt(1) == 3); + } + + @Test + public void aggregateCountInJava() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT largeLeftTableIndex.NAMEC, COUNT(*) FROM fs.largeLeftTableIndex GROUP BY NAMEC"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Java.platform()); + }); + + sqlContext.execute(wayangPlan); + + final Record rec = result.stream().findFirst().get(); + assert (rec.size() == 2); + assert (rec.getInt(1) == 3); + } + + @Test + public void filterIsNull() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA IS NULL)" // + ); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); + assert (result.size() == 0); + } + + @Test + public void filterIsNotValue() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA <> 'test1')" // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + sqlContext.execute(wayangPlan); + + assert (!result.stream().anyMatch(record -> record.getField(0).equals("test1"))); + } + + @Test + public void filterIsNotNull() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA IS NOT NULL)" // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); + + assert (!result.stream().anyMatch(record -> record.getField(0).equals(null))); + } + + @Test + public void javaReduceBy() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan( + sqlContext, + "select exampleSmallA.COLA, count(*) from fs.exampleSmallA group by exampleSmallA.COLA"); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Java.platform()); + }); + + sqlContext.execute(wayangPlan); + + assert (result.stream().anyMatch(rec -> rec.equals(new Record("item1", 2)))); + } + + @Test + public void javaCrossJoin() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan( + sqlContext, + "select * from fs.exampleSmallA cross join fs.exampleSmallB"); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + sqlContext.execute(wayangPlan); + + final List shouldBe = List.of( + new Record("item1", "item2", "item1", "item2", "item3"), + new Record("item1", "item2", "item1", "item2", "item3"), + new Record("item1", "item2", "item1", "item2", "item3"), + new Record("item1", "item2", "item1", "item2", "item3"), + new Record("item1", "item2", "x", "x", "x"), + new Record("item1", "item2", "x", "x", "x")); + + final Map resultTally = result.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + final Map shouldBeTally = shouldBe.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + + assert (resultTally.equals(shouldBeTally)); + } + + @Test + public void filterWithNotLike() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA NOT LIKE '_est1')" // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); + + assert (!result.stream().anyMatch(record -> record.getString(0).equals("test1"))); + } + + @Test + public void filterWithLike() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE largeLeftTableIndex.NAMEA LIKE '_est1'" // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); + + assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2")))); + } + + //@Test + public void javaLimit() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT col1, col2, col3, count(*) as total from fs.exampleSort group by col1, col2, col3 order by col1 desc, col2, col3 desc LIMIT 1"); + + final Collection r = t.field0; + final WayangPlan wayangPlan = t.field1; + + sqlContext.execute(wayangPlan); + + final List result = r.stream().collect(Collectors.toList()); + + assert (result.get(0).equals(new Record(2, "a", "a", 2))); + } + + @Test + public void javaSort() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT col1, col2, col3, count(*) as total from fs.exampleSort group by col1, col2, col3 order by col1 desc, col2, col3 desc"); + + final Collection r = t.field0; + final WayangPlan wayangPlan = t.field1; + + sqlContext.execute(wayangPlan); + + final List result = r.stream().collect(Collectors.toList()); + + assert (result.get(0).equals(new Record(2, "a", "a", 2))); + assert (result.get(1).equals(new Record(1, "a", "b", 1))); + assert (result.get(2).equals(new Record(1, "a", "a", 1))); + assert (result.get(3).equals(new Record(1, "b", "b", 1))); + assert (result.get(4).equals(new Record(0, "a", "b", 1))); + assert (result.get(5).equals(new Record(0, "a", "a", 1))); + assert (result.get(6).equals(new Record(0, "b", "b", 1))); + } + + @Test + public void joinWithLargeLeftTableIndexCorrect() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON na.NAMEB = nb.NAMEA " // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); + + final List shouldBe = List.of( + new Record("test1", "test1", "test2", "test1", "test1", "test2"), + new Record("test2", "", "test2", "", "test2", "test2"), + new Record("", "test2", "test2", "test2", "", "test2")); + + final Map resultTally = result.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + final Map shouldBeTally = shouldBe.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + + assert (resultTally.equals(shouldBeTally)); + } + + // Imagine case: l = {item1, item2}, r = {item3,item4}, j = {item1, item2, + // item3, item4} join on =($1,$3) would be =(item2, item4) in the join set + // however from the r set we need to factor in the + // offset, $3 -> 3 - l.size() = $1, r($1) = "item4" we cannot naively assume + // that it is always ordered as =(lRef,rRef), lRef < rRef. + // it may also be =($3,$1) + @Test + public void joinWithLargeLeftTableIndexMirrorAlias() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - @Test - public void filterIsNotNull() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " // + ); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA IS NOT NULL)" // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - - assert (!result.stream().anyMatch(record -> record.getField(0).equals(null))); - } - - @Test - public void javaReduceBy() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan( - sqlContext, - "select exampleSmallA.COLA, count(*) from fs.exampleSmallA group by exampleSmallA.COLA"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Java.platform()); - }); - - sqlContext.execute(wayangPlan); - - assert (result.stream().anyMatch(rec -> rec.equals(new Record("item1", 2)))); - } - - @Test - public void javaCrossJoin() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan( - sqlContext, - "select * from fs.exampleSmallA cross join fs.exampleSmallB"); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - sqlContext.execute(wayangPlan); - - final List shouldBe = List.of( - new Record("item1", "item2", "item1", "item2", "item3"), - new Record("item1", "item2", "item1", "item2", "item3"), - new Record("item1", "item2", "item1", "item2", "item3"), - new Record("item1", "item2", "item1", "item2", "item3"), - new Record("item1", "item2", "x", "x", "x"), - new Record("item1", "item2", "x", "x", "x")); - - final Map resultTally = result.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Map shouldBeTally = shouldBe.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - - assert (resultTally.equals(shouldBeTally)); - } + final List shouldBe = List.of( + new Record("test1", "test1", "test2", "test1", "test1", "test2"), + new Record("test2", "", "test2", "", "test2", "test2"), + new Record("", "test2", "test2", "test2", "", "test2")); - @Test - public void filterWithNotLike() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA NOT LIKE '_est1')" // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - - assert (!result.stream().anyMatch(record -> record.getString(0).equals("test1"))); - } - - @Test - public void filterWithLike() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + final Map resultTally = result.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + final Map shouldBeTally = shouldBe.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex WHERE largeLeftTableIndex.NAMEA LIKE '_est1'" // - ); + assert (resultTally.equals(shouldBeTally)); + } + + // @Test + public void sparkFilter() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - - assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2")))); - } - - @Test - public void joinWithLargeLeftTableIndexCorrect() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON na.NAMEB = nb.NAMEA " // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - - final List shouldBe = List.of( - new Record("test1", "test1", "test2", "test1", "test1", "test2"), - new Record("test2", "" , "test2", "" , "test2", "test2"), - new Record("" , "test2", "test2", "test2", "" , "test2") - ); - - final Map resultTally = result.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Map shouldBeTally = shouldBe.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - - assert (resultTally.equals(shouldBeTally)); - } - - // Imagine case: l = {item1, item2}, r = {item3,item4}, j = {item1, item2, - // item3, item4} join on =($1,$3) would be =(item2, item4) in the join set - // however from the r set we need to factor in the - // offset, $3 -> 3 - l.size() = $1, r($1) = "item4" we cannot naively assume - // that it is always ordered as =(lRef,rRef), lRef < rRef. - // it may also be =($3,$1) - @Test - public void joinWithLargeLeftTableIndexMirrorAlias() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - - final List shouldBe = List.of( - new Record("test1", "test1", "test2", "test1", "test1", "test2"), - new Record("test2", "" , "test2", "" , "test2", "test2"), - new Record("" , "test2", "test2", "test2", "" , "test2") - ); - - final Map resultTally = result.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Map shouldBeTally = shouldBe.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - - assert (resultTally.equals(shouldBeTally)); - } - - // tests sql-apis ability to serialize projections and joins - @Test - public void sparkInnerJoin() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Spark.platform()); - }); - - sqlContext.execute(wayangPlan); - - final List shouldBe = List.of( - new Record("test1", "test1", "test2", "test1", "test1", "test2"), - new Record("test2", "" , "test2", "" , "test2", "test2"), - new Record("" , "test2", "test2", "test2", "" , "test2") - ); - - final Map resultTally = result.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Map shouldBeTally = shouldBe.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - - assert (resultTally.equals(shouldBeTally)); - } - - //@Test - public void rexSerializationTest() throws Exception { - // create filterPredicateImpl for serialisation - final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); - final RexBuilder rb = new RexBuilder(typeFactory); - final RexNode leftOperand = rb.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0); - final RexNode rightOperand = rb.makeLiteral("test"); - final RexNode cond = rb.makeCall(SqlStdOperatorTable.EQUALS, leftOperand, rightOperand); - final SerializablePredicate fpImpl = new FilterPredicateImpl(cond); - - final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); - objectOutputStream.writeObject(fpImpl); - objectOutputStream.close(); - - final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( - byteArrayOutputStream.toByteArray()); - final ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); - final Object deserializedObject = objectInputStream.readObject(); - objectInputStream.close(); - - assert (((FilterPredicateImpl) deserializedObject).test(new Record("test"))); - } - - @Test - public void exampleFilterTableRefToTableRef() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/exampleRefToRef.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.exampleRefToRef WHERE exampleRefToRef.NAMEA = exampleRefToRef.NAMEB" // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - - assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1")))); - } - - @Test - public void exampleMinWithStrings() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/exampleMin.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT MIN(exampleMin.NAME) FROM fs.exampleMin" // - ); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - - assert (result.stream().findAny().get().getString(0).equals("AA")); - } - - public void test_simple_sql() throws Exception { - final WayangTable customer = WayangTableBuilder.build("customer") - .addField("id", SqlTypeName.INTEGER) - .addField("name", SqlTypeName.VARCHAR) - .addField("age", SqlTypeName.INTEGER) - .withRowCount(100) - .build(); - - final WayangTable orders = WayangTableBuilder.build("orders") - .addField("id", SqlTypeName.INTEGER) - .addField("cid", SqlTypeName.INTEGER) - .addField("price", SqlTypeName.DECIMAL) - .addField("quantity", SqlTypeName.INTEGER) - .withRowCount(100) - .build(); - - final WayangSchema wayangSchema = WayangSchemaBuilder.build("exSchema") - .addTable(customer) - .addTable(orders) - .build(); - - final Optimizer optimizer = Optimizer.create(wayangSchema); - - // String sql = "select c.name, c.age from customer c where (c.age < 40 or c.age - // > 60) and \'alex\' = c.name"; - // String sql = "select c.age from customer c"; - final String sql = "select c.name, c.age, o.price from customer c join orders o on c.id = o.cid where c.age > 40 " - + - "and o" + - ".price < 100"; - - final SqlNode sqlNode = optimizer.parseSql(sql); - final SqlNode validatedSqlNode = optimizer.validate(sqlNode); - final RelNode relNode = optimizer.convert(validatedSqlNode); - - print("After parsing", relNode); - - final RuleSet rules = RuleSets.ofList( - WayangRules.WAYANG_TABLESCAN_RULE, - WayangRules.WAYANG_PROJECT_RULE, - WayangRules.WAYANG_FILTER_RULE, - WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, - WayangRules.WAYANG_JOIN_RULE, - WayangRules.WAYANG_AGGREGATE_RULE); - - final RelNode wayangRel = optimizer.optimize( - relNode, - relNode.getTraitSet().plus(WayangConvention.INSTANCE), - rules); - - print("After rel to wayang conversion", wayangRel); - - // WayangPlan plan = optimizer.convert(wayangRel); - - // print("After Translating to WayangPlan", plan); - - } - - private SqlContext createSqlContext(final String tableResourceName) - throws IOException, ParseException, SQLException { - final String calciteModel = "{\r\n" + // - " \"calcite\": {\r\n" + // - " \"version\": \"1.0\",\r\n" + // - " \"defaultSchema\": \"wayang\",\r\n" + // - " \"schemas\": [\r\n" + // - " {\r\n" + // - " \"name\": \"fs\",\r\n" + // - " \"type\": \"custom\",\r\n" + // - " \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\r\n" + // - " \"operand\": {\r\n" + // - " \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath() - + "\"\r\n" + // - " }\r\n" + // - " }\r\n" + // - " ]\r\n" + // - " },\r\n" + // - " \"separator\": \";\"\r\n" + // - " }\r\n" + // - " \r\n" + // - " \r\n" + // - ""; - - final JSONObject calciteModelJSON = (JSONObject) new JSONParser().parse(calciteModel); - final Configuration configuration = new ModelParser(new Configuration(), calciteModelJSON) - .setProperties(); - assert (configuration != null) - : "Could not get configuration with calcite model: " + calciteModel; - - final String dataPath = this.getClass().getResource(tableResourceName).getPath(); - assert (dataPath != null && dataPath != "") - : "Could not get table resource from path: " + tableResourceName; - - configuration.setProperty("wayang.fs.table.url", dataPath); - - configuration.setProperty( - "wayang.ml.executions.file", - "mle" + ".txt"); - - configuration.setProperty( - "wayang.ml.optimizations.file", - "mlo" + ".txt"); - - configuration.setProperty("wayang.ml.experience.enabled", "false"); - - return new SqlContext(configuration); - } - - private void print(final String header, final WayangPlan plan) { - final StringWriter sw = new StringWriter(); - sw.append(header).append(":").append("\n"); - - final Collection operators = PlanTraversal.upstream().traverse(plan.getSinks()) - .getTraversedNodes(); - operators.forEach(o -> sw.append(o.toString())); - - System.out.println(sw.toString()); - } - - private void print(final String header, final RelNode relTree) { - final StringWriter sw = new StringWriter(); - - sw.append(header).append(":").append("\n"); - - final RelWriterImpl relWriter = new RelWriterImpl(new PrintWriter(sw), SqlExplainLevel.ALL_ATTRIBUTES, - true); - - relTree.explain(relWriter); - - System.out.println(sw.toString()); - } + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex AS na WHERE na.NAMEA = 'test1'" // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Spark.platform()); + }); + + sqlContext.execute(wayangPlan); + + assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1")))); + } + + // tests sql-apis ability to serialize projections and joins + @Test + public void sparkInnerJoin() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Spark.platform()); + }); + + sqlContext.execute(wayangPlan); + + final List shouldBe = List.of( + new Record("test1", "test1", "test2", "test1", "test1", "test2"), + new Record("test2", "", "test2", "", "test2", "test2"), + new Record("", "test2", "test2", "test2", "", "test2")); + + final Map resultTally = result.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + final Map shouldBeTally = shouldBe.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + + assert (resultTally.equals(shouldBeTally)); + } + + // @Test + public void rexSerializationTest() throws Exception { + // create filterPredicateImpl for serialisation + final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); + final RexBuilder rb = new RexBuilder(typeFactory); + final RexNode leftOperand = rb.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0); + final RexNode rightOperand = rb.makeLiteral("test"); + final RexNode cond = rb.makeCall(SqlStdOperatorTable.EQUALS, leftOperand, rightOperand); + final SerializablePredicate fpImpl = new FilterPredicateImpl(cond); + + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); + objectOutputStream.writeObject(fpImpl); + objectOutputStream.close(); + + final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( + byteArrayOutputStream.toByteArray()); + final ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + final Object deserializedObject = objectInputStream.readObject(); + objectInputStream.close(); + + assert (((FilterPredicateImpl) deserializedObject).test(new Record("test"))); + } + + @Test + public void exampleFilterTableRefToTableRef() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/exampleRefToRef.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.exampleRefToRef WHERE exampleRefToRef.NAMEA = exampleRefToRef.NAMEB" // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); + + assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1")))); + } + + @Test + public void exampleMinWithStrings() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/exampleMin.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT MIN(exampleMin.NAME) FROM fs.exampleMin" // + ); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); + + assert (result.stream().findAny().get().getString(0).equals("AA")); + } + + public void test_simple_sql() throws Exception { + final WayangTable customer = WayangTableBuilder.build("customer") + .addField("id", SqlTypeName.INTEGER) + .addField("name", SqlTypeName.VARCHAR) + .addField("age", SqlTypeName.INTEGER) + .withRowCount(100) + .build(); + + final WayangTable orders = WayangTableBuilder.build("orders") + .addField("id", SqlTypeName.INTEGER) + .addField("cid", SqlTypeName.INTEGER) + .addField("price", SqlTypeName.DECIMAL) + .addField("quantity", SqlTypeName.INTEGER) + .withRowCount(100) + .build(); + + final WayangSchema wayangSchema = WayangSchemaBuilder.build("exSchema") + .addTable(customer) + .addTable(orders) + .build(); + + final Optimizer optimizer = Optimizer.create(wayangSchema); + + // String sql = "select c.name, c.age from customer c where (c.age < 40 or c.age + // > 60) and \'alex\' = c.name"; + // String sql = "select c.age from customer c"; + final String sql = "select c.name, c.age, o.price from customer c join orders o on c.id = o.cid where c.age > 40 " + + + "and o" + + ".price < 100"; + + final SqlNode sqlNode = optimizer.parseSql(sql); + final SqlNode validatedSqlNode = optimizer.validate(sqlNode); + final RelNode relNode = optimizer.convert(validatedSqlNode); + + print("After parsing", relNode); + + final RuleSet rules = RuleSets.ofList( + WayangRules.WAYANG_TABLESCAN_RULE, + WayangRules.WAYANG_PROJECT_RULE, + WayangRules.WAYANG_FILTER_RULE, + WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, + WayangRules.WAYANG_JOIN_RULE, + WayangRules.WAYANG_AGGREGATE_RULE); + + final RelNode wayangRel = optimizer.optimize( + relNode, + relNode.getTraitSet().plus(WayangConvention.INSTANCE), + rules); + + print("After rel to wayang conversion", wayangRel); + + // WayangPlan plan = optimizer.convert(wayangRel); + + // print("After Translating to WayangPlan", plan); + + } + + private SqlContext createSqlContext(final String tableResourceName) + throws IOException, ParseException, SQLException { + final String calciteModel = "{\r\n" + // + " \"calcite\": {\r\n" + // + " \"version\": \"1.0\",\r\n" + // + " \"defaultSchema\": \"wayang\",\r\n" + // + " \"schemas\": [\r\n" + // + " {\r\n" + // + " \"name\": \"fs\",\r\n" + // + " \"type\": \"custom\",\r\n" + // + " \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\r\n" + // + " \"operand\": {\r\n" + // + " \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath() + + "\"\r\n" + // + " }\r\n" + // + " }\r\n" + // + " ]\r\n" + // + " },\r\n" + // + " \"separator\": \";\"\r\n" + // + " }\r\n" + // + " \r\n" + // + " \r\n" + // + ""; + + final JSONObject calciteModelJSON = (JSONObject) new JSONParser().parse(calciteModel); + final Configuration configuration = new ModelParser(new Configuration(), calciteModelJSON) + .setProperties(); + assert (configuration != null) + : "Could not get configuration with calcite model: " + calciteModel; + + final String dataPath = this.getClass().getResource(tableResourceName).getPath(); + assert (dataPath != null && dataPath != "") + : "Could not get table resource from path: " + tableResourceName; + + configuration.setProperty("wayang.fs.table.url", dataPath); + + configuration.setProperty( + "wayang.ml.executions.file", + "mle" + ".txt"); + + configuration.setProperty( + "wayang.ml.optimizations.file", + "mlo" + ".txt"); + + configuration.setProperty("wayang.ml.experience.enabled", "false"); + + return new SqlContext(configuration); + } + + private void print(final String header, final WayangPlan plan) { + final StringWriter sw = new StringWriter(); + sw.append(header).append(":").append("\n"); + + final Collection operators = PlanTraversal.upstream().traverse(plan.getSinks()) + .getTraversedNodes(); + operators.forEach(o -> sw.append(o.toString())); + + System.out.println(sw.toString()); + } + + private void print(final String header, final RelNode relTree) { + final StringWriter sw = new StringWriter(); + + sw.append(header).append(":").append("\n"); + + final RelWriterImpl relWriter = new RelWriterImpl(new PrintWriter(sw), SqlExplainLevel.ALL_ATTRIBUTES, + true); + + relTree.explain(relWriter); + + System.out.println(sw.toString()); + } } diff --git a/wayang-api/wayang-api-sql/src/test/resources/data/exampleSort.csv b/wayang-api/wayang-api-sql/src/test/resources/data/exampleSort.csv new file mode 100644 index 000000000..836f09bbe --- /dev/null +++ b/wayang-api/wayang-api-sql/src/test/resources/data/exampleSort.csv @@ -0,0 +1,9 @@ +col1:int,col2:string,col3:string +0;a;a +0;b;b +0;a;b +2;a;a +2;a;a +1;a;a +1;b;b +1;a;b \ No newline at end of file diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java index 687aaa92c..00c76943a 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java @@ -27,17 +27,22 @@ import java.util.Objects; /** - * A Type that represents a record with a schema, might be replaced with something standard like JPA entity. + * A Type that represents a record with a schema, might be replaced with + * something standard like JPA entity. */ -public class Record implements Serializable, Copyable { +public class Record implements Serializable, Copyable, Comparable { private Object[] values; - public Record(Object... values) { + public Object[] getValues() { + return values; + } + + public Record(final Object... values) { this.values = values; } - public Record(List values) { + public Record(final List values) { this.values = values.toArray(); } @@ -47,10 +52,12 @@ public Record copy() { } @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || this.getClass() != o.getClass()) return false; - Record record2 = (Record) o; + public boolean equals(final Object o) { + if (this == o) + return true; + if (o == null || this.getClass() != o.getClass()) + return false; + final Record record2 = (Record) o; return Arrays.equals(this.values, record2.values); } @@ -64,7 +71,7 @@ public String toString() { return "Record" + Arrays.toString(this.values); } - public Object getField(int index) { + public Object getField(final int index) { return this.values[index]; } @@ -74,10 +81,10 @@ public Object getField(int index) { * @param index the index of the field * @return the {@code double} representation of the field */ - public double getDouble(int index) { - Object field = this.values[index]; + public double getDouble(final int index) { + final Object field = this.values[index]; return ReflectionUtils.toDouble(field); - } + } /** * Retrieve a field as a {@code long}. It must be castable as such. @@ -85,12 +92,16 @@ public double getDouble(int index) { * @param index the index of the field * @return the {@code long} representation of the field */ - public long getLong(int index) { - Object field = this.values[index]; - if (field instanceof Integer) return (Integer) field; - else if (field instanceof Long) return (Long) field; - else if (field instanceof Short) return (Short) field; - else if (field instanceof Byte) return (Byte) field; + public long getLong(final int index) { + final Object field = this.values[index]; + if (field instanceof Integer) + return (Integer) field; + else if (field instanceof Long) + return (Long) field; + else if (field instanceof Short) + return (Short) field; + else if (field instanceof Byte) + return (Byte) field; throw new IllegalStateException(String.format("%s cannot be retrieved as long.", field)); } @@ -100,11 +111,14 @@ public long getLong(int index) { * @param index the index of the field * @return the {@code int} representation of the field */ - public int getInt(int index) { - Object field = this.values[index]; - if (field instanceof Integer) return (Integer) field; - else if (field instanceof Short) return (Short) field; - else if (field instanceof Byte) return (Byte) field; + public int getInt(final int index) { + final Object field = this.values[index]; + if (field instanceof Integer) + return (Integer) field; + else if (field instanceof Short) + return (Short) field; + else if (field instanceof Byte) + return (Byte) field; throw new IllegalStateException(String.format("%s cannot be retrieved as int.", field)); } @@ -112,10 +126,12 @@ public int getInt(int index) { * Retrieve a field as a {@link String}. * * @param index the index of the field - * @return the field as a {@link String} (obtained via {@link Object#toString()}) or {@code null} if the field is {@code null} + * @return the field as a {@link String} (obtained via + * {@link Object#toString()}) or {@code null} if the field is + * {@code null} */ - public String getString(int index) { - Object field = this.values[index]; + public String getString(final int index) { + final Object field = this.values[index]; return field == null ? null : field.toString(); } @@ -125,7 +141,7 @@ public String getString(int index) { * @param index the index of the field * @param field the new value of the field to be set */ - public void setField(int index, Object field) { + public void setField(final int index, final Object field) { this.values[index] = field; } @@ -134,9 +150,9 @@ public void setField(int index, Object field) { * * @param field the field to add */ - public void addField(Object field) { - int size = this.size(); - Object[] newValues = Arrays.copyOf(this.values, size + 1); + public void addField(final Object field) { + final int size = this.size(); + final Object[] newValues = Arrays.copyOf(this.values, size + 1); newValues[size] = field; this.values = newValues; } @@ -150,4 +166,26 @@ public int size() { return this.values.length; } + /** + * Compares the fields of this record to the fields of another record. + * + * @param that another record not null + * @return + * @throws IllegalStateException if the two records do not have the same types in {@link #values} + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public int compareTo(final Record that) throws IllegalStateException { + for (int i = 0; i < values.length; i++) { + if (!this.values[i].getClass().equals(that.values[i].getClass())) + throw new IllegalStateException("Tried compare records with dissimilar classes had, this values: " + + this.values + ", that values: " + that.values + ", this item class: " + + this.values[i].getClass() + ", that item class: " + that.values[i].getClass()); + } + + final Comparable[] thisComparables = (Comparable[]) values; + final Comparable[] thatComparables = (Comparable[]) that.values; + + return Arrays.compare(thisComparables, thatComparables); + } } diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaSortOperator.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaSortOperator.java index 587c0a3ab..5eeaa33c9 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaSortOperator.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaSortOperator.java @@ -76,7 +76,7 @@ public Tuple, Collection> eval final Function keyExtractor = javaExecutor.getCompiler().compile(this.keyDescriptor); ((StreamChannel.Instance) outputs[0]).accept(((JavaChannelInstance) inputs[0]).provideStream() - .sorted((e1, e2) -> ((Comparable)keyExtractor.apply(e1)).compareTo(keyExtractor.apply(e2)))); + .sorted((e1, e2) -> ((Comparable) keyExtractor.apply(e1)).compareTo(keyExtractor.apply(e2)))); return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext); }