Skip to content

Commit 166022b

Browse files
committed
add filters w. where clauses for java platforms in sql-api
1 parent f09522e commit 166022b

2 files changed

Lines changed: 34 additions & 9 deletions

File tree

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/FilterEvaluateCondition.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@
2929
import org.apache.wayang.api.sql.calcite.converter.WayangFilterVisitor;
3030
import org.apache.wayang.basic.data.Record;
3131

32-
public class FilterEvaluateCondition extends RexVisitorImpl<Boolean> {
32+
public class FilterEvaluateCondition extends RexVisitorImpl<Boolean> {
3333
final Record record;
3434

3535
protected FilterEvaluateCondition(final boolean deep, final Record record) {
36-
super(deep);
37-
this.record = record;
38-
}
36+
super(deep);
37+
this.record = record;
38+
}
3939

4040
@Override
4141
public Boolean visitCall(final RexCall call) {
@@ -64,7 +64,6 @@ public Boolean visitCall(final RexCall call) {
6464

6565
public boolean eval(final Record record, final SqlKind kind, final RexNode leftOperand,
6666
final RexNode rightOperand) {
67-
6867
if (leftOperand instanceof RexInputRef && rightOperand instanceof RexLiteral) {
6968
final RexInputRef rexInputRef = (RexInputRef) leftOperand;
7069
final int index = rexInputRef.getIndex();
@@ -85,13 +84,25 @@ public boolean eval(final Record record, final SqlKind kind, final RexNode leftO
8584
case LESS_THAN_OR_EQUAL:
8685
return isLessThan(field, rexLiteral) || isEqualTo(field, rexLiteral);
8786
default:
88-
throw new IllegalStateException("Predicate not supported yet");
87+
throw new IllegalStateException("Predicate not supported yet: " + kind);
8988
}
89+
} else if (leftOperand instanceof RexInputRef && rightOperand instanceof RexInputRef) {
90+
final RexInputRef leftRexInputRef = (RexInputRef) leftOperand;
91+
final int leftIndex = leftRexInputRef.getIndex();
92+
final RexInputRef righRexInputRef = (RexInputRef) rightOperand;
93+
final int rightIndex = righRexInputRef.getIndex();
9094

95+
switch (kind) {
96+
case EQUALS:
97+
return isEqualTo(record.getField(leftIndex), record.getField(rightIndex));
98+
default:
99+
throw new IllegalStateException("Predicate not supported yet, kind: " + kind + " left field: "
100+
+ record.getField(leftIndex) + " right field: " + record.getField(rightIndex));
101+
}
91102
} else {
92-
throw new IllegalStateException("Predicate not supported yet");
103+
throw new IllegalStateException("Predicate not supported with types yet, predicate: " + kind + ", type1: "
104+
+ leftOperand.getClass() + ", type2: " + rightOperand.getClass());
93105
}
94-
95106
}
96107

97108
private boolean isGreaterThan(final Object o, final RexLiteral rexLiteral) {
@@ -111,4 +122,14 @@ private boolean isEqualTo(final Object o, final RexLiteral rexLiteral) {
111122
throw new IllegalStateException("Predicate not supported yet");
112123
}
113124
}
125+
126+
private boolean isEqualTo(final Object o1, final Object o2) {
127+
System.out.println("comparing: " + o1 + " with " + o2);
128+
System.out.println("true: " + ((Comparable) o1).compareTo(o2));
129+
try {
130+
return ((Comparable) o1).compareTo(o2) == 0;
131+
} catch (final Exception e) {
132+
throw new IllegalStateException("Predicate not supported yet");
133+
}
134+
}
114135
}

wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ public void joinWithLargeLeftTableIndexCorrect() throws Exception {
323323
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
324324
"SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON na.NAMEB = nb.NAMEA " //
325325
);
326+
326327
final Collection<Record> result = t.field0;
327328
final WayangPlan wayangPlan = t.field1;
328329
sqlContext.execute(wayangPlan);
@@ -340,16 +341,19 @@ public void joinWithLargeLeftTableIndexMirrorAlias() throws Exception {
340341
sqlContext.execute(wayangPlan);
341342
}
342343

343-
// @Test
344+
@Test
344345
public void exampleFilterTableRefToTableRef() throws Exception {
345346
final SqlContext sqlContext = createSqlContext("/data/exampleRefToRef.csv");
346347

347348
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
348349
"SELECT * FROM fs.exampleRefToRef WHERE exampleRefToRef.NAMEA = exampleRefToRef.NAMEB" //
349350
);
351+
350352
final Collection<Record> result = t.field0;
351353
final WayangPlan wayangPlan = t.field1;
352354
sqlContext.execute(wayangPlan);
355+
356+
assert (result.stream().findFirst().get().equals(new Record("test1","test1")));
353357
}
354358

355359
@Test

0 commit comments

Comments
 (0)