Skip to content

Commit fbcf73d

Browse files
[BugFix] Fix ranking window optimization without partition-by and order-by (backport StarRocks#67081) (StarRocks#67093)
Signed-off-by: zihe.liu <[email protected]> Co-authored-by: zihe.liu <[email protected]>
1 parent c595a2a commit fbcf73d

File tree

3 files changed

+192
-1
lines changed

3 files changed

+192
-1
lines changed

fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/PushDownLimitRankingWindowRule.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.starrocks.sql.optimizer.operator.OperatorType;
2626
import com.starrocks.sql.optimizer.operator.SortPhase;
2727
import com.starrocks.sql.optimizer.operator.TopNType;
28+
import com.starrocks.sql.optimizer.operator.logical.LogicalLimitOperator;
2829
import com.starrocks.sql.optimizer.operator.logical.LogicalProjectOperator;
2930
import com.starrocks.sql.optimizer.operator.logical.LogicalTopNOperator;
3031
import com.starrocks.sql.optimizer.operator.logical.LogicalWindowOperator;
@@ -187,6 +188,20 @@ public List<OptExpression> transform(OptExpression input, OptimizerContext conte
187188
long limitValue = topNOperator.getOffset() + topNOperator.getLimit();
188189
TopNType topNType = TopNType.parse(callOperator.getFnName());
189190

191+
if (partitionByColumns.isEmpty() && rankRelatedWindowOperator.getEnforceSortColumns().isEmpty()) {
192+
if (topNType != TopNType.ROW_NUMBER) {
193+
return Collections.emptyList();
194+
}
195+
196+
LogicalLimitOperator limitOp = LogicalLimitOperator.init(limitValue);
197+
198+
// topN -> project -> window -> limit
199+
OptExpression limitOpExpr = OptExpression.create(limitOp, grandChildExpr.getInputs());
200+
OptExpression newWindowOptExp = OptExpression.create(rankRelatedWindowOperator, limitOpExpr);
201+
OptExpression newProjectOptExp = OptExpression.create(projectOperator, newWindowOptExp);
202+
return Collections.singletonList(OptExpression.create(topNOperator, newProjectOptExp));
203+
}
204+
190205
// If partition by columns is not empty, then we cannot derive sort property from the SortNode
191206
// OutputPropertyDeriver will generate PhysicalPropertySet.EMPTY if sortPhase is SortPhase.PARTIAL
192207
final SortPhase sortPhase = partitionByColumns.isEmpty() ? SortPhase.FINAL : SortPhase.PARTIAL;

fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/PushDownPredicateRankingWindowRule.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.starrocks.sql.optimizer.operator.SortPhase;
2929
import com.starrocks.sql.optimizer.operator.TopNType;
3030
import com.starrocks.sql.optimizer.operator.logical.LogicalFilterOperator;
31+
import com.starrocks.sql.optimizer.operator.logical.LogicalLimitOperator;
3132
import com.starrocks.sql.optimizer.operator.logical.LogicalProjectOperator;
3233
import com.starrocks.sql.optimizer.operator.logical.LogicalTopNOperator;
3334
import com.starrocks.sql.optimizer.operator.logical.LogicalWindowOperator;
@@ -183,6 +184,19 @@ public List<OptExpression> transform(OptExpression input, OptimizerContext conte
183184

184185
TopNType topNType = TopNType.parse(callOperator.getFnName());
185186

187+
if (partitionByColumns.isEmpty() && rankRelatedWindowOperator.getEnforceSortColumns().isEmpty()) {
188+
if (topNType != TopNType.ROW_NUMBER) {
189+
return Collections.emptyList();
190+
}
191+
192+
LogicalLimitOperator limitOp = LogicalLimitOperator.init(limitValue);
193+
194+
// Filter -> window -> limit
195+
OptExpression limitOpExpr = OptExpression.create(limitOp, rankRelatedOptExpr.getInputs());
196+
OptExpression newWindowOptExp = OptExpression.create(rankRelatedWindowOperator, limitOpExpr);
197+
return Collections.singletonList(OptExpression.create(filterOperator, newWindowOptExp));
198+
}
199+
186200
// If partition by columns is not empty, then we cannot derive sort property from the SortNode
187201
// OutputPropertyDeriver will generate PhysicalPropertySet.EMPTY if sortPhase is SortPhase.PARTIAL
188202
final SortPhase sortPhase = partitionByColumns.isEmpty() ? SortPhase.FINAL : SortPhase.PARTIAL;

fe/fe-core/src/test/java/com/starrocks/sql/plan/WindowTest.java

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public static void beforeClass() throws Exception {
5252
"\"compression\" = \"LZ4\" \n" +
5353
");");
5454
}
55-
55+
5656
@Test
5757
public void testLagWindowFunction() throws Exception {
5858
String sql = "select lag(id_datetime, 1, '2020-01-01') over(partition by t1c) from test_all_type;";
@@ -697,6 +697,168 @@ public void testRankingWindowWithPartitionLimitPushDown() throws Exception {
697697
FeConstants.runningUnitTest = false;
698698
}
699699

700+
@Test
701+
public void testLimitRankingWindowWithoutPartitionAndOrder() throws Exception {
702+
{
703+
String sql = "select * from (\n" +
704+
" select *, " +
705+
" row_number() over () as rk " +
706+
" from t0\n" +
707+
") sub_t0\n" +
708+
"order by rk limit 4;";
709+
String plan = getFragmentPlan(sql);
710+
assertContains(plan, " 3:TOP-N\n" +
711+
" | order by: <slot 4> 4: row_number() ASC\n" +
712+
" | offset: 0\n" +
713+
" | limit: 4\n" +
714+
" | \n" +
715+
" 2:ANALYTIC\n" +
716+
" | functions: [, row_number(), ]\n" +
717+
" | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\n" +
718+
" | \n" +
719+
" 1:EXCHANGE\n" +
720+
" limit: 4", " STREAM DATA SINK\n" +
721+
" EXCHANGE ID: 01\n" +
722+
" UNPARTITIONED\n" +
723+
"\n" +
724+
" 0:OlapScanNode\n" +
725+
" TABLE: t0\n" +
726+
" PREAGGREGATION: ON\n" +
727+
" partitions=0/1\n" +
728+
" rollup: t0\n" +
729+
" tabletRatio=0/0\n" +
730+
" tabletList=\n" +
731+
" cardinality=1\n" +
732+
" avgRowSize=3.0\n" +
733+
" limit: 4");
734+
}
735+
736+
{
737+
String sql = "select * from (\n" +
738+
" select *, " +
739+
" rank() over () as rk " +
740+
" from t0\n" +
741+
") sub_t0\n" +
742+
"order by rk limit 4;";
743+
String plan = getFragmentPlan(sql);
744+
assertContains(plan, " 3:TOP-N\n" +
745+
" | order by: <slot 4> 4: rank() ASC\n" +
746+
" | offset: 0\n" +
747+
" | limit: 4\n" +
748+
" | \n" +
749+
" 2:ANALYTIC\n" +
750+
" | functions: [, rank(), ]\n" +
751+
" | \n" +
752+
" 1:EXCHANGE", " STREAM DATA SINK\n" +
753+
" EXCHANGE ID: 01\n" +
754+
" UNPARTITIONED\n" +
755+
"\n" +
756+
" 0:OlapScanNode");
757+
}
758+
759+
{
760+
String sql = "select * from (\n" +
761+
" select *, " +
762+
" dense_rank() over () as rk " +
763+
" from t0\n" +
764+
") sub_t0\n" +
765+
"order by rk limit 4;";
766+
String plan = getFragmentPlan(sql);
767+
assertContains(plan, " 3:TOP-N\n" +
768+
" | order by: <slot 4> 4: dense_rank() ASC\n" +
769+
" | offset: 0\n" +
770+
" | limit: 4\n" +
771+
" | \n" +
772+
" 2:ANALYTIC\n" +
773+
" | functions: [, dense_rank(), ]\n" +
774+
" | \n" +
775+
" 1:EXCHANGE", " STREAM DATA SINK\n" +
776+
" EXCHANGE ID: 01\n" +
777+
" UNPARTITIONED\n" +
778+
"\n" +
779+
" 0:OlapScanNode");
780+
}
781+
}
782+
783+
@Test
784+
public void testPredicateRankingWindowWithoutPartitionAndOrder() throws Exception {
785+
{
786+
String sql = "select * from (\n" +
787+
" select *, " +
788+
" row_number() over () as rk " +
789+
" from t0\n" +
790+
") sub_t0\n" +
791+
"where rk <= 4;";
792+
String plan = getFragmentPlan(sql);
793+
assertContains(plan, " 3:SELECT\n" +
794+
" | predicates: 4: row_number() <= 4\n" +
795+
" | \n" +
796+
" 2:ANALYTIC\n" +
797+
" | functions: [, row_number(), ]\n" +
798+
" | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\n" +
799+
" | \n" +
800+
" 1:EXCHANGE\n" +
801+
" limit: 4",
802+
" STREAM DATA SINK\n" +
803+
" EXCHANGE ID: 01\n" +
804+
" UNPARTITIONED\n" +
805+
"\n" +
806+
" 0:OlapScanNode\n" +
807+
" TABLE: t0\n" +
808+
" PREAGGREGATION: ON\n" +
809+
" partitions=0/1\n" +
810+
" rollup: t0\n" +
811+
" tabletRatio=0/0\n" +
812+
" tabletList=\n" +
813+
" cardinality=1\n" +
814+
" avgRowSize=3.0\n" +
815+
" limit: 4");
816+
}
817+
818+
{
819+
String sql = "select * from (\n" +
820+
" select *, " +
821+
" rank() over () as rk " +
822+
" from t0\n" +
823+
") sub_t0\n" +
824+
"where rk <= 4;";
825+
String plan = getFragmentPlan(sql);
826+
assertContains(plan, " 3:SELECT\n" +
827+
" | predicates: 4: rank() <= 4\n" +
828+
" | \n" +
829+
" 2:ANALYTIC\n" +
830+
" | functions: [, rank(), ]\n" +
831+
" | \n" +
832+
" 1:EXCHANGE", " STREAM DATA SINK\n" +
833+
" EXCHANGE ID: 01\n" +
834+
" UNPARTITIONED\n" +
835+
"\n" +
836+
" 0:OlapScanNode");
837+
}
838+
839+
{
840+
String sql = "select * from (\n" +
841+
" select *, " +
842+
" dense_rank() over () as rk " +
843+
" from t0\n" +
844+
") sub_t0\n" +
845+
"where rk <= 4;";
846+
String plan = getFragmentPlan(sql);
847+
assertContains(plan, " 3:SELECT\n" +
848+
" | predicates: 4: dense_rank() <= 4\n" +
849+
" | \n" +
850+
" 2:ANALYTIC\n" +
851+
" | functions: [, dense_rank(), ]\n" +
852+
" | \n" +
853+
" 1:EXCHANGE",
854+
" STREAM DATA SINK\n" +
855+
" EXCHANGE ID: 01\n" +
856+
" UNPARTITIONED\n" +
857+
"\n" +
858+
" 0:OlapScanNode");
859+
}
860+
}
861+
700862
@Test
701863
public void testRankingWindowPreAggWithPartitionPredicatePushDown() throws Exception {
702864
FeConstants.runningUnitTest = true;

0 commit comments

Comments
 (0)