Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 167 additions & 57 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use datafusion_common::DataFusionError;
use datafusion_common::Result as DFResult;
use datafusion_common::ScalarValue;
use datafusion_common::exec_datafusion_err;
use datafusion_datasource::FileRange;
use datafusion_datasource::PartitionedFile;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file_stream::FileOpenFuture;
Expand All @@ -30,18 +29,21 @@ use futures::FutureExt;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream;
use itertools::Itertools;
use object_store::path::Path;
use tracing::Instrument;
use vortex::array::ArrayRef;
use vortex::array::VortexSessionExecute;
use vortex::array::arrow::ArrowArrayExecutor;
use vortex::dtype::FieldMask;
use vortex::error::VortexError;
use vortex::error::VortexExpect;
use vortex::file::OpenOptionsSessionExt;
use vortex::io::InstrumentedReadAt;
use vortex::layout::LayoutReader;
use vortex::metrics::Label;
use vortex::metrics::MetricsRegistry;
use vortex::scan::ScanBuilder;
use vortex::scan::SplitBy;
use vortex::session::VortexSession;
use vortex_utils::aliases::dash_map::DashMap;
use vortex_utils::aliases::dash_map::Entry;
Expand Down Expand Up @@ -88,6 +90,8 @@ pub(crate) struct VortexOpener {
/// To save on the overhead of reparsing FlatBuffers and rebuilding the layout tree, we cache
/// a file reader the first time we read a file.
pub layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
/// Shared full-file natural split ranges keyed by file path.
pub natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
/// Whether the query has output ordering specified
pub has_output_ordering: bool,

Expand Down Expand Up @@ -124,7 +128,8 @@ impl FileOpener for VortexOpener {
let unified_file_schema = self.table_schema.file_schema().clone();
let batch_size = self.batch_size;
let limit = self.limit;
let layout_reader = self.layout_readers.clone();
let layout_reader = Arc::clone(&self.layout_readers);
let natural_split_ranges = Arc::clone(&self.natural_split_ranges);
let has_output_ordering = self.has_output_ordering;
let scan_concurrency = self.scan_concurrency;

Expand Down Expand Up @@ -296,6 +301,12 @@ impl FileOpener for VortexOpener {
}
};

let natural_split_ranges = natural_split_ranges_for_file(
natural_split_ranges.as_ref(),
&file.object_meta.location,
&layout_reader,
)?;

let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);

if let Some(extensions) = file.extensions
Expand All @@ -305,12 +316,22 @@ impl FileOpener for VortexOpener {
}

if let Some(file_range) = file.range {
scan_builder = apply_byte_range(
file_range,
let byte_range = Range {
start: u64::try_from(file_range.start)
.map_err(|_| exec_datafusion_err!("Vortex file range start is negative"))?,
end: u64::try_from(file_range.end)
.map_err(|_| exec_datafusion_err!("Vortex file range end is negative"))?,
};

let Some(row_range) = split_aligned_row_range(
byte_range,
file.object_meta.size,
vxf.row_count(),
scan_builder,
);
natural_split_ranges.as_ref(),
) else {
return Ok(stream::empty().boxed());
};

scan_builder = scan_builder.with_row_range(row_range);
}

let filter = filter
Expand Down Expand Up @@ -415,31 +436,74 @@ impl FileOpener for VortexOpener {
}
}

