diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index 12223dfe6..adb41e75b 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -63,6 +63,7 @@ macro_rules! prim_array_cmp { pub(crate) use prim_array_cmp; type FieldIndex = usize; +type FlattenedRangeIterator = std::iter::Flatten>>; /// contains information about a StructField matched to a parquet struct field /// @@ -128,21 +129,36 @@ impl RowIndexBuilder { // filtering is not idempotent and `with_row_groups` could be called more than once. self.row_group_ordinals = Some(ordinals.to_vec()) } -} - -impl IntoIterator for RowIndexBuilder { - type Item = i64; - type IntoIter = std::iter::Flatten>>; - fn into_iter(self) -> Self::IntoIter { + /// Build an iterator of row indexes, filtering out row groups that were skipped. + /// + /// # Errors + /// + /// Returns an error if there are duplicate or out of bounds row group ordinals. + pub(crate) fn build(self) -> DeltaResult> { let starting_offsets = match self.row_group_ordinals { - Some(ordinals) => ordinals - .iter() - .map(|i| self.row_group_row_index_ranges[*i].clone()) - .collect(), + Some(ordinals) => { + let mut seen_ordinals = HashSet::new(); + ordinals + .iter() + .map(|&i| { + // We verify that there are no duplicate or out of bounds ordinals + if !seen_ordinals.insert(i) { + return Err(Error::generic("Found duplicate row group ordinal")); + } + // We have to clone here to avoid modifying the original vector in each iteration + self.row_group_row_index_ranges + .get(i) + .cloned() + .ok_or_else(|| { + Error::generic(format!("Row group ordinal {i} is out of bounds")) + }) + }) + .try_collect()? + } None => self.row_group_row_index_ranges, }; - starting_offsets.into_iter().flatten() + Ok(starting_offsets.into_iter().flatten()) } } @@ -153,7 +169,7 @@ impl IntoIterator for RowIndexBuilder { pub(crate) fn fixup_parquet_read( batch: RecordBatch, requested_ordering: &[ReorderIndex], - row_indexes: Option<&mut ::IntoIter>, + row_indexes: Option<&mut FlattenedRangeIterator>, ) -> DeltaResult where StructArray: Into, @@ -750,7 +766,7 @@ type FieldArrayOpt = Option<(Arc, Arc)>; pub(crate) fn reorder_struct_array( input_data: StructArray, requested_ordering: &[ReorderIndex], - mut row_indexes: Option<&mut ::IntoIter>, + mut row_indexes: Option<&mut FlattenedRangeIterator>, ) -> DeltaResult { debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}"); if !ordering_needs_transform(requested_ordering) { @@ -1153,6 +1169,41 @@ mod tests { ]) } + /// Helper function to create mock row group metadata for testing + fn create_mock_row_group(num_rows: i64) -> RowGroupMetaData { + use crate::parquet::basic::{Encoding, Type as PhysicalType}; + use crate::parquet::file::metadata::ColumnChunkMetaData; + use crate::parquet::schema::types::Type; + + // Create a minimal schema descriptor + let schema = Arc::new(SchemaDescriptor::new(Arc::new( + Type::group_type_builder("schema") + .with_fields(vec![Arc::new( + Type::primitive_type_builder("test_col", PhysicalType::INT32) + .build() + .unwrap(), + )]) + .build() + .unwrap(), + ))); + + // Create a minimal column chunk metadata + let column_chunk = ColumnChunkMetaData::builder(schema.column(0)) + .set_encodings(vec![Encoding::PLAIN]) + .set_total_compressed_size(100) + .set_total_uncompressed_size(100) + .set_num_values(num_rows) + .build() + .unwrap(); + + RowGroupMetaData::builder(schema) + .set_num_rows(num_rows) + .set_total_byte_size(100) + .set_column_metadata(vec![column_chunk]) + .build() + .unwrap() + } + #[test] fn test_json_parsing() { let requested_schema = Arc::new(ArrowSchema::new(vec![ @@ -1783,6 +1834,103 @@ mod tests { assert_eq!(reorder_indices, expect_reorder); } + #[test] + fn test_row_index_builder_no_skipping() { + let row_groups = vec![ + create_mock_row_group(5), // 5 rows: indexes 0-4 + create_mock_row_group(3), // 3 rows: indexes 5-7 + create_mock_row_group(4), // 4 rows: indexes 8-11 + ]; + + let builder = RowIndexBuilder::new(&row_groups); + let row_indexes: Vec = builder.build().unwrap().collect(); + + // Should produce consecutive indexes from 0 to 11 + assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + } + + #[test] + fn test_row_index_builder_with_skipping() { + let row_groups = vec![ + create_mock_row_group(5), // 5 rows: indexes 0-4 + create_mock_row_group(3), // 3 rows: indexes 5-7 (will be skipped) + create_mock_row_group(4), // 4 rows: indexes 8-11 + create_mock_row_group(2), // 2 rows: indexes 12-13 (will be skipped) + ]; + + let mut builder = RowIndexBuilder::new(&row_groups); + builder.select_row_groups(&[0, 2]); + + let row_indexes: Vec = builder.build().unwrap().collect(); + + // Should produce indexes from row groups 0 and 2: [0-4] and [8-11] + assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 8, 9, 10, 11]); + } + + #[test] + fn test_row_index_builder_single_row_group() { + let row_groups = vec![create_mock_row_group(7)]; + + let mut builder = RowIndexBuilder::new(&row_groups); + builder.select_row_groups(&[0]); + + let row_indexes: Vec = builder.build().unwrap().collect(); + + assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 5, 6]); + } + + #[test] + fn test_row_index_builder_empty_selection() { + let row_groups = vec![create_mock_row_group(3), create_mock_row_group(2)]; + + let mut builder = RowIndexBuilder::new(&row_groups); + builder.select_row_groups(&[]); + + let row_indexes: Vec = builder.build().unwrap().collect(); + + // Should produce no indexes + assert_eq!(row_indexes, Vec::::new()); + } + + #[test] + fn test_row_index_builder_out_of_order_selection() { + let row_groups = vec![ + create_mock_row_group(2), // 2 rows: indexes 0-1 + create_mock_row_group(3), // 3 rows: indexes 2-4 + create_mock_row_group(1), // 1 row: index 5 + ]; + + let mut builder = RowIndexBuilder::new(&row_groups); + builder.select_row_groups(&[2, 0]); + + let row_indexes: Vec = builder.build().unwrap().collect(); + + // Should produce indexes in the order specified: group 2 first, then group 0 + assert_eq!(row_indexes, vec![5, 0, 1]); + } + + #[test] + fn test_row_index_builder_out_of_bounds_row_group_ordinals() { + let row_groups = vec![create_mock_row_group(2)]; + + let mut builder = RowIndexBuilder::new(&row_groups); + builder.select_row_groups(&[1]); + + let result = builder.build(); + assert_result_error_with_message(result, "Row group ordinal 1 is out of bounds"); + } + + #[test] + fn test_row_index_builder_duplicate_row_group_ordinals() { + let row_groups = vec![create_mock_row_group(2), create_mock_row_group(3)]; + + let mut builder = RowIndexBuilder::new(&row_groups); + builder.select_row_groups(&[1, 1]); + + let result = builder.build(); + assert_result_error_with_message(result, "Found duplicate row group ordinal"); + } + #[test] fn nested_indices() { column_mapping_cases().into_iter().for_each(|mode| { diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 7609510e0..fa77b3294 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -338,7 +338,7 @@ impl FileOpener for ParquetOpener { builder = builder.with_limit(limit) } - let mut row_indexes = row_indexes.map(|rb| rb.into_iter()); + let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?; let stream = builder.with_batch_size(batch_size).build()?; let stream = stream.map(move |rbr| { @@ -416,7 +416,7 @@ impl FileOpener for PresignedUrlOpener { let reader = builder.with_batch_size(batch_size).build()?; - let mut row_indexes = row_indexes.map(|rb| rb.into_iter()); + let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?; let stream = futures::stream::iter(reader); let stream = stream.map(move |rbr| { fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut()) diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index 65795571d..d170a2334 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -38,7 +38,7 @@ fn try_create_from_parquet( builder = builder.with_row_group_filter(predicate.as_ref(), row_indexes.as_mut()); } - let mut row_indexes = row_indexes.map(|rb| rb.into_iter()); + let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?; let stream = builder.build()?; Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut()))) } diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 817c85cbc..91fb2f052 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -3,9 +3,10 @@ use std::path::PathBuf; use std::sync::Arc; use delta_kernel::actions::deletion_vector::split_vector; -use delta_kernel::arrow::array::AsArray as _; +use delta_kernel::arrow::array::{AsArray as _, BooleanArray}; use delta_kernel::arrow::compute::{concat_batches, filter_record_batch}; use delta_kernel::arrow::datatypes::{Int64Type, Schema as ArrowSchema}; +use delta_kernel::arrow::record_batch::RecordBatch; use delta_kernel::engine::arrow_conversion::TryFromKernel as _; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; @@ -14,7 +15,7 @@ use delta_kernel::expressions::{ }; use delta_kernel::parquet::file::properties::{EnabledStatistics, WriterProperties}; use delta_kernel::scan::state::{transform_to_logical, DvInfo, Stats}; -use delta_kernel::scan::Scan; +use delta_kernel::scan::{Scan, ScanResult}; use delta_kernel::schema::{DataType, MetadataColumnSpec, Schema, StructField, StructType}; use delta_kernel::{Engine, FileMeta, Snapshot}; @@ -33,6 +34,23 @@ const PARQUET_FILE1: &str = "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c00 const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet"; const PARQUET_FILE3: &str = "part-00002-c506e79a-0bf8-4e2b-a42b-9731b2e490ff-c000.snappy.parquet"; +/// Helper function to extract filtered data from a scan result, respecting row masks +fn extract_record_batch( + scan_result: ScanResult, +) -> Result> { + let mask = scan_result.full_mask(); + let record_batch = into_record_batch(scan_result.raw_data?); + + if let Some(mask) = mask { + Ok(filter_record_batch( + &record_batch, + &BooleanArray::from(mask), + )?) + } else { + Ok(record_batch) + } +} + #[tokio::test] async fn single_commit_two_add_files() -> Result<(), Box> { let batch = generate_simple_batch()?; @@ -1382,24 +1400,18 @@ async fn test_row_index_metadata_column() -> Result<(), Box Result<(), Box