@@ -10,7 +10,6 @@ use datafusion_common::DataFusionError;
1010use datafusion_common:: Result as DFResult ;
1111use datafusion_common:: ScalarValue ;
1212use datafusion_common:: exec_datafusion_err;
13- use datafusion_datasource:: FileRange ;
1413use datafusion_datasource:: PartitionedFile ;
1514use datafusion_datasource:: TableSchema ;
1615use datafusion_datasource:: file_stream:: FileOpenFuture ;
@@ -30,18 +29,21 @@ use futures::FutureExt;
3029use futures:: StreamExt ;
3130use futures:: TryStreamExt ;
3231use futures:: stream;
32+ use itertools:: Itertools ;
3333use object_store:: path:: Path ;
3434use tracing:: Instrument ;
35- use vortex:: array:: ArrayRef ;
3635use vortex:: array:: VortexSessionExecute ;
3736use vortex:: array:: arrow:: ArrowArrayExecutor ;
37+ use vortex:: dtype:: FieldMask ;
3838use vortex:: error:: VortexError ;
39+ use vortex:: error:: VortexExpect ;
3940use vortex:: file:: OpenOptionsSessionExt ;
4041use vortex:: io:: InstrumentedReadAt ;
4142use vortex:: layout:: LayoutReader ;
4243use vortex:: metrics:: Label ;
4344use vortex:: metrics:: MetricsRegistry ;
4445use vortex:: scan:: ScanBuilder ;
46+ use vortex:: scan:: SplitBy ;
4547use vortex:: session:: VortexSession ;
4648use vortex_utils:: aliases:: dash_map:: DashMap ;
4749use vortex_utils:: aliases:: dash_map:: Entry ;
@@ -88,6 +90,8 @@ pub(crate) struct VortexOpener {
8890 /// To save on the overhead of reparsing FlatBuffers and rebuilding the layout tree, we cache
8991 /// a file reader the first time we read a file.
9092 pub layout_readers : Arc < DashMap < Path , Weak < dyn LayoutReader > > > ,
93+ /// Shared full-file natural split ranges keyed by file path.
94+ pub natural_split_ranges : Arc < DashMap < Path , Arc < [ Range < u64 > ] > > > ,
9195 /// Whether the query has output ordering specified
9296 pub has_output_ordering : bool ,
9397
@@ -124,7 +128,8 @@ impl FileOpener for VortexOpener {
124128 let unified_file_schema = self . table_schema . file_schema ( ) . clone ( ) ;
125129 let batch_size = self . batch_size ;
126130 let limit = self . limit ;
127- let layout_reader = self . layout_readers . clone ( ) ;
131+ let layout_reader = Arc :: clone ( & self . layout_readers ) ;
132+ let natural_split_ranges = Arc :: clone ( & self . natural_split_ranges ) ;
128133 let has_output_ordering = self . has_output_ordering ;
129134 let scan_concurrency = self . scan_concurrency ;
130135
@@ -296,6 +301,12 @@ impl FileOpener for VortexOpener {
296301 }
297302 } ;
298303
304+ let natural_split_ranges = natural_split_ranges_for_file (
305+ natural_split_ranges. as_ref ( ) ,
306+ & file. object_meta . location ,
307+ & layout_reader,
308+ ) ?;
309+
299310 let mut scan_builder = ScanBuilder :: new ( session. clone ( ) , layout_reader) ;
300311
301312 if let Some ( extensions) = file. extensions
@@ -305,12 +316,22 @@ impl FileOpener for VortexOpener {
305316 }
306317
307318 if let Some ( file_range) = file. range {
308- scan_builder = apply_byte_range (
309- file_range,
319+ let byte_range = Range {
320+ start : u64:: try_from ( file_range. start )
321+ . map_err ( |_| exec_datafusion_err ! ( "Vortex file range start is negative" ) ) ?,
322+ end : u64:: try_from ( file_range. end )
323+ . map_err ( |_| exec_datafusion_err ! ( "Vortex file range end is negative" ) ) ?,
324+ } ;
325+
326+ let Some ( row_range) = split_aligned_row_range (
327+ byte_range,
310328 file. object_meta . size ,
311- vxf. row_count ( ) ,
312- scan_builder,
313- ) ;
329+ natural_split_ranges. as_ref ( ) ,
330+ ) else {
331+ return Ok ( stream:: empty ( ) . boxed ( ) ) ;
332+ } ;
333+
334+ scan_builder = scan_builder. with_row_range ( row_range) ;
314335 }
315336
316337 let filter = filter
@@ -415,31 +436,74 @@ impl FileOpener for VortexOpener {
415436 }
416437}
417438
418- /// If the file has a [`FileRange`], we translate it into a row range in the file for the scan.
419- fn apply_byte_range (
420- file_range : FileRange ,
421- total_size : u64 ,
422- row_count : u64 ,
423- scan_builder : ScanBuilder < ArrayRef > ,
424- ) -> ScanBuilder < ArrayRef > {
425- let row_range = byte_range_to_row_range (
426- file_range. start as u64 ..file_range. end as u64 ,
427- row_count,
428- total_size,
429- ) ;
430-
431- scan_builder. with_row_range ( row_range)
439+ fn natural_split_ranges_for_file (
440+ natural_split_ranges : & DashMap < Path , Arc < [ Range < u64 > ] > > ,
441+ path : & Path ,
442+ layout_reader : & Arc < dyn LayoutReader > ,
443+ ) -> DFResult < Arc < [ Range < u64 > ] > > {
444+ if let Some ( split_ranges) = natural_split_ranges. get ( path) {
445+ return Ok ( Arc :: clone ( split_ranges. value ( ) ) ) ;
446+ }
447+
448+ let split_ranges = compute_natural_split_ranges ( layout_reader. as_ref ( ) ) ?;
449+
450+ match natural_split_ranges. entry ( path. clone ( ) ) {
451+ Entry :: Occupied ( entry) => Ok ( Arc :: clone ( entry. get ( ) ) ) ,
452+ Entry :: Vacant ( entry) => {
453+ entry. insert ( Arc :: clone ( & split_ranges) ) ;
454+ Ok ( split_ranges)
455+ }
456+ }
432457}
433458
434- fn byte_range_to_row_range ( byte_range : Range < u64 > , row_count : u64 , total_size : u64 ) -> Range < u64 > {
435- let average_row = total_size / row_count;
436- assert ! ( average_row > 0 , "A row must always have at least one byte" ) ;
459+ fn compute_natural_split_ranges ( layout_reader : & dyn LayoutReader ) -> DFResult < Arc < [ Range < u64 > ] > > {
460+ let row_count = layout_reader. row_count ( ) ;
461+ let row_range = 0 ..row_count;
462+ let split_points: Vec < _ > = SplitBy :: Layout
463+ . splits ( layout_reader, & row_range, & [ FieldMask :: All ] )
464+ . map_err ( |e| exec_datafusion_err ! ( "Failed to compute Vortex natural splits: {e}" ) ) ?
465+ . into_iter ( )
466+ . tuple_windows ( )
467+ . map ( |( s, e) | s..e)
468+ . collect :: < Vec < _ > > ( ) ;
469+
470+ Ok ( split_points. into ( ) )
471+ }
437472
438- let start_row = byte_range. start / average_row;
439- let end_row = byte_range. end / average_row;
473+ /// Translate a DataFusion byte range to the contiguous natural split ranges it owns.
474+ fn split_aligned_row_range (
475+ byte_range : Range < u64 > ,
476+ total_size : u64 ,
477+ split_ranges : & [ Range < u64 > ] ,
478+ ) -> Option < Range < u64 > > {
479+ if byte_range. start >= byte_range. end {
480+ return None ;
481+ }
440482
441- // We take the min here as `end_row` might overshoot
442- start_row..u64:: min ( row_count, end_row)
483+ let row_count = split_ranges. last ( ) . map ( |split| split. end ) ?;
484+ if row_count == 0 {
485+ return None ;
486+ }
487+
488+ let mut owned_splits = split_ranges. iter ( ) . filter ( |split_range| {
489+ let midpoint_byte = split_midpoint_to_byte ( split_range, row_count, total_size) ;
490+ byte_range. contains ( & midpoint_byte)
491+ } ) ;
492+
493+ let first_split = owned_splits. next ( ) ?;
494+ let mut row_range = first_split. start ..first_split. end ;
495+ for split_range in owned_splits {
496+ row_range. end = split_range. end ;
497+ }
498+
499+ Some ( row_range)
500+ }
501+
502+ fn split_midpoint_to_byte ( split_range : & Range < u64 > , row_count : u64 , total_size : u64 ) -> u64 {
503+ let midpoint_row = split_range. start + ( split_range. end - split_range. start ) / 2 ;
504+ let midpoint_byte = ( u128:: from ( midpoint_row) * u128:: from ( total_size) ) / u128:: from ( row_count) ;
505+
506+ u64:: try_from ( midpoint_byte) . vortex_expect ( "midpoint byte projection should fit into u64" )
443507}
444508
445509#[ cfg( test) ]
@@ -491,43 +555,56 @@ mod tests {
491555 static SESSION : LazyLock < VortexSession > = LazyLock :: new ( VortexSession :: default) ;
492556
493557 #[ rstest]
494- #[ case( 0 ..100 , 100 , 100 , 0 ..100 ) ]
495- #[ case( 0 ..105 , 100 , 105 , 0 ..100 ) ]
496- #[ case( 0 ..50 , 100 , 105 , 0 ..50 ) ]
497- #[ case( 50 ..105 , 100 , 105 , 50 ..100 ) ]
498- #[ case( 0 ..1 , 4 , 8 , 0 ..0 ) ]
499- #[ case( 1 ..8 , 4 , 8 , 0 ..4 ) ]
500- fn test_range_translation (
558+ #[ case( 0 ..3 , 10 , vec![ 0 ..2 , 2 ..5 , 5 ..10 ] , Some ( 0 ..2 ) ) ]
559+ #[ case( 3 ..7 , 10 , vec![ 0 ..2 , 2 ..5 , 5 ..10 ] , Some ( 2 ..5 ) ) ]
560+ #[ case( 1 ..8 , 10 , vec![ 0 ..1 , 1 ..9 , 9 ..10 ] , Some ( 1 ..9 ) ) ]
561+ #[ case( 1 ..4 , 16 , vec![ 0 ..1 , 1 ..2 , 2 ..3 , 3 ..4 ] , None ) ]
562+ fn test_split_aligned_row_range (
501563 #[ case] byte_range : Range < u64 > ,
502- #[ case] row_count : u64 ,
503564 #[ case] total_size : u64 ,
504- #[ case] expected : Range < u64 > ,
565+ #[ case] split_ranges : Vec < Range < u64 > > ,
566+ #[ case] expected : Option < Range < u64 > > ,
505567 ) {
506568 assert_eq ! (
507- byte_range_to_row_range ( byte_range, row_count , total_size ) ,
569+ split_aligned_row_range ( byte_range, total_size , & split_ranges ) ,
508570 expected
509571 ) ;
510572 }
511573
512574 #[ test]
513- fn test_consecutive_ranges ( ) {
514- let row_count = 100 ;
515- let total_size = 429 ;
516- let bytes_a = 0 ..143 ;
517- let bytes_b = 143 ..286 ;
518- let bytes_c = 286 ..429 ;
519-
520- let rows_a = byte_range_to_row_range ( bytes_a, row_count, total_size) ;
521- let rows_b = byte_range_to_row_range ( bytes_b, row_count, total_size) ;
522- let rows_c = byte_range_to_row_range ( bytes_c, row_count, total_size) ;
523-
524- assert_eq ! ( rows_a. end - rows_a. start, 35 ) ;
525- assert_eq ! ( rows_b. end - rows_b. start, 36 ) ;
526- assert_eq ! ( rows_c. end - rows_c. start, 29 ) ;
527-
528- assert_eq ! ( rows_a. start, 0 ) ;
529- assert_eq ! ( rows_c. end, 100 ) ;
530- for ( left, right) in [ rows_a, rows_b, rows_c] . iter ( ) . tuple_windows ( ) {
575+ fn test_split_aligned_ranges_cover_splits_exactly_once ( ) {
576+ let split_ranges = vec ! [ 0 ..1 , 1 ..4 , 4 ..10 , 10 ..13 ] ;
577+ let byte_ranges = [ 0 ..4 , 4 ..8 , 8 ..12 , 12 ..16 ] ;
578+
579+ let assigned = byte_ranges
580+ . into_iter ( )
581+ . filter_map ( |byte_range| split_aligned_row_range ( byte_range, 16 , & split_ranges) )
582+ . collect :: < Vec < _ > > ( ) ;
583+
584+ assert_eq ! ( assigned, vec![ 0 ..4 , 4 ..10 , 10 ..13 ] ) ;
585+ assert_eq ! (
586+ assigned
587+ . iter( )
588+ . map( |range| range. end - range. start)
589+ . sum:: <u64 >( ) ,
590+ 13
591+ ) ;
592+
593+ let split_starts = split_ranges
594+ . iter ( )
595+ . map ( |range| range. start )
596+ . collect :: < Vec < _ > > ( ) ;
597+ let split_ends = split_ranges
598+ . iter ( )
599+ . map ( |range| range. end )
600+ . collect :: < Vec < _ > > ( ) ;
601+
602+ for range in & assigned {
603+ assert ! ( split_starts. contains( & range. start) ) ;
604+ assert ! ( split_ends. contains( & range. end) ) ;
605+ }
606+
607+ for ( left, right) in assigned. iter ( ) . tuple_windows ( ) {
531608 assert_eq ! ( left. end, right. start) ;
532609 }
533610 }
@@ -568,6 +645,7 @@ mod tests {
568645 limit : None ,
569646 metrics_registry : Arc :: new ( DefaultMetricsRegistry :: default ( ) ) ,
570647 layout_readers : Default :: default ( ) ,
648+ natural_split_ranges : Default :: default ( ) ,
571649 has_output_ordering : false ,
572650 expression_convertor : Arc :: new ( DefaultExpressionConvertor :: default ( ) ) ,
573651 file_metadata_cache : None ,
@@ -621,6 +699,33 @@ mod tests {
621699 Ok ( ( ) )
622700 }
623701
702+ #[ tokio:: test]
703+ async fn test_open_empty_file ( ) -> anyhow:: Result < ( ) > {
704+ use futures:: TryStreamExt ;
705+
706+ let object_store = Arc :: new ( InMemory :: new ( ) ) as Arc < dyn ObjectStore > ;
707+ let data_batch = record_batch ! ( ( "a" , Int32 , Vec :: <i32 >:: new( ) ) ) . unwrap ( ) ;
708+ let file_path = "part=1/empty.vortex" ;
709+ let file_size =
710+ write_arrow_to_vortex ( Arc :: clone ( & object_store) , file_path, data_batch. clone ( ) ) . await ?;
711+
712+ let file_schema = data_batch. schema ( ) ;
713+ // Parallel scans may attach a byte range even for empty files; the
714+ // opener must return early before attempting split-aligned translation.
715+ let file =
716+ PartitionedFile :: new_with_range ( file_path. to_string ( ) , file_size, 0 , file_size as i64 ) ;
717+
718+ let table_schema = TableSchema :: from_file_schema ( Arc :: clone ( & file_schema) ) ;
719+
720+ let opener = make_opener ( object_store, table_schema, None ) ;
721+ let stream = opener. open ( file) ?. await ?;
722+ let data = stream. try_collect :: < Vec < _ > > ( ) . await ?;
723+
724+ assert_eq ! ( data. len( ) , 0 ) ;
725+
726+ Ok ( ( ) )
727+ }
728+
624729 #[ rstest]
625730 #[ tokio:: test]
626731 async fn test_open_files_different_table_schema ( ) -> anyhow:: Result < ( ) > {
@@ -662,6 +767,7 @@ mod tests {
662767 limit : None ,
663768 metrics_registry : Arc :: new ( DefaultMetricsRegistry :: default ( ) ) ,
664769 layout_readers : Default :: default ( ) ,
770+ natural_split_ranges : Default :: default ( ) ,
665771 has_output_ordering : false ,
666772 expression_convertor : Arc :: new ( DefaultExpressionConvertor :: default ( ) ) ,
667773 file_metadata_cache : None ,
@@ -748,6 +854,7 @@ mod tests {
748854 limit : None ,
749855 metrics_registry : Arc :: new ( DefaultMetricsRegistry :: default ( ) ) ,
750856 layout_readers : Default :: default ( ) ,
857+ natural_split_ranges : Default :: default ( ) ,
751858 has_output_ordering : false ,
752859 expression_convertor : Arc :: new ( DefaultExpressionConvertor :: default ( ) ) ,
753860 file_metadata_cache : None ,
@@ -902,6 +1009,7 @@ mod tests {
9021009 limit : None ,
9031010 metrics_registry : Arc :: new ( DefaultMetricsRegistry :: default ( ) ) ,
9041011 layout_readers : Default :: default ( ) ,
1012+ natural_split_ranges : Default :: default ( ) ,
9051013 has_output_ordering : false ,
9061014 expression_convertor : Arc :: new ( DefaultExpressionConvertor :: default ( ) ) ,
9071015 file_metadata_cache : None ,
@@ -961,6 +1069,7 @@ mod tests {
9611069 limit : None ,
9621070 metrics_registry : Arc :: new ( DefaultMetricsRegistry :: default ( ) ) ,
9631071 layout_readers : Default :: default ( ) ,
1072+ natural_split_ranges : Default :: default ( ) ,
9641073 has_output_ordering : false ,
9651074 expression_convertor : Arc :: new ( DefaultExpressionConvertor :: default ( ) ) ,
9661075 file_metadata_cache : None ,
@@ -1162,6 +1271,7 @@ mod tests {
11621271 limit : None ,
11631272 metrics_registry : Arc :: new ( DefaultMetricsRegistry :: default ( ) ) ,
11641273 layout_readers : Default :: default ( ) ,
1274+ natural_split_ranges : Default :: default ( ) ,
11651275 has_output_ordering : false ,
11661276 expression_convertor : Arc :: new ( DefaultExpressionConvertor :: default ( ) ) ,
11671277 file_metadata_cache : None ,
0 commit comments