diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index cc9cd1333..4ec116793 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -74,7 +74,7 @@ pub unsafe extern "C" fn selection_vector_from_scan_metadata( fn selection_vector_from_scan_metadata_impl( scan_metadata: &ScanMetadata, ) -> DeltaResult { - Ok(scan_metadata.scan_files.selection_vector.clone().into()) + Ok(scan_metadata.scan_files.selection_vector().to_vec().into()) } /// Drops a scan. diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index e5afea11c..f52e4aa22 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -113,7 +113,7 @@ fn try_main() -> DeltaResult<()> { if cli.metadata { let (scan_metadata_batches, scan_metadata_rows) = scan_metadata - .map(|res| res.unwrap().scan_files.data.len()) + .map(|res| res.unwrap().scan_files.data().len()) .fold((0, 0), |(batches, rows), len| (batches + 1, rows + len)); println!("Scan metadata: {scan_metadata_batches} chunks, {scan_metadata_rows} files",); return Ok(()); diff --git a/kernel/src/action_reconciliation/log_replay.rs b/kernel/src/action_reconciliation/log_replay.rs index 2e6a32470..a0e923715 100644 --- a/kernel/src/action_reconciliation/log_replay.rs +++ b/kernel/src/action_reconciliation/log_replay.rs @@ -114,10 +114,7 @@ impl LogReplayProcessor for ActionReconciliationProcessor { self.seen_protocol = visitor.seen_protocol; self.seen_metadata = visitor.seen_metadata; - let filtered_data = FilteredEngineData { - data: actions, - selection_vector: visitor.selection_vector, - }; + let filtered_data = FilteredEngineData::try_new(actions, visitor.selection_vector)?; Ok(ActionReconciliationBatch { filtered_data, @@ -770,8 +767,11 @@ mod tests { // Verify results assert_eq!(results.len(), 2, "Expected two batches in results"); - assert_eq!(results[0].selection_vector, vec![true, true, true],); - assert_eq!(results[1].selection_vector, vec![false, false, false, true],); + assert_eq!(results[0].selection_vector(), &vec![true, true, true]); + assert_eq!( + results[1].selection_vector(), + &vec![false, false, false, true] + ); assert_eq!(actions_count, 4); assert_eq!(add_actions, 0); @@ -809,8 +809,8 @@ mod tests { // Verify results assert_eq!(results.len(), 2); // The third batch should be filtered out since there are no selected actions - assert_eq!(results[0].selection_vector, vec![true]); - assert_eq!(results[1].selection_vector, vec![false, true]); + assert_eq!(results[0].selection_vector(), &vec![true]); + assert_eq!(results[1].selection_vector(), &vec![false, true]); assert_eq!(actions_count, 2); assert_eq!(add_actions, 1); @@ -845,8 +845,8 @@ mod tests { // Verify results assert_eq!(results.len(), 2); - assert_eq!(results[0].selection_vector, vec![true, true]); - assert_eq!(results[1].selection_vector, vec![false, false, true]); + assert_eq!(results[0].selection_vector(), &vec![true, true]); + assert_eq!(results[1].selection_vector(), &vec![false, false, true]); assert_eq!(actions_count, 3); assert_eq!(add_actions, 2); @@ -927,13 +927,16 @@ mod tests { // First batch: protocol, metadata, and one recent txn (old_app filtered out) assert_eq!( - results[0].filtered_data.selection_vector, + results[0].filtered_data.selection_vector(), vec![true, true, false, true] ); assert_eq!(results[0].actions_count, 3); // Second batch: timeless_app kept, another_old filtered out - assert_eq!(results[1].filtered_data.selection_vector, vec![true, false]); + assert_eq!( + results[1].filtered_data.selection_vector(), + vec![true, false] + ); assert_eq!(results[1].actions_count, 1); Ok(()) diff --git a/kernel/src/checkpoint/mod.rs b/kernel/src/checkpoint/mod.rs index 50cb05714..68f7f5c43 100644 --- a/kernel/src/checkpoint/mod.rs +++ b/kernel/src/checkpoint/mod.rs @@ -336,9 +336,10 @@ impl CheckpointWriter { let last_checkpoint_path = LastCheckpointHint::path(&self.snapshot.log_segment().log_root)?; // Write the `_last_checkpoint` file to `table/_delta_log/_last_checkpoint` + let filtered_data = FilteredEngineData::with_all_rows_selected(data?); engine.json_handler().write_json_file( &last_checkpoint_path, - Box::new(std::iter::once(data)), + Box::new(std::iter::once(Ok(filtered_data))), true, )?; @@ -370,10 +371,7 @@ impl CheckpointWriter { &[Scalar::from(self.version)], )?; - let filtered_data = FilteredEngineData { - data: checkpoint_metadata_batch, - selection_vector: vec![true], // Include the action in the checkpoint - }; + let filtered_data = FilteredEngineData::with_all_rows_selected(checkpoint_metadata_batch); Ok(ActionReconciliationBatch { filtered_data, diff --git a/kernel/src/checkpoint/tests.rs b/kernel/src/checkpoint/tests.rs index 554e42292..ea2ab10ef 100644 --- a/kernel/src/checkpoint/tests.rs +++ b/kernel/src/checkpoint/tests.rs @@ -9,6 +9,7 @@ use crate::arrow::datatypes::{DataType, Schema}; use crate::checkpoint::create_last_checkpoint_data; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; +use crate::log_replay::HasSelectionVector; use crate::schema::{DataType as KernelDataType, StructField, StructType}; use crate::utils::test_utils::Action; use crate::{DeltaResult, FileMeta, Snapshot}; @@ -77,13 +78,11 @@ fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { let writer = snapshot.checkpoint()?; let checkpoint_batch = writer.create_checkpoint_metadata_batch(&engine)?; - - // Check selection vector has one true value - assert_eq!(checkpoint_batch.filtered_data.selection_vector, vec![true]); + assert!(checkpoint_batch.filtered_data.has_selected_rows()); // Verify the underlying EngineData contains the expected CheckpointMetadata action - let arrow_engine_data = - ArrowEngineData::try_from_engine_data(checkpoint_batch.filtered_data.data)?; + let (underlying_data, _) = checkpoint_batch.filtered_data.into_parts(); + let arrow_engine_data = ArrowEngineData::try_from_engine_data(underlying_data)?; let record_batch = arrow_engine_data.record_batch(); // Build the expected RecordBatch @@ -314,11 +313,11 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { let mut data_iter = writer.checkpoint_data(&engine)?; // The first batch should be the metadata and protocol actions. let batch = data_iter.next().unwrap()?; - assert_eq!(batch.selection_vector, [true, true]); + assert_eq!(batch.selection_vector(), &[true, true]); // The second batch should include both the add action and the remove action let batch = data_iter.next().unwrap()?; - assert_eq!(batch.selection_vector, [true, true]); + assert_eq!(batch.selection_vector(), &[true, true]); // The third batch should not be included as the selection vector does not // contain any true values, as the file added is removed in a following commit. @@ -384,7 +383,7 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { let mut data_iter = writer.checkpoint_data(&engine)?; // The first batch should be the metadata and protocol actions. let batch = data_iter.next().unwrap()?; - assert_eq!(batch.selection_vector, [true, true]); + assert_eq!(batch.selection_vector(), &[true, true]); // No more data should exist because we only requested version 0 assert!(data_iter.next().is_none()); @@ -488,15 +487,17 @@ fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { let mut data_iter = writer.checkpoint_data(&engine)?; // The first batch should be the metadata and protocol actions. let batch = data_iter.next().unwrap()?; - assert_eq!(batch.selection_vector, [true, true]); + assert_eq!(batch.selection_vector(), &[true, true]); // The second batch should include both the add action and the remove action let batch = data_iter.next().unwrap()?; - assert_eq!(batch.selection_vector, [true, true]); + assert_eq!(batch.selection_vector(), &[true, true]); // The third batch should be the CheckpointMetaData action. let batch = data_iter.next().unwrap()?; - assert_eq!(batch.selection_vector, [true]); + // According to the new contract, with_all_rows_selected creates an empty selection vector + assert_eq!(batch.selection_vector(), &[] as &[bool]); + assert!(batch.has_selected_rows()); // No more data should exist assert!(data_iter.next().is_none()); diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index adb41e75b..21cb246f3 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -7,20 +7,22 @@ use std::sync::{Arc, OnceLock}; use crate::engine::arrow_conversion::{TryFromKernel as _, TryIntoArrow as _}; use crate::engine::ensure_data_types::DataTypeCompat; +use crate::engine_data::FilteredEngineData; use crate::schema::{ColumnMetadataKey, MetadataValue}; use crate::{ - engine::arrow_data::ArrowEngineData, + engine::arrow_data::{extract_record_batch, ArrowEngineData}, schema::{DataType, MetadataColumnSpec, Schema, SchemaRef, StructField, StructType}, utils::require, DeltaResult, EngineData, Error, }; use crate::arrow::array::{ - cast::AsArray, make_array, new_null_array, Array as ArrowArray, GenericListArray, MapArray, - OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray, + cast::AsArray, make_array, new_null_array, Array as ArrowArray, BooleanArray, GenericListArray, + MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray, }; use crate::arrow::buffer::NullBuffer; use crate::arrow::compute::concat_batches; +use crate::arrow::compute::filter_record_batch; use crate::arrow::datatypes::{ DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields as ArrowFields, Int64Type, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, @@ -1076,13 +1078,30 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR // TODO (zach): this should stream data to the JSON writer and output an iterator. #[internal_api] pub(crate) fn to_json_bytes( - data: impl Iterator>> + Send, + data: impl Iterator> + Send, ) -> DeltaResult> { let mut writer = LineDelimitedWriter::new(Vec::new()); for chunk in data { - let arrow_data = ArrowEngineData::try_from_engine_data(chunk?)?; - let record_batch = arrow_data.record_batch(); - writer.write(record_batch)?; + let filtered_data = chunk?; + // Honor the new contract: if selection vector is shorter than the number of rows, + // then all rows not covered by the selection vector are assumed to be selected + let (underlying_data, mut selection_vector) = filtered_data.into_parts(); + let batch = extract_record_batch(&*underlying_data)?; + let num_rows = batch.num_rows(); + + if selection_vector.is_empty() { + // If selection vector is empty, write all rows per contract. + writer.write(batch)?; + } else { + // Extend the selection vector with `true` for uncovered rows + if selection_vector.len() < num_rows { + selection_vector.resize(num_rows, true); + } + + let filtered_batch = filter_record_batch(batch, &BooleanArray::from(selection_vector)) + .map_err(|e| Error::generic(format!("Failed to filter record batch: {e}")))?; + writer.write(&filtered_batch)? + }; } writer.finish()?; Ok(writer.into_inner()) @@ -2767,7 +2786,8 @@ mod tests { vec![Arc::new(StringArray::from(vec!["string1", "string2"]))], )?; let data: Box = Box::new(ArrowEngineData::new(data)); - let json = to_json_bytes(Box::new(std::iter::once(Ok(data))))?; + let filtered_data = FilteredEngineData::with_all_rows_selected(data); + let json = to_json_bytes(Box::new(std::iter::once(Ok(filtered_data))))?; assert_eq!( json, "{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes() @@ -2775,6 +2795,73 @@ mod tests { Ok(()) } + #[test] + fn test_to_json_bytes_filters_data() -> DeltaResult<()> { + // Create test data with 4 rows + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "value", + ArrowDataType::Utf8, + true, + )])); + let record_batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(vec![ + "row0", "row1", "row2", "row3", + ]))], + )?; + + // Helper function to create EngineData from the same record batch + let create_engine_data = + || -> Box { Box::new(ArrowEngineData::new(record_batch.clone())) }; + + // Test case 1: All rows selected (should include all 4 rows) + let all_selected = + FilteredEngineData::try_new(create_engine_data(), vec![true, true, true, true])?; + let json_all = to_json_bytes(Box::new(std::iter::once(Ok(all_selected))))?; + assert_eq!( + json_all, + "{\"value\":\"row0\"}\n{\"value\":\"row1\"}\n{\"value\":\"row2\"}\n{\"value\":\"row3\"}\n".as_bytes() + ); + + // Test case 2: Only first and last rows selected (should include only 2 rows) + let partial_selected = + FilteredEngineData::try_new(create_engine_data(), vec![true, false, false, true])?; + let json_partial = to_json_bytes(Box::new(std::iter::once(Ok(partial_selected))))?; + assert_eq!( + json_partial, + "{\"value\":\"row0\"}\n{\"value\":\"row3\"}\n".as_bytes() + ); + + // Test case 3: Only middle rows selected (should include only 2 rows) + let middle_selected = + FilteredEngineData::try_new(create_engine_data(), vec![false, true, true, false])?; + let json_middle = to_json_bytes(Box::new(std::iter::once(Ok(middle_selected))))?; + assert_eq!( + json_middle, + "{\"value\":\"row1\"}\n{\"value\":\"row2\"}\n".as_bytes() + ); + + // Test case 4: No rows selected (should produce empty output) + let none_selected = + FilteredEngineData::try_new(create_engine_data(), vec![false, false, false, false])?; + let json_none = to_json_bytes(Box::new(std::iter::once(Ok(none_selected))))?; + assert_eq!(json_none, "".as_bytes()); + + // Test case 5: Only one row selected (should include only 1 row) + let one_selected = + FilteredEngineData::try_new(create_engine_data(), vec![false, true, false, false])?; + let json_one = to_json_bytes(Box::new(std::iter::once(Ok(one_selected))))?; + assert_eq!(json_one, "{\"value\":\"row1\"}\n".as_bytes()); + + // Test case 6: Only one row selected implicitly by short vector + let one_selected = + FilteredEngineData::try_new(create_engine_data(), vec![false, false, false])?; + let json_one = to_json_bytes(Box::new(std::iter::once(Ok(one_selected))))?; + assert_eq!(json_one, "{\"value\":\"row3\"}\n".as_bytes()); + + Ok(()) + } + #[test] fn test_arrow_broken_nested_null_masks() { use crate::arrow::datatypes::{DataType, Field, Schema}; diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index a86727184..a98fff491 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -21,6 +21,7 @@ use crate::engine::arrow_conversion::TryFromKernel as _; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::parse_json as arrow_parse_json; use crate::engine::arrow_utils::to_json_bytes; +use crate::engine_data::FilteredEngineData; use crate::schema::SchemaRef; use crate::{ DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, JsonHandler, PredicateRef, @@ -136,7 +137,7 @@ impl JsonHandler for DefaultJsonHandler { fn write_json_file( &self, path: &Url, - data: Box>> + Send + '_>, + data: Box> + Send + '_>, overwrite: bool, ) -> DeltaResult<()> { let buffer = to_json_bytes(data)?; @@ -774,7 +775,9 @@ mod tests { // First write with no existing file let data = create_test_data(vec!["remi", "wilson"])?; - let result = handler.write_json_file(&path, Box::new(std::iter::once(Ok(data))), overwrite); + let filtered_data = Ok(FilteredEngineData::with_all_rows_selected(data)); + let result = + handler.write_json_file(&path, Box::new(std::iter::once(filtered_data)), overwrite); // Verify the first write is successful assert!(result.is_ok()); @@ -783,7 +786,9 @@ mod tests { // Second write with existing file let data = create_test_data(vec!["seb", "tia"])?; - let result = handler.write_json_file(&path, Box::new(std::iter::once(Ok(data))), overwrite); + let filtered_data = Ok(FilteredEngineData::with_all_rows_selected(data)); + let result = + handler.write_json_file(&path, Box::new(std::iter::once(filtered_data)), overwrite); if overwrite { // Verify the second write is successful diff --git a/kernel/src/engine/mod.rs b/kernel/src/engine/mod.rs index 6290e6310..8453b6560 100644 --- a/kernel/src/engine/mod.rs +++ b/kernel/src/engine/mod.rs @@ -37,6 +37,7 @@ mod tests { use crate::arrow::array::{RecordBatch, StringArray}; use crate::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema}; use crate::engine::arrow_data::ArrowEngineData; + use crate::engine_data::FilteredEngineData; use crate::{Engine, EngineData}; use test_utils::delta_path_for_version; @@ -47,7 +48,11 @@ mod tests { engine_data: impl Fn() -> Box, ) { let json = engine.json_handler(); - let get_data = || Box::new(std::iter::once(Ok(engine_data()))); + let get_data = || { + let data = engine_data(); + let filtered_data = FilteredEngineData::with_all_rows_selected(data); + Box::new(std::iter::once(Ok(filtered_data))) + }; let expected_names: Vec = (1..4) .map(|i| delta_path_for_version(i, "json")) diff --git a/kernel/src/engine/sync/json.rs b/kernel/src/engine/sync/json.rs index ac14e65b7..f6a50f968 100644 --- a/kernel/src/engine/sync/json.rs +++ b/kernel/src/engine/sync/json.rs @@ -9,6 +9,7 @@ use super::read_files; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::parse_json as arrow_parse_json; use crate::engine::arrow_utils::to_json_bytes; +use crate::engine_data::FilteredEngineData; use crate::schema::SchemaRef; use crate::{ DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, JsonHandler, PredicateRef, @@ -52,7 +53,7 @@ impl JsonHandler for SyncJsonHandler { fn write_json_file( &self, path: &Url, - data: Box>> + Send + '_>, + data: Box> + Send + '_>, overwrite: bool, ) -> DeltaResult<()> { let path = path @@ -144,7 +145,9 @@ mod tests { // First write with no existing file let data = create_test_data(vec!["remi", "wilson"])?; - let result = handler.write_json_file(&url, Box::new(std::iter::once(Ok(data))), overwrite); + let filtered_data = Ok(FilteredEngineData::with_all_rows_selected(data)); + let result = + handler.write_json_file(&url, Box::new(std::iter::once(filtered_data)), overwrite); // Verify the first write is successful assert!(result.is_ok()); @@ -153,7 +156,9 @@ mod tests { // Second write with existing file let data = create_test_data(vec!["seb", "tia"])?; - let result = handler.write_json_file(&url, Box::new(std::iter::once(Ok(data))), overwrite); + let filtered_data = Ok(FilteredEngineData::with_all_rows_selected(data)); + let result = + handler.write_json_file(&url, Box::new(std::iter::once(filtered_data)), overwrite); if overwrite { // Verify the second write is successful diff --git a/kernel/src/engine_data.rs b/kernel/src/engine_data.rs index 18fb81813..7edb70b7c 100644 --- a/kernel/src/engine_data.rs +++ b/kernel/src/engine_data.rs @@ -12,19 +12,68 @@ use crate::{AsAny, DeltaResult, Error}; /// Engine data paired with a selection vector indicating which rows are logically selected. /// /// A value of `true` in the selection vector means the corresponding row is selected (i.e., not deleted), -/// while `false` means the row is logically deleted and should be ignored. +/// while `false` means the row is logically deleted and should be ignored. If the selection vector is shorter +/// then the number of rows in `data` then all rows not covered by the selection vector are assumed to be selected. /// /// Interpreting unselected (`false`) rows will result in incorrect/undefined behavior. pub struct FilteredEngineData { // The underlying engine data - pub data: Box, - // The selection vector where `true` marks rows to include in results - pub selection_vector: Vec, + data: Box, + // The selection vector where `true` marks rows to include in results. N.B. this selection + // vector may be less then `data.len()` and any gaps represent rows that are assumed to be selected. + selection_vector: Vec, +} + +impl FilteredEngineData { + pub fn try_new(data: Box, selection_vector: Vec) -> DeltaResult { + if selection_vector.len() > data.len() { + return Err(Error::InvalidSelectionVector(format!( + "Selection vector is larger than data length: {} > {}", + selection_vector.len(), + data.len() + ))); + } + Ok(Self { + data, + selection_vector, + }) + } + + /// Returns a reference to the underlying engine data. + pub fn data(&self) -> &dyn EngineData { + &*self.data + } + + /// Returns a reference to the selection vector. + pub fn selection_vector(&self) -> &[bool] { + &self.selection_vector + } + + /// Consumes the FilteredEngineData and returns the underlying data and selection vector. + pub fn into_parts(self) -> (Box, Vec) { + (self.data, self.selection_vector) + } + + /// Creates a new `FilteredEngineData` with all rows selected. + /// + /// This is a convenience method for the common case where you want to wrap + /// `EngineData` in `FilteredEngineData` without any filtering. + pub fn with_all_rows_selected(data: Box) -> Self { + Self { + data, + selection_vector: vec![], + } + } } impl HasSelectionVector for FilteredEngineData { /// Returns true if any row in the selection vector is marked as selected fn has_selected_rows(&self) -> bool { + // Per contract if selection is not as long as data then at least one row is selected. + if self.selection_vector.len() < self.data.len() { + return true; + } + self.selection_vector.contains(&true) } } @@ -315,3 +364,197 @@ pub trait EngineData: AsAny { columns: Vec, ) -> DeltaResult>; } + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array::{RecordBatch, StringArray}; + use crate::arrow::datatypes::{ + DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, + }; + use crate::engine::arrow_data::ArrowEngineData; + use std::sync::Arc; + + #[test] + fn test_with_all_rows_selected_empty_data() { + // Test with empty data + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "value", + ArrowDataType::Utf8, + true, + )])); + let record_batch = RecordBatch::try_new( + schema, + vec![Arc::new(StringArray::from(Vec::::new()))], + ) + .unwrap(); + let data: Box = Box::new(ArrowEngineData::new(record_batch)); + + let filtered_data = FilteredEngineData::with_all_rows_selected(data); + + assert_eq!(filtered_data.selection_vector().len(), 0); + assert!(filtered_data.selection_vector().is_empty()); + assert_eq!(filtered_data.data().len(), 0); + } + + #[test] + fn test_with_all_rows_selected_single_row() { + // Test with single row + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "value", + ArrowDataType::Utf8, + true, + )])); + let record_batch = RecordBatch::try_new( + schema, + vec![Arc::new(StringArray::from(vec!["single_row"]))], + ) + .unwrap(); + let data: Box = Box::new(ArrowEngineData::new(record_batch)); + + let filtered_data = FilteredEngineData::with_all_rows_selected(data); + + // According to the new contract, empty selection vector means all rows are selected + assert!(filtered_data.selection_vector().is_empty()); + assert_eq!(filtered_data.data().len(), 1); + assert!(filtered_data.has_selected_rows()); + } + + #[test] + fn test_with_all_rows_selected_multiple_rows() { + // Test with multiple rows + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "value", + ArrowDataType::Utf8, + true, + )])); + let record_batch = RecordBatch::try_new( + schema, + vec![Arc::new(StringArray::from(vec![ + "row1", "row2", "row3", "row4", + ]))], + ) + .unwrap(); + let data: Box = Box::new(ArrowEngineData::new(record_batch)); + + let filtered_data = FilteredEngineData::with_all_rows_selected(data); + + // According to the new contract, empty selection vector means all rows are selected + assert!(filtered_data.selection_vector().is_empty()); + assert_eq!(filtered_data.data().len(), 4); + assert!(filtered_data.has_selected_rows()); + } + + #[test] + fn test_has_selected_rows_empty_data() { + // Test with empty data + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "value", + ArrowDataType::Utf8, + true, + )])); + let record_batch = RecordBatch::try_new( + schema, + vec![Arc::new(StringArray::from(Vec::::new()))], + ) + .unwrap(); + let data: Box = Box::new(ArrowEngineData::new(record_batch)); + + let filtered_data = FilteredEngineData::try_new(data, vec![]).unwrap(); + + // Empty data should return false even with empty selection vector + assert!(!filtered_data.has_selected_rows()); + } + + #[test] + fn test_has_selected_rows_selection_vector_shorter_than_data() { + // Test with selection vector shorter than data length + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "value", + ArrowDataType::Utf8, + true, + )])); + let record_batch = RecordBatch::try_new( + schema, + vec![Arc::new(StringArray::from(vec!["row1", "row2", "row3"]))], + ) + .unwrap(); + let data: Box = Box::new(ArrowEngineData::new(record_batch)); + + // Selection vector with only 2 elements for 3 rows of data + let filtered_data = FilteredEngineData::try_new(data, vec![false, false]).unwrap(); + + // Should return true because selection vector is shorter than data + assert!(filtered_data.has_selected_rows()); + } + + #[test] + fn test_has_selected_rows_selection_vector_same_length_all_false() { + // Test with selection vector same length as data, all false + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "value", + ArrowDataType::Utf8, + true, + )])); + let record_batch = RecordBatch::try_new( + schema, + vec![Arc::new(StringArray::from(vec!["row1", "row2"]))], + ) + .unwrap(); + let data: Box = Box::new(ArrowEngineData::new(record_batch)); + + let filtered_data = FilteredEngineData::try_new(data, vec![false, false]).unwrap(); + + // Should return false because no rows are selected + assert!(!filtered_data.has_selected_rows()); + } + + #[test] + fn test_has_selected_rows_selection_vector_same_length_some_true() { + // Test with selection vector same length as data, some true + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "value", + ArrowDataType::Utf8, + true, + )])); + let record_batch = RecordBatch::try_new( + schema, + vec![Arc::new(StringArray::from(vec!["row1", "row2", "row3"]))], + ) + .unwrap(); + let data: Box = Box::new(ArrowEngineData::new(record_batch)); + + let filtered_data = FilteredEngineData::try_new(data, vec![true, false, true]).unwrap(); + + // Should return true because some rows are selected + assert!(filtered_data.has_selected_rows()); + } + + #[test] + fn test_try_new_selection_vector_larger_than_data() { + // Test with selection vector larger than data length - should return error + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "value", + ArrowDataType::Utf8, + true, + )])); + let record_batch = RecordBatch::try_new( + schema, + vec![Arc::new(StringArray::from(vec!["row1", "row2"]))], + ) + .unwrap(); + let data: Box = Box::new(ArrowEngineData::new(record_batch)); + + // Selection vector with 3 elements for 2 rows of data - should fail + let result = FilteredEngineData::try_new(data, vec![true, false, true]); + + // Should return an error + assert!(result.is_err()); + if let Err(e) = result { + assert!(e + .to_string() + .contains("Selection vector is larger than data length")); + assert!(e.to_string().contains("3 > 2")); + } + } +} diff --git a/kernel/src/error.rs b/kernel/src/error.rs index baf85757b..d42cd6de4 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -112,6 +112,10 @@ pub enum Error { #[error("Deletion Vector error: {0}")] DeletionVector(String), + /// A selection vector is larger than data length + #[error("Selection vector is larger than data length: {0}")] + InvalidSelectionVector(String), + /// A specified URL was invalid #[error("Invalid url: {0}")] InvalidUrl(#[from] url::ParseError), diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 228c3ed61..eca5ec0a6 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -149,6 +149,7 @@ pub mod history_manager; #[cfg(not(feature = "internal-api"))] pub(crate) mod history_manager; +pub use crate::engine_data::FilteredEngineData; pub use delta_kernel_derive; pub use engine_data::{EngineData, RowVisitor}; pub use error::{DeltaResult, Error}; @@ -602,7 +603,7 @@ pub trait JsonHandler: AsAny { fn write_json_file( &self, path: &Url, - data: Box>> + Send + '_>, + data: Box> + Send + '_>, overwrite: bool, ) -> DeltaResult<()>; } diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 3d37c3845..a82d123d2 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -336,11 +336,11 @@ impl LogReplayProcessor for ScanLogReplayProcessor { // TODO: Teach expression eval to respect the selection vector we just computed so carefully! let result = self.add_transform.evaluate(actions.as_ref())?; - Ok(ScanMetadata::new( + ScanMetadata::try_new( result, visitor.selection_vector, visitor.row_transform_exprs, - )) + ) } fn data_skipping_filter(&self) -> Option<&DataSkippingFilter> { diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 0c728edec..bed1c1b9a 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -341,24 +341,21 @@ pub struct ScanMetadata { } impl ScanMetadata { - fn new( + fn try_new( data: Box, selection_vector: Vec, scan_file_transforms: Vec>, - ) -> Self { - Self { - scan_files: FilteredEngineData { - data, - selection_vector, - }, + ) -> DeltaResult { + Ok(Self { + scan_files: FilteredEngineData::try_new(data, selection_vector)?, scan_file_transforms, - } + }) } } impl HasSelectionVector for ScanMetadata { fn has_selected_rows(&self) -> bool { - self.scan_files.selection_vector.contains(&true) + self.scan_files.selection_vector().contains(&true) } } @@ -979,7 +976,10 @@ pub(crate) mod test_utils { let mut batch_count = 0; for res in iter { let scan_metadata = res.unwrap(); - assert_eq!(scan_metadata.scan_files.selection_vector, expected_sel_vec); + assert_eq!( + scan_metadata.scan_files.selection_vector(), + expected_sel_vec + ); scan_metadata .visit_scan_files(context.clone(), validate_callback) .unwrap(); @@ -1251,12 +1251,12 @@ mod tests { .scan_metadata(engine.as_ref()) .unwrap() .map_ok(|ScanMetadata { scan_files, .. }| { - let batch: RecordBatch = ArrowEngineData::try_from_engine_data(scan_files.data) + let (underlying_data, selection_vector) = scan_files.into_parts(); + let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data) .unwrap() .into(); let filtered_batch = - filter_record_batch(&batch, &BooleanArray::from(scan_files.selection_vector)) - .unwrap(); + filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap(); Box::new(ArrowEngineData::from(filtered_batch)) as Box }) .try_collect() @@ -1287,11 +1287,11 @@ mod tests { .scan_metadata(engine.as_ref()) .unwrap() .map_ok(|ScanMetadata { scan_files, .. }| { - let batch: RecordBatch = ArrowEngineData::try_from_engine_data(scan_files.data) + let (underlying_data, selection_vector) = scan_files.into_parts(); + let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data) .unwrap() .into(); - filter_record_batch(&batch, &BooleanArray::from(scan_files.selection_vector)) - .unwrap() + filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap() }) .try_collect() .unwrap(); @@ -1311,11 +1311,11 @@ mod tests { .scan_metadata_from(engine.as_ref(), 0, files, None) .unwrap() .map_ok(|ScanMetadata { scan_files, .. }| { - let batch: RecordBatch = ArrowEngineData::try_from_engine_data(scan_files.data) + let (underlying_data, selection_vector) = scan_files.into_parts(); + let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data) .unwrap() .into(); - filter_record_batch(&batch, &BooleanArray::from(scan_files.selection_vector)) - .unwrap() + filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap() }) .try_collect() .unwrap(); diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index a2ac1dbfd..90a3422f8 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -156,11 +156,11 @@ impl ScanMetadata { pub fn visit_scan_files(&self, context: T, callback: ScanCallback) -> DeltaResult { let mut visitor = ScanFileVisitor { callback, - selection_vector: &self.scan_files.selection_vector, + selection_vector: self.scan_files.selection_vector(), transforms: &self.scan_file_transforms, context, }; - visitor.visit_rows_of(self.scan_files.data.as_ref())?; + visitor.visit_rows_of(self.scan_files.data())?; Ok(visitor.context) } } diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 88d34b658..982f572f3 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -9,6 +9,7 @@ use crate::actions::{ as_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, }; +use crate::engine_data::FilteredEngineData; use crate::error::Error; use crate::expressions::{ArrayData, Transform, UnaryExpressionOp::ToJson}; use crate::path::ParsedLogPath; @@ -219,8 +220,13 @@ impl Transaction { .chain(set_transaction_actions) .chain(domain_metadata_actions); + // Convert EngineData to FilteredEngineData with all rows selected + let filtered_actions = actions + .map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected)); + let json_handler = engine.json_handler(); - match json_handler.write_json_file(&commit_path.location, Box::new(actions), false) { + match json_handler.write_json_file(&commit_path.location, Box::new(filtered_actions), false) + { Ok(()) => Ok(CommitResult::Committed { version: commit_version, post_commit_stats: PostCommitStats { diff --git a/kernel/tests/log_compaction.rs b/kernel/tests/log_compaction.rs index d6bc12b7f..198a3879c 100644 --- a/kernel/tests/log_compaction.rs +++ b/kernel/tests/log_compaction.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use delta_kernel::engine::to_json_bytes; +use delta_kernel::engine_data::FilteredEngineData; use delta_kernel::schema::{DataType, StructField, StructType}; use delta_kernel::Snapshot; use test_utils::{create_table, engine_store_setup}; @@ -116,9 +117,11 @@ async fn action_reconciliation_round_trip() -> Result<(), Box