Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,12 @@ Operator visit(WayangJoin wayangRelNode) {
}

/**
* This method handles the {@link JoinOperator} creation, used in conjunction
* with:
* {@link #determineKeyExtractionDirection(Integer, Integer, WayangJoin)}
* This method handles the {@link JoinOperator} creation
*
* @param wayangRelNode
* @param leftKeyIndex
* @param rightKeyIndex
* @return a {@link JoinOperator} with {@link KeyExtractors} set
* @return
*/
protected JoinOperator<Record, Record, Record> getJoinOperator(final Integer[] leftKeyIndexes,
final Integer[] rightKeyIndexes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public Operator convert(final RelNode node) {
return new WayangProjectVisitor(this).visit((WayangProject) node);
} else if (node instanceof WayangFilter) {
return new WayangFilterVisitor(this).visit((WayangFilter) node);
} else if (node instanceof WayangSort) {
return new WayangSortVisitor(this).visit((WayangSort) node);
} else if (node instanceof WayangJoin && ((WayangJoin) node).getCondition().isA(SqlKind.AND)) {
return new WayangMultiConditionJoinVisitor(this).visit((WayangJoin) node);
} else if (node instanceof WayangJoin && WayangJoin.class.cast(node).getCondition().isAlwaysTrue()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.api.sql.calcite.converter;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;

import org.apache.wayang.api.sql.calcite.converter.functions.SortKeyExtractor;
import org.apache.wayang.api.sql.calcite.rel.WayangSort;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.operators.SortOperator;
import org.apache.wayang.core.function.TransformationDescriptor;
import org.apache.wayang.core.plan.wayangplan.Operator;

public class WayangSortVisitor extends WayangRelNodeVisitor<WayangSort> {

WayangSortVisitor(final WayangRelConverter wayangRelConverter) {
super(wayangRelConverter);
}

@Override
Operator visit(final WayangSort wayangRelNode) {
assert (wayangRelNode.getInputs().size() == 1)
: "Sorts must only have one input, but found: " + wayangRelNode.getInputs().size();

final Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput());

//TODO: implement fetch & offset for java
final RexNode fetch = wayangRelNode.fetch;
final RexLiteral offset = (RexLiteral) wayangRelNode.offset;

if (fetch != null || offset != null) throw new UnsupportedOperationException("Offset and fetch currently not supported, these appear via LIMIT statements in SQL");

final RelCollation collation = wayangRelNode.getCollation();

final List<Direction> collationDirections = collation.getFieldCollations().stream()
.map(fieldCol -> fieldCol.getDirection())
.collect(Collectors.toList());

final List<Integer> collationIndexes = collation.getFieldCollations().stream()
.map(fieldCol -> fieldCol.getFieldIndex())
.collect(Collectors.toList());

final TransformationDescriptor<Record, Record> td = new TransformationDescriptor<Record, Record>(
new SortKeyExtractor(
collationDirections,
collationIndexes),
Record.class, Record.class);

final SortOperator<Record, Record> sort = new SortOperator<Record, Record>(td);

childOp.connectTo(0, sort, 0);

return sort;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.api.sql.calcite.converter.functions;

import java.util.List;

import org.apache.calcite.rel.RelFieldCollation.Direction;

import org.apache.wayang.basic.data.Record;
import org.apache.wayang.core.function.FunctionDescriptor;

public class SortKeyExtractor implements FunctionDescriptor.SerializableFunction<Record, Record> {
final List<Direction> directions;
final List<Integer> collationIndexes;

public SortKeyExtractor(final List<Direction> collationDirections, final List<Integer> collationIndexes) {
this.directions = collationDirections;
this.collationIndexes = collationIndexes;
}

@Override
public Record apply(final Record record) {
return new Record(collationIndexes.stream().map(record::getField).toArray()) {
@Override
public int compareTo(final Record that) throws IllegalStateException {
assert (directions.size() == collationIndexes.size()) : "Mismatch between the amount of collation indexes and directions";

for (int i = 0; i < directions.size(); i++) {
final Direction direction = directions.get(i);
final Comparable thisField = (Comparable) this.getValues()[i];
final Comparable thatField = (Comparable) that.getValues()[i];

// == 0, < -1, > 1
final int comparison = direction.isDescending() ? -thisField.compareTo(thatField) : thisField.compareTo(thatField);

if (comparison != 0) return comparison;
}

return 0;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.api.sql.calcite.rel;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rex.RexNode;
import org.apache.wayang.api.sql.calcite.convention.WayangConvention;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.List;

public class WayangSort extends Sort implements WayangRel {
public WayangSort(final RelOptCluster cluster,
final RelTraitSet traits,
final List<RelHint> hints,
final RelNode child,
final RelCollation collation,
@Nullable final RexNode offset,
@Nullable final RexNode fetch) {
super(cluster, traits, hints, child, collation, offset, fetch);
assert getConvention() instanceof WayangConvention;
}

@Override
public Sort copy(final RelTraitSet traitSet, final RelNode newInput, final RelCollation newCollation, @Nullable final RexNode offset,
@Nullable final RexNode fetch) {
return new WayangSort(getCluster(), traitSet, getHints(), newInput, newCollation, offset, fetch);
}

@Override
public String toString() {
return "WayangSort";
}
}
Loading
Loading