Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ Operator visit(final WayangFilter wayangRelNode) {
public static final EnumSet<SqlKind> SUPPORTED_OPS = EnumSet.of(SqlKind.AND, SqlKind.OR, SqlKind.NOT,
SqlKind.EQUALS, SqlKind.NOT_EQUALS,
SqlKind.LESS_THAN, SqlKind.GREATER_THAN,
SqlKind.GREATER_THAN_OR_EQUAL, SqlKind.LESS_THAN_OR_EQUAL, SqlKind.LIKE);
SqlKind.GREATER_THAN_OR_EQUAL, SqlKind.LESS_THAN_OR_EQUAL, SqlKind.LIKE, SqlKind.IS_NOT_NULL, SqlKind.IS_NULL);

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ public Boolean visitCall(final RexCall call) {
"Cannot handle this filter predicate yet: " + kind + " during RexCall: " + call);

switch (kind) {
// Since NOT captures only one operand we just get
// the first
case IS_NOT_NULL:
assert (call.getOperands().size() == 1);
return eval(record, kind, call.getOperands().get(0), null);
case IS_NULL:
assert (call.getOperands().size() == 1);
return eval(record, kind, call.getOperands().get(0), null);
case NOT:
assert (call.getOperands().size() == 1) : "SqlKind.NOT should only have 1 operand in call got: "
+ call.getOperands().size() + ", call: " + call;
// Since NOT captures only one operand we just get
// the first
return !(call.getOperands().get(0).accept(this));
case AND:
return call.getOperands().stream().allMatch(operator -> operator.accept(this));
Expand Down Expand Up @@ -78,7 +84,9 @@ public boolean eval(final Record record, final SqlKind kind, final RexNode leftO
case LESS_THAN:
return isLessThan(field, rexLiteral);
case EQUALS:
return isEqualTo(field, rexLiteral);
return isEqualTo(field, rexLiteral.getValueAs(field.getClass()));
case NOT_EQUALS:
return !isEqualTo(field, rexLiteral.getValueAs(field.getClass()));
case GREATER_THAN_OR_EQUAL:
return isGreaterThan(field, rexLiteral) || isEqualTo(field, rexLiteral);
case LESS_THAN_OR_EQUAL:
Expand All @@ -99,9 +107,23 @@ public boolean eval(final Record record, final SqlKind kind, final RexNode leftO
throw new IllegalStateException("Predicate not supported yet, kind: " + kind + " left field: "
+ record.getField(leftIndex) + " right field: " + record.getField(rightIndex));
}
} else if (leftOperand instanceof RexInputRef && rightOperand == null) {
final RexInputRef leftRexInputRef = (RexInputRef) leftOperand;
final int leftIndex = leftRexInputRef.getIndex();
final Object leftField = record.getField(leftIndex);

switch (kind) {
case IS_NOT_NULL:
return !isEqualTo(leftField, null);
case IS_NULL:
return isEqualTo(leftField, null);
default:
throw new IllegalStateException(
"Predicate not supported yet, kind: " + kind + " left field: " + leftField);
}
} else {
throw new IllegalStateException("Predicate not supported with types yet, predicate: " + kind + ", type1: "
+ leftOperand.getClass() + ", type2: " + rightOperand.getClass());
+ leftOperand + ", type2: " + rightOperand);
}
}

Expand All @@ -115,21 +137,11 @@ private boolean isLessThan(final Object o, final RexLiteral rexLiteral) {
return ((Comparable) o).compareTo(rexLiteral.getValueAs(o.getClass())) < 0;
}

private boolean isEqualTo(final Object o, final RexLiteral rexLiteral) {
try {
return ((Comparable) o).compareTo(rexLiteral.getValueAs(o.getClass())) == 0;
} catch (final Exception e) {
throw new IllegalStateException("Predicate not supported yet");
}
}

