Skip to content

Commit 626d92e

Browse files
committed
cleanup and improve code quality
1 parent 6af0137 commit 626d92e

4 files changed

Lines changed: 44 additions & 69 deletions

File tree

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateAddCols.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.wayang.api.sql.calcite.converter.functions;
1919

20-
import java.util.Arrays;
2120
import java.util.List;
2221

2322
import org.apache.calcite.rel.core.AggregateCall;
@@ -55,7 +54,6 @@ public Record apply(final Record record) {
5554
}
5655

5756
resValues[newRecordSize - 1] = 1;
58-
System.out.println("AddCols: returning res valueS: " + Arrays.toString(resValues) + ", vs rec: " + record);
5957
return new Record(resValues);
6058
}
6159
}

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateFunction.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,26 +76,19 @@ public Record apply(final Record record1, final Record record2) {
7676
assert (field1 instanceof Integer && field2 instanceof Integer)
7777
: "Expected to find integers for count but found: " + field1 + " and " + field2;
7878
final Object avg = Integer.class.cast(field1) + Integer.class.cast(field2);
79-
80-
resValues[counter] = avg;
8179

82-
if(!countDone) {
83-
resValues[l-1] = record1.getInt(l-1) + record2.getInt(l-1);
80+
resValues[counter] = avg;
81+
82+
if (!countDone) {
83+
resValues[l - 1] = record1.getInt(l - 1) + record2.getInt(l - 1);
8484
countDone = true;
8585
}
86-
87-
System.out.println("AggregateFunction: putting avg: " + avg + ", using fields: " + field1 + ", and: " + field2);
88-
System.out.println("AggregateFunction: resvalues[counter]: " + resValues[counter]);
89-
System.out.println("AggregateFunction resvalues after mutation: " + Arrays.toString(resValues));
9086
break;
9187
default:
9288
throw new IllegalStateException("Unsupported operation: " + aggregateCall.getAggregation().kind);
9389
}
9490
counter++;
9591
}
96-
97-
System.out.println("AggregateFunction: records: " + record1 + ", and: " + record2);
98-
System.out.println("AggregateFunction: returning resvalues: " + Arrays.toString(resValues));
9992
return new Record(resValues);
10093
}
10194

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateGetResult.java

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
import java.util.Arrays;
2222
import java.util.List;
2323
import java.util.Set;
24+
import java.util.stream.IntStream;
25+
import java.util.stream.Stream;
2426

2527
import org.apache.calcite.rel.core.AggregateCall;
28+
import org.apache.calcite.sql.SqlKind;
2629
import org.apache.wayang.basic.data.Record;
2730
import org.apache.wayang.core.function.FunctionDescriptor;
2831

