Skip to content

Commit 2e80208

Browse files
AdamGSsgrebnov
authored andcommitted
Use layout file splits when DF re-partitions individual files (vortex-data#7591)
Instead of just splitting files arbitrarily, align it with split layouts to make better use of Vortex's internal pruning and other behaviors. --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent d694abd commit 2e80208

2 files changed

Lines changed: 174 additions & 59 deletions

File tree

vortex-datafusion/src/persistent/opener.rs

Lines changed: 167 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use datafusion_common::DataFusionError;
1010
use datafusion_common::Result as DFResult;
1111
use datafusion_common::ScalarValue;
1212
use datafusion_common::exec_datafusion_err;
13-
use datafusion_datasource::FileRange;
1413
use datafusion_datasource::PartitionedFile;
1514
use datafusion_datasource::TableSchema;
1615
use datafusion_datasource::file_stream::FileOpenFuture;
@@ -30,18 +29,21 @@ use futures::FutureExt;
3029
use futures::StreamExt;
3130
use futures::TryStreamExt;
3231
use futures::stream;
32+
use itertools::Itertools;
3333
use object_store::path::Path;
3434
use tracing::Instrument;
35-
use vortex::array::ArrayRef;
3635
use vortex::array::VortexSessionExecute;
3736
use vortex::array::arrow::ArrowArrayExecutor;
37+
use vortex::dtype::FieldMask;
3838
use vortex::error::VortexError;
39+
use vortex::error::VortexExpect;
3940
use vortex::file::OpenOptionsSessionExt;
4041
use vortex::io::InstrumentedReadAt;
4142
use vortex::layout::LayoutReader;
4243
use vortex::metrics::Label;
4344
use vortex::metrics::MetricsRegistry;
4445
use vortex::scan::ScanBuilder;
46+
use vortex::scan::SplitBy;
4547
use vortex::session::VortexSession;
4648
use vortex_utils::aliases::dash_map::DashMap;
4749
use vortex_utils::aliases::dash_map::Entry;
@@ -88,6 +90,8 @@ pub(crate) struct VortexOpener {
8890
/// To save on the overhead of reparsing FlatBuffers and rebuilding the layout tree, we cache
8991
/// a file reader the first time we read a file.
9092
pub layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
93+
/// Shared full-file natural split ranges keyed by file path.
94+
pub natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
9195
/// Whether the query has output ordering specified
9296
pub has_output_ordering: bool,
9397

@@ -124,7 +128,8 @@ impl FileOpener for VortexOpener {
124128
let unified_file_schema = self.table_schema.file_schema().clone();
125129
let batch_size = self.batch_size;
126130
let limit = self.limit;
127-
let layout_reader = self.layout_readers.clone();
131+
let layout_reader = Arc::clone(&self.layout_readers);
132+
let natural_split_ranges = Arc::clone(&self.natural_split_ranges);
128133
let has_output_ordering = self.has_output_ordering;
129134
let scan_concurrency = self.scan_concurrency;
130135

@@ -296,6 +301,12 @@ impl FileOpener for VortexOpener {
296301
}
297302
};
298303

304+
let natural_split_ranges = natural_split_ranges_for_file(
305+
natural_split_ranges.as_ref(),
306+
&file.object_meta.location,
307+
&layout_reader,
308+
)?;
309+
299310
let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);
300311

301312
if let Some(extensions) = file.extensions
@@ -305,12 +316,22 @@ impl FileOpener for VortexOpener {
305316
}
306317

307318
if let Some(file_range) = file.range {
308-
scan_builder = apply_byte_range(
309-
file_range,
319+
let byte_range = Range {
320+
start: u64::try_from(file_range.start)
321+
.map_err(|_| exec_datafusion_err!("Vortex file range start is negative"))?,
322+
end: u64::try_from(file_range.end)
323+
.map_err(|_| exec_datafusion_err!("Vortex file range end is negative"))?,
324+
};
325+
326+
let Some(row_range) = split_aligned_row_range(
327+
byte_range,
310328
file.object_meta.size,
311-
vxf.row_count(),
312-
scan_builder,
313-
);
329+
natural_split_ranges.as_ref(),
330+
) else {
331+
return Ok(stream::empty().boxed());
332+
};
333+
334+
scan_builder = scan_builder.with_row_range(row_range);
314335
}
315336

316337
let filter = filter
@@ -415,31 +436,74 @@ impl FileOpener for VortexOpener {
415436
}
416437
}
417438

