Skip to content

Commit 01ddfdf

Browse files
committed
fix a bug where tableA join tableB failed due to index sizes
1 parent 71818cb commit 01ddfdf

2 files changed

Lines changed: 42 additions & 15 deletions

File tree

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangJoinVisitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ Operator visit(final WayangJoin wayangRelNode) {
6767
// offset of the index in the right child
6868
final int offset = wayangRelNode.getInput(0).getRowType().getFieldCount();
6969

70-
final int leftKeyIndex = keys.get(0);
71-
final int rightKeyIndex = keys.get(1) - offset;
70+
final int leftKeyIndex = keys.get(0) < keys.get(1) ? keys.get(0) : keys.get(0) - offset;
71+
final int rightKeyIndex = keys.get(0) < keys.get(1) ? keys.get(1) - offset : keys.get(1);
7272

7373
final JoinOperator<Record, Record, Object> join = new JoinOperator<>(
7474
new TransformationDescriptor<>(new JoinKeyExtractor(leftKeyIndex), Record.class, Object.class),

wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ public Tuple2<Collection<Record>, WayangPlan> buildCollectorAndWayangPlan(final
114114
@Test
115115
public void javaJoinTest() throws Exception {
116116
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
117-
// SELECT acc.location, count(*) FROM postgres.site
118117
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
119118
"SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEA = exampleRefToRef.NAMEA");
120119
final Collection<Record> result = t.field0;
@@ -127,14 +126,12 @@ public void javaJoinTest() throws Exception {
127126

128127
sqlContext.execute(wayangPlan);
129128

130-
final Record rec = result.stream().findFirst().get();
131-
assert (rec.equals(new Record("test1", "test1", "test2", "test1", "test1")));
129+
assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2", "test1", "test1"))));
132130
}
133131

134132
@Test
135133
public void javaMultiConditionJoin() throws Exception {
136134
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
137-
// SELECT acc.location, count(*) FROM postgres.site
138135
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
139136
"SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEB = exampleRefToRef.NAMEB AND largeLeftTableIndex.NAMEC = exampleRefToRef.NAMEB");
140137
final Collection<Record> result = t.field0;
@@ -156,7 +153,6 @@ public void javaMultiConditionJoin() throws Exception {
156153
@Test
157154
public void aggregateCountInJavaWithIntegers() throws Exception {
158155
final SqlContext sqlContext = this.createSqlContext("/data/exampleInt.csv");
159-
// SELECT acc.location, count(*) FROM postgres.site
160156
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
161157
"SELECT exampleInt.NAMEC, COUNT(*) FROM fs.exampleInt GROUP BY NAMEC");
162158
final Collection<Record> result = t.field0;
@@ -177,7 +173,6 @@ public void aggregateCountInJavaWithIntegers() throws Exception {
177173
@Test
178174
public void aggregateCountInJava() throws Exception {
179175
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
180-
// SELECT acc.location, count(*) FROM postgres.site
181176
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
182177
"SELECT largeLeftTableIndex.NAMEC, COUNT(*) FROM fs.largeLeftTableIndex GROUP BY NAMEC");
183178
final Collection<Record> result = t.field0;
@@ -222,8 +217,7 @@ public void filterIsNotValue() throws Exception {
222217
sqlContext.execute(wayangPlan);
223218

224219
assert (!result.stream().anyMatch(record -> record.getField(0).equals("test1")));
225-
}
226-
220+
}
227221

228222
@Test
229223
public void filterIsNotNull() throws Exception {
@@ -257,7 +251,7 @@ public void javaReduceBy() throws Exception {
257251

258252
sqlContext.execute(wayangPlan);
259253

260-
assert (result.stream().findFirst().get().equals(new Record("item1", 2)));
254+
assert (result.stream().anyMatch(rec -> rec.equals(new Record("item1", 2))));
261255
}
262256

263257
@Test
@@ -316,10 +310,10 @@ public void filterWithLike() throws Exception {
316310
final WayangPlan wayangPlan = t.field1;
317311
sqlContext.execute(wayangPlan);
318312

319-
assert (result.stream().findFirst().get().equals(new Record("test1", "test1", "test2")));
313+
assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2"))));
320314
}
321315

322-
// @Test
316+
@Test
323317
public void joinWithLargeLeftTableIndexCorrect() throws Exception {
324318
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
325319

@@ -330,18 +324,51 @@ public void joinWithLargeLeftTableIndexCorrect() throws Exception {
330324
final Collection<Record> result = t.field0;
331325
final WayangPlan wayangPlan = t.field1;
332326
sqlContext.execute(wayangPlan);
327+
328+
final List<Record> shouldBe = List.of(
329+
new Record("test1", "test1", "test2", "test1", "test1", "test2"),
330+
new Record("test2", "" , "test2", "" , "test2", "test2"),
331+
new Record("" , "test2", "test2", "test2", "" , "test2")
332+
);
333+
334+
final Map<Record, Integer> resultTally = result.stream()
335+
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
336+
final Map<Record, Integer> shouldBeTally = shouldBe.stream()
337+
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
338+
339+
assert (resultTally.equals(shouldBeTally));
333340
}
334341

335-
// @Test
342+
// Imagine case: l = {item1, item2}, r = {item3,item4}, j = {item1, item2,
343+
// item3, item4} join on =($1,$3) would be =(item2, item4) in the join set
344+
// however from the r set we need to factor in the
345+
// offset, $3 -> 3 - l.size() = $1, r($1) = "item4" we cannot naively assume
346+
// that it is always ordered as =(lRef,rRef), lRef < rRef.
347+
// it may also be =($3,$1)
348+
@Test
336349
public void joinWithLargeLeftTableIndexMirrorAlias() throws Exception {
337350
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
338351

339352
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
340353
"SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " //
341354
);
355+
342356
final Collection<Record> result = t.field0;
343357
final WayangPlan wayangPlan = t.field1;
344358
sqlContext.execute(wayangPlan);
359+
360+
final List<Record> shouldBe = List.of(
361+
new Record("test1", "test1", "test2", "test1", "test1", "test2"),
362+
new Record("test2", "" , "test2", "" , "test2", "test2"),
363+
new Record("" , "test2", "test2", "test2", "" , "test2")
364+
);
365+
366+
final Map<Record, Integer> resultTally = result.stream()
367+
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
368+
final Map<Record, Integer> shouldBeTally = shouldBe.stream()
369+
.collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
370+
371+
assert (resultTally.equals(shouldBeTally));
345372
}
346373

347374
@Test
@@ -356,7 +383,7 @@ public void exampleFilterTableRefToTableRef() throws Exception {
356383
final WayangPlan wayangPlan = t.field1;
357384
sqlContext.execute(wayangPlan);
358385

359-
assert (result.stream().findFirst().get().equals(new Record("test1", "test1")));
386+
assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1"))));
360387
}
361388

362389
@Test

0 commit comments

Comments
 (0)