@@ -37,41 +40,22 @@ public AggregateGetResult(final List<AggregateCall> aggregateCalls, final Set<In
3740

3841
@Override
3942
public Record apply(final Record record) {
40-
System.out.println("GetResult: rec: " + record);
41-
System.out.println("GetResult: aggCall: " + aggregateCallList);
42-
4343
final int recordSize = record.size();
44-
final int outputRecordSize = aggregateCallList.size() + groupingfields.size();
45-
final Object[] resValues = new Object[outputRecordSize];
46-
47-
int i = 0;
48-
int j = 0;
49-
50-
groupingfields.stream().forEach(System.out::println);
51-
52-
System.out.println("GetResult: grouping fields: " + groupingfields);
53-
for (i = 0; j < groupingfields.size(); i++) {
54-
if (groupingfields.contains(i)) {
55-
resValues[j] = record.getField(i);
56-
j++;
57-
}
58-
}
59-
60-
System.out.println("GetResult: resvalues post calc: " + Arrays.toString(resValues));
61-
62-
i = recordSize - aggregateCallList.size() - 1;
63-
for (final AggregateCall aggregateCall : aggregateCallList) {
64-
final String name = aggregateCall.getAggregation().getName();
65-
if (name.equals("AVG")) {
66-
System.out.println("GetResult: avg: " + record.getField(i) + " and " + record.getField(recordSize - 1));
67-
resValues[j] = record.getDouble(i) / record.getDouble(recordSize - 1);
68-
} else {
69-
resValues[j] = record.getField(i);
70-
}
71-
j++;
72-
i++;
73-
}
74-
75-
return new Record(resValues);
44+
final int aggregateCallOffset = recordSize - aggregateCallList.size() - 1;
45+
46+
final Object[] fields = groupingfields.stream()
47+
.map(record::getField)
48+
.toArray();
49+
50+
final Object[] aggregateCallFields = IntStream.range(0, aggregateCallList.size())
51+
.mapToObj(i -> aggregateCallList.get(i).getAggregation().getKind().equals(SqlKind.AVG)
52+
? record.getDouble(i + aggregateCallOffset) / record.getDouble(recordSize - 1)
53+
: record.getField(i + aggregateCallOffset))
54+
.toArray();
55+
56+
final Object[] combinedFields = Stream.concat(Arrays.stream(fields), Arrays.stream(aggregateCallFields))
57+
.toArray();
58+
59+
return new Record(combinedFields);
7660
}
7761
}

wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public Tuple2<Collection<Record>, WayangPlan> buildCollectorAndWayangPlan(final
125125
return new Tuple2<>(collector, wayangPlan);
126126
}
127127

