@@ -11,7 +11,7 @@ use datafusion::{
1111 config:: ConfigOptions ,
1212 datasource:: source_as_provider,
1313 error:: Result ,
14- logical_expr:: { Expr , Extension , LogicalPlan , Projection , TableScan , TableSource } ,
14+ logical_expr:: { Expr , Extension , LogicalPlan , Projection , Sort , TableScan , TableSource } ,
1515 optimizer:: analyzer:: AnalyzerRule ,
1616 sql:: TableReference ,
1717} ;
@@ -460,6 +460,27 @@ fn wrap_projection(plan: LogicalPlan) -> Result<LogicalPlan> {
460460 // TODO: minimize requested columns
461461 match plan {
462462 LogicalPlan :: Projection ( _) => Ok ( plan) ,
463+ // Do NOT put a Projection on top of a Sort. The SQL Unparser
464+ // translates a Projection-over-Sort into a subquery:
465+ //
466+ // SELECT col1, col2 FROM (SELECT col1, col2 FROM t ORDER BY col1)
467+ //
468+ // which buries the ORDER BY inside the subquery. SQL does not
469+ // guarantee that subquery ordering propagates to the outer query, so
470+ // multi-batch results can arrive in the wrong order.
471+ //
472+ // Instead, push the Projection below the Sort so the Unparser
473+ // generates a top-level ORDER BY:
474+ //
475+ // SELECT col1, col2 FROM t ORDER BY col1
476+ LogicalPlan :: Sort ( sort) => {
477+ let wrapped_input = wrap_projection ( Arc :: unwrap_or_clone ( sort. input ) ) ?;
478+ Ok ( LogicalPlan :: Sort ( Sort {
479+ expr : sort. expr ,
480+ input : Arc :: new ( wrapped_input) ,
481+ fetch : sort. fetch ,
482+ } ) )
483+ }
463484 _ => {
464485 let expr = plan
465486 . schema ( )
@@ -529,3 +550,74 @@ pub fn get_table_source(
529550 // Return original FederatedTableSource
530551 Ok ( Some ( Arc :: clone ( & wrapper. source ) ) )
531552}
553+
554+ #[ cfg( test) ]
555+ mod tests {
556+ use super :: * ;
557+ use datafusion:: arrow:: datatypes:: { DataType , Field , Schema } ;
558+ use datafusion:: common:: DFSchema ;
559+ use datafusion:: logical_expr:: { EmptyRelation , LogicalPlan , Sort , SortExpr } ;
560+ use datafusion:: prelude:: col;
561+
562+ fn make_empty_plan ( fields : Vec < Field > ) -> LogicalPlan {
563+ let schema = Arc :: new ( Schema :: new ( fields) ) ;
564+ let df_schema = Arc :: new ( DFSchema :: try_from ( schema. as_ref ( ) . clone ( ) ) . unwrap ( ) ) ;
565+ LogicalPlan :: EmptyRelation ( EmptyRelation {
566+ produce_one_row : false ,
567+ schema : df_schema,
568+ } )
569+ }
570+
571+ /// `wrap_projection` on a plain non-Sort plan adds a Projection at the top.
572+ #[ test]
573+ fn wrap_projection_adds_projection_over_non_sort ( ) {
574+ let plan = make_empty_plan ( vec ! [ Field :: new( "a" , DataType :: Int64 , false ) ] ) ;
575+ let wrapped = wrap_projection ( plan) . unwrap ( ) ;
576+ assert ! (
577+ matches!( wrapped, LogicalPlan :: Projection ( _) ) ,
578+ "expected Projection at top, got: {}" ,
579+ wrapped. display_indent( )
580+ ) ;
581+ }
582+
583+ /// `wrap_projection` on a Sort must NOT put a Projection on top of the
584+ /// Sort — that would push ORDER BY into a SQL subquery and break ordering
585+ /// when the remote engine returns multiple batches.
586+ ///
587+ /// Instead the Projection must be pushed *below* the Sort, so the SQL
588+ /// Unparser can emit a top-level ORDER BY.
589+ #[ test]
590+ fn wrap_projection_does_not_bury_sort_in_subquery ( ) {
591+ let leaf = make_empty_plan ( vec ! [
592+ Field :: new( "id" , DataType :: Int64 , false ) ,
593+ Field :: new( "name" , DataType :: Utf8 , false ) ,
594+ ] ) ;
595+ let sort_node = LogicalPlan :: Sort ( Sort {
596+ expr : vec ! [ SortExpr {
597+ expr: col( "id" ) ,
598+ asc: true ,
599+ nulls_first: false ,
600+ } ] ,
601+ input : Arc :: new ( leaf) ,
602+ fetch : None ,
603+ } ) ;
604+
605+ let wrapped = wrap_projection ( sort_node) . unwrap ( ) ;
606+
607+ // The top-level node must still be a Sort, not a Projection.
608+ assert ! (
609+ matches!( wrapped, LogicalPlan :: Sort ( _) ) ,
610+ "wrap_projection must not bury Sort under Projection; got: {}" ,
611+ wrapped. display_indent( )
612+ ) ;
613+
614+ // The Sort's direct input must now be a Projection.
615+ if let LogicalPlan :: Sort ( Sort { input, .. } ) = & wrapped {
616+ assert ! (
617+ matches!( input. as_ref( ) , LogicalPlan :: Projection ( _) ) ,
618+ "expected Projection as Sort's input after wrap_projection; got: {}" ,
619+ input. display_indent( )
620+ ) ;
621+ }
622+ }
623+ }
0 commit comments