From 465784a766c86f365ffc7aa8b446c77fb042345d Mon Sep 17 00:00:00 2001 From: Lennart Behme <44500208+lbhm@users.noreply.github.com> Date: Tue, 9 Sep 2025 11:47:25 +0200 Subject: [PATCH] feat!: Introduce RowIndex metadata column Address review comments Address 2nd review comments Make row indexes optional --- kernel/src/engine/arrow_utils.rs | 277 +++++++++++++++--- kernel/src/engine/default/parquet.rs | 29 +- .../src/engine/parquet_row_group_skipping.rs | 31 +- kernel/src/engine/sync/parquet.rs | 17 +- kernel/tests/read.rs | 194 +++++++++++- 5 files changed, 491 insertions(+), 57 deletions(-) diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index 3c120d3d7..12223dfe6 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::io::{BufRead, BufReader}; +use std::ops::Range; use std::sync::{Arc, OnceLock}; use crate::engine::arrow_conversion::{TryFromKernel as _, TryIntoArrow as _}; @@ -9,23 +10,24 @@ use crate::engine::ensure_data_types::DataTypeCompat; use crate::schema::{ColumnMetadataKey, MetadataValue}; use crate::{ engine::arrow_data::ArrowEngineData, - schema::{DataType, Schema, SchemaRef, StructField, StructType}, + schema::{DataType, MetadataColumnSpec, Schema, SchemaRef, StructField, StructType}, utils::require, DeltaResult, EngineData, Error, }; use crate::arrow::array::{ cast::AsArray, make_array, new_null_array, Array as ArrowArray, GenericListArray, MapArray, - OffsetSizeTrait, RecordBatch, StringArray, StructArray, + OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray, }; use crate::arrow::buffer::NullBuffer; use crate::arrow::compute::concat_batches; use crate::arrow::datatypes::{ DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, - Fields as ArrowFields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + Fields as ArrowFields, Int64Type, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use crate::arrow::json::{LineDelimitedWriter, ReaderBuilder}; use crate::parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use crate::parquet::file::metadata::RowGroupMetaData; use crate::parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor}; use delta_kernel_derive::internal_api; use itertools::Itertools; @@ -98,17 +100,65 @@ pub(crate) fn make_arrow_error(s: impl Into) -> Error { .with_backtrace() } +/// Prepares to enumerate row indexes of rows in a parquet file, accounting for row group skipping. +pub(crate) struct RowIndexBuilder { + row_group_row_index_ranges: Vec>, + row_group_ordinals: Option>, +} + +impl RowIndexBuilder { + pub(crate) fn new(row_groups: &[RowGroupMetaData]) -> Self { + let mut row_group_row_index_ranges = vec![]; + let mut offset = 0; + for row_group in row_groups { + let num_rows = row_group.num_rows(); + row_group_row_index_ranges.push(offset..offset + num_rows); + offset += num_rows; + } + Self { + row_group_row_index_ranges, + row_group_ordinals: None, + } + } + + /// Only produce row indexes for the row groups specified by the ordinals that survived row + /// group skipping. The ordinals must be in 0..num_row_groups. + pub(crate) fn select_row_groups(&mut self, ordinals: &[usize]) { + // NOTE: Don't apply the filtering until we actually build the iterator, because the + // filtering is not idempotent and `with_row_groups` could be called more than once. + self.row_group_ordinals = Some(ordinals.to_vec()) + } +} + +impl IntoIterator for RowIndexBuilder { + type Item = i64; + type IntoIter = std::iter::Flatten>>; + + fn into_iter(self) -> Self::IntoIter { + let starting_offsets = match self.row_group_ordinals { + Some(ordinals) => ordinals + .iter() + .map(|i| self.row_group_row_index_ranges[*i].clone()) + .collect(), + None => self.row_group_row_index_ranges, + }; + starting_offsets.into_iter().flatten() + } +} + /// Applies post-processing to data read from parquet files. This includes `reorder_struct_array` to /// ensure schema compatibility, as well as `fix_nested_null_masks` to ensure that leaf columns have /// accurate null masks that row visitors rely on for correctness. +/// `row_indexes` are passed through to `reorder_struct_array`. pub(crate) fn fixup_parquet_read( batch: RecordBatch, requested_ordering: &[ReorderIndex], + row_indexes: Option<&mut ::IntoIter>, ) -> DeltaResult where StructArray: Into, { - let data = reorder_struct_array(batch.into(), requested_ordering)?; + let data = reorder_struct_array(batch.into(), requested_ordering, row_indexes)?; let data = fix_nested_null_masks(data); Ok(data.into()) } @@ -236,6 +286,8 @@ pub(crate) enum ReorderIndexTransform { Identity, /// Data is missing, fill in with a null column Missing(ArrowFieldRef), + /// Row index column requested, compute it + RowIndex(ArrowFieldRef), } impl ReorderIndex { @@ -259,12 +311,18 @@ impl ReorderIndex { ReorderIndex::new(index, ReorderIndexTransform::Missing(field)) } + fn row_index(index: usize, field: ArrowFieldRef) -> Self { + ReorderIndex::new(index, ReorderIndexTransform::RowIndex(field)) + } + /// Check if this reordering requires a transformation anywhere. See comment below on /// [`ordering_needs_transform`] to understand why this is needed. fn needs_transform(&self) -> bool { match self.transform { - // if we're casting or inserting null, we need to transform - ReorderIndexTransform::Cast(_) | ReorderIndexTransform::Missing(_) => true, + // if we're casting, inserting null, or generating row index, we need to transform + ReorderIndexTransform::Cast(_) + | ReorderIndexTransform::Missing(_) + | ReorderIndexTransform::RowIndex(_) => true, // if our nested ordering needs a transform, we need a transform ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children), // no transform needed @@ -508,27 +566,35 @@ fn get_indices( } if found_fields.len() != requested_schema.num_fields() { - // some fields are missing, but they might be nullable, need to insert them into the reorder_indices + // some fields are missing, but they might be nullable or metadata columns, need to insert them into the reorder_indices for (requested_position, field) in requested_schema.fields().enumerate() { if !found_fields.contains(field.name()) { - if let Some(metadata_spec) = field.get_metadata_column_spec() { - // We don't support reading any metadata columns yet - // TODO: Implement row index support for the Parquet reader - return Err(Error::Generic(format!( - "Metadata column {metadata_spec:?} is not supported by the default parquet reader" - ))); - } - if field.nullable { - debug!("Inserting missing and nullable field: {}", field.name()); - reorder_indices.push(ReorderIndex::missing( - requested_position, - Arc::new(field.try_into_arrow()?), - )); - } else { - return Err(Error::Generic(format!( - "Requested field not found in parquet schema, and field is not nullable: {}", - field.name() - ))); + match field.get_metadata_column_spec() { + Some(MetadataColumnSpec::RowIndex) => { + debug!("Inserting a row index column: {}", field.name()); + reorder_indices.push(ReorderIndex::row_index( + requested_position, + Arc::new(field.try_into_arrow()?), + )); + } + Some(metadata_spec) => { + return Err(Error::Generic(format!( + "Metadata column {metadata_spec:?} is not supported by the default parquet reader" + ))); + } + None if field.nullable => { + debug!("Inserting missing and nullable field: {}", field.name()); + reorder_indices.push(ReorderIndex::missing( + requested_position, + Arc::new(field.try_into_arrow()?), + )); + } + None => { + return Err(Error::Generic(format!( + "Requested field not found in parquet schema, and field is not nullable: {}", + field.name() + ))); + } } } } @@ -642,7 +708,7 @@ pub(crate) fn generate_mask( )) } -/// Check if an ordering requires transforming the data in any way. This is true if the indices are +/// Check if an ordering requires transforming the data in any way. This is true if the indices are /// NOT in ascending order (so we have to reorder things), or if we need to do any transformation on /// the data read from parquet. We check the ordering here, and also call /// `ReorderIndex::needs_transform` on each element to check for other transforms, and to check @@ -662,15 +728,29 @@ fn ordering_needs_transform(requested_ordering: &[ReorderIndex]) -> bool { .any(|ri| (ri[0].index >= ri[1].index) || ri[1].needs_transform()) } +/// Check if an ordering requires row index computation. +/// +/// The function only checks if a RowIndex transform is present at the top-level, since metadata +/// columns are not allowed to be nested. +pub(crate) fn ordering_needs_row_indexes(requested_ordering: &[ReorderIndex]) -> bool { + requested_ordering + .iter() + .any(|reorder_index| matches!(&reorder_index.transform, ReorderIndexTransform::RowIndex(_))) +} + // we use this as a placeholder for an array and its associated field. We can fill in a Vec of None // of this type and then set elements of the Vec to Some(FieldArrayOpt) for each column type FieldArrayOpt = Option<(Arc, Arc)>; /// Reorder a RecordBatch to match `requested_ordering`. For each non-zero value in -/// `requested_ordering`, the column at that index will be added in order to returned batch +/// `requested_ordering`, the column at that index will be added in order to the returned batch. +/// +/// If the requested ordering contains a [`ReorderIndexTransform::RowIndex`], `row_indexes` +/// must not be `None` to append a row index column to the output. pub(crate) fn reorder_struct_array( input_data: StructArray, requested_ordering: &[ReorderIndex], + mut row_indexes: Option<&mut ::IntoIter>, ) -> DeltaResult { debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}"); if !ordering_needs_transform(requested_ordering) { @@ -704,8 +784,11 @@ pub(crate) fn reorder_struct_array( match input_cols[parquet_position].data_type() { ArrowDataType::Struct(_) => { let struct_array = input_cols[parquet_position].as_struct().clone(); - let result_array = - Arc::new(reorder_struct_array(struct_array, children)?); + let result_array = Arc::new(reorder_struct_array( + struct_array, + children, + None, // Nested structures don't need row indexes since metadata columns can't be nested + )?); // create the new field specifying the correct order for the struct let new_field = Arc::new(ArrowField::new_struct( input_field_name, @@ -748,6 +831,23 @@ pub(crate) fn reorder_struct_array( let field = field.clone(); // cheap Arc clone final_fields_cols[reorder_index.index] = Some((field, null_array)); } + ReorderIndexTransform::RowIndex(field) => { + let Some(ref mut row_index_iter) = row_indexes else { + return Err(Error::generic( + "Row index column requested but row index iterator not provided", + )); + }; + let row_index_array: PrimitiveArray = + row_index_iter.take(num_rows).collect(); + require!( + row_index_array.len() == num_rows, + Error::internal_error( + "Row index iterator exhausted before reaching the end of the file" + ) + ); + final_fields_cols[reorder_index.index] = + Some((Arc::clone(field), Arc::new(row_index_array))); + } } } let num_cols = final_fields_cols.len(); @@ -773,7 +873,11 @@ fn reorder_list( let (list_field, offset_buffer, maybe_sa, null_buf) = list_array.into_parts(); if let Some(struct_array) = maybe_sa.as_struct_opt() { let struct_array = struct_array.clone(); - let result_array = Arc::new(reorder_struct_array(struct_array, children)?); + let result_array = Arc::new(reorder_struct_array( + struct_array, + children, + None, // Nested structures don't need row indexes since metadata columns can't be nested + )?); let new_list_field = Arc::new(ArrowField::new_struct( list_field.name(), result_array.fields().clone(), @@ -804,7 +908,11 @@ fn reorder_map( children: &[ReorderIndex], ) -> DeltaResult { let (map_field, offset_buffer, struct_array, null_buf, ordered) = map_array.into_parts(); - let result_array = reorder_struct_array(struct_array, children)?; + let result_array = reorder_struct_array( + struct_array, + children, + None, // Nested structures don't need row indexes since metadata columns can't be nested + )?; let result_fields = result_array.fields(); let new_map_field = Arc::new(ArrowField::new_struct( map_field.name(), @@ -1576,6 +1684,105 @@ mod tests { assert_eq!(matched_fields[2].parquet_field.name(), "another_field"); } + #[test] + fn test_ordering_needs_row_indexes() { + // Test case 1: No row index needed + let ordering_no_row_index = vec![ + ReorderIndex::identity(0), + ReorderIndex::cast(1, ArrowDataType::Int64), + ReorderIndex::missing( + 2, + Arc::new(ArrowField::new("missing", ArrowDataType::Utf8, true)), + ), + ]; + assert!(!ordering_needs_row_indexes(&ordering_no_row_index)); + + // Test case 2: Row index needed at top level + let ordering_with_row_index = vec![ + ReorderIndex::identity(0), + ReorderIndex::row_index( + 1, + Arc::new(ArrowField::new("row_idx", ArrowDataType::Int64, false)), + ), + ]; + assert!(ordering_needs_row_indexes(&ordering_with_row_index)); + + // Test case 3: Empty ordering + assert!(!ordering_needs_row_indexes(&[])); + } + + #[test] + fn test_reorder_struct_array_missing_row_indexes() { + // Test that we get a proper error when row indexes are needed but not provided + let arry = make_struct_array(); + let reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::row_index( + 1, + Arc::new(ArrowField::new("row_idx", ArrowDataType::Int64, false)), + ), + ]; + + let result = reorder_struct_array(arry, &reorder, None); + assert_result_error_with_message( + result, + "Row index column requested but row index iterator not provided", + ); + } + + #[test] + fn test_reorder_struct_array_with_row_indexes() { + // Test that row indexes work when properly provided + let arry = make_struct_array(); + let reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::row_index( + 1, + Arc::new(ArrowField::new("row_idx", ArrowDataType::Int64, false)), + ), + ]; + + // Create a mock row index iterator + #[allow(clippy::single_range_in_vec_init)] + let mut row_indexes = vec![(0..4)].into_iter().flatten(); + + let ordered = reorder_struct_array(arry, &reorder, Some(&mut row_indexes)).unwrap(); + assert_eq!(ordered.column_names(), vec!["b", "row_idx"]); + + // Verify the row index column contains the expected values + let row_idx_col = ordered.column(1).as_primitive::(); + assert_eq!(row_idx_col.values(), &[0, 1, 2, 3]); + } + + #[test] + fn simple_row_index_field() { + let requested_schema = Arc::new(StructType::new_unchecked([ + StructField::not_null("i", DataType::INTEGER), + StructField::create_metadata_column("my_row_index", MetadataColumnSpec::RowIndex), + StructField::nullable("i2", DataType::INTEGER), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new("i2", ArrowDataType::Int32, true), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1]; + let mut arrow_row_index_field = + ArrowField::new("my_row_index", ArrowDataType::Int64, false); + arrow_row_index_field.set_metadata(HashMap::from([( + "delta.metadataSpec".to_string(), + "row_index".to_string(), + )])); + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::identity(2), + ReorderIndex::row_index(1, Arc::new(arrow_row_index_field)), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + #[test] fn nested_indices() { column_mapping_cases().into_iter().for_each(|mode| { @@ -2156,7 +2363,7 @@ mod tests { fn simple_reorder_struct() { let arry = make_struct_array(); let reorder = vec![ReorderIndex::identity(1), ReorderIndex::identity(0)]; - let ordered = reorder_struct_array(arry, &reorder).unwrap(); + let ordered = reorder_struct_array(arry, &reorder, None).unwrap(); assert_eq!(ordered.column_names(), vec!["c", "b"]); } @@ -2204,7 +2411,7 @@ mod tests { ], ), ]; - let ordered = reorder_struct_array(nested, &reorder).unwrap(); + let ordered = reorder_struct_array(nested, &reorder, None).unwrap(); assert_eq!(ordered.column_names(), vec!["struct2", "struct1"]); let ordered_s2 = ordered.column(0).as_struct(); assert_eq!(ordered_s2.column_names(), vec!["b", "c", "s"]); @@ -2251,7 +2458,7 @@ mod tests { 0, vec![ReorderIndex::identity(1), ReorderIndex::identity(0)], )]; - let ordered = reorder_struct_array(struct_array, &reorder).unwrap(); + let ordered = reorder_struct_array(struct_array, &reorder, None).unwrap(); let ordered_list_col = ordered.column(0).as_list::(); for i in 0..ordered_list_col.len() { let array_item = ordered_list_col.value(i); @@ -2317,7 +2524,7 @@ mod tests { ], ), ]; - let ordered = reorder_struct_array(struct_array, &reorder).unwrap(); + let ordered = reorder_struct_array(struct_array, &reorder, None).unwrap(); assert_eq!(ordered.column_names(), vec!["map", "i"]); if let ArrowDataType::Map(field, _) = ordered.column(0).data_type() { if let ArrowDataType::Struct(fields) = field.data_type() { diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 017fb5bfb..7609510e0 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -21,7 +21,10 @@ use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; use super::UrlExt; use crate::engine::arrow_conversion::TryIntoArrow as _; use crate::engine::arrow_data::ArrowEngineData; -use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices}; +use crate::engine::arrow_utils::{ + fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes, + RowIndexBuilder, +}; use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; @@ -323,16 +326,24 @@ impl FileOpener for ParquetOpener { builder = builder.with_projection(mask) } + // Only create RowIndexBuilder if row indexes are actually needed + let mut row_indexes = ordering_needs_row_indexes(&requested_ordering) + .then(|| RowIndexBuilder::new(builder.metadata().row_groups())); + + // Filter row groups and row indexes if a predicate is provided if let Some(ref predicate) = predicate { - builder = builder.with_row_group_filter(predicate); + builder = builder.with_row_group_filter(predicate, row_indexes.as_mut()); } if let Some(limit) = limit { builder = builder.with_limit(limit) } + let mut row_indexes = row_indexes.map(|rb| rb.into_iter()); let stream = builder.with_batch_size(batch_size).build()?; - let stream = stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering)); + let stream = stream.map(move |rbr| { + fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut()) + }); Ok(stream.boxed()) })) } @@ -391,8 +402,13 @@ impl FileOpener for PresignedUrlOpener { builder = builder.with_projection(mask) } + // Only create RowIndexBuilder if row indexes are actually needed + let mut row_indexes = ordering_needs_row_indexes(&requested_ordering) + .then(|| RowIndexBuilder::new(builder.metadata().row_groups())); + + // Filter row groups and row indexes if a predicate is provided if let Some(ref predicate) = predicate { - builder = builder.with_row_group_filter(predicate); + builder = builder.with_row_group_filter(predicate, row_indexes.as_mut()); } if let Some(limit) = limit { builder = builder.with_limit(limit) @@ -400,8 +416,11 @@ impl FileOpener for PresignedUrlOpener { let reader = builder.with_batch_size(batch_size).build()?; + let mut row_indexes = row_indexes.map(|rb| rb.into_iter()); let stream = futures::stream::iter(reader); - let stream = stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering)); + let stream = stream.map(move |rbr| { + fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut()) + }); Ok(stream.boxed()) })) } diff --git a/kernel/src/engine/parquet_row_group_skipping.rs b/kernel/src/engine/parquet_row_group_skipping.rs index fb965d5b5..bdef0e13f 100644 --- a/kernel/src/engine/parquet_row_group_skipping.rs +++ b/kernel/src/engine/parquet_row_group_skipping.rs @@ -1,4 +1,5 @@ //! An implementation of parquet row group skipping using data skipping predicates over footer stats. +use crate::engine::arrow_utils::RowIndexBuilder; use crate::expressions::{ColumnName, DecimalData, Predicate, Scalar}; use crate::kernel_predicates::parquet_stats_skipping::ParquetStatsProvider; use crate::parquet::arrow::arrow_reader::ArrowReaderBuilder; @@ -17,22 +18,36 @@ mod tests; pub(crate) trait ParquetRowGroupSkipping { /// Instructs the parquet reader to perform row group skipping, eliminating any row group whose /// stats prove that none of the group's rows can satisfy the given `predicate`. - fn with_row_group_filter(self, predicate: &Predicate) -> Self; + /// + /// If a [`RowIndexBuilder`] is provided, it will be updated to only include row indices of the + /// row groups that survived the filter. + fn with_row_group_filter( + self, + predicate: &Predicate, + row_indexes: Option<&mut RowIndexBuilder>, + ) -> Self; } impl ParquetRowGroupSkipping for ArrowReaderBuilder { - fn with_row_group_filter(self, predicate: &Predicate) -> Self { - let indices = self + fn with_row_group_filter( + self, + predicate: &Predicate, + row_indexes: Option<&mut RowIndexBuilder>, + ) -> Self { + let ordinals: Vec<_> = self .metadata() .row_groups() .iter() .enumerate() - .filter_map(|(index, row_group)| { - // If the group survives the filter, return Some(index) so filter_map keeps it. - RowGroupFilter::apply(row_group, predicate).then_some(index) + .filter_map(|(ordinal, row_group)| { + // If the group survives the filter, return Some(ordinal) so filter_map keeps it. + RowGroupFilter::apply(row_group, predicate).then_some(ordinal) }) .collect(); - debug!("with_row_group_filter({predicate:#?}) = {indices:?})"); - self.with_row_groups(indices) + debug!("with_row_group_filter({predicate:#?}) = {ordinals:?})"); + if let Some(row_indexes) = row_indexes { + row_indexes.select_row_groups(&ordinals); + } + self.with_row_groups(ordinals) } } diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index 65b7c0f73..65795571d 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -5,7 +5,10 @@ use crate::parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatc use super::read_files; use crate::engine::arrow_data::ArrowEngineData; -use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices}; +use crate::engine::arrow_utils::{ + fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes, + RowIndexBuilder, +}; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; use crate::{DeltaResult, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef}; @@ -25,11 +28,19 @@ fn try_create_from_parquet( if let Some(mask) = generate_mask(&schema, parquet_schema, builder.parquet_schema(), &indices) { builder = builder.with_projection(mask); } + + // Only create RowIndexBuilder if row indexes are actually needed + let mut row_indexes = ordering_needs_row_indexes(&requested_ordering) + .then(|| RowIndexBuilder::new(builder.metadata().row_groups())); + + // Filter row groups and row indexes if a predicate is provided if let Some(predicate) = predicate { - builder = builder.with_row_group_filter(predicate.as_ref()); + builder = builder.with_row_group_filter(predicate.as_ref(), row_indexes.as_mut()); } + + let mut row_indexes = row_indexes.map(|rb| rb.into_iter()); let stream = builder.build()?; - Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering))) + Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut()))) } impl ParquetHandler for SyncParquetHandler { diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index cdd194d5d..817c85cbc 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -3,8 +3,10 @@ use std::path::PathBuf; use std::sync::Arc; use delta_kernel::actions::deletion_vector::split_vector; +use delta_kernel::arrow::array::AsArray as _; use delta_kernel::arrow::compute::{concat_batches, filter_record_batch}; -use delta_kernel::arrow::datatypes::Schema as ArrowSchema; +use delta_kernel::arrow::datatypes::{Int64Type, Schema as ArrowSchema}; +use delta_kernel::engine::arrow_conversion::TryFromKernel as _; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::expressions::{ @@ -13,23 +15,23 @@ use delta_kernel::expressions::{ use delta_kernel::parquet::file::properties::{EnabledStatistics, WriterProperties}; use delta_kernel::scan::state::{transform_to_logical, DvInfo, Stats}; use delta_kernel::scan::Scan; -use delta_kernel::schema::{DataType, Schema}; +use delta_kernel::schema::{DataType, MetadataColumnSpec, Schema, StructField, StructType}; use delta_kernel::{Engine, FileMeta, Snapshot}; + use itertools::Itertools; use object_store::{memory::InMemory, path::Path, ObjectStore}; use test_utils::{ actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch, - record_batch_to_bytes, record_batch_to_bytes_with_props, IntoArray, TestAction, METADATA, + load_test_data, read_scan, record_batch_to_bytes, record_batch_to_bytes_with_props, to_arrow, + IntoArray, TestAction, METADATA, }; use url::Url; mod common; -use delta_kernel::engine::arrow_conversion::TryFromKernel as _; -use test_utils::{load_test_data, read_scan, to_arrow}; - const PARQUET_FILE1: &str = "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet"; const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet"; +const PARQUET_FILE3: &str = "part-00002-c506e79a-0bf8-4e2b-a42b-9731b2e490ff-c000.snappy.parquet"; #[tokio::test] async fn single_commit_two_add_files() -> Result<(), Box> { @@ -1350,3 +1352,183 @@ fn unshredded_variant_table() -> Result<(), Box> { let test_path = test_dir.path().join(test_name); read_table_data_str(test_path.to_str().unwrap(), None, None, expected) } + +#[tokio::test] +async fn test_row_index_metadata_column() -> Result<(), Box> { + // Setup up an in-memory table with different numbers of rows in each file + let batch1 = generate_batch(vec![ + ("id", vec![1i32, 2, 3, 4, 5].into_array()), + ("value", vec!["a", "b", "c", "d", "e"].into_array()), + ])?; + let batch2 = generate_batch(vec![ + ("id", vec![10i32, 20, 30].into_array()), + ("value", vec!["x", "y", "z"].into_array()), + ])?; + let batch3 = generate_batch(vec![ + ("id", vec![100i32, 200, 300, 400].into_array()), + ("value", vec!["p", "q", "r", "s"].into_array()), + ])?; + + let storage = Arc::new(InMemory::new()); + add_commit( + storage.as_ref(), + 0, + actions_to_string(vec![ + TestAction::Metadata, + TestAction::Add(PARQUET_FILE1.to_string()), + TestAction::Add(PARQUET_FILE2.to_string()), + TestAction::Add(PARQUET_FILE3.to_string()), + ]), + ) + .await?; + + storage + .put( + &Path::from(PARQUET_FILE1), + record_batch_to_bytes(&batch1).into(), + ) + .await?; + storage + .put( + &Path::from(PARQUET_FILE2), + record_batch_to_bytes(&batch2).into(), + ) + .await?; + storage + .put( + &Path::from(PARQUET_FILE3), + record_batch_to_bytes(&batch3).into(), + ) + .await?; + + let location = Url::parse("memory:///")?; + let engine = Arc::new(DefaultEngine::new( + storage.clone(), + Arc::new(TokioBackgroundExecutor::new()), + )); + + // Create a schema that includes a row index metadata column + let schema = Arc::new(StructType::try_new([ + StructField::nullable("id", DataType::INTEGER), + StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex), + StructField::nullable("value", DataType::STRING), + ])?); + + let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?; + let scan = snapshot.scan_builder().with_schema(schema).build()?; + + let mut file_count = 0; + let expected_row_counts = [5, 3, 4]; + let stream = scan.execute(engine.clone())?; + + for scan_result in stream { + let data = scan_result?.raw_data?; + let batch = into_record_batch(data); + file_count += 1; + + // Verify the schema structure + assert_eq!(batch.num_columns(), 3, "Expected 3 columns in the batch"); + assert_eq!( + batch.schema().field(0).name(), + "id", + "First column should be 'id'" + ); + assert_eq!( + batch.schema().field(1).name(), + "row_index", + "Second column should be 'row_index'" + ); + assert_eq!( + batch.schema().field(2).name(), + "value", + "Third column should be 'value'" + ); + + // Each file should have row indexes starting from 0 (file-local indexing) + let row_index_array = batch.column(1).as_primitive::(); + let expected_values: Vec = (0..batch.num_rows() as i64).collect(); + assert_eq!( + row_index_array.values().to_vec(), + expected_values, + "Row index values incorrect for file {} (expected {} rows)", + file_count, + expected_row_counts[file_count - 1] + ); + } + + assert_eq!(file_count, 3, "Expected to scan 3 files"); + Ok(()) +} + +#[tokio::test] +async fn test_unsupported_metadata_columns() -> Result<(), Box> { + // Prepare an in-memory table with some data + let batch = generate_simple_batch()?; + let storage = Arc::new(InMemory::new()); + add_commit( + storage.as_ref(), + 0, + actions_to_string(vec![ + TestAction::Metadata, + TestAction::Add(PARQUET_FILE1.to_string()), + ]), + ) + .await?; + storage + .put( + &Path::from(PARQUET_FILE1), + record_batch_to_bytes(&batch).into(), + ) + .await?; + + let location = Url::parse("memory:///")?; + let engine = Arc::new(DefaultEngine::new( + storage.clone(), + Arc::new(TokioBackgroundExecutor::new()), + )); + + // Test that unsupported metadata columns fail with appropriate errors + let test_cases = [ + ("row_id", MetadataColumnSpec::RowId, "RowId"), + ( + "row_commit_version", + MetadataColumnSpec::RowCommitVersion, + "RowCommitVersion", + ), + ]; + for (column_name, metadata_spec, error_text) in test_cases { + let snapshot = Snapshot::builder_for(location.clone()).build(engine.as_ref())?; + let schema = Arc::new(StructType::try_new([ + StructField::nullable("id", DataType::INTEGER), + StructField::create_metadata_column(column_name, metadata_spec), + ])?); + let scan = snapshot.scan_builder().with_schema(schema).build()?; + let stream = scan.execute(engine.clone())?; + + let mut found_error = false; + for scan_result in stream { + match scan_result { + Err(e) => { + let error_msg = e.to_string(); + if error_msg.contains(error_text) && error_msg.contains("not supported") { + found_error = true; + break; + } + } + Ok(_) => { + panic!( + "Expected error for {} metadata column, but scan succeeded", + error_text + ); + } + } + } + assert!( + found_error, + "Expected error about {} not being supported", + error_text + ); + } + + Ok(()) +}