Skip to content

Commit 9229c42

Browse files
authored
Merge pull request #547 from mspruc/main
IS NOT NULL & IS NULL & <> for sql-api
2 parents 4ae6d73 + 71818cb commit 9229c42

3 files changed

Lines changed: 48 additions & 32 deletions

File tree

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangFilterVisitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,6 @@ Operator visit(final WayangFilter wayangRelNode) {
5454
public static final EnumSet<SqlKind> SUPPORTED_OPS = EnumSet.of(SqlKind.AND, SqlKind.OR, SqlKind.NOT,
5555
SqlKind.EQUALS, SqlKind.NOT_EQUALS,
5656
SqlKind.LESS_THAN, SqlKind.GREATER_THAN,
57-
SqlKind.GREATER_THAN_OR_EQUAL, SqlKind.LESS_THAN_OR_EQUAL, SqlKind.LIKE);
57+
SqlKind.GREATER_THAN_OR_EQUAL, SqlKind.LESS_THAN_OR_EQUAL, SqlKind.LIKE, SqlKind.IS_NOT_NULL, SqlKind.IS_NULL);
5858

5959
}

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/FilterEvaluateCondition.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,17 @@ public Boolean visitCall(final RexCall call) {
4646
"Cannot handle this filter predicate yet: " + kind + " during RexCall: " + call);
4747

4848
switch (kind) {
49-
// Since NOT captures only one operand we just get
50-
// the first
49+
case IS_NOT_NULL:
50+
assert (call.getOperands().size() == 1);
51+
return eval(record, kind, call.getOperands().get(0), null);
52+
case IS_NULL:
53+
assert (call.getOperands().size() == 1);
54+
return eval(record, kind, call.getOperands().get(0), null);
5155
case NOT:
5256
assert (call.getOperands().size() == 1) : "SqlKind.NOT should only have 1 operand in call got: "
5357
+ call.getOperands().size() + ", call: " + call;
58+
// Since NOT captures only one operand we just get
59+
// the first
5460
return !(call.getOperands().get(0).accept(this));
5561
case AND:
5662
return call.getOperands().stream().allMatch(operator -> operator.accept(this));
@@ -78,7 +84,9 @@ public boolean eval(final Record record, final SqlKind kind, final RexNode leftO
7884
case LESS_THAN:
7985
return isLessThan(field, rexLiteral);
8086
case EQUALS:
81-
return isEqualTo(field, rexLiteral);
87+
return isEqualTo(field, rexLiteral.getValueAs(field.getClass()));
88+
case NOT_EQUALS:
89+
return !isEqualTo(field, rexLiteral.getValueAs(field.getClass()));
8290
case GREATER_THAN_OR_EQUAL:
8391
return isGreaterThan(field, rexLiteral) || isEqualTo(field, rexLiteral);
8492
case LESS_THAN_OR_EQUAL:
@@ -99,9 +107,23 @@ public boolean eval(final Record record, final SqlKind kind, final RexNode leftO
99107
throw new IllegalStateException("Predicate not supported yet, kind: " + kind + " left field: "
100108
+ record.getField(leftIndex) + " right field: " + record.getField(rightIndex));
101109
}
110+
} else if (leftOperand instanceof RexInputRef && rightOperand == null) {
111+
final RexInputRef leftRexInputRef = (RexInputRef) leftOperand;
112+
final int leftIndex = leftRexInputRef.getIndex();
113+
final Object leftField = record.getField(leftIndex);
114+
115+
switch (kind) {
116+
case IS_NOT_NULL:
117+
return !isEqualTo(leftField, null);
118+
case IS_NULL:
119+
return isEqualTo(leftField, null);
120+
default:
121+
throw new IllegalStateException(
122+
"Predicate not supported yet, kind: " + kind + " left field: " + leftField);
123+
}
102124
} else {
103125
throw new IllegalStateException("Predicate not supported with types yet, predicate: " + kind + ", type1: "
104-
+ leftOperand.getClass() + ", type2: " + rightOperand.getClass());
126+
+ leftOperand + ", type2: " + rightOperand);
105127
}
106128
}
107129

@@ -115,21 +137,11 @@ private boolean isLessThan(final Object o, final RexLiteral rexLiteral) {
115137
return ((Comparable) o).compareTo(rexLiteral.getValueAs(o.getClass())) < 0;
116138
}
117139

