diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index 807d51d40..d643063c6 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use std::io::{BufRead, BufReader}; +use std::ops::Range; use std::sync::Arc; use crate::engine::ensure_data_types::DataTypeCompat; @@ -14,16 +15,18 @@ use crate::{ use crate::arrow::array::{ cast::AsArray, make_array, new_null_array, Array as ArrowArray, GenericListArray, - OffsetSizeTrait, RecordBatch, StringArray, StructArray, + OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray, }; use crate::arrow::buffer::NullBuffer; use crate::arrow::compute::concat_batches; use crate::arrow::datatypes::{ - DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields, + DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields, Int64Type, SchemaRef as ArrowSchemaRef, }; use crate::arrow::json::{LineDelimitedWriter, ReaderBuilder}; +use crate::parquet::file::metadata::RowGroupMetaData; use crate::parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor}; +use crate::schema::InternalMetadataColumn; use itertools::Itertools; use tracing::debug; @@ -66,17 +69,63 @@ pub(crate) fn make_arrow_error(s: impl Into) -> Error { .with_backtrace() } +/// Prepares to enumerate row indexes of rows in a parquet file, accounting for row group skipping. +pub(crate) struct RowIndexBuilder { + row_group_starting_row_offsets: Vec>, + row_group_ordinals: Option>, +} +impl RowIndexBuilder { + pub(crate) fn new(row_groups: &[RowGroupMetaData]) -> Self { + let mut row_group_starting_row_offsets = vec![]; + let mut offset = 0; + for row_group in row_groups { + let num_rows = row_group.num_rows(); + row_group_starting_row_offsets.push(offset..offset + num_rows); + offset += num_rows; + } + Self { + row_group_starting_row_offsets, + row_group_ordinals: None, + } + } + + /// Only produce row indexes for the row groups specified by the ordinals that survived row + /// group skipping. The ordinals must be in 0..num_row_groups. + pub(crate) fn select_row_groups(&mut self, ordinals: &[usize]) { + // NOTE: Don't apply the filtering until we actually build the iterator, because the + // filtering is not idempotent and `with_row_groups` could be called more than once. + self.row_group_ordinals = Some(ordinals.to_vec()) + } +} + +impl IntoIterator for RowIndexBuilder { + type Item = i64; + type IntoIter = std::iter::Flatten>>; + + fn into_iter(self) -> Self::IntoIter { + let starting_offsets = match self.row_group_ordinals { + Some(ordinals) => ordinals + .iter() + .map(|i| self.row_group_starting_row_offsets[*i].clone()) + .collect(), + None => self.row_group_starting_row_offsets, + }; + starting_offsets.into_iter().flatten() + } +} + /// Applies post-processing to data read from parquet files. This includes `reorder_struct_array` to /// ensure schema compatibility, as well as `fix_nested_null_masks` to ensure that leaf columns have /// accurate null masks that row visitors rely on for correctness. pub(crate) fn fixup_parquet_read( batch: RecordBatch, requested_ordering: &[ReorderIndex], + row_indexes: &mut impl Iterator, ) -> DeltaResult where StructArray: Into, { - let data = reorder_struct_array(batch.into(), requested_ordering)?; + let data = reorder_struct_array(batch.into(), requested_ordering, row_indexes)?; let data = fix_nested_null_masks(data); Ok(data.into()) } @@ -188,7 +237,7 @@ where #[derive(Debug, PartialEq)] pub(crate) struct ReorderIndex { pub(crate) index: usize, - transform: ReorderIndexTransform, + pub(crate) transform: ReorderIndexTransform, } #[derive(Debug, PartialEq)] @@ -201,6 +250,8 @@ pub(crate) enum ReorderIndexTransform { Identity, /// Data is missing, fill in with a null column Missing(ArrowFieldRef), + /// Row index column requested, compute it + RowIndex(ArrowFieldRef), } impl ReorderIndex { @@ -224,17 +275,8 @@ impl ReorderIndex { ReorderIndex::new(index, ReorderIndexTransform::Missing(field)) } - /// Check if this reordering requires a transformation anywhere. See comment below on - /// [`ordering_needs_transform`] to understand why this is needed. - fn needs_transform(&self) -> bool { - match self.transform { - // if we're casting or inserting null, we need to transform - ReorderIndexTransform::Cast(_) | ReorderIndexTransform::Missing(_) => true, - // if our nested ordering needs a transform, we need a transform - ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children), - // no transform needed - ReorderIndexTransform::Identity => false, - } + fn row_index(index: usize, field: ArrowFieldRef) -> Self { + ReorderIndex::new(index, ReorderIndexTransform::RowIndex(field)) } } @@ -413,7 +455,13 @@ fn get_indices( // some fields are missing, but they might be nullable, need to insert them into the reorder_indices for (requested_position, field) in requested_schema.fields().enumerate() { if !found_fields.contains(field.name()) { - if field.nullable { + if let Ok(InternalMetadataColumn::RowIndex) = field.try_into() { + debug!("Inserting a row index column: {}", field.name()); + reorder_indices.push(ReorderIndex::row_index( + requested_position, + Arc::new(field.try_into()?), + )); + } else if field.nullable { debug!("Inserting missing and nullable field: {}", field.name()); reorder_indices.push(ReorderIndex::missing( requested_position, @@ -473,22 +521,19 @@ pub(crate) fn generate_mask( /// Check if an ordering requires transforming the data in any way. This is true if the indices are /// NOT in ascending order (so we have to reorder things), or if we need to do any transformation on -/// the data read from parquet. We check the ordering here, and also call -/// `ReorderIndex::needs_transform` on each element to check for other transforms, and to check -/// `Nested` variants recursively. +/// the data read from parquet. The check is recursive over `Nested` transforms. fn ordering_needs_transform(requested_ordering: &[ReorderIndex]) -> bool { - if requested_ordering.is_empty() { - return false; - } - // we have >=1 element. check that the first element doesn't need a transform - if requested_ordering[0].needs_transform() { - return true; - } - // Check for all elements if we need a transform. This is true if any elements are not in order - // (i.e. element[i].index < element[i+1].index), or any element needs a transform - requested_ordering - .windows(2) - .any(|ri| (ri[0].index >= ri[1].index) || ri[1].needs_transform()) + let requested_ordering = &mut requested_ordering.iter().enumerate(); + requested_ordering.any(|(i, ReorderIndex { index, transform })| { + use ReorderIndexTransform::*; + match (index, transform) { + (index, _) if *index != i => (), + (_, Cast(_) | Missing(_) | RowIndex(_)) => (), + (_, Nested(ref children)) if ordering_needs_transform(children) => (), + (_, Identity | Nested(_)) => return false, // fully ordered and trivial so far + } + true // out of order or non-trivial transform detected + }) } // we use this as a placeholder for an array and its associated field. We can fill in a Vec of None @@ -500,6 +545,7 @@ type FieldArrayOpt = Option<(Arc, Arc)>; pub(crate) fn reorder_struct_array( input_data: StructArray, requested_ordering: &[ReorderIndex], + row_indexes: &mut impl Iterator, ) -> DeltaResult { debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}"); if !ordering_needs_transform(requested_ordering) { @@ -532,8 +578,11 @@ pub(crate) fn reorder_struct_array( match input_cols[parquet_position].data_type() { ArrowDataType::Struct(_) => { let struct_array = input_cols[parquet_position].as_struct().clone(); - let result_array = - Arc::new(reorder_struct_array(struct_array, children)?); + let result_array = Arc::new(reorder_struct_array( + struct_array, + children, + row_indexes, + )?); // create the new field specifying the correct order for the struct let new_field = Arc::new(ArrowField::new_struct( input_fields[parquet_position].name(), @@ -578,6 +627,21 @@ pub(crate) fn reorder_struct_array( let field = field.clone(); // cheap Arc clone final_fields_cols[reorder_index.index] = Some((field, null_array)); } + ReorderIndexTransform::RowIndex(field) => { + let row_index_array: PrimitiveArray = + row_indexes.take(num_rows).collect(); + require!( + row_index_array.len() == num_rows, + Error::internal_error(format!( + "Row index iterator exhausted after only {} of {} rows", + row_index_array.len(), + num_rows + )) + ); + let field = field.clone(); // cheap aArc clone + final_fields_cols[reorder_index.index] = + Some((field, Arc::new(row_index_array))); + } } } let num_cols = final_fields_cols.len(); @@ -603,7 +667,14 @@ fn reorder_list( let (list_field, offset_buffer, maybe_sa, null_buf) = list_array.into_parts(); if let Some(struct_array) = maybe_sa.as_struct_opt() { let struct_array = struct_array.clone(); - let result_array = Arc::new(reorder_struct_array(struct_array, children)?); + // WARNING: We cannot naively plumb through our caller's row index iterator, because each + // array element of a given row must replicate the row's index and empty arrays must drop + // that row's index. For now, just don't support it (Delta doesn't need the capability). + let result_array = Arc::new(reorder_struct_array( + struct_array, + children, + &mut std::iter::empty(), + )?); let new_list_field = Arc::new(ArrowField::new_struct( list_field.name(), result_array.fields().clone(), @@ -944,6 +1015,31 @@ mod tests { assert_eq!(reorder_indices, expect_reorder); } + #[test] + fn simple_row_index_field() { + let row_index_field = InternalMetadataColumn::RowIndex.as_struct_field("my_row_index"); + let arrow_row_index_field = ArrowField::try_from(&row_index_field).unwrap(); + let requested_schema = Arc::new(StructType::new([ + StructField::not_null("i", DataType::INTEGER), + row_index_field, + StructField::nullable("i2", DataType::INTEGER), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new("i2", ArrowDataType::Int32, true), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1]; + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::identity(2), + ReorderIndex::row_index(1, Arc::new(arrow_row_index_field)), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + #[test] fn nested_indices() { let requested_schema = Arc::new(StructType::new([ @@ -1336,7 +1432,7 @@ mod tests { fn simple_reorder_struct() { let arry = make_struct_array(); let reorder = vec![ReorderIndex::identity(1), ReorderIndex::identity(0)]; - let ordered = reorder_struct_array(arry, &reorder).unwrap(); + let ordered = reorder_struct_array(arry, &reorder, &mut std::iter::empty()).unwrap(); assert_eq!(ordered.column_names(), vec!["c", "b"]); } @@ -1375,19 +1471,25 @@ mod tests { ReorderIndex::nested( 0, vec![ - ReorderIndex::identity(0), ReorderIndex::identity(1), + ReorderIndex::identity(2), ReorderIndex::missing( - 2, + 3, Arc::new(ArrowField::new("s", ArrowDataType::Utf8, true)), ), + ReorderIndex::row_index( + 0, + Arc::new(ArrowField::new("r", ArrowDataType::Int64, false)), + ), ], ), ]; - let ordered = reorder_struct_array(nested, &reorder).unwrap(); + let mut row_indexes = [0, 1, 2, 3, 4].into_iter(); + let ordered = reorder_struct_array(nested, &reorder, &mut row_indexes).unwrap(); + assert_eq!(row_indexes.next(), Some(4)); assert_eq!(ordered.column_names(), vec!["struct2", "struct1"]); let ordered_s2 = ordered.column(0).as_struct(); - assert_eq!(ordered_s2.column_names(), vec!["b", "c", "s"]); + assert_eq!(ordered_s2.column_names(), vec!["r", "b", "c", "s"]); let ordered_s1 = ordered.column(1).as_struct(); assert_eq!(ordered_s1.column_names(), vec!["c", "b"]); } @@ -1431,7 +1533,8 @@ mod tests { 0, vec![ReorderIndex::identity(1), ReorderIndex::identity(0)], )]; - let ordered = reorder_struct_array(struct_array, &reorder).unwrap(); + let ordered = + reorder_struct_array(struct_array, &reorder, &mut std::iter::empty()).unwrap(); let ordered_list_col = ordered.column(0).as_list::(); for i in 0..ordered_list_col.len() { let array_item = ordered_list_col.value(i); diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index ff2727565..387097c93 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -19,7 +19,9 @@ use uuid::Uuid; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; use super::UrlExt; use crate::engine::arrow_data::ArrowEngineData; -use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices}; +use crate::engine::arrow_utils::{ + fixup_parquet_read, generate_mask, get_requested_indices, RowIndexBuilder, +}; use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; @@ -291,16 +293,18 @@ impl FileOpener for ParquetOpener { builder = builder.with_projection(mask) } + let mut row_indexes = RowIndexBuilder::new(builder.metadata().row_groups()); if let Some(ref predicate) = predicate { - builder = builder.with_row_group_filter(predicate); + builder = builder.with_row_group_filter(predicate, &mut row_indexes); } if let Some(limit) = limit { builder = builder.with_limit(limit) } + let mut row_indexes = row_indexes.into_iter(); let stream = builder.with_batch_size(batch_size).build()?; - - let stream = stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering)); + let stream = stream + .map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering, &mut row_indexes)); Ok(stream.boxed()) })) } @@ -359,8 +363,9 @@ impl FileOpener for PresignedUrlOpener { builder = builder.with_projection(mask) } + let mut row_indexes = RowIndexBuilder::new(builder.metadata().row_groups()); if let Some(ref predicate) = predicate { - builder = builder.with_row_group_filter(predicate); + builder = builder.with_row_group_filter(predicate, &mut row_indexes); } if let Some(limit) = limit { builder = builder.with_limit(limit) @@ -368,8 +373,10 @@ impl FileOpener for PresignedUrlOpener { let reader = builder.with_batch_size(batch_size).build()?; + let mut row_indexes = row_indexes.into_iter(); let stream = futures::stream::iter(reader); - let stream = stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering)); + let stream = stream + .map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering, &mut row_indexes)); Ok(stream.boxed()) })) } @@ -438,6 +445,59 @@ mod tests { assert_eq!(data[0].num_rows(), 10); } + #[tokio::test] + async fn test_read_parquet_row_indexes() { + use crate::arrow::array::AsArray as _; + use crate::arrow::datatypes::Int64Type; + + let store = Arc::new(LocalFileSystem::new()); + let urls = [ + "./tests/data/data-reader-timestamp_ntz/tsNtzPartition=__HIVE_DEFAULT_PARTITION__/part-00001-53fd3b3b-7773-459a-921c-bb64bf0bbd03.c000.snappy.parquet", + "./tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/part-00000-6240e68e-2304-449a-a1e6-0e24866d3508.c000.snappy.parquet", + "./tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/part-00001-336e3e5f-a202-4bd9-b117-28d871bbb639.c000.snappy.parquet", + "./tests/data/data-reader-timestamp_ntz/tsNtzPartition=2021-11-18 02%3A30%3A00.123456/part-00000-65fcd5cb-f2f3-44f4-96ef-f43825143ba9.c000.snappy.parquet", + ].map(|p| { + //println!("p: {:?}", std::fs::canonicalize(PathBuf::from(p)).unwrap()); + url::Url::from_file_path(std::fs::canonicalize(PathBuf::from(p)).unwrap()).unwrap() + }); + let mut metas = vec![]; + for url in urls { + println!("url: {}", url); + let location = Path::from_url_path(url.path()).unwrap(); + let meta = store.head(&location).await.unwrap(); + metas.push(FileMeta { + location: url.clone(), + last_modified: meta.last_modified.timestamp(), + size: meta.size, + }); + } + + let schema = Arc::new(crate::schema::StructType::new([ + crate::schema::StructField::nullable("id", crate::schema::DataType::INTEGER), + crate::schema::InternalMetadataColumn::RowIndex.as_struct_field("row_index"), + crate::schema::StructField::nullable("tsNtz", crate::schema::DataType::TIMESTAMP_NTZ), + ])); + let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + let data: Vec = handler + .read_parquet_files(&metas, schema, None) + .unwrap() + .map(into_record_batch) + .try_collect() + .unwrap(); + assert_eq!(data.len(), 4); + let data: Vec<_> = data + .into_iter() + .map(|batch| { + let column = batch.column_by_name("row_index").unwrap(); + column.as_primitive::().values().to_vec() + }) + .collect(); + assert_eq!(data[0], &[0, 1, 2]); + assert_eq!(data[1], &[0]); + assert_eq!(data[2], &[0, 1]); + assert_eq!(data[3], &[0, 1, 2]); + } + #[test] fn test_as_record_batch() { let location = Url::parse("file:///test_url").unwrap(); diff --git a/kernel/src/engine/parquet_row_group_skipping.rs b/kernel/src/engine/parquet_row_group_skipping.rs index fb965d5b5..7ce0b3e59 100644 --- a/kernel/src/engine/parquet_row_group_skipping.rs +++ b/kernel/src/engine/parquet_row_group_skipping.rs @@ -1,4 +1,5 @@ //! An implementation of parquet row group skipping using data skipping predicates over footer stats. +use crate::engine::arrow_utils::RowIndexBuilder; use crate::expressions::{ColumnName, DecimalData, Predicate, Scalar}; use crate::kernel_predicates::parquet_stats_skipping::ParquetStatsProvider; use crate::parquet::arrow::arrow_reader::ArrowReaderBuilder; @@ -17,22 +18,31 @@ mod tests; pub(crate) trait ParquetRowGroupSkipping { /// Instructs the parquet reader to perform row group skipping, eliminating any row group whose /// stats prove that none of the group's rows can satisfy the given `predicate`. - fn with_row_group_filter(self, predicate: &Predicate) -> Self; + fn with_row_group_filter( + self, + predicate: &Predicate, + row_indexes: &mut RowIndexBuilder, + ) -> Self; } impl ParquetRowGroupSkipping for ArrowReaderBuilder { - fn with_row_group_filter(self, predicate: &Predicate) -> Self { - let indices = self + fn with_row_group_filter( + self, + predicate: &Predicate, + row_indexes: &mut RowIndexBuilder, + ) -> Self { + let ordinals: Vec<_> = self .metadata() .row_groups() .iter() .enumerate() - .filter_map(|(index, row_group)| { - // If the group survives the filter, return Some(index) so filter_map keeps it. - RowGroupFilter::apply(row_group, predicate).then_some(index) + .filter_map(|(ordinal, row_group)| { + // If the group survives the filter, return Some(ordinal) so filter_map keeps it. + RowGroupFilter::apply(row_group, predicate).then_some(ordinal) }) .collect(); - debug!("with_row_group_filter({predicate:#?}) = {indices:?})"); - self.with_row_groups(indices) + debug!("with_row_group_filter({predicate:#?}) = {ordinals:?})"); + row_indexes.select_row_groups(&ordinals); + self.with_row_groups(ordinals) } } diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index 65b7c0f73..94e59e24f 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -5,7 +5,9 @@ use crate::parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatc use super::read_files; use crate::engine::arrow_data::ArrowEngineData; -use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices}; +use crate::engine::arrow_utils::{ + fixup_parquet_read, generate_mask, get_requested_indices, RowIndexBuilder, +}; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; use crate::{DeltaResult, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef}; @@ -25,11 +27,13 @@ fn try_create_from_parquet( if let Some(mask) = generate_mask(&schema, parquet_schema, builder.parquet_schema(), &indices) { builder = builder.with_projection(mask); } + let mut row_indexes = RowIndexBuilder::new(builder.metadata().row_groups()); if let Some(predicate) = predicate { - builder = builder.with_row_group_filter(predicate.as_ref()); + builder = builder.with_row_group_filter(predicate.as_ref(), &mut row_indexes); } + let mut row_indexes = row_indexes.into_iter(); let stream = builder.build()?; - Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering))) + Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering, &mut row_indexes))) } impl ParquetHandler for SyncParquetHandler { diff --git a/kernel/src/schema/mod.rs b/kernel/src/schema/mod.rs index 6204d8620..79b38d32c 100644 --- a/kernel/src/schema/mod.rs +++ b/kernel/src/schema/mod.rs @@ -15,6 +15,8 @@ use crate::utils::require; use crate::{DeltaResult, Error}; use delta_kernel_derive::internal_api; +use strum::{AsRefStr, EnumString}; + pub(crate) mod compare; pub type Schema = StructType; @@ -86,6 +88,56 @@ pub enum ColumnMetadataKey { Invariants, } +/// When present in a [`StructField::metadata`], identifies which internal Delta metadata column the +/// field represents. +pub(crate) const INTERNAL_METADATA_COLUMN_KEY: &str = "delta.__internal__.metadata_column"; + +#[derive(Clone, Copy, Debug, PartialEq, AsRefStr, EnumString)] +#[allow(clippy::enum_variant_names)] // "all variants have the same prefix: 'Row'" +pub(crate) enum InternalMetadataColumn { + /// The file-local row index of a row in a parquet file + #[strum(to_string = "delta.__internal__.metadata_column.row_index")] + RowIndex, + /// The logical row ID column of a data file + #[strum(to_string = "delta.__internal__.metadata_column.row_id")] + #[allow(unused)] // TODO: support row tracking + RowId, + /// The logical row commit version column of a data file + #[strum(to_string = "delta.__internal__.metadata_column.row_commit_version")] + #[allow(unused)] // TODO: support row tracking + RowCommitVersion, +} + +impl InternalMetadataColumn { + #[cfg(test)] + pub(crate) fn as_struct_field(&self, name: impl Into) -> StructField { + let (data_type, nullable) = match self { + Self::RowIndex => (DataType::LONG, false), + Self::RowId => (DataType::LONG, true), + Self::RowCommitVersion => (DataType::LONG, true), + }; + StructField::new(name, data_type, nullable) + .with_metadata([(INTERNAL_METADATA_COLUMN_KEY, self.as_ref())]) + } +} + +impl TryFrom<&StructField> for InternalMetadataColumn { + type Error = Error; + + fn try_from(field: &StructField) -> DeltaResult { + let metadata = field.metadata.get(INTERNAL_METADATA_COLUMN_KEY); + if let Some(MetadataValue::String(s)) = metadata { + if let Ok(column) = s.parse() { + return Ok(column); + } + } + Err(Error::internal_error(format!( + "Invalid metadata value for {}: {metadata:?}", + INTERNAL_METADATA_COLUMN_KEY + ))) + } +} + impl AsRef for ColumnMetadataKey { fn as_ref(&self) -> &str { match self { @@ -1112,6 +1164,14 @@ mod tests { "delta.columnMapping.id": 5, "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49" } + }, + { + "name": "row_index", + "type": "long", + "nullable": false, + "metadata": { + "delta.__internal__.metadata_column": "delta.__internal__.metadata_column.row_index" + } } ] }, @@ -1155,6 +1215,10 @@ mod tests { stype.fields.get_index(0).unwrap().1.name, "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49" ); + assert_eq!( + InternalMetadataColumn::try_from(stype.fields.get_index(1).unwrap().1).unwrap(), + InternalMetadataColumn::RowIndex + ); } #[test]