Skip to content

Commit e171259

Browse files
authored
Merge pull request #552 from mspruc/main
Sort operator for java platforms in the sql-api
2 parents af2f391 + 198ef99 commit e171259

12 files changed

Lines changed: 959 additions & 611 deletions

File tree

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangMultiConditionJoinVisitor.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,12 @@ Operator visit(WayangJoin wayangRelNode) {
134134
}
135135

136136
/**
137-
* This method handles the {@link JoinOperator} creation, used in conjunction
138-
* with:
139-
* {@link #determineKeyExtractionDirection(Integer, Integer, WayangJoin)}
137+
* This method handles the {@link JoinOperator} creation
140138
*
141139
* @param wayangRelNode
142140
* @param leftKeyIndex
143141
* @param rightKeyIndex
144-
* @return a {@link JoinOperator} with {@link KeyExtractors} set
142+
* @return
145143
*/
146144
protected JoinOperator<Record, Record, Record> getJoinOperator(final Integer[] leftKeyIndexes,
147145
final Integer[] rightKeyIndexes,

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public Operator convert(final RelNode node) {
5656
return new WayangProjectVisitor(this).visit((WayangProject) node);
5757
} else if (node instanceof WayangFilter) {
5858
return new WayangFilterVisitor(this).visit((WayangFilter) node);
59+
} else if (node instanceof WayangSort) {
60+
return new WayangSortVisitor(this).visit((WayangSort) node);
5961
} else if (node instanceof WayangJoin && ((WayangJoin) node).getCondition().isA(SqlKind.AND)) {
6062
return new WayangMultiConditionJoinVisitor(this).visit((WayangJoin) node);
6163
} else if (node instanceof WayangJoin && WayangJoin.class.cast(node).getCondition().isAlwaysTrue()) {
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.wayang.api.sql.calcite.converter;
19+
20+
import java.util.List;
21+
import java.util.stream.Collectors;
22+
23+
import org.apache.calcite.rel.RelCollation;
24+
import org.apache.calcite.rel.RelFieldCollation.Direction;
25+
import org.apache.calcite.rex.RexLiteral;
26+
import org.apache.calcite.rex.RexNode;
27+
28+
import org.apache.wayang.api.sql.calcite.converter.functions.SortKeyExtractor;
29+
import org.apache.wayang.api.sql.calcite.rel.WayangSort;
30+
import org.apache.wayang.basic.data.Record;
31+
import org.apache.wayang.basic.operators.SortOperator;
32+
import org.apache.wayang.core.function.TransformationDescriptor;
33+
import org.apache.wayang.core.plan.wayangplan.Operator;
34+
35+
public class WayangSortVisitor extends WayangRelNodeVisitor<WayangSort> {
36+
37+
WayangSortVisitor(final WayangRelConverter wayangRelConverter) {
38+
super(wayangRelConverter);
39+
}
40+
41+
@Override
42+
Operator visit(final WayangSort wayangRelNode) {
43+
assert (wayangRelNode.getInputs().size() == 1)
44+
: "Sorts must only have one input, but found: " + wayangRelNode.getInputs().size();
45+
46+
final Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput());
47+
48+
//TODO: implement fetch & offset for java
49+
final RexNode fetch = wayangRelNode.fetch;
50+
final RexLiteral offset = (RexLiteral) wayangRelNode.offset;
51+
52+
if (fetch != null || offset != null) throw new UnsupportedOperationException("Offset and fetch currently not supported, these appear via LIMIT statements in SQL");
53+
54+
final RelCollation collation = wayangRelNode.getCollation();
55+
56+
final List<Direction> collationDirections = collation.getFieldCollations().stream()
57+
.map(fieldCol -> fieldCol.getDirection())
58+
.collect(Collectors.toList());
59+
60+
final List<Integer> collationIndexes = collation.getFieldCollations().stream()
61+
.map(fieldCol -> fieldCol.getFieldIndex())
62+
.collect(Collectors.toList());
63+
64+
final TransformationDescriptor<Record, Record> td = new TransformationDescriptor<Record, Record>(
65+
new SortKeyExtractor(
66+
collationDirections,
67+
collationIndexes),
68+
Record.class, Record.class);
69+
70+
final SortOperator<Record, Record> sort = new SortOperator<Record, Record>(td);
71+
72+
childOp.connectTo(0, sort, 0);
73+
74+
return sort;
75+
}
76+
77+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.api.sql.calcite.converter.functions;
20+
21+
import java.util.List;
22+
23+
import org.apache.calcite.rel.RelFieldCollation.Direction;
24+
25+
import org.apache.wayang.basic.data.Record;
26+
import org.apache.wayang.core.function.FunctionDescriptor;
27+
28+
public class SortKeyExtractor implements FunctionDescriptor.SerializableFunction<Record, Record> {
29+
final List<Direction> directions;
30+
final List<Integer> collationIndexes;
31+
32+
public SortKeyExtractor(final List<Direction> collationDirections, final List<Integer> collationIndexes) {
33+
this.directions = collationDirections;
34+
this.collationIndexes = collationIndexes;
35+
}
36+
37+
@Override
38+
public Record apply(final Record record) {
39+
return new Record(collationIndexes.stream().map(record::getField).toArray()) {
40+
@Override
41+
public int compareTo(final Record that) throws IllegalStateException {
42+
assert (directions.size() == collationIndexes.size()) : "Mismatch between the amount of collation indexes and directions";
43+
44+
for (int i = 0; i < directions.size(); i++) {
45+
final Direction direction = directions.get(i);
46+
final Comparable thisField = (Comparable) this.getValues()[i];
47+
final Comparable thatField = (Comparable) that.getValues()[i];
48+
49+
// == 0, < -1, > 1
50+
final int comparison = direction.isDescending() ? -thisField.compareTo(thatField) : thisField.compareTo(thatField);
51+
52+
if (comparison != 0) return comparison;
53+
}
54+
55+
return 0;
56+
}
57+
};
58+
}
59+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.wayang.api.sql.calcite.rel;
19+
20+
import org.apache.calcite.plan.RelOptCluster;
21+
import org.apache.calcite.plan.RelTraitSet;
22+
import org.apache.calcite.rel.RelCollation;
23+
import org.apache.calcite.rel.RelNode;
24+
import org.apache.calcite.rel.core.Sort;
25+
import org.apache.calcite.rel.hint.RelHint;
26+
import org.apache.calcite.rex.RexNode;
27+
import org.apache.wayang.api.sql.calcite.convention.WayangConvention;
28+
import org.checkerframework.checker.nullness.qual.Nullable;
29+
30+
import java.util.List;
31+
32+
public class WayangSort extends Sort implements WayangRel {
33+
public WayangSort(final RelOptCluster cluster,
34+
final RelTraitSet traits,
35+
final List<RelHint> hints,
36+
final RelNode child,
37+
final RelCollation collation,
38+
@Nullable final RexNode offset,
39+
@Nullable final RexNode fetch) {
40+
super(cluster, traits, hints, child, collation, offset, fetch);
41+
assert getConvention() instanceof WayangConvention;
42+
}
43+
44+
@Override
45+
public Sort copy(final RelTraitSet traitSet, final RelNode newInput, final RelCollation newCollation, @Nullable final RexNode offset,
46+
@Nullable final RexNode fetch) {
47+
return new WayangSort(getCluster(), traitSet, getHints(), newInput, newCollation, offset, fetch);
48+
}
49+
50+
@Override
51+
public String toString() {
52+
return "WayangSort";
53+
}
54+
}

0 commit comments

Comments
 (0)