Skip to content

Commit 3fa657b

Browse files
authored
Merge pull request #643 from mspruc/sql-api-testing
tests for sql-api to postgres implementation & open jdbcexecutor for testing
2 parents 81ad95d + 09b7d78 commit 3fa657b

File tree

10 files changed

+332
-216
lines changed

10 files changed

+332
-216
lines changed

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangProjectVisitor.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919

2020
package org.apache.wayang.api.sql.calcite.converter;
2121

22-
import org.apache.calcite.rel.core.Project;
2322
import org.apache.calcite.rex.RexNode;
2423

2524
import org.apache.wayang.api.sql.calcite.converter.functions.ProjectMapFuncImpl;
2625
import org.apache.wayang.api.sql.calcite.rel.WayangProject;
2726
import org.apache.wayang.basic.data.Record;
27+
import org.apache.wayang.basic.function.ProjectionDescriptor;
2828
import org.apache.wayang.basic.operators.MapOperator;
2929
import org.apache.wayang.core.plan.wayangplan.Operator;
30+
import org.apache.wayang.core.types.BasicDataUnitType;
3031

3132
import java.util.List;
3233

@@ -39,14 +40,15 @@ public class WayangProjectVisitor extends WayangRelNodeVisitor<WayangProject> {
3940
Operator visit(final WayangProject wayangRelNode) {
4041
final Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput(0));
4142

42-
/* Quick check */
43-
final List<RexNode> projects = ((Project) wayangRelNode).getProjects();
43+
final List<RexNode> projects = wayangRelNode.getProjects();
4444

45-
// TODO: create a map with specific dataset type
46-
final MapOperator<Record, Record> projection = new MapOperator<>(
45+
final ProjectionDescriptor<Record, Record> projectionDescriptor = new ProjectionDescriptor<>(
4746
new ProjectMapFuncImpl(projects),
48-
Record.class,
49-
Record.class);
47+
wayangRelNode.getRowType().getFieldNames(),
48+
BasicDataUnitType.createBasic(Record.class),
49+
BasicDataUnitType.createBasic(Record.class));
50+
51+
final MapOperator<Record, Record> projection = new MapOperator<>(projectionDescriptor);
5052

5153
childOp.connectTo(0, projection, 0);
5254

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import org.apache.wayang.api.sql.sources.fs.JavaCSVTableSource;
2727
import org.apache.wayang.core.plan.wayangplan.Operator;
2828
import org.apache.wayang.core.types.DataSetType;
29+
import org.apache.wayang.jdbc.operators.JdbcTableSource;
30+
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
2931
import org.apache.wayang.postgres.operators.PostgresTableSource;
3032
import org.apache.wayang.basic.data.Record;
33+
import org.apache.wayang.basic.operators.TableSource;
3134

3235
import java.util.List;
3336
import java.util.stream.Collectors;
@@ -49,9 +52,7 @@ Operator visit(final WayangTableScan wayangRelNode) {
4952

5053
if (tableSource.equals("postgres")) {
5154
return new PostgresTableSource(tableName, columnNames.toArray(String[]::new));
52-
}
53-
54-
if (tableSource.equals("fs")) {
55+
} else if (tableSource.equals("fs")) {
5556
final ModelParser modelParser;
5657
try {
5758
modelParser = this.wayangRelConverter.getConfiguration() == null
@@ -72,7 +73,18 @@ Operator visit(final WayangTableScan wayangRelNode) {
7273
final char separator = modelParser.getSchemaDelimiter(tableSource);
7374

7475
return new JavaCSVTableSource<>(url, DataSetType.createDefault(Record.class), fieldTypes, separator);
76+
} else if (wayangRelNode.getTable().getQualifiedName().size() == 1) {
77+
// we assume that it is coming from a test environement or in memory db.
78+
79+
return new JdbcTableSource(wayangRelNode.getTable().getQualifiedName().get(0), wayangRelNode.getRowType().getFieldNames().toArray(String[]::new)) {
80+
81+
@Override
82+
public JdbcPlatformTemplate getPlatform() {
83+
throw new UnsupportedOperationException("Unimplemented method 'getPlatform'");
84+
}
85+
};
7586
} else
76-
throw new RuntimeException("Source not supported");
87+
throw new RuntimeException(
88+
"Source not supported, got: " + tableSource + ", expected either postgres or filesystem (fs).");
7789
}
7890
}

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/CallTreeFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,14 @@ public default Node fromRexNode(final RexNode node) {
5858
* @return a serializable function of +, -, * or /
5959
* @throws UnsupportedOperationException on unrecognized {@link SqlKind}
6060
*/
61-
public SerializableFunction<List<Object>, Object> deriveOperation(SqlKind kind);
61+
public SerializableFunction<List<Object>, Object> deriveOperation(final SqlKind kind);
6262
}
6363

6464
interface Node extends Serializable {
6565
public Object evaluate(final Record rec);
6666
}
6767

68-
class Call implements Node {
68+
final class Call implements Node {
6969
private final List<Node> operands;
7070
final SerializableFunction<List<Object>, Object> operation;
7171

@@ -83,7 +83,7 @@ public Object evaluate(final Record rec) {
8383
}
8484
}
8585

86-
class Literal implements Node {
86+
final class Literal implements Node {
8787
final Serializable value;
8888

8989
Literal(final RexLiteral literal) {
@@ -109,7 +109,7 @@ public Object evaluate(final Record rec) {
109109
}
110110
}
111111

112-
class InputRef implements Node {
112+
final class InputRef implements Node {
113113
private final int key;
114114

115115
InputRef(final RexInputRef inputRef) {

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/optimizer/Optimizer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,11 +211,11 @@ public RelNode optimize(RelNode node, RelTraitSet requiredTraitSet, RuleSet rule
211211
);
212212
}
213213

214-
public WayangPlan convert(RelNode relNode) {
214+
public static WayangPlan convert(RelNode relNode) {
215215
return convert(relNode, new ArrayList<>());
216216
}
217217

218-
public WayangPlan convert(RelNode relNode, Collection<Record> collector) {
218+
public static WayangPlan convert(RelNode relNode, Collection<Record> collector) {
219219

220220
LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);
221221

@@ -225,8 +225,7 @@ public WayangPlan convert(RelNode relNode, Collection<Record> collector) {
225225
return new WayangPlan(sink);
226226
}
227227

228-
public WayangPlan convertWithConfig(RelNode relNode, Configuration configuration, Collection<Record> collector) {
229-
228+
public static WayangPlan convertWithConfig(RelNode relNode, Configuration configuration, Collection<Record> collector) {
230229
LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);
231230

232231
Operator op = new WayangRelConverter(configuration).convert(relNode);

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rel/WayangTableScan.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727
import org.apache.calcite.rel.hint.RelHint;
2828
import org.apache.calcite.schema.Table;
2929
import org.apache.wayang.api.sql.calcite.convention.WayangConvention;
30-
import org.apache.wayang.api.sql.calcite.utils.ModelParser;
3130

3231
import java.util.List;
3332

3433
public class WayangTableScan extends TableScan implements WayangRel {
3534

35+
//TODO: fields are never queried, why?
3636
private final int[] fields;
3737

3838
public WayangTableScan(RelOptCluster cluster,
@@ -83,11 +83,15 @@ public String toString() {
8383
}
8484

8585
public String getQualifiedName() {
86-
return table.getQualifiedName().get(1);
86+
return table.getQualifiedName().size() == 1
87+
? table.getQualifiedName().get(0)
88+
: table.getQualifiedName().get(1);
8789
}
8890

8991
public String getTableName() {
90-
return table.getQualifiedName().get(1);
92+
return table.getQualifiedName().size() == 1
93+
? table.getQualifiedName().get(0)
94+
: table.getQualifiedName().get(1);
9195
}
9296

9397
public List<String> getColumnNames() {

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public static void main(final String[] args) throws Exception {
167167
PrintUtils.print("After translating logical intermediate plan", wayangRel);
168168

169169
final Collection<Record> collector = new ArrayList<>();
170-
final WayangPlan wayangPlan = optimizer.convertWithConfig(wayangRel, configuration, collector);
170+
final WayangPlan wayangPlan = Optimizer.convertWithConfig(wayangRel, configuration, collector);
171171
collector.add(new Record(wayangRel.getRowType().getFieldNames().toArray()));
172172
context.execute(getJobName(), wayangPlan);
173173

@@ -182,7 +182,6 @@ public static void main(final String[] args) throws Exception {
182182
}
183183

184184
public Collection<Record> executeSql(final String sql) throws SqlParseException {
185-
186185
final Properties configProperties = Optimizer.ConfigProperties.getDefaults();
187186
final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
188187

@@ -216,7 +215,7 @@ public Collection<Record> executeSql(final String sql) throws SqlParseException
216215
PrintUtils.print("After translating logical intermediate plan", wayangRel);
217216

218217
final Collection<Record> collector = new ArrayList<>();
219-
final WayangPlan wayangPlan = optimizer.convert(wayangRel, collector);
218+
final WayangPlan wayangPlan = Optimizer.convert(wayangRel, collector);
220219

221220
this.execute(getJobName(), wayangPlan);
222221

0 commit comments

Comments
 (0)