11use std:: sync:: Arc ;
22
3- use arrow_flight:: sql:: client:: FlightSqlServiceClient ;
3+ use arrow_flight:: { error :: FlightError , sql:: client:: FlightSqlServiceClient } ;
44use async_trait:: async_trait;
55use datafusion:: arrow:: { datatypes:: SchemaRef , error:: ArrowError } ;
66use datafusion:: {
77 error:: { DataFusionError , Result } ,
8- physical_plan:: { stream:: RecordBatchStreamAdapter , SendableRecordBatchStream } ,
8+ physical_plan:: { stream:: RecordBatchStreamAdapter , PhysicalExpr , SendableRecordBatchStream } ,
99 sql:: unparser:: dialect:: { DefaultDialect , Dialect } ,
1010} ;
1111use datafusion_federation:: sql:: SQLExecutor ;
@@ -38,14 +38,14 @@ async fn make_flight_sql_stream(
3838 let flight_info = client
3939 . execute ( sql. to_string ( ) , None )
4040 . await
41- . map_err ( arrow_error_to_df ) ?;
41+ . map_err ( flight_error_to_df ) ?;
4242
4343 let mut flight_data_streams = Vec :: with_capacity ( flight_info. endpoint . len ( ) ) ;
4444 for endpoint in flight_info. endpoint {
4545 let ticket = endpoint. ticket . ok_or ( DataFusionError :: Execution (
4646 "FlightEndpoint missing ticket!" . to_string ( ) ,
4747 ) ) ?;
48- let flight_data = client. do_get ( ticket) . await ?;
48+ let flight_data = client. do_get ( ticket) . await . map_err ( flight_error_to_df ) ?;
4949 flight_data_streams. push ( flight_data) ;
5050 }
5151
@@ -66,7 +66,12 @@ impl SQLExecutor for FlightSQLExecutor {
6666 fn compute_context ( & self ) -> Option < String > {
6767 Some ( self . context . clone ( ) )
6868 }
69- fn execute ( & self , sql : & str , schema : SchemaRef ) -> Result < SendableRecordBatchStream > {
69+ fn execute (
70+ & self ,
71+ sql : & str ,
72+ schema : SchemaRef ,
73+ _filters : & [ Arc < dyn PhysicalExpr > ] ,
74+ ) -> Result < SendableRecordBatchStream > {
7075 let future_stream =
7176 make_flight_sql_stream ( sql. to_string ( ) , self . client . clone ( ) , Arc :: clone ( & schema) ) ;
7277 let stream = futures:: stream:: once ( future_stream) . try_flatten ( ) ;
@@ -90,7 +95,7 @@ impl SQLExecutor for FlightSQLExecutor {
9095 . clone ( )
9196 . execute ( sql, None )
9297 . await
93- . map_err ( arrow_error_to_df ) ?;
98+ . map_err ( flight_error_to_df ) ?;
9499 let schema = flight_info. try_decode_schema ( ) . map_err ( arrow_error_to_df) ?;
95100 Ok ( Arc :: new ( schema) )
96101 }
@@ -100,6 +105,10 @@ impl SQLExecutor for FlightSQLExecutor {
100105 }
101106}
102107
108+ fn flight_error_to_df ( err : FlightError ) -> DataFusionError {
109+ DataFusionError :: External ( format ! ( "flight error: {err:?}" ) . into ( ) )
110+ }
111+
103112fn arrow_error_to_df ( err : ArrowError ) -> DataFusionError {
104113 DataFusionError :: External ( format ! ( "arrow error: {err:?}" ) . into ( ) )
105114}
0 commit comments