private boolean isEqualTo(final Object o1, final Object o2) {
System.out.println("comparing: " + o1 + " with " + o2);
System.out.println("true: " + ((Comparable) o1).compareTo(o2));
try {
return ((Comparable) o1).compareTo(o2) == 0;
return o1.equals(o2);
} catch (final Exception e) {
throw new IllegalStateException("Predicate not supported yet");
throw new IllegalStateException("Predicate not supported yet, " + e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public void javaMultiConditionJoin() throws Exception {

sqlContext.execute(wayangPlan);

final boolean checkEq = result.stream().allMatch(rec -> rec.equals(new Record("", "test2", "test2", "", "test2")));
final boolean checkEq = result.stream()
.allMatch(rec -> rec.equals(new Record("", "test2", "test2", "", "test2")));

assert (checkEq);
}
Expand Down Expand Up @@ -194,7 +195,7 @@ public void aggregateCountInJava() throws Exception {
assert (rec.getInt(1) == 3);
}

// @Test
@Test
public void filterIsNull() throws Exception {
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");

Expand All @@ -207,7 +208,7 @@ public void filterIsNull() throws Exception {
assert (result.size() == 0);
}

// @Test
@Test
public void filterIsNotValue() throws Exception {
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");

Expand All @@ -221,9 +222,10 @@ public void filterIsNotValue() throws Exception {
sqlContext.execute(wayangPlan);

assert (!result.stream().anyMatch(record -> record.getField(0).equals("test1")));
}
}

// @Test

@Test
public void filterIsNotNull() throws Exception {
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");

Expand All @@ -236,14 +238,14 @@ public void filterIsNotNull() throws Exception {
sqlContext.execute(wayangPlan);

assert (!result.stream().anyMatch(record -> record.getField(0).equals(null)));
}
}

@Test
public void javaReduceBy() throws Exception {
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");

final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(
sqlContext,
sqlContext,
"select exampleSmallA.COLA, count(*) from fs.exampleSmallA group by exampleSmallA.COLA");

final Collection<Record> result = t.field0;
Expand Down Expand Up @@ -276,12 +278,13 @@ public void javaCrossJoin() throws Exception {
new Record("item1", "item2", "item1", "item2", "item3"),
new Record("item1", "item2", "item1", "item2", "item3"),
new Record("item1", "item2", "item1", "item2", "item3"),
new Record("item1", "item2", "x" , "x" , "x"),
new Record("item1", "item2", "x" , "x" , "x")
);
new Record("item1", "item2", "x", "x", "x"),
new Record("item1", "item2", "x", "x", "x"));

final Map<Record, Integer> resultTally = result.stream().collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
final Map<Record, Integer> shouldBeTally = shouldBe.stream().collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
final Map<Record, Integer> resultTally = result.stream()
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
final Map<Record, Integer> shouldBeTally = shouldBe.stream()
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));

assert (resultTally.equals(shouldBeTally));
}
Expand Down Expand Up @@ -353,7 +356,7 @@ public void exampleFilterTableRefToTableRef() throws Exception {
final WayangPlan wayangPlan = t.field1;
sqlContext.execute(wayangPlan);

assert (result.stream().findFirst().get().equals(new Record("test1","test1")));
assert (result.stream().findFirst().get().equals(new Record("test1", "test1")));
}

@Test
Expand Down Expand Up @@ -440,7 +443,8 @@ private SqlContext createSqlContext(final String tableResourceName)
" \"type\": \"custom\",\r\n" + //
" \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\r\n" + //
" \"operand\": {\r\n" + //
" \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath() + "\"\r\n" + //
" \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath()
+ "\"\r\n" + //
" }\r\n" + //
" }\r\n" + //
" ]\r\n" + //
Expand All @@ -450,7 +454,7 @@ private SqlContext createSqlContext(final String tableResourceName)
" \r\n" + //
" \r\n" + //
"";

final JSONObject calciteModelJSON = (JSONObject) new JSONParser().parse(calciteModel);
final Configuration configuration = new ModelParser(new Configuration(), calciteModelJSON)
.setProperties();
Expand Down
Loading