Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions vortex-datafusion/src/persistent/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,16 @@ pub struct VortexAccessPlan {
}

impl VortexAccessPlan {
/// Returns the selection, if one was set.
pub fn selection(&self) -> Option<&Selection> {
self.selection.as_ref()
}

/// Sets the row [`Selection`] to apply when the file is opened.
pub fn with_selection(mut self, selection: Selection) -> Self {
self.selection = Some(selection);
self
}
}

impl VortexAccessPlan {
/// Returns the selection, if one was set.
pub fn selection(&self) -> Option<&Selection> {
self.selection.as_ref()
}

/// Applies this access plan to a [`ScanBuilder`].
///
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl FileFormatFactory for VortexFormatFactory {
if let Some(key) = key.strip_prefix("format.") {
opts.set(key, value)?;
} else {
tracing::trace!("Ignoring options '{key}'");
tracing::trace!("Ignoring option '{key}'");
}
}

Expand Down
88 changes: 73 additions & 15 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use datafusion_datasource::PartitionedFile;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file_stream::FileOpenFuture;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
use datafusion_execution::cache::cache_manager::FileMetadataCache;
use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr::projection::ProjectionExprs;
Expand Down Expand Up @@ -129,7 +130,7 @@ impl FileOpener for VortexOpener {
let unified_file_schema = Arc::clone(self.table_schema.file_schema());
let batch_size = self.batch_size;
let limit = self.limit;
let layout_reader = Arc::clone(&self.layout_readers);
let layout_readers = Arc::clone(&self.layout_readers);
let natural_split_ranges = Arc::clone(&self.natural_split_ranges);
let has_output_ordering = self.has_output_ordering;
let scan_concurrency = self.scan_concurrency;
Expand Down Expand Up @@ -160,9 +161,7 @@ impl FileOpener for VortexOpener {
Ok(async move {
// Create FilePruner when we have a predicate and either dynamic expressions
// or file statistics available. The pruner can eliminate files without
// opening them based on:
// - Partition column values (e.g., date=2024-01-01)
// - File-level statistics (min/max values per column)
// opening them based on File-level statistics (min/max values per column)
let mut file_pruner = file_pruning_predicate
.filter(|p| {
// Only create pruner if we have dynamic expressions or file statistics
Expand Down Expand Up @@ -192,22 +191,41 @@ impl FileOpener for VortexOpener {
.with_metrics_registry(Arc::clone(&metrics_registry))
.with_labels(labels);

if let Some(file_metadata_cache) = file_metadata_cache
&& let Some(entry) = file_metadata_cache.get(file.path())
&& entry.is_valid_for(&file.object_meta)
&& let Some(vortex_metadata) = entry
.file_metadata
.as_any()
.downcast_ref::<CachedVortexMetadata>()
{
open_opts = open_opts.with_footer(vortex_metadata.footer().clone());
let cached_footer = file_metadata_cache
.as_ref()
.and_then(|cache| cache.get(file.path()))
.filter(|entry| entry.is_valid_for(&file.object_meta))
.and_then(|entry| {
entry
.file_metadata
.as_any()
.downcast_ref::<CachedVortexMetadata>()
.map(|vortex_metadata| vortex_metadata.footer().clone())
});
let footer_cache_hit = cached_footer.is_some();

if let Some(footer) = cached_footer {
open_opts = open_opts.with_footer(footer);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now has a cache!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where?

}

let vxf = open_opts
.open_read(reader)
.await
.map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?;

// On a miss, cache the parsed footer so other partitions and later executions
// skip the footer fetch and parse. `infer_schema`/`infer_stats` also populate
// this cache, but only when planning goes through `VortexFormat`.
if !footer_cache_hit && let Some(cache) = &file_metadata_cache {
cache.put(
file.path(),
CachedFileMetadataEntry::new(
file.object_meta.clone(),
Arc::new(CachedVortexMetadata::new(&vxf)),
),
);
}

// Check if there are rows in this file. If not, we can save
// ourselves some work and return an empty stream.
if vxf.row_count() == 0 {
Expand Down Expand Up @@ -285,7 +303,7 @@ impl FileOpener for VortexOpener {
let projector = leftover_projection.make_projector(&stream_schema)?;

// We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
let layout_reader = match layout_reader.entry(file.object_meta.location.clone()) {
let layout_reader = match layout_readers.entry(file.object_meta.location.clone()) {
Entry::Occupied(mut occupied_entry) => {
if let Some(reader) = occupied_entry.get().upgrade() {
tracing::trace!("reusing layout reader for {}", occupied_entry.key());
Expand Down Expand Up @@ -352,7 +370,6 @@ impl FileOpener for VortexOpener {
// This will only fail if the user has not configured a suitable
// PhysicalExprAdapterFactory on the file source to handle rewriting the
// expression to handle missing/reordered columns in the Vortex file.

let (pushed, unpushed): (Vec<PhysicalExprRef>, Vec<PhysicalExprRef>) =
split_conjunction(&f)
.into_iter()
Expand Down Expand Up @@ -568,6 +585,7 @@ mod tests {
use datafusion::physical_expr::planner::logical2physical;
use datafusion::physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
use datafusion::scalar::ScalarValue;
use datafusion_execution::cache::DefaultFilesMetadataCache;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions as df_expr;
use datafusion_physical_expr::projection::ProjectionExpr;
Expand Down Expand Up @@ -775,6 +793,46 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_open_populates_file_metadata_cache() -> anyhow::Result<()> {
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let file_path = "cached/file.vortex";
let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
let data_size =
write_arrow_to_vortex(Arc::clone(&object_store), file_path, batch.clone()).await?;

let file = PartitionedFile::new(file_path.to_string(), data_size);
let table_schema = TableSchema::from_file_schema(batch.schema());

let cache: Arc<dyn FileMetadataCache> =
Arc::new(DefaultFilesMetadataCache::new(64 * 1024 * 1024));
let mut opener = make_opener(Arc::clone(&object_store), table_schema, None);
opener.file_metadata_cache = Some(Arc::clone(&cache));

// The first open misses the cache and must write the parsed footer back.
let stream = opener.open(file.clone())?.await?;
stream.try_collect::<Vec<_>>().await?;

let entry = cache
.get(file.path())
.ok_or_else(|| anyhow::anyhow!("footer was not cached after open"))?;
assert!(entry.is_valid_for(&file.object_meta));
assert!(
entry
.file_metadata
.as_any()
.downcast_ref::<CachedVortexMetadata>()
.is_some()
);

// The second open hits the cache and still returns the same data.
let stream = opener.open(file.clone())?.await?;
let data = stream.try_collect::<Vec<_>>().await?;
assert_eq!(data.iter().map(|rb| rb.num_rows()).sum::<usize>(), 3);

Ok(())
}

#[rstest]
#[tokio::test]
async fn test_open_files_different_table_schema() -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl VortexReaderFactory for DefaultVortexReaderFactory {
) -> DFResult<Arc<dyn VortexReadAt>> {
Ok(Arc::new(ObjectStoreReadAt::new_with_allocator(
Arc::clone(&self.object_store),
file.path().as_ref().into(),
file.path().clone(),
session.handle(),
session.allocator(),
)) as _)
Expand Down
16 changes: 9 additions & 7 deletions vortex-datafusion/src/persistent/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,26 @@ impl FileSink for VortexSink {
object_store: Arc<dyn ObjectStore>,
) -> DFResult<u64> {
let mut file_write_tasks: JoinSet<DFResult<(Path, WriteSummary)>> = JoinSet::new();
let writer_schema = get_writer_schema(&self.config);
let dtype = self
.session
.arrow()
.from_arrow_schema(&writer_schema)
.map_err(|e| {
exec_datafusion_err!("Failed to derive Vortex DType from writer schema: {e}")
})?;

// TODO(adamg):
// 1. We can probably be better at signaling how much memory we're consuming (potentially when reading too), see ParquetSink::spawn_writer_tasks_and_join.
while let Some((path, rx)) = file_stream_rx.recv().await {
let session = self.session.clone();
let object_store = Arc::clone(&object_store);
let writer_schema = get_writer_schema(&self.config);
let dtype = session
.arrow()
.from_arrow_schema(&writer_schema)
.map_err(|e| {
exec_datafusion_err!("Failed to derive Vortex DType from writer schema: {e}")
})?;

// We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered,
// the demux task might deadlock itself.
let arrow_session = session.clone();
let import_schema = Arc::clone(&writer_schema);
let dtype = dtype.clone();
file_write_tasks.spawn(async move {
let stream = ReceiverStream::new(rx).map(move |rb| {
arrow_session
Expand Down
Loading