Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ Operator visit(final WayangJoin wayangRelNode) {
// offset of the index in the right child
final int offset = wayangRelNode.getInput(0).getRowType().getFieldCount();

final int leftKeyIndex = keys.get(0);
final int rightKeyIndex = keys.get(1) - offset;
final int leftKeyIndex = keys.get(0) < keys.get(1) ? keys.get(0) : keys.get(0) - offset;
final int rightKeyIndex = keys.get(0) < keys.get(1) ? keys.get(1) - offset : keys.get(1);

final JoinOperator<Record, Record, Object> join = new JoinOperator<>(
new TransformationDescriptor<>(new JoinKeyExtractor(leftKeyIndex), Record.class, Object.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public Tuple2<Collection<Record>, WayangPlan> buildCollectorAndWayangPlan(final
@Test
public void javaJoinTest() throws Exception {
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
// SELECT acc.location, count(*) FROM postgres.site
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
"SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEA = exampleRefToRef.NAMEA");
final Collection<Record> result = t.field0;
Expand All @@ -127,14 +126,12 @@ public void javaJoinTest() throws Exception {

sqlContext.execute(wayangPlan);

final Record rec = result.stream().findFirst().get();
assert (rec.equals(new Record("test1", "test1", "test2", "test1", "test1")));
assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2", "test1", "test1"))));
}

@Test
public void javaMultiConditionJoin() throws Exception {
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
// SELECT acc.location, count(*) FROM postgres.site
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
"SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEB = exampleRefToRef.NAMEB AND largeLeftTableIndex.NAMEC = exampleRefToRef.NAMEB");
final Collection<Record> result = t.field0;
Expand All @@ -156,7 +153,6 @@ public void javaMultiConditionJoin() throws Exception {
@Test
public void aggregateCountInJavaWithIntegers() throws Exception {
final SqlContext sqlContext = this.createSqlContext("/data/exampleInt.csv");
// SELECT acc.location, count(*) FROM postgres.site
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
"SELECT exampleInt.NAMEC, COUNT(*) FROM fs.exampleInt GROUP BY NAMEC");
final Collection<Record> result = t.field0;
Expand All @@ -177,7 +173,6 @@ public void aggregateCountInJavaWithIntegers() throws Exception {
@Test
public void aggregateCountInJava() throws Exception {
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
// SELECT acc.location, count(*) FROM postgres.site
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
"SELECT largeLeftTableIndex.NAMEC, COUNT(*) FROM fs.largeLeftTableIndex GROUP BY NAMEC");
final Collection<Record> result = t.field0;
Expand Down Expand Up @@ -222,8 +217,7 @@ public void filterIsNotValue() throws Exception {
sqlContext.execute(wayangPlan);

assert (!result.stream().anyMatch(record -> record.getField(0).equals("test1")));
}

}

@Test
public void filterIsNotNull() throws Exception {
Expand Down Expand Up @@ -257,7 +251,7 @@ public void javaReduceBy() throws Exception {

sqlContext.execute(wayangPlan);

assert (result.stream().findFirst().get().equals(new Record("item1", 2)));
assert (result.stream().anyMatch(rec -> rec.equals(new Record("item1", 2))));
}

@Test
Expand Down Expand Up @@ -316,10 +310,10 @@ public void filterWithLike() throws Exception {
final WayangPlan wayangPlan = t.field1;
sqlContext.execute(wayangPlan);

assert (result.stream().findFirst().get().equals(new Record("test1", "test1", "test2")));
assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2"))));
}

// @Test
@Test
public void joinWithLargeLeftTableIndexCorrect() throws Exception {
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");

Expand All @@ -330,18 +324,51 @@ public void joinWithLargeLeftTableIndexCorrect() throws Exception {
final Collection<Record> result = t.field0;
final WayangPlan wayangPlan = t.field1;
sqlContext.execute(wayangPlan);

final List<Record> shouldBe = List.of(
new Record("test1", "test1", "test2", "test1", "test1", "test2"),
new Record("test2", "" , "test2", "" , "test2", "test2"),
new Record("" , "test2", "test2", "test2", "" , "test2")
);

final Map<Record, Integer> resultTally = result.stream()
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
final Map<Record, Integer> shouldBeTally = shouldBe.stream()
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));

assert (resultTally.equals(shouldBeTally));
}

// @Test
// Imagine case: l = {item1, item2}, r = {item3,item4}, j = {item1, item2,
// item3, item4} join on =($1,$3) would be =(item2, item4) in the join set
// however from the r set we need to factor in the
// offset, $3 -> 3 - l.size() = $1, r($1) = "item4" we cannot naively assume
// that it is always ordered as =(lRef,rRef), lRef < rRef.
// it may also be =($3,$1)
@Test
public void joinWithLargeLeftTableIndexMirrorAlias() throws Exception {
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");

final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
"SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " //
);

final Collection<Record> result = t.field0;
final WayangPlan wayangPlan = t.field1;
sqlContext.execute(wayangPlan);

final List<Record> shouldBe = List.of(
new Record("test1", "test1", "test2", "test1", "test1", "test2"),
new Record("test2", "" , "test2", "" , "test2", "test2"),
new Record("" , "test2", "test2", "test2", "" , "test2")
);

final Map<Record, Integer> resultTally = result.stream()
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
final Map<Record, Integer> shouldBeTally = shouldBe.stream()
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));

assert (resultTally.equals(shouldBeTally));
}

@Test
Expand All @@ -356,7 +383,7 @@ public void exampleFilterTableRefToTableRef() throws Exception {
final WayangPlan wayangPlan = t.field1;
sqlContext.execute(wayangPlan);

assert (result.stream().findFirst().get().equals(new Record("test1", "test1")));
assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1"))));
}

@Test
Expand Down
Loading