/// If the file has a [`FileRange`], we translate it into a row range in the file for the scan.
fn apply_byte_range(
file_range: FileRange,
total_size: u64,
row_count: u64,
scan_builder: ScanBuilder<ArrayRef>,
) -> ScanBuilder<ArrayRef> {
let row_range = byte_range_to_row_range(
file_range.start as u64..file_range.end as u64,
row_count,
total_size,
);

scan_builder.with_row_range(row_range)
fn natural_split_ranges_for_file(
natural_split_ranges: &DashMap<Path, Arc<[Range<u64>]>>,
path: &Path,
layout_reader: &Arc<dyn LayoutReader>,
) -> DFResult<Arc<[Range<u64>]>> {
if let Some(split_ranges) = natural_split_ranges.get(path) {
return Ok(Arc::clone(split_ranges.value()));
}

let split_ranges = compute_natural_split_ranges(layout_reader.as_ref())?;

match natural_split_ranges.entry(path.clone()) {
Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
Entry::Vacant(entry) => {
entry.insert(Arc::clone(&split_ranges));
Ok(split_ranges)
}
}
}

fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u64) -> Range<u64> {
let average_row = total_size / row_count;
assert!(average_row > 0, "A row must always have at least one byte");
fn compute_natural_split_ranges(layout_reader: &dyn LayoutReader) -> DFResult<Arc<[Range<u64>]>> {
let row_count = layout_reader.row_count();
let row_range = 0..row_count;
let split_points: Vec<_> = SplitBy::Layout
.splits(layout_reader, &row_range, &[FieldMask::All])
.map_err(|e| exec_datafusion_err!("Failed to compute Vortex natural splits: {e}"))?
.into_iter()
.tuple_windows()
.map(|(s, e)| s..e)
.collect::<Vec<_>>();

Ok(split_points.into())
}

let start_row = byte_range.start / average_row;
let end_row = byte_range.end / average_row;
/// Translate a DataFusion byte range to the contiguous natural split ranges it owns.
fn split_aligned_row_range(
byte_range: Range<u64>,
total_size: u64,
split_ranges: &[Range<u64>],
) -> Option<Range<u64>> {
if byte_range.start >= byte_range.end {
return None;
}

// We take the min here as `end_row` might overshoot
start_row..u64::min(row_count, end_row)
let row_count = split_ranges.last().map(|split| split.end)?;
if row_count == 0 {
return None;
}

let mut owned_splits = split_ranges.iter().filter(|split_range| {
let midpoint_byte = split_midpoint_to_byte(split_range, row_count, total_size);
byte_range.contains(&midpoint_byte)
});

let first_split = owned_splits.next()?;
let mut row_range = first_split.start..first_split.end;
for split_range in owned_splits {
row_range.end = split_range.end;
}

Some(row_range)
}

fn split_midpoint_to_byte(split_range: &Range<u64>, row_count: u64, total_size: u64) -> u64 {
let midpoint_row = split_range.start + (split_range.end - split_range.start) / 2;
let midpoint_byte = (u128::from(midpoint_row) * u128::from(total_size)) / u128::from(row_count);

u64::try_from(midpoint_byte).vortex_expect("midpoint byte projection should fit into u64")
}

#[cfg(test)]
Expand Down Expand Up @@ -491,43 +555,56 @@ mod tests {
static SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);

#[rstest]
#[case(0..100, 100, 100, 0..100)]
#[case(0..105, 100, 105, 0..100)]
#[case(0..50, 100, 105, 0..50)]
#[case(50..105, 100, 105, 50..100)]
#[case(0..1, 4, 8, 0..0)]
#[case(1..8, 4, 8, 0..4)]
fn test_range_translation(
#[case(0..3, 10, vec![0..2, 2..5, 5..10], Some(0..2))]
#[case(3..7, 10, vec![0..2, 2..5, 5..10], Some(2..5))]
#[case(1..8, 10, vec![0..1, 1..9, 9..10], Some(1..9))]
#[case(1..4, 16, vec![0..1, 1..2, 2..3, 3..4], None)]
fn test_split_aligned_row_range(
#[case] byte_range: Range<u64>,
#[case] row_count: u64,
#[case] total_size: u64,
#[case] expected: Range<u64>,
#[case] split_ranges: Vec<Range<u64>>,
#[case] expected: Option<Range<u64>>,
) {
assert_eq!(
byte_range_to_row_range(byte_range, row_count, total_size),
split_aligned_row_range(byte_range, total_size, &split_ranges),
expected
);
}

#[test]
fn test_consecutive_ranges() {
let row_count = 100;
let total_size = 429;
let bytes_a = 0..143;
let bytes_b = 143..286;
let bytes_c = 286..429;

let rows_a = byte_range_to_row_range(bytes_a, row_count, total_size);
let rows_b = byte_range_to_row_range(bytes_b, row_count, total_size);
let rows_c = byte_range_to_row_range(bytes_c, row_count, total_size);

assert_eq!(rows_a.end - rows_a.start, 35);
assert_eq!(rows_b.end - rows_b.start, 36);
assert_eq!(rows_c.end - rows_c.start, 29);

assert_eq!(rows_a.start, 0);
assert_eq!(rows_c.end, 100);
for (left, right) in [rows_a, rows_b, rows_c].iter().tuple_windows() {
fn test_split_aligned_ranges_cover_splits_exactly_once() {
let split_ranges = vec![0..1, 1..4, 4..10, 10..13];
let byte_ranges = [0..4, 4..8, 8..12, 12..16];

let assigned = byte_ranges
.into_iter()
.filter_map(|byte_range| split_aligned_row_range(byte_range, 16, &split_ranges))
.collect::<Vec<_>>();

assert_eq!(assigned, vec![0..4, 4..10, 10..13]);
assert_eq!(
assigned
.iter()
.map(|range| range.end - range.start)
.sum::<u64>(),
13
);

let split_starts = split_ranges
.iter()
.map(|range| range.start)
.collect::<Vec<_>>();
let split_ends = split_ranges
.iter()
.map(|range| range.end)
.collect::<Vec<_>>();

for range in &assigned {
assert!(split_starts.contains(&range.start));
assert!(split_ends.contains(&range.end));
}

for (left, right) in assigned.iter().tuple_windows() {
assert_eq!(left.end, right.start);
}
}
Expand Down Expand Up @@ -568,6 +645,7 @@ mod tests {
limit: None,
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
layout_readers: Default::default(),
natural_split_ranges: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
Expand Down Expand Up @@ -621,6 +699,33 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_open_empty_file() -> anyhow::Result<()> {
use futures::TryStreamExt;

let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let data_batch = record_batch!(("a", Int32, Vec::<i32>::new())).unwrap();
let file_path = "part=1/empty.vortex";
let file_size =
write_arrow_to_vortex(Arc::clone(&object_store), file_path, data_batch.clone()).await?;

let file_schema = data_batch.schema();
// Parallel scans may attach a byte range even for empty files; the
// opener must return early before attempting split-aligned translation.
let file =
PartitionedFile::new_with_range(file_path.to_string(), file_size, 0, file_size as i64);

let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema));

let opener = make_opener(object_store, table_schema, None);
let stream = opener.open(file)?.await?;
let data = stream.try_collect::<Vec<_>>().await?;

assert_eq!(data.len(), 0);

Ok(())
}

#[rstest]
#[tokio::test]
async fn test_open_files_different_table_schema() -> anyhow::Result<()> {
Expand Down Expand Up @@ -662,6 +767,7 @@ mod tests {
limit: None,
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
layout_readers: Default::default(),
natural_split_ranges: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
Expand Down Expand Up @@ -748,6 +854,7 @@ mod tests {
limit: None,
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
layout_readers: Default::default(),
natural_split_ranges: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
Expand Down Expand Up @@ -902,6 +1009,7 @@ mod tests {
limit: None,
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
layout_readers: Default::default(),
natural_split_ranges: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
Expand Down Expand Up @@ -961,6 +1069,7 @@ mod tests {
limit: None,
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
layout_readers: Default::default(),
natural_split_ranges: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
Expand Down Expand Up @@ -1162,6 +1271,7 @@ mod tests {
limit: None,
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
layout_readers: Default::default(),
natural_split_ranges: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: None,
Expand Down
9 changes: 7 additions & 2 deletions vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::any::Any;
use std::fmt::Formatter;
use std::ops::Range;
use std::sync::Arc;
use std::sync::Weak;

Expand Down Expand Up @@ -61,6 +62,8 @@ pub struct VortexSource {
///
/// Sharing the readers allows us to only read every layout once from the file, even across partitions.
layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
/// Shared full-file natural split ranges keyed by path.
natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
expression_convertor: Arc<dyn ExpressionConvertor>,
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
vx_metrics_registry: Arc<dyn MetricsRegistry>,
Expand Down Expand Up @@ -88,6 +91,7 @@ impl VortexSource {
batch_size: None,
_unused_df_metrics: Default::default(),
layout_readers: Arc::new(DashMap::default()),
natural_split_ranges: Arc::new(DashMap::default()),
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
vortex_reader_factory: None,
vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
Expand Down Expand Up @@ -186,8 +190,9 @@ impl FileSource for VortexSource {
table_schema: self.table_schema.clone(),
batch_size,
limit: base_config.limit.map(|l| l as u64),
metrics_registry: self.vx_metrics_registry.clone(),
layout_readers: self.layout_readers.clone(),
metrics_registry: Arc::clone(&self.vx_metrics_registry),
layout_readers: Arc::clone(&self.layout_readers),
natural_split_ranges: Arc::clone(&self.natural_split_ranges),
has_output_ordering: !base_config.output_ordering.is_empty(),
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
file_metadata_cache: self.file_metadata_cache.clone(),
Expand Down
Loading