22// SPDX-FileCopyrightText: Copyright the Vortex contributors
33
44use std:: ops:: Range ;
5- use std:: sync:: Arc ;
6- use std:: sync:: Weak ;
5+ use std:: sync:: { Arc , Weak } ;
76
87use arrow_schema:: Schema ;
98use datafusion_common:: DataFusionError ;
@@ -43,8 +42,7 @@ use vortex::metrics::Label;
4342use vortex:: metrics:: MetricsRegistry ;
4443use vortex:: scan:: ScanBuilder ;
4544use vortex:: session:: VortexSession ;
46- use vortex_utils:: aliases:: dash_map:: DashMap ;
47- use vortex_utils:: aliases:: dash_map:: Entry ;
45+ use vortex_utils:: aliases:: dash_map:: { DashMap , Entry } ;
4846
4947use crate :: VortexAccessPlan ;
5048use crate :: convert:: exprs:: ExpressionConvertor ;
@@ -90,6 +88,7 @@ pub(crate) struct VortexOpener {
9088 pub layout_readers : Arc < DashMap < Path , Weak < dyn LayoutReader > > > ,
9189 /// Whether the query has output ordering specified
9290 pub has_output_ordering : bool ,
91+ }
9392
9493 pub expression_convertor: Arc <dyn ExpressionConvertor >,
9594 pub file_metadata_cache: Option <Arc <dyn FileMetadataCache >>,
@@ -99,7 +98,7 @@ pub(crate) struct VortexOpener {
9998}
10099
101100impl FileOpener for VortexOpener {
102- fn open ( & self , file : PartitionedFile ) -> DFResult < FileOpenFuture > {
101+ fn open ( & self , file_meta : FileMeta , file : PartitionedFile ) -> DFResult < FileOpenFuture > {
103102 let session = self . session . clone ( ) ;
104103 let metrics_registry = self . metrics_registry . clone ( ) ;
105104 let labels = vec ! [
@@ -158,7 +157,7 @@ impl FileOpener for VortexOpener {
158157 // - Partition column values (e.g., date=2024-01-01)
159158 // - File-level statistics (min/max values per column)
160159 let mut file_pruner = file_pruning_predicate
161- . filter ( |p | {
160+ . map ( |predicate | {
162161 // Only create pruner if we have dynamic expressions or file statistics
163162 // to work with. Static predicates without stats won't benefit from pruning.
164163 is_dynamic_physical_expr ( p) || file. has_statistics ( )
@@ -269,13 +268,13 @@ impl FileOpener for VortexOpener {
269268 let projector = leftover_projection. make_projector ( & stream_schema) ?;
270269
271270 // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
272- let layout_reader = match layout_reader. entry ( file . object_meta . location . clone ( ) ) {
271+ let layout_reader = match layout_reader. entry ( file_meta . object_meta . location . clone ( ) ) {
273272 Entry :: Occupied ( mut occupied_entry) => {
274273 if let Some ( reader) = occupied_entry. get ( ) . upgrade ( ) {
275- tracing :: trace!( "reusing layout reader for {}" , occupied_entry. key( ) ) ;
274+ log :: trace!( "reusing layout reader for {}" , occupied_entry. key( ) ) ;
276275 reader
277276 } else {
278- tracing :: trace!( "creating layout reader for {}" , occupied_entry. key( ) ) ;
277+ log :: trace!( "creating layout reader for {}" , occupied_entry. key( ) ) ;
279278 let reader = vxf. layout_reader ( ) . map_err ( |e| {
280279 DataFusionError :: Execution ( format ! (
281280 "Failed to create layout reader: {e}"
@@ -286,7 +285,7 @@ impl FileOpener for VortexOpener {
286285 }
287286 }
288287 Entry :: Vacant ( vacant_entry) => {
289- tracing :: trace!( "creating layout reader for {}" , vacant_entry. key( ) ) ;
288+ log :: trace!( "creating layout reader for {}" , vacant_entry. key( ) ) ;
290289 let reader = vxf. layout_reader ( ) . map_err ( |e| {
291290 DataFusionError :: Execution ( format ! ( "Failed to create layout reader: {e}" ) )
292291 } ) ?;
@@ -296,29 +295,22 @@ impl FileOpener for VortexOpener {
296295 }
297296 } ;
298297
299- let mut scan_builder = ScanBuilder :: new ( session. clone ( ) , layout_reader) ;
300-
301- if let Some ( extensions) = file. extensions
302- && let Some ( vortex_plan) = extensions. downcast_ref :: < VortexAccessPlan > ( )
303- {
304- scan_builder = vortex_plan. apply_to_builder ( scan_builder) ;
305- }
306-
307- if let Some ( file_range) = file. range {
298+ let mut scan_builder = ScanBuilder :: new ( session, layout_reader) ;
299+ if let Some ( file_range) = file_meta. range {
308300 scan_builder = apply_byte_range (
309301 file_range,
310- file . object_meta . size ,
302+ file_meta . object_meta . size ,
311303 vxf. row_count ( ) ,
312304 scan_builder,
313305 ) ;
314306 }
315307
316308 let filter = filter
317309 . and_then ( |f| {
318- // Verify that all filters we've accepted from DataFusion get pushed down.
319- // This will only fail if the user has not configured a suitable
320- // PhysicalExprAdapterFactory on the file source to handle rewriting the
321- // expression to handle missing/reordered columns in the Vortex file.
310+ let exprs = split_conjunction ( & f )
311+ . into_iter ( )
312+ . filter ( |expr| can_be_pushed_down ( expr , & predicate_file_schema ) )
313+ . collect :: < Vec < _ > > ( ) ;
322314
323315 let ( pushed, unpushed) : ( Vec < PhysicalExprRef > , Vec < PhysicalExprRef > ) =
324316 split_conjunction ( & f)
@@ -391,7 +383,7 @@ impl FileOpener for VortexOpener {
391383 . map_err ( move |e : VortexError | {
392384 DataFusionError :: External ( Box :: new ( e. with_context ( format ! (
393385 "Failed to read Vortex file: {}" ,
394- file . object_meta. location
386+ file_meta . object_meta. location
395387 ) ) ) )
396388 } )
397389 . try_flatten ( )
@@ -405,17 +397,21 @@ impl FileOpener for VortexOpener {
405397 . boxed ( ) ;
406398
407399 if let Some ( file_pruner) = file_pruner {
408- Ok ( PrunableStream :: new ( file_pruner, stream) . boxed ( ) )
400+ Ok ( Box :: pin ( EarlyStoppingStream :: new (
401+ stream,
402+ file_pruner,
403+ Count :: new ( ) ,
404+ ) ) )
409405 } else {
410- Ok ( stream)
406+ Ok ( Box :: pin ( stream) )
411407 }
412408 }
413409 . in_current_span ( )
414410 . boxed ( ) )
415411 }
416412}
417413
418- /// If the file has a [`FileRange`], we translate it into a row range in the file for the scan.
414+ /// If the file has a [`FileRange`](datafusion::datasource::listing::FileRange) , we translate it into a row range in the file for the scan.
419415fn apply_byte_range (
420416 file_range : FileRange ,
421417 total_size : u64 ,
@@ -447,7 +443,6 @@ mod tests {
447443 use std:: sync:: Arc ;
448444 use std:: sync:: LazyLock ;
449445
450- use arrow_schema:: Field ;
451446 use arrow_schema:: Fields ;
452447 use arrow_schema:: SchemaRef ;
453448 use datafusion:: arrow:: array:: DictionaryArray ;
@@ -458,7 +453,6 @@ mod tests {
458453 use datafusion:: arrow:: datatypes:: Schema ;
459454 use datafusion:: arrow:: datatypes:: UInt32Type ;
460455 use datafusion:: arrow:: util:: display:: FormatOptions ;
461- use datafusion:: arrow:: util:: pretty:: pretty_format_batches_with_options;
462456 use datafusion:: common:: record_batch;
463457 use datafusion:: logical_expr:: col;
464458 use datafusion:: logical_expr:: lit;
@@ -595,7 +589,7 @@ mod tests {
595589
596590 // filter matches partition value
597591 let filter = col ( "part" ) . eq ( lit ( 1 ) ) ;
598- let filter = logical2physical ( & filter, table_schema. table_schema ( ) ) ;
592+ let filter = logical2physical ( & filter, table_schema. as_ref ( ) ) ;
599593
600594 let opener = make_opener ( object_store. clone ( ) , table_schema. clone ( ) , Some ( filter) ) ;
601595 let stream = opener. open ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
@@ -604,11 +598,11 @@ mod tests {
604598 let num_batches = data. len ( ) ;
605599 let num_rows = data. iter ( ) . map ( |rb| rb. num_rows ( ) ) . sum :: < usize > ( ) ;
606600
607- assert_eq ! ( ( num_batches, num_rows) , ( 1 , 3 ) ) ;
601+ assert_eq ! ( ( num_batches, num_rows) , expected_result1 ) ;
608602
609603 // filter doesn't matches partition value
610604 let filter = col ( "part" ) . eq ( lit ( 2 ) ) ;
611- let filter = logical2physical ( & filter, table_schema. table_schema ( ) ) ;
605+ let filter = logical2physical ( & filter, table_schema. as_ref ( ) ) ;
612606
613607 let opener = make_opener ( object_store. clone ( ) , table_schema. clone ( ) , Some ( filter) ) ;
614608 let stream = opener. open ( file. clone ( ) ) . unwrap ( ) . await . unwrap ( ) ;
@@ -625,29 +619,18 @@ mod tests {
625619 #[ tokio:: test]
626620 async fn test_open_files_different_table_schema ( ) -> anyhow:: Result < ( ) > {
627621 let object_store = Arc :: new ( InMemory :: new ( ) ) as Arc < dyn ObjectStore > ;
622+ let file1_path = "/path/file1.vortex" ;
623+ let batch1 = record_batch ! ( ( "a" , Int32 , vec![ Some ( 1 ) , Some ( 2 ) , Some ( 3 ) ] ) ) . unwrap ( ) ;
624+ let data_size1 = write_arrow_to_vortex ( object_store. clone ( ) , file1_path, batch1) . await ?;
625+ let file1 = PartitionedFile :: new ( file1_path. to_string ( ) , data_size1) ;
628626
629- let file1 = {
630- let file1_path = "/path/file1.vortex" ;
631- let batch1 = record_batch ! ( ( "a" , Int32 , vec![ Some ( 1 ) , Some ( 2 ) , Some ( 3 ) ] ) ) . unwrap ( ) ;
632- let data_size1 =
633- write_arrow_to_vortex ( object_store. clone ( ) , file1_path, batch1) . await ?;
634- PartitionedFile :: new ( file1_path. to_string ( ) , data_size1)
635- } ;
636-
637- let file2 = {
638- let file2_path = "/path/file2.vortex" ;
639- let batch2 = record_batch ! ( ( "a" , Int16 , vec![ Some ( -1 ) , Some ( -2 ) , Some ( -3 ) ] ) ) . unwrap ( ) ;
640- let data_size2 =
641- write_arrow_to_vortex ( object_store. clone ( ) , file2_path, batch2) . await ?;
642- PartitionedFile :: new ( file2_path. to_string ( ) , data_size2)
643- } ;
627+ let file2_path = "/path/file2.vortex" ;
628+ let batch2 = record_batch ! ( ( "a" , Int16 , vec![ Some ( -1 ) , Some ( -2 ) , Some ( -3 ) ] ) ) . unwrap ( ) ;
629+ let data_size2 = write_arrow_to_vortex ( object_store. clone ( ) , file2_path, batch2) . await ?;
630+ let file2 = PartitionedFile :: new ( file1_path. to_string ( ) , data_size1) ;
644631
645632 // Table schema has can accommodate both files
646- let table_schema = TableSchema :: from_file_schema ( Arc :: new ( Schema :: new ( vec ! [ Field :: new(
647- "a" ,
648- DataType :: Int32 ,
649- true ,
650- ) ] ) ) ) ;
633+ let table_schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int32 , true ) ] ) ) ;
651634
652635 let make_opener = |filter| VortexOpener {
653636 partition : 1 ,
@@ -670,10 +653,12 @@ mod tests {
670653 } ;
671654
672655 let filter = col ( "a" ) . lt ( lit ( 100_i32 ) ) ;
673- let filter = logical2physical ( & filter, table_schema. table_schema ( ) ) ;
656+ let filter = logical2physical ( & filter, table_schema. as_ref ( ) ) ;
674657
675658 let opener1 = make_opener ( filter. clone ( ) ) ;
676- let stream = opener1. open ( file1) ?. await ?;
659+ let stream = opener1
660+ . open ( make_meta ( file1_path, data_size1) , file1) ?
661+ . await ?;
677662
678663 let format_opts = FormatOptions :: new ( ) . with_types_info ( true ) ;
679664
@@ -690,7 +675,9 @@ mod tests {
690675 " ) ;
691676
692677 let opener2 = make_opener ( filter. clone ( ) ) ;
693- let stream = opener2. open ( file2) ?. await ?;
678+ let stream = opener2
679+ . open ( make_meta ( file2_path, data_size2) , file2) ?
680+ . await ?;
694681
695682 let data = stream. try_collect :: < Vec < _ > > ( ) . await ?;
696683 assert_snapshot ! ( pretty_format_batches_with_options( & data, & format_opts) ?. to_string( ) , @r"
@@ -806,7 +793,7 @@ mod tests {
806793 let data_size = write_arrow_to_vortex ( object_store. clone ( ) , file_path, batch) . await ?;
807794
808795 // Table schema has an extra utf8 field.
809- let table_schema = TableSchema :: from_file_schema ( Arc :: new ( Schema :: new ( vec ! [ Field :: new(
796+ let table_schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
810797 "my_struct" ,
811798 DataType :: Struct ( Fields :: from( vec![
812799 Field :: new(
@@ -822,23 +809,25 @@ mod tests {
822809 Field :: new( "field3" , DataType :: Utf8 , true ) ,
823810 ] ) ) ,
824811 true ,
825- ) ] ) ) ) ;
812+ ) ] ) ) ;
826813
827- let opener = make_opener (
828- object_store. clone ( ) ,
829- table_schema. clone ( ) ,
830- // expression references my_struct column which has different fields in each
831- // field.
832- Some ( logical2physical (
814+ let opener = VortexOpener {
815+ session : SESSION . clone ( ) ,
816+ object_store : object_store. clone ( ) ,
817+ projection : None ,
818+ filter : Some ( logical2physical (
833819 & col ( "my_struct" ) . is_not_null ( ) ,
834- table_schema. table_schema ( ) ,
820+ & table_schema,
835821 ) ) ,
836822 ) ;
837823
838824 // The opener should be able to open the file with a filter on the
839825 // struct column.
840826 let data = opener
841- . open ( PartitionedFile :: new ( file_path. to_string ( ) , data_size) ) ?
827+ . open (
828+ make_meta ( file_path, data_size) ,
829+ PartitionedFile :: new ( file_path. to_string ( ) , data_size) ,
830+ ) ?
842831 . await ?
843832 . try_collect :: < Vec < _ > > ( )
844833 . await ?;
0 commit comments