418-
/// If the file has a [`FileRange`], we translate it into a row range in the file for the scan.
419-
fn apply_byte_range(
420-
file_range: FileRange,
421-
total_size: u64,
422-
row_count: u64,
423-
scan_builder: ScanBuilder<ArrayRef>,
424-
) -> ScanBuilder<ArrayRef> {
425-
let row_range = byte_range_to_row_range(
426-
file_range.start as u64..file_range.end as u64,
427-
row_count,
428-
total_size,
429-
);
430-
431-
scan_builder.with_row_range(row_range)
439+
fn natural_split_ranges_for_file(
440+
natural_split_ranges: &DashMap<Path, Arc<[Range<u64>]>>,
441+
path: &Path,
442+
layout_reader: &Arc<dyn LayoutReader>,
443+
) -> DFResult<Arc<[Range<u64>]>> {
444+
if let Some(split_ranges) = natural_split_ranges.get(path) {
445+
return Ok(Arc::clone(split_ranges.value()));
446+
}
447+
448+
let split_ranges = compute_natural_split_ranges(layout_reader.as_ref())?;
449+
450+
match natural_split_ranges.entry(path.clone()) {
451+
Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
452+
Entry::Vacant(entry) => {
453+
entry.insert(Arc::clone(&split_ranges));
454+
Ok(split_ranges)
455+
}
456+
}
432457
}
433458

434-
fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u64) -> Range<u64> {
435-
let average_row = total_size / row_count;
436-
assert!(average_row > 0, "A row must always have at least one byte");
459+
fn compute_natural_split_ranges(layout_reader: &dyn LayoutReader) -> DFResult<Arc<[Range<u64>]>> {
460+
let row_count = layout_reader.row_count();
461+
let row_range = 0..row_count;
462+
let split_points: Vec<_> = SplitBy::Layout
463+
.splits(layout_reader, &row_range, &[FieldMask::All])
464+
.map_err(|e| exec_datafusion_err!("Failed to compute Vortex natural splits: {e}"))?
465+
.into_iter()
466+
.tuple_windows()
467+
.map(|(s, e)| s..e)
468+
.collect::<Vec<_>>();
469+
470+
Ok(split_points.into())
471+
}
437472

438-
let start_row = byte_range.start / average_row;
439-
let end_row = byte_range.end / average_row;
473+
/// Translate a DataFusion byte range to the contiguous natural split ranges it owns.
474+
fn split_aligned_row_range(
475+
byte_range: Range<u64>,
476+
total_size: u64,
477+
split_ranges: &[Range<u64>],
478+
) -> Option<Range<u64>> {
479+
if byte_range.start >= byte_range.end {
480+
return None;
481+
}
440482

441-
// We take the min here as `end_row` might overshoot
442-
start_row..u64::min(row_count, end_row)
483+
let row_count = split_ranges.last().map(|split| split.end)?;
484+
if row_count == 0 {
485+
return None;
486+
}
487+
488+
let mut owned_splits = split_ranges.iter().filter(|split_range| {
489+
let midpoint_byte = split_midpoint_to_byte(split_range, row_count, total_size);
490+
byte_range.contains(&midpoint_byte)
491+
});
492+
493+
let first_split = owned_splits.next()?;
494+
let mut row_range = first_split.start..first_split.end;
495+
for split_range in owned_splits {
496+
row_range.end = split_range.end;
497+
}
498+
499+
Some(row_range)
500+
}
501+
502+
fn split_midpoint_to_byte(split_range: &Range<u64>, row_count: u64, total_size: u64) -> u64 {
503+
let midpoint_row = split_range.start + (split_range.end - split_range.start) / 2;
504+
let midpoint_byte = (u128::from(midpoint_row) * u128::from(total_size)) / u128::from(row_count);
505+
506+
u64::try_from(midpoint_byte).vortex_expect("midpoint byte projection should fit into u64")
443507
}
444508

445509
#[cfg(test)]
@@ -491,43 +555,56 @@ mod tests {
491555
static SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
492556

493557
#[rstest]
494-
#[case(0..100, 100, 100, 0..100)]
495-
#[case(0..105, 100, 105, 0..100)]
496-
#[case(0..50, 100, 105, 0..50)]
497-
#[case(50..105, 100, 105, 50..100)]
498-
#[case(0..1, 4, 8, 0..0)]
499-
#[case(1..8, 4, 8, 0..4)]
500-
fn test_range_translation(
558+
#[case(0..3, 10, vec![0..2, 2..5, 5..10], Some(0..2))]
559+
#[case(3..7, 10, vec![0..2, 2..5, 5..10], Some(2..5))]
560+
#[case(1..8, 10, vec![0..1, 1..9, 9..10], Some(1..9))]
561+
#[case(1..4, 16, vec![0..1, 1..2, 2..3, 3..4], None)]
562+
fn test_split_aligned_row_range(
501563
#[case] byte_range: Range<u64>,
502-
#[case] row_count: u64,
503564
#[case] total_size: u64,
504-
#[case] expected: Range<u64>,
565+
#[case] split_ranges: Vec<Range<u64>>,
566+
#[case] expected: Option<Range<u64>>,
505567
) {
506568
assert_eq!(
507-
byte_range_to_row_range(byte_range, row_count, total_size),
569+
split_aligned_row_range(byte_range, total_size, &split_ranges),
508570
expected
509571
);
510572
}
511573

512574
#[test]
513-
fn test_consecutive_ranges() {
514-
let row_count = 100;
515-
let total_size = 429;
516-
let bytes_a = 0..143;
517-
let bytes_b = 143..286;
518-
let bytes_c = 286..429;
519-
520-
let rows_a = byte_range_to_row_range(bytes_a, row_count, total_size);
521-
let rows_b = byte_range_to_row_range(bytes_b, row_count, total_size);
522-
let rows_c = byte_range_to_row_range(bytes_c, row_count, total_size);
523-
524-
assert_eq!(rows_a.end - rows_a.start, 35);
525-
assert_eq!(rows_b.end - rows_b.start, 36);
526-
assert_eq!(rows_c.end - rows_c.start, 29);
527-
528-
assert_eq!(rows_a.start, 0);
529-
assert_eq!(rows_c.end, 100);
530-
for (left, right) in [rows_a, rows_b, rows_c].iter().tuple_windows() {
575+
fn test_split_aligned_ranges_cover_splits_exactly_once() {
576+
let split_ranges = vec![0..1, 1..4, 4..10, 10..13];
577+
let byte_ranges = [0..4, 4..8, 8..12, 12..16];
578+
579+
let assigned = byte_ranges
580+
.into_iter()
581+
.filter_map(|byte_range| split_aligned_row_range(byte_range, 16, &split_ranges))
582+
.collect::<Vec<_>>();
583+
584+
assert_eq!(assigned, vec![0..4, 4..10, 10..13]);
585+
assert_eq!(
586+
assigned
587+
.iter()
588+
.map(|range| range.end - range.start)
589+
.sum::<u64>(),
590+
13
591+
);
592+
593+
let split_starts = split_ranges
594+
.iter()
595+
.map(|range| range.start)
596+
.collect::<Vec<_>>();
597+
let split_ends = split_ranges
598+
.iter()
599+
.map(|range| range.end)
600+
.collect::<Vec<_>>();
601+
602+
for range in &assigned {
603+
assert!(split_starts.contains(&range.start));
604+
assert!(split_ends.contains(&range.end));
605+
}
606+
607+
for (left, right) in assigned.iter().tuple_windows() {
531608
assert_eq!(left.end, right.start);
532609
}
533610
}
@@ -568,6 +645,7 @@ mod tests {
568645
limit: None,
569646
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
570647
layout_readers: Default::default(),
648+
natural_split_ranges: Default::default(),
571649
has_output_ordering: false,
572650
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
573651
file_metadata_cache: None,
@@ -621,6 +699,33 @@ mod tests {
621699
Ok(())
622700
}
623701

702+
#[tokio::test]
703+
async fn test_open_empty_file() -> anyhow::Result<()> {
704+
use futures::TryStreamExt;
705+
706+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
707+
let data_batch = record_batch!(("a", Int32, Vec::<i32>::new())).unwrap();
708+
let file_path = "part=1/empty.vortex";
709+
let file_size =
710+
write_arrow_to_vortex(Arc::clone(&object_store), file_path, data_batch.clone()).await?;
711+
712+
let file_schema = data_batch.schema();
713+
// Parallel scans may attach a byte range even for empty files; the
714+
// opener must return early before attempting split-aligned translation.
715+
let file =
716+
PartitionedFile::new_with_range(file_path.to_string(), file_size, 0, file_size as i64);
717+
718+
let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema));
719+
720+
let opener = make_opener(object_store, table_schema, None);
721+
let stream = opener.open(file)?.await?;
722+
let data = stream.try_collect::<Vec<_>>().await?;
723+
724+
assert_eq!(data.len(), 0);
725+
726+
Ok(())
727+
}
728+
624729
#[rstest]
625730
#[tokio::test]
626731
async fn test_open_files_different_table_schema() -> anyhow::Result<()> {
@@ -662,6 +767,7 @@ mod tests {
662767
limit: None,
663768
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
664769
layout_readers: Default::default(),
770+
natural_split_ranges: Default::default(),
665771
has_output_ordering: false,
666772
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
667773
file_metadata_cache: None,
@@ -748,6 +854,7 @@ mod tests {
748854
limit: None,
749855
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
750856
layout_readers: Default::default(),
857+
natural_split_ranges: Default::default(),
751858
has_output_ordering: false,
752859
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
753860
file_metadata_cache: None,
@@ -902,6 +1009,7 @@ mod tests {
9021009
limit: None,
9031010
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
9041011
layout_readers: Default::default(),
1012+
natural_split_ranges: Default::default(),
9051013
has_output_ordering: false,
9061014
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
9071015
file_metadata_cache: None,
@@ -961,6 +1069,7 @@ mod tests {
9611069
limit: None,
9621070
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
9631071
layout_readers: Default::default(),
1072+
natural_split_ranges: Default::default(),
9641073
has_output_ordering: false,
9651074
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
9661075
file_metadata_cache: None,
@@ -1162,6 +1271,7 @@ mod tests {
11621271
limit: None,
11631272
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
11641273
layout_readers: Default::default(),
1274+
natural_split_ranges: Default::default(),
11651275
has_output_ordering: false,
11661276
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
11671277
file_metadata_cache: None,

vortex-datafusion/src/persistent/source.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use std::any::Any;
55
use std::fmt::Formatter;
6+
use std::ops::Range;
67
use std::sync::Arc;
78
use std::sync::Weak;
89

@@ -61,6 +62,8 @@ pub struct VortexSource {
6162
///
6263
/// Sharing the readers allows us to only read every layout once from the file, even across partitions.
6364
layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
65+
/// Shared full-file natural split ranges keyed by path.
66+
natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
6467
expression_convertor: Arc<dyn ExpressionConvertor>,
6568
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
6669
vx_metrics_registry: Arc<dyn MetricsRegistry>,
@@ -88,6 +91,7 @@ impl VortexSource {
8891
batch_size: None,
8992
_unused_df_metrics: Default::default(),
9093
layout_readers: Arc::new(DashMap::default()),
94+
natural_split_ranges: Arc::new(DashMap::default()),
9195
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
9296
vortex_reader_factory: None,
9397
vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
@@ -186,8 +190,9 @@ impl FileSource for VortexSource {
186190
table_schema: self.table_schema.clone(),
187191
batch_size,
188192
limit: base_config.limit.map(|l| l as u64),
189-
metrics_registry: self.vx_metrics_registry.clone(),
190-
layout_readers: self.layout_readers.clone(),
193+
metrics_registry: Arc::clone(&self.vx_metrics_registry),
194+
layout_readers: Arc::clone(&self.layout_readers),
195+
natural_split_ranges: Arc::clone(&self.natural_split_ranges),
191196
has_output_ordering: !base_config.output_ordering.is_empty(),
192197
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
193198
file_metadata_cache: self.file_metadata_cache.clone(),

0 commit comments

Comments
 (0)