Skip to content

Commit 7784fd1

Browse files
authored
Merge pull request #554 from mspruc/main
fetch, offset & limit for java platforms in sql-api
2 parents e171259 + cd54718 commit 7784fd1

4 files changed

Lines changed: 70 additions & 10 deletions

File tree

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222

2323
import org.apache.calcite.rel.RelCollation;
2424
import org.apache.calcite.rel.RelFieldCollation.Direction;
25+
import org.apache.calcite.rex.RexInputRef;
2526
import org.apache.calcite.rex.RexLiteral;
26-
import org.apache.calcite.rex.RexNode;
2727

28+
import org.apache.wayang.api.sql.calcite.converter.functions.SortFilter;
2829
import org.apache.wayang.api.sql.calcite.converter.functions.SortKeyExtractor;
2930
import org.apache.wayang.api.sql.calcite.rel.WayangSort;
3031
import org.apache.wayang.basic.data.Record;
32+
import org.apache.wayang.basic.operators.FilterOperator;
3133
import org.apache.wayang.basic.operators.SortOperator;
3234
import org.apache.wayang.core.function.TransformationDescriptor;
3335
import org.apache.wayang.core.plan.wayangplan.Operator;
@@ -45,12 +47,14 @@ Operator visit(final WayangSort wayangRelNode) {
4547

4648
final Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput());
4749

48-
//TODO: implement fetch & offset for java
49-
final RexNode fetch = wayangRelNode.fetch;
50-
final RexLiteral offset = (RexLiteral) wayangRelNode.offset;
50+
// TODO: implement fetch & offset for java
51+
final RexLiteral fetch = (RexLiteral) wayangRelNode.fetch;
52+
final RexInputRef offset = (RexInputRef) wayangRelNode.offset;
53+
54+
// if (fetch != null || offset != null) throw new
55+
// UnsupportedOperationException("Offset and fetch currently not supported,
56+
// these appear via LIMIT statements in SQL");
5157

52-
if (fetch != null || offset != null) throw new UnsupportedOperationException("Offset and fetch currently not supported, these appear via LIMIT statements in SQL");
53-
5458
final RelCollation collation = wayangRelNode.getCollation();
5559

5660
final List<Direction> collationDirections = collation.getFieldCollations().stream()
@@ -71,7 +75,16 @@ Operator visit(final WayangSort wayangRelNode) {
7175

7276
childOp.connectTo(0, sort, 0);
7377

74-
return sort;
78+
79+
final SortFilter sortFilter = new SortFilter(
80+
fetch != null ? RexLiteral.intValue(fetch) : Integer.MAX_VALUE,
81+
offset != null ? RexLiteral.intValue(offset) : 0);
82+
83+
final FilterOperator<Record> filter = new FilterOperator<Record>(sortFilter, Record.class);
84+
85+
sort.connectTo(0, filter, 0);
86+
87+
return filter;
7588
}
7689

7790
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.api.sql.calcite.converter.functions;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import org.apache.wayang.core.function.FunctionDescriptor;
23+
24+
public class SortFilter implements FunctionDescriptor.SerializablePredicate<Record> {
25+
final int offset;
26+
final int fetch;
27+
int increment;
28+
29+
/**
30+
* The filter for a calcite/sql sort operator
31+
* usually triggered by "LIMIT x", "OFFSET x", "FETCH x" statements
32+
* @param offset amount of records ignored before accepting
33+
* @param fetch amount of records accepted
34+
*/
35+
public SortFilter(final int fetch, final int offset) {
36+
this.fetch = fetch;
37+
this.offset = offset;
38+
}
39+
40+
@Override
41+
public boolean test(final Record record) {
42+
final boolean test = increment >= offset && increment <= fetch;
43+
increment++;
44+
45+
return test;
46+
}
47+
}

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangSortRule.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public RelNode convert(final RelNode rel) {
5454
sort.getHints(),
5555
newInput,
5656
sort.collation,
57-
sort.fetch,
58-
sort.offset);
57+
sort.offset,
58+
sort.fetch);
5959
}
6060
}

wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ public void filterWithLike() throws Exception {
328328
assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2"))));
329329
}
330330

331-
//@Test
331+
@Test
332332
public void javaLimit() throws Exception {
333333
final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv");
334334

0 commit comments

Comments
 (0)