118-
private boolean isEqualTo(final Object o, final RexLiteral rexLiteral) {
119-
try {
120-
return ((Comparable) o).compareTo(rexLiteral.getValueAs(o.getClass())) == 0;
121-
} catch (final Exception e) {
122-
throw new IllegalStateException("Predicate not supported yet");
123-
}
124-
}
125-
126140
private boolean isEqualTo(final Object o1, final Object o2) {
127-
System.out.println("comparing: " + o1 + " with " + o2);
128-
System.out.println("true: " + ((Comparable) o1).compareTo(o2));
129141
try {
130-
return ((Comparable) o1).compareTo(o2) == 0;
142+
return o1.equals(o2);
131143
} catch (final Exception e) {
132-
throw new IllegalStateException("Predicate not supported yet");
144+
throw new IllegalStateException("Predicate not supported yet, " + e);
133145
}
134146
}
135147
}

wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ public void javaMultiConditionJoin() throws Exception {
147147

148148
sqlContext.execute(wayangPlan);
149149

150-
final boolean checkEq = result.stream().allMatch(rec -> rec.equals(new Record("", "test2", "test2", "", "test2")));
150+
final boolean checkEq = result.stream()
151+
.allMatch(rec -> rec.equals(new Record("", "test2", "test2", "", "test2")));
151152

152153
assert (checkEq);
153154
}
@@ -194,7 +195,7 @@ public void aggregateCountInJava() throws Exception {
194195
assert (rec.getInt(1) == 3);
195196
}
196197

197-
// @Test
198+
@Test
198199
public void filterIsNull() throws Exception {
199200
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
200201

@@ -207,7 +208,7 @@ public void filterIsNull() throws Exception {
207208
assert (result.size() == 0);
208209
}
209210

210-
// @Test
211+
@Test
211212
public void filterIsNotValue() throws Exception {
212213
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
213214

@@ -221,9 +222,10 @@ public void filterIsNotValue() throws Exception {
221222
sqlContext.execute(wayangPlan);
222223

223224
assert (!result.stream().anyMatch(record -> record.getField(0).equals("test1")));
224-
}
225+
}
225226

226-
// @Test
227+
228+
@Test
227229
public void filterIsNotNull() throws Exception {
228230
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
229231

@@ -236,14 +238,14 @@ public void filterIsNotNull() throws Exception {
236238
sqlContext.execute(wayangPlan);
237239

238240
assert (!result.stream().anyMatch(record -> record.getField(0).equals(null)));
239-
}
241+
}
240242

241243
@Test
242244
public void javaReduceBy() throws Exception {
243245
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
244246

245247
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(
246-
sqlContext,
248+
sqlContext,
247249
"select exampleSmallA.COLA, count(*) from fs.exampleSmallA group by exampleSmallA.COLA");
248250

249251
final Collection<Record> result = t.field0;
@@ -276,12 +278,13 @@ public void javaCrossJoin() throws Exception {
276278
new Record("item1", "item2", "item1", "item2", "item3"),
277279
new Record("item1", "item2", "item1", "item2", "item3"),
278280
new Record("item1", "item2", "item1", "item2", "item3"),
279-
new Record("item1", "item2", "x" , "x" , "x"),
280-
new Record("item1", "item2", "x" , "x" , "x")
281-
);
281+
new Record("item1", "item2", "x", "x", "x"),
282+
new Record("item1", "item2", "x", "x", "x"));
282283

283-
final Map<Record, Integer> resultTally = result.stream().collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
284-
final Map<Record, Integer> shouldBeTally = shouldBe.stream().collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
284+
final Map<Record, Integer> resultTally = result.stream()
285+
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
286+
final Map<Record, Integer> shouldBeTally = shouldBe.stream()
287+
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
285288

286289
assert (resultTally.equals(shouldBeTally));
287290
}
@@ -353,7 +356,7 @@ public void exampleFilterTableRefToTableRef() throws Exception {
353356
final WayangPlan wayangPlan = t.field1;
354357
sqlContext.execute(wayangPlan);
355358

356-
assert (result.stream().findFirst().get().equals(new Record("test1","test1")));
359+
assert (result.stream().findFirst().get().equals(new Record("test1", "test1")));
357360
}
358361

359362
@Test
@@ -440,7 +443,8 @@ private SqlContext createSqlContext(final String tableResourceName)
440443
" \"type\": \"custom\",\r\n" + //
441444
" \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\r\n" + //
442445
" \"operand\": {\r\n" + //
443-
" \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath() + "\"\r\n" + //
446+
" \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath()
447+
+ "\"\r\n" + //
444448
" }\r\n" + //
445449
" }\r\n" + //
446450
" ]\r\n" + //
@@ -450,7 +454,7 @@ private SqlContext createSqlContext(final String tableResourceName)
450454
" \r\n" + //
451455
" \r\n" + //
452456
"";
453-
457+
454458
final JSONObject calciteModelJSON = (JSONObject) new JSONParser().parse(calciteModel);
455459
final Configuration configuration = new ModelParser(new Configuration(), calciteModelJSON)
456460
.setProperties();

0 commit comments

Comments
 (0)