@@ -114,7 +114,6 @@ public Tuple2<Collection<Record>, WayangPlan> buildCollectorAndWayangPlan(final
114114 @ Test
115115 public void javaJoinTest () throws Exception {
116116 final SqlContext sqlContext = this .createSqlContext ("/data/largeLeftTableIndex.csv" );
117- // SELECT acc.location, count(*) FROM postgres.site
118117 final Tuple2 <Collection <Record >, WayangPlan > t = this .buildCollectorAndWayangPlan (sqlContext ,
119118 "SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEA = exampleRefToRef.NAMEA" );
120119 final Collection <Record > result = t .field0 ;
@@ -127,14 +126,12 @@ public void javaJoinTest() throws Exception {
127126
128127 sqlContext .execute (wayangPlan );
129128
130- final Record rec = result .stream ().findFirst ().get ();
131- assert (rec .equals (new Record ("test1" , "test1" , "test2" , "test1" , "test1" )));
129+ assert (result .stream ().anyMatch (rec -> rec .equals (new Record ("test1" , "test1" , "test2" , "test1" , "test1" ))));
132130 }
133131
134132 @ Test
135133 public void javaMultiConditionJoin () throws Exception {
136134 final SqlContext sqlContext = this .createSqlContext ("/data/largeLeftTableIndex.csv" );
137- // SELECT acc.location, count(*) FROM postgres.site
138135 final Tuple2 <Collection <Record >, WayangPlan > t = this .buildCollectorAndWayangPlan (sqlContext ,
139136 "SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEB = exampleRefToRef.NAMEB AND largeLeftTableIndex.NAMEC = exampleRefToRef.NAMEB" );
140137 final Collection <Record > result = t .field0 ;
@@ -156,7 +153,6 @@ public void javaMultiConditionJoin() throws Exception {
156153 @ Test
157154 public void aggregateCountInJavaWithIntegers () throws Exception {
158155 final SqlContext sqlContext = this .createSqlContext ("/data/exampleInt.csv" );
159- // SELECT acc.location, count(*) FROM postgres.site
160156 final Tuple2 <Collection <Record >, WayangPlan > t = this .buildCollectorAndWayangPlan (sqlContext ,
161157 "SELECT exampleInt.NAMEC, COUNT(*) FROM fs.exampleInt GROUP BY NAMEC" );
162158 final Collection <Record > result = t .field0 ;
@@ -177,7 +173,6 @@ public void aggregateCountInJavaWithIntegers() throws Exception {
177173 @ Test
178174 public void aggregateCountInJava () throws Exception {
179175 final SqlContext sqlContext = this .createSqlContext ("/data/largeLeftTableIndex.csv" );
180- // SELECT acc.location, count(*) FROM postgres.site
181176 final Tuple2 <Collection <Record >, WayangPlan > t = this .buildCollectorAndWayangPlan (sqlContext ,
182177 "SELECT largeLeftTableIndex.NAMEC, COUNT(*) FROM fs.largeLeftTableIndex GROUP BY NAMEC" );
183178 final Collection <Record > result = t .field0 ;
@@ -222,8 +217,7 @@ public void filterIsNotValue() throws Exception {
222217 sqlContext .execute (wayangPlan );
223218
224219 assert (!result .stream ().anyMatch (record -> record .getField (0 ).equals ("test1" )));
225- }
226-
220+ }
227221
228222 @ Test
229223 public void filterIsNotNull () throws Exception {
@@ -257,7 +251,7 @@ public void javaReduceBy() throws Exception {
257251
258252 sqlContext .execute (wayangPlan );
259253
260- assert (result .stream ().findFirst (). get (). equals (new Record ("item1" , 2 )));
254+ assert (result .stream ().anyMatch ( rec -> rec . equals (new Record ("item1" , 2 ) )));
261255 }
262256
263257 @ Test
@@ -316,10 +310,10 @@ public void filterWithLike() throws Exception {
316310 final WayangPlan wayangPlan = t .field1 ;
317311 sqlContext .execute (wayangPlan );
318312
319- assert (result .stream ().findFirst (). get (). equals (new Record ("test1" , "test1" , "test2" )));
313+ assert (result .stream ().anyMatch ( rec -> rec . equals (new Record ("test1" , "test1" , "test2" ) )));
320314 }
321315
322- // @Test
316+ @ Test
323317 public void joinWithLargeLeftTableIndexCorrect () throws Exception {
324318 final SqlContext sqlContext = createSqlContext ("/data/largeLeftTableIndex.csv" );
325319
@@ -330,18 +324,51 @@ public void joinWithLargeLeftTableIndexCorrect() throws Exception {
330324 final Collection <Record > result = t .field0 ;
331325 final WayangPlan wayangPlan = t .field1 ;
332326 sqlContext .execute (wayangPlan );
327+
328+ final List <Record > shouldBe = List .of (
329+ new Record ("test1" , "test1" , "test2" , "test1" , "test1" , "test2" ),
330+ new Record ("test2" , "" , "test2" , "" , "test2" , "test2" ),
331+ new Record ("" , "test2" , "test2" , "test2" , "" , "test2" )
332+ );
333+
334+ final Map <Record , Integer > resultTally = result .stream ()
335+ .collect (Collectors .toMap (rec -> rec , rec -> 1 , Integer ::sum ));
336+ final Map <Record , Integer > shouldBeTally = shouldBe .stream ()
337+ .collect (Collectors .toMap (rec -> rec , rec -> 1 , Integer ::sum ));
338+
339+ assert (resultTally .equals (shouldBeTally ));
333340 }
334341
335- // @Test
342+ // Imagine case: l = {item1, item2}, r = {item3,item4}, j = {item1, item2,
343+ // item3, item4} join on =($1,$3) would be =(item2, item4) in the join set
344+ // however from the r set we need to factor in the
345+ // offset, $3 -> 3 - l.size() = $1, r($1) = "item4" we cannot naively assume
346+ // that it is always ordered as =(lRef,rRef), lRef < rRef.
347+ // it may also be =($3,$1)
348+ @ Test
336349 public void joinWithLargeLeftTableIndexMirrorAlias () throws Exception {
337350 final SqlContext sqlContext = createSqlContext ("/data/largeLeftTableIndex.csv" );
338351
339352 final Tuple2 <Collection <Record >, WayangPlan > t = this .buildCollectorAndWayangPlan (sqlContext ,
340353 "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " //
341354 );
355+
342356 final Collection <Record > result = t .field0 ;
343357 final WayangPlan wayangPlan = t .field1 ;
344358 sqlContext .execute (wayangPlan );
359+
360+ final List <Record > shouldBe = List .of (
361+ new Record ("test1" , "test1" , "test2" , "test1" , "test1" , "test2" ),
362+ new Record ("test2" , "" , "test2" , "" , "test2" , "test2" ),
363+ new Record ("" , "test2" , "test2" , "test2" , "" , "test2" )
364+ );
365+
366+ final Map <Record , Integer > resultTally = result .stream ()
367+ .collect (Collectors .toMap (rec -> rec , rec -> 1 , Integer ::sum ));
368+ final Map <Record , Integer > shouldBeTally = shouldBe .stream ()
369+ .collect (Collectors .toMap (rec -> rec , rec -> 1 , Integer ::sum ));
370+
371+ assert (resultTally .equals (shouldBeTally ));
345372 }
346373
347374 @ Test
@@ -356,7 +383,7 @@ public void exampleFilterTableRefToTableRef() throws Exception {
356383 final WayangPlan wayangPlan = t .field1 ;
357384 sqlContext .execute (wayangPlan );
358385
359- assert (result .stream ().findFirst (). get (). equals (new Record ("test1" , "test1" )));
386+ assert (result .stream ().anyMatch ( rec -> rec . equals (new Record ("test1" , "test1" ) )));
360387 }
361388
362389 @ Test
0 commit comments