128-
//@Test
128+
@Test
129129
public void javaJoinTest() throws Exception {
130130
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
131131
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
@@ -144,7 +144,7 @@ public void javaJoinTest() throws Exception {
144144
.anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2", "test1", "test1"))));
145145
}
146146

147-
//@Test
147+
@Test
148148
public void javaMultiConditionJoin() throws Exception {
149149
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
150150
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
@@ -165,7 +165,7 @@ public void javaMultiConditionJoin() throws Exception {
165165
assert (checkEq);
166166
}
167167

168-
//@Test
168+
@Test
169169
public void aggregateCountInJavaWithIntegers() throws Exception {
170170
final SqlContext sqlContext = this.createSqlContext("/data/exampleInt.csv");
171171
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
@@ -185,7 +185,7 @@ public void aggregateCountInJavaWithIntegers() throws Exception {
185185
assert (rec.getInt(1) == 3);
186186
}
187187

188-
//@Test
188+
@Test
189189
public void aggregateCountInJava() throws Exception {
190190
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
191191
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
@@ -205,7 +205,7 @@ public void aggregateCountInJava() throws Exception {
205205
assert (rec.getInt(1) == 3);
206206
}
207207

208-
//@Test
208+
@Test
209209
public void filterIsNull() throws Exception {
210210
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
211211

@@ -227,14 +227,14 @@ public void javaAverage() throws Exception {
227227
);
228228
final Collection<Record> result = t.field0;
229229
final WayangPlan wayangPlan = t.field1;
230-
230+
231231
sqlContext.execute(wayangPlan);
232232

233233
assert (result.size() == 1);
234234
assert (result.stream().findFirst().get().getDouble(0) == 0.875f);
235235
}
236236

237-
//@Test
237+
@Test
238238
public void filterNotEqualsValue() throws Exception {
239239
final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv");
240240

@@ -250,7 +250,7 @@ public void filterNotEqualsValue() throws Exception {
250250
assert (!result.stream().anyMatch(record -> record.getField(0).equals("test1")));
251251
}
252252

253-
//@Test
253+
@Test
254254
public void filterIsNotNull() throws Exception {
255255
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
256256

@@ -265,7 +265,7 @@ public void filterIsNotNull() throws Exception {
265265
assert (!result.stream().anyMatch(record -> record.getField(0).equals(null)));
266266
}
267267

268-
//@Test
268+
@Test
269269
public void javaReduceBy() throws Exception {
270270
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
271271

@@ -285,7 +285,7 @@ public void javaReduceBy() throws Exception {
285285
assert (result.stream().anyMatch(rec -> rec.equals(new Record("item1", 2))));
286286
}
287287

288-
//@Test
288+
@Test
289289
public void javaCrossJoin() throws Exception {
290290
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
291291

@@ -314,7 +314,7 @@ public void javaCrossJoin() throws Exception {
314314
assert (resultTally.equals(shouldBeTally));
315315
}
316316

317-
//@Test
317+
@Test
318318
public void filterWithNotLike() throws Exception {
319319
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
320320

@@ -329,7 +329,7 @@ public void filterWithNotLike() throws Exception {
329329
assert (!result.stream().anyMatch(record -> record.getString(0).equals("test1")));
330330
}
331331

332-
//@Test
332+
@Test
333333
public void filterWithLike() throws Exception {
334334
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
335335

@@ -344,7 +344,7 @@ public void filterWithLike() throws Exception {
344344
assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2"))));
345345
}
346346

347-
//@Test
347+
@Test
348348
public void javaLimit() throws Exception {
349349
final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv");
350350

@@ -361,7 +361,7 @@ public void javaLimit() throws Exception {
361361
assert (result.get(0).equals(new Record(2, "a", "a", 2)));
362362
}
363363

364-
//@Test
364+
@Test
365365
public void javaSort() throws Exception {
366366
final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv");
367367

@@ -384,7 +384,7 @@ public void javaSort() throws Exception {
384384
assert (result.get(6).equals(new Record(0, "b", "b", 1)));
385385
}
386386

387-
//@Test
387+
@Test
388388
public void joinWithLargeLeftTableIndexCorrect() throws Exception {
389389
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
390390

@@ -415,7 +415,7 @@ public void joinWithLargeLeftTableIndexCorrect() throws Exception {
415415
// offset, $3 -> 3 - l.size() = $1, r($1) = "item4" we cannot naively assume
416416
// that it is always ordered as =(lRef,rRef), lRef < rRef.
417417
// it may also be =($3,$1)
418-
//@Test
418+
@Test
419419
public void joinWithLargeLeftTableIndexMirrorAlias() throws Exception {
420420
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
421421

@@ -440,7 +440,7 @@ public void joinWithLargeLeftTableIndexMirrorAlias() throws Exception {
440440
assert (resultTally.equals(shouldBeTally));
441441
}
442442

443-
// //@Test
443+
// @Test
444444
public void sparkFilter() throws Exception {
445445
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
446446

@@ -461,7 +461,7 @@ public void sparkFilter() throws Exception {
461461
}
462462

463463
// tests sql-apis ability to serialize projections and joins
464-
//@Test
464+
@Test
465465
public void sparkInnerJoin() throws Exception {
466466
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
467467

@@ -491,7 +491,7 @@ public void sparkInnerJoin() throws Exception {
491491
assert (resultTally.equals(shouldBeTally));
492492
}
493493

494-
// //@Test
494+
// @Test
495495
public void rexSerializationTest() throws Exception {
496496
// create filterPredicateImpl for serialisation
497497
final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
@@ -515,7 +515,7 @@ public void rexSerializationTest() throws Exception {
515515
assert (((FilterPredicateImpl) deserializedObject).test(new Record("test")));
516516
}
517517

518-
//@Test
518+
@Test
519519
public void exampleFilterTableRefToTableRef() throws Exception {
520520
final SqlContext sqlContext = createSqlContext("/data/exampleRefToRef.csv");
521521

@@ -530,7 +530,7 @@ public void exampleFilterTableRefToTableRef() throws Exception {
530530
assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1"))));
531531
}
532532

533-
//@Test
533+
@Test
534534
public void exampleMinWithStrings() throws Exception {
535535
final SqlContext sqlContext = createSqlContext("/data/exampleMin.csv");
536536

0 commit comments

Comments
 (0)