diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index 21cb246f3..1d95c57b3 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -1,10 +1,5 @@ //! Some utilities for working with arrow data types -use std::collections::{HashMap, HashSet}; -use std::io::{BufRead, BufReader}; -use std::ops::Range; -use std::sync::{Arc, OnceLock}; - use crate::engine::arrow_conversion::{TryFromKernel as _, TryIntoArrow as _}; use crate::engine::ensure_data_types::DataTypeCompat; use crate::engine_data::FilteredEngineData; @@ -15,6 +10,10 @@ use crate::{ utils::require, DeltaResult, EngineData, Error, }; +use std::collections::{HashMap, HashSet}; +use std::io::{BufRead, BufReader}; +use std::ops::Range; +use std::sync::{Arc, OnceLock}; use crate::arrow::array::{ cast::AsArray, make_array, new_null_array, Array as ArrowArray, BooleanArray, GenericListArray, @@ -1074,6 +1073,31 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR Ok(concat_batches(&schema, output.iter())?) } +pub(crate) fn filter_to_record_batch( + filtered_data: FilteredEngineData, +) -> DeltaResult { + // Honor the new contract: if selection vector is shorter than the number of rows, + // then all rows not covered by the selection vector are assumed to be selected + let (underlying_data, mut selection_vector) = filtered_data.into_parts(); + let batch = extract_record_batch(&*underlying_data)?; + let num_rows = batch.num_rows(); + + let result_batch = if selection_vector.is_empty() { + // If selection vector is empty, write all rows per contract. + batch.clone() + } else { + // Extend the selection vector with `true` for uncovered rows + if selection_vector.len() < num_rows { + selection_vector.resize(num_rows, true); + } + + filter_record_batch(batch, &BooleanArray::from(selection_vector)) + .map_err(|e| Error::generic(format!("Failed to filter record batch: {e}")))? + }; + + Ok(result_batch) +} + /// serialize an arrow RecordBatch to a JSON string by appending to a buffer. // TODO (zach): this should stream data to the JSON writer and output an iterator. #[internal_api] @@ -1082,26 +1106,8 @@ pub(crate) fn to_json_bytes( ) -> DeltaResult> { let mut writer = LineDelimitedWriter::new(Vec::new()); for chunk in data { - let filtered_data = chunk?; - // Honor the new contract: if selection vector is shorter than the number of rows, - // then all rows not covered by the selection vector are assumed to be selected - let (underlying_data, mut selection_vector) = filtered_data.into_parts(); - let batch = extract_record_batch(&*underlying_data)?; - let num_rows = batch.num_rows(); - - if selection_vector.is_empty() { - // If selection vector is empty, write all rows per contract. - writer.write(batch)?; - } else { - // Extend the selection vector with `true` for uncovered rows - if selection_vector.len() < num_rows { - selection_vector.resize(num_rows, true); - } - - let filtered_batch = filter_record_batch(batch, &BooleanArray::from(selection_vector)) - .map_err(|e| Error::generic(format!("Failed to filter record batch: {e}")))?; - writer.write(&filtered_batch)? - }; + let batch = filter_to_record_batch(chunk?)?; + writer.write(&batch)?; } writer.finish()?; Ok(writer.into_inner()) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index ac63877ec..12ab1adc4 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -22,15 +22,15 @@ use super::UrlExt; use crate::engine::arrow_conversion::TryIntoArrow as _; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::{ - fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes, - RowIndexBuilder, + filter_to_record_batch, fixup_parquet_read, generate_mask, get_requested_indices, + ordering_needs_row_indexes, RowIndexBuilder, }; use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; use crate::{ - DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetHandler, - PredicateRef, + DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, FilteredEngineData, + ParquetHandler, PredicateRef, }; #[derive(Debug)] @@ -242,6 +242,45 @@ impl ParquetHandler for DefaultParquetHandler { self.readahead, ) } + + fn write_parquet_file( + &self, + location: url::Url, + data: FilteredEngineData, + ) -> DeltaResult { + // Convert FilteredEngineData to RecordBatch, applying selection filter + let batch = filter_to_record_batch(data)?; + + // We buffer it in the application first, and then push everything to the object-store. + // The storage API does not seem to allow for streaming to the store, which is okay + // since often the object stores allocate their own buffers. + let mut buffer = Vec::new(); + + // Scope to ensure writer is dropped before we use buffer + { + let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None)?; + writer.write(&batch)?; + writer.close()?; // writer must be closed to write footer + } + + let store = self.store.clone(); + let path = Path::from_url_path(location.path())?; + + let size: u64 = buffer + .len() + .try_into() + .map_err(|_| Error::generic("unable to convert usize to u64"))?; + + // Block on the async put operation + self.task_executor + .block_on(async move { store.put(&path, buffer.into()).await })?; + + Ok(FileMeta { + location, + last_modified: 0, + size, + }) + } } /// Implements [`FileOpener`] for a parquet file @@ -427,7 +466,7 @@ mod tests { use std::path::PathBuf; use std::slice; - use crate::arrow::array::{Array, RecordBatch}; + use crate::arrow::array::{Array, BooleanArray, RecordBatch}; use crate::engine::arrow_conversion::TryIntoKernel as _; use crate::engine::arrow_data::ArrowEngineData; @@ -638,4 +677,159 @@ mod tests { "Generic delta kernel error: Path must end with a trailing slash: memory:///data", ); } + + #[tokio::test] + async fn test_parquet_handler_trait_write() { + let store = Arc::new(InMemory::new()); + let parquet_handler: Arc = Arc::new(DefaultParquetHandler::new( + store.clone(), + Arc::new(TokioBackgroundExecutor::new()), + )); + + let engine_data: Box = Box::new(ArrowEngineData::new( + RecordBatch::try_from_iter(vec![ + ( + "x", + Arc::new(Int64Array::from(vec![10, 20, 30])) as Arc, + ), + ( + "y", + Arc::new(Int64Array::from(vec![100, 200, 300])) as Arc, + ), + ]) + .unwrap(), + )); + + // Wrap in FilteredEngineData with all rows selected + let filtered_data = crate::FilteredEngineData::with_all_rows_selected(engine_data); + + // Test writing through the trait method + let file_url = Url::parse("memory:///test/data.parquet").unwrap(); + parquet_handler + .write_parquet_file(file_url.clone(), filtered_data) + .unwrap(); + + // Verify we can read the file back + let path = Path::from_url_path(file_url.path()).unwrap(); + let reader = ParquetObjectReader::new(store.clone(), path); + let physical_schema = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .schema() + .clone(); + + let file_meta = FileMeta { + location: file_url, + last_modified: 0, + size: 0, + }; + + let data: Vec = parquet_handler + .read_parquet_files( + slice::from_ref(&file_meta), + Arc::new(physical_schema.try_into_kernel().unwrap()), + None, + ) + .unwrap() + .map(into_record_batch) + .try_collect() + .unwrap(); + + assert_eq!(data.len(), 1); + assert_eq!(data[0].num_rows(), 3); + assert_eq!(data[0].num_columns(), 2); + } + + #[tokio::test] + async fn test_parquet_handler_trait_write_and_read_roundtrip() { + let store = Arc::new(InMemory::new()); + let parquet_handler: Arc = Arc::new(DefaultParquetHandler::new( + store.clone(), + Arc::new(TokioBackgroundExecutor::new()), + )); + + // Create test data with multiple types + let engine_data: Box = Box::new(ArrowEngineData::new( + RecordBatch::try_from_iter(vec![ + ( + "int_col", + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])) as Arc, + ), + ( + "string_col", + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])) as Arc, + ), + ( + "bool_col", + Arc::new(BooleanArray::from(vec![true, false, true, false, true])) + as Arc, + ), + ]) + .unwrap(), + )); + + // Wrap in FilteredEngineData with all rows selected + let filtered_data = crate::FilteredEngineData::with_all_rows_selected(engine_data); + + // Write the data + let file_url = Url::parse("memory:///roundtrip/test.parquet").unwrap(); + parquet_handler + .write_parquet_file(file_url.clone(), filtered_data) + .unwrap(); + + // Read it back + let path = Path::from_url_path(file_url.path()).unwrap(); + let reader = ParquetObjectReader::new(store.clone(), path); + let physical_schema = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .schema() + .clone(); + + let file_meta = FileMeta { + location: file_url.clone(), + last_modified: 0, + size: 0, + }; + + let data: Vec = parquet_handler + .read_parquet_files( + slice::from_ref(&file_meta), + Arc::new(physical_schema.try_into_kernel().unwrap()), + None, + ) + .unwrap() + .map(into_record_batch) + .try_collect() + .unwrap(); + + // Verify the data + assert_eq!(data.len(), 1); + assert_eq!(data[0].num_rows(), 5); + assert_eq!(data[0].num_columns(), 3); + + // Verify column values + let int_col = data[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(int_col.values(), &[1, 2, 3, 4, 5]); + + let string_col = data[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(string_col.value(0), "a"); + assert_eq!(string_col.value(4), "e"); + + let bool_col = data[0] + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(bool_col.value(0)); + assert!(!bool_col.value(1)); + } } diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index d170a2334..52d0453c3 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -1,13 +1,15 @@ -use std::fs::File; - use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef; use crate::parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder}; +use crate::parquet::arrow::arrow_writer::ArrowWriter; +use std::fs::File; +use std::time::SystemTime; +use url::Url; use super::read_files; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::{ - fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes, - RowIndexBuilder, + filter_to_record_batch, fixup_parquet_read, generate_mask, get_requested_indices, + ordering_needs_row_indexes, RowIndexBuilder, }; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; @@ -52,4 +54,214 @@ impl ParquetHandler for SyncParquetHandler { ) -> DeltaResult { read_files(files, schema, predicate, try_create_from_parquet) } + + fn write_parquet_file( + &self, + location: Url, + data: crate::FilteredEngineData, + ) -> DeltaResult { + // Convert FilteredEngineData to RecordBatch, applying selection filter + let batch = filter_to_record_batch(data)?; + + // Convert URL to file path + let path = location + .to_file_path() + .map_err(|_| crate::Error::generic(format!("Invalid file URL: {}", location)))?; + let mut file = File::create(&path)?; + + let mut writer = ArrowWriter::try_new(&mut file, batch.schema(), None)?; + writer.write(&batch)?; + writer.close()?; // writer must be closed to write footer + + let meta = file.metadata()?; + let last_modified = meta + .modified()? + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|e| crate::Error::generic(format!("Invalid file timestamp: {}", e)))? + .as_millis() as i64; + + Ok(FileMeta { + location, + last_modified, + size: meta.len(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array::{Array, Int64Array, RecordBatch, StringArray}; + use crate::engine::arrow_conversion::TryIntoKernel as _; + use std::sync::Arc; + use tempfile::tempdir; + + #[test] + fn test_sync_write_parquet_file() { + let handler = SyncParquetHandler; + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test.parquet"); + let url = Url::from_file_path(&file_path).unwrap(); + + // Create test data + let engine_data: Box = Box::new(ArrowEngineData::new( + RecordBatch::try_from_iter(vec![ + ( + "id", + Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc, + ), + ( + "name", + Arc::new(StringArray::from(vec!["a", "b", "c"])) as Arc, + ), + ]) + .unwrap(), + )); + + // Wrap in FilteredEngineData with all rows selected + let filtered_data = crate::FilteredEngineData::with_all_rows_selected(engine_data); + + // Write the file + handler + .write_parquet_file(url.clone(), filtered_data) + .unwrap(); + + // Verify the file exists + assert!(file_path.exists()); + + // Read it back to verify + let file = File::open(&file_path).unwrap(); + let reader = + crate::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file) + .unwrap(); + let schema = reader.schema().clone(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + let mut result = handler + .read_parquet_files( + &[file_meta], + Arc::new(schema.try_into_kernel().unwrap()), + None, + ) + .unwrap(); + + let engine_data = result.next().unwrap().unwrap(); + let batch = ArrowEngineData::try_from_engine_data(engine_data).unwrap(); + let record_batch = batch.record_batch(); + + // Verify shape + assert_eq!(record_batch.num_rows(), 3); + assert_eq!(record_batch.num_columns(), 2); + + // Verify content - id column + let id_col = record_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_col.values(), &[1, 2, 3]); + + // Verify content - name column + let name_col = record_batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_col.value(0), "a"); + assert_eq!(name_col.value(1), "b"); + assert_eq!(name_col.value(2), "c"); + + assert!(result.next().is_none()); + } + + #[test] + fn test_sync_write_parquet_file_with_filter() { + let handler = SyncParquetHandler; + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_filtered.parquet"); + let url = Url::from_file_path(&file_path).unwrap(); + + // Create test data with 5 rows + let engine_data: Box = Box::new(ArrowEngineData::new( + RecordBatch::try_from_iter(vec![ + ( + "id", + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])) as Arc, + ), + ( + "name", + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])) as Arc, + ), + ]) + .unwrap(), + )); + + // Create selection vector that filters out rows 1 and 3 (0-indexed) + // Keep rows: 0 (id=1, name=a), 2 (id=3, name=c), 4 (id=5, name=e) + let selection_vector = vec![true, false, true, false, true]; + let filtered_data = + crate::FilteredEngineData::try_new(engine_data, selection_vector).unwrap(); + + // Write the file with filter applied + handler + .write_parquet_file(url.clone(), filtered_data) + .unwrap(); + + // Verify the file exists + assert!(file_path.exists()); + + // Read it back to verify only filtered rows are present + let file = File::open(&file_path).unwrap(); + let reader = + crate::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file) + .unwrap(); + let schema = reader.schema().clone(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + let mut result = handler + .read_parquet_files( + &[file_meta], + Arc::new(schema.try_into_kernel().unwrap()), + None, + ) + .unwrap(); + + let engine_data = result.next().unwrap().unwrap(); + let batch = ArrowEngineData::try_from_engine_data(engine_data).unwrap(); + let record_batch = batch.record_batch(); + + // Verify shape - should only have 3 rows (filtered from 5) + assert_eq!(record_batch.num_rows(), 3); + assert_eq!(record_batch.num_columns(), 2); + + // Verify content - id column should have values 1, 3, 5 + let id_col = record_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_col.values(), &[1, 3, 5]); + + // Verify content - name column should have values "a", "c", "e" + let name_col = record_batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_col.value(0), "a"); + assert_eq!(name_col.value(1), "c"); + assert_eq!(name_col.value(2), "e"); + + assert!(result.next().is_none()); + } } diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 6f7635d7d..ee95e6d51 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -685,6 +685,25 @@ pub trait ParquetHandler: AsAny { physical_schema: SchemaRef, predicate: Option, ) -> DeltaResult; + + /// Write data to a Parquet file at the specified URL. + /// + /// This method writes the provided `data` to a Parquet file at the given `url`. + /// + /// # Parameters + /// + /// - `url` - The full URL path where the Parquet file should be written + /// (e.g., `s3://bucket/path/file.parquet`). + /// - `data` - The data to write to the Parquet file, as FilteredEngineData. + /// + /// # Returns + /// + /// A [`DeltaResult`] indicating success or failure. + fn write_parquet_file( + &self, + location: url::Url, + data: FilteredEngineData, + ) -> DeltaResult; } /// The `Engine` trait encapsulates all the functionality an engine or connector needs to provide