@@ -360,6 +360,9 @@ impl VirtualExecutionPlan {
360360/// `Limit` and `Distinct` (they call `derive_with_dialect_alias` when
361361/// `already_projected` is true). `SubqueryAlias` is included for safety.
362362fn sink_projection_below_sort ( plan : LogicalPlan ) -> Result < LogicalPlan > {
363+ if find_top_sort ( & plan) . is_none ( ) {
364+ return Ok ( plan) ;
365+ }
363366 let LogicalPlan :: Projection ( proj) = plan else {
364367 return Ok ( plan) ;
365368 } ;
@@ -1529,6 +1532,55 @@ mod tests {
15291532 Ok ( ( ) )
15301533 }
15311534
1535+ /// If there is no reachable `Sort` under the transparent wrappers,
1536+ /// `sink_projection_below_sort` must leave the plan unchanged.
1537+ ///
1538+ /// This guards against accidentally changing semantics for expressions
1539+ /// that can error or have side effects (and against changing DISTINCT
1540+ /// behaviour) in plans that are not actually ORDER BY queries.
1541+ #[ test]
1542+ fn sink_projection_below_sort_without_sort_leaves_plan_unchanged ( ) -> Result < ( ) , DataFusionError >
1543+ {
1544+ use datafusion:: common:: DFSchema ;
1545+ use datafusion:: logical_expr:: { Distinct , Limit , LogicalPlan , Projection , SubqueryAlias } ;
1546+ use datafusion:: prelude:: { col, lit} ;
1547+ use std:: sync:: Arc ;
1548+
1549+ let schema = Arc :: new ( Schema :: new ( vec ! [
1550+ Field :: new( "id" , DataType :: Int64 , false ) ,
1551+ Field :: new( "name" , DataType :: Utf8 , false ) ,
1552+ ] ) ) ;
1553+ let df_schema = Arc :: new ( DFSchema :: try_from ( schema. as_ref ( ) . clone ( ) ) ?) ;
1554+
1555+ let leaf = LogicalPlan :: EmptyRelation ( datafusion:: logical_expr:: EmptyRelation {
1556+ produce_one_row : false ,
1557+ schema : df_schema,
1558+ } ) ;
1559+
1560+ // Projection → Limit → Distinct → SubqueryAlias → EmptyRelation
1561+ // (no Sort anywhere).
1562+ let aliased = LogicalPlan :: SubqueryAlias ( SubqueryAlias :: try_new ( Arc :: new ( leaf) , "t" ) ?) ;
1563+ let distinct = LogicalPlan :: Distinct ( Distinct :: All ( Arc :: new ( aliased) ) ) ;
1564+ let limited = LogicalPlan :: Limit ( Limit {
1565+ skip : None ,
1566+ fetch : Some ( Box :: new ( lit ( 5i64 ) ) ) ,
1567+ input : Arc :: new ( distinct) ,
1568+ } ) ;
1569+ let original = LogicalPlan :: Projection ( Projection :: try_new (
1570+ vec ! [ col( "id" ) , col( "name" ) ] ,
1571+ Arc :: new ( limited) ,
1572+ ) ?) ;
1573+
1574+ let rewritten = sink_projection_below_sort ( original. clone ( ) ) ?;
1575+
1576+ assert_eq ! (
1577+ rewritten, original,
1578+ "plan without reachable Sort should remain unchanged"
1579+ ) ;
1580+
1581+ Ok ( ( ) )
1582+ }
1583+
15321584 /// When a `Projection` doesn't include the sort column, `sink_exprs_below_sort`
15331585 /// must produce `Projection → Sort → Projection` so the sort can see the column
15341586 /// while the outer projection restores the original output schema.
0 commit comments