diff --git a/kernel/examples/common/src/lib.rs b/kernel/examples/common/src/lib.rs index b5976c6a8..7023f2632 100644 --- a/kernel/examples/common/src/lib.rs +++ b/kernel/examples/common/src/lib.rs @@ -7,7 +7,7 @@ use delta_kernel::{ arrow::array::RecordBatch, engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}, scan::Scan, - schema::Schema, + schema::MetadataColumnSpec, DeltaResult, SnapshotRef, }; @@ -61,9 +61,18 @@ pub struct ScanArgs { #[arg(long)] pub schema_only: bool, - /// Comma separated list of columns to select - #[arg(long, value_delimiter=',', num_args(0..))] - pub columns: Option>, + /// Comma separated list of columns to select. Must be passed as a single string, leading and + /// trailing spaces for each column name will be trimmed + #[arg(long)] + pub columns: Option, + + /// Include a _metadata.row_index field + #[arg(long)] + pub with_row_index: bool, + + /// Include a _metadata.row_id field if row-tracking is enabled + #[arg(long)] + pub with_row_id: bool, } pub trait ParseWithExamples { @@ -180,27 +189,26 @@ pub fn get_scan(snapshot: SnapshotRef, args: &ScanArgs) -> DeltaResult DeltaResult<_> { - let table_schema = snapshot.schema(); - let selected_fields = cols.iter().map(|col| { - table_schema - .field(col) - .cloned() - .ok_or(delta_kernel::Error::Generic(format!( - "Table has no such column: {col}" - ))) - }); - Schema::try_from_results(selected_fields).map(Arc::new) - }) - .transpose()?; + let mut scan_schema = snapshot.schema(); + if let Some(cols) = args.columns.as_ref() { + let cols: Vec<&str> = cols.split(",").map(str::trim).collect(); + scan_schema = scan_schema.project_as_struct(&cols)?.into(); + } + + if args.with_row_index { + scan_schema = scan_schema + .add_metadata_column("_metadata.row_index", MetadataColumnSpec::RowIndex)? + .into(); + } + + if args.with_row_id { + scan_schema = scan_schema + .add_metadata_column("_metadata.row_index", MetadataColumnSpec::RowIndex)? + .into(); + } + Ok(Some( - snapshot - .scan_builder() - .with_schema_opt(read_schema_opt) - .build()?, + snapshot.scan_builder().with_schema(scan_schema).build()?, )) } diff --git a/kernel/src/scan/field_classifiers.rs b/kernel/src/scan/field_classifiers.rs index 686f266f1..81611a46c 100644 --- a/kernel/src/scan/field_classifiers.rs +++ b/kernel/src/scan/field_classifiers.rs @@ -6,8 +6,10 @@ use crate::table_changes::{ }; use crate::transforms::FieldTransformSpec; -/// Trait for classifying fields during StateInfo construction. -/// Allows different scan types (regular, CDF) to customize field handling. +/// Trait for classifying fields during StateInfo construction. Allows different scan types +/// (regular, CDF) to customize field handling. Note that the default set of field handling occurs +/// in [`StateInfo::try_new`](crate::scan::state_info::StateInfo::try_new). A +/// `TransformFieldClassifier` can be used to override the behavior implemented in that method. pub(crate) trait TransformFieldClassifier { /// Classify a field and return its transform spec. /// Returns None if the field is physical (should be read from parquet). @@ -16,32 +18,19 @@ pub(crate) trait TransformFieldClassifier { &self, field: &StructField, field_index: usize, - partition_columns: &[String], last_physical_field: &Option, ) -> Option; } -/// Regular scan field classifier for standard Delta table scans. -/// Handles partition columns as metadata-derived fields. -pub(crate) struct ScanTransformFieldClassifier; -impl TransformFieldClassifier for ScanTransformFieldClassifier { +// Empty classifier, always returns None +impl TransformFieldClassifier for () { fn classify_field( &self, - field: &StructField, - field_index: usize, - partition_columns: &[String], - last_physical_field: &Option, + _: &StructField, + _: usize, + _: &Option, ) -> Option { - if partition_columns.contains(field.name()) { - // Partition column: needs transform to inject metadata - Some(FieldTransformSpec::MetadataDerivedColumn { - field_index, - insert_after: last_physical_field.clone(), - }) - } else { - // Regular physical field - no transform needed - None - } + None } } @@ -53,7 +42,6 @@ impl TransformFieldClassifier for CdfTransformFieldClassifier { &self, field: &StructField, field_index: usize, - partition_columns: &[String], last_physical_field: &Option, ) -> Option { match field.name().as_str() { @@ -70,13 +58,7 @@ impl TransformFieldClassifier for CdfTransformFieldClassifier { insert_after: last_physical_field.clone(), }) } - // Defer to default classifier for partition columns and physical fields - _ => ScanTransformFieldClassifier.classify_field( - field, - field_index, - partition_columns, - last_physical_field, - ), + _ => None, } } } diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 2f98488ce..66fdec709 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -3,7 +3,8 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, LazyLock}; use super::data_skipping::DataSkippingFilter; -use super::{PhysicalPredicate, ScanMetadata, StateInfo}; +use super::state_info::StateInfo; +use super::{PhysicalPredicate, ScanMetadata}; use crate::actions::deletion_vector::DeletionVectorDescriptor; use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; @@ -105,8 +106,9 @@ impl AddRemoveDedupVisitor<'_> { const ADD_PATH_INDEX: usize = 0; // Position of "add.path" in getters const ADD_PARTITION_VALUES_INDEX: usize = 1; // Position of "add.partitionValues" in getters const ADD_DV_START_INDEX: usize = 2; // Start position of add deletion vector columns - const REMOVE_PATH_INDEX: usize = 5; // Position of "remove.path" in getters - const REMOVE_DV_START_INDEX: usize = 6; // Start position of remove deletion vector columns + const BASE_ROW_ID_INDEX: usize = 5; // Position of add.baseRowId in getters + const REMOVE_PATH_INDEX: usize = 6; // Position of "remove.path" in getters + const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns fn new( seen: &mut HashSet, @@ -195,10 +197,19 @@ impl AddRemoveDedupVisitor<'_> { if self.deduplicator.check_and_record_seen(file_key) || !is_add { return Ok(false); } + let base_row_id: Option = + getters[Self::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?; let transform = self .transform_spec .as_ref() - .map(|transform| get_transform_expr(transform, partition_values, &self.physical_schema)) + .map(|transform| { + get_transform_expr( + transform, + partition_values, + &self.physical_schema, + base_row_id, + ) + }) .transpose()?; if transform.is_some() { // fill in any needed `None`s for previous rows @@ -215,6 +226,7 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { static NAMES_AND_TYPES: LazyLock = LazyLock::new(|| { const STRING: DataType = DataType::STRING; const INTEGER: DataType = DataType::INTEGER; + const LONG: DataType = DataType::LONG; let ss_map: DataType = MapType::new(STRING, STRING, true).into(); let types_and_names = vec![ (STRING, column_name!("add.path")), @@ -222,6 +234,7 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { (STRING, column_name!("add.deletionVector.storageType")), (STRING, column_name!("add.deletionVector.pathOrInlineDv")), (INTEGER, column_name!("add.deletionVector.offset")), + (LONG, column_name!("add.baseRowId")), (STRING, column_name!("remove.path")), (STRING, column_name!("remove.deletionVector.storageType")), (STRING, column_name!("remove.deletionVector.pathOrInlineDv")), @@ -236,13 +249,13 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files // only serve as tombstones for vacuum jobs. So we only need to examine the adds here. - (&names[..5], &types[..5]) + (&names[..6], &types[..6]) } } fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { let is_log_batch = self.deduplicator.is_log_batch(); - let expected_getters = if is_log_batch { 9 } else { 5 }; + let expected_getters = if is_log_batch { 10 } else { 6 }; require!( getters.len() == expected_getters, Error::InternalError(format!( @@ -266,8 +279,10 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { pub(crate) static SCAN_ROW_SCHEMA: LazyLock> = LazyLock::new(|| { // Note that fields projected out of a nullable struct must be nullable let partition_values = MapType::new(DataType::STRING, DataType::STRING, true); - let file_constant_values = - StructType::new_unchecked([StructField::nullable("partitionValues", partition_values)]); + let file_constant_values = StructType::new_unchecked([ + StructField::nullable("partitionValues", partition_values), + StructField::nullable("baseRowId", DataType::LONG), + ]); Arc::new(StructType::new_unchecked([ StructField::nullable("path", DataType::STRING), StructField::nullable("size", DataType::LONG), @@ -290,9 +305,10 @@ fn get_add_transform_expr() -> ExpressionRef { column_expr_ref!("add.modificationTime"), column_expr_ref!("add.stats"), column_expr_ref!("add.deletionVector"), - Arc::new(Expression::Struct(vec![column_expr_ref!( - "add.partitionValues" - )])), + Arc::new(Expression::Struct(vec![ + column_expr_ref!("add.partitionValues"), + column_expr_ref!("add.baseRowId"), + ])), ])) }); EXPR.clone() @@ -311,6 +327,7 @@ pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef { column_expr_ref!("modificationTime"), column_expr_ref!("stats"), column_expr_ref!("deletionVector"), + column_expr_ref!("fileConstantValues.baseRowId"), ], ))])) }); @@ -377,15 +394,19 @@ mod tests { use std::{collections::HashMap, sync::Arc}; use crate::actions::get_log_schema; - use crate::expressions::Scalar; + use crate::expressions::{BinaryExpressionOp, Scalar, VariadicExpressionOp}; use crate::log_replay::ActionsBatch; use crate::scan::state::{DvInfo, Stats}; + use crate::scan::state_info::tests::{ + assert_transform_spec, get_simple_state_info, get_state_info, + }; + use crate::scan::state_info::StateInfo; use crate::scan::test_utils::{ - add_batch_simple, add_batch_with_partition_col, add_batch_with_remove, - run_with_validate_callback, + add_batch_for_row_id, add_batch_simple, add_batch_with_partition_col, + add_batch_with_remove, run_with_validate_callback, }; - use crate::scan::{PhysicalPredicate, StateInfo}; - use crate::table_features::ColumnMappingMode; + use crate::scan::PhysicalPredicate; + use crate::schema::MetadataColumnSpec; use crate::Expression as Expr; use crate::{ engine::sync::SyncEngine, @@ -473,15 +494,8 @@ mod tests { StructField::new("value", DataType::INTEGER, true), StructField::new("date", DataType::DATE, true), ])); - let partition_cols = ["date".to_string()]; - let state_info = StateInfo::try_new( - schema.clone(), - &partition_cols, - ColumnMappingMode::None, - None, - crate::scan::field_classifiers::ScanTransformFieldClassifier, - ) - .unwrap(); + let partition_cols = vec!["date".to_string()]; + let state_info = get_simple_state_info(schema, partition_cols).unwrap(); let batch = vec![add_batch_with_partition_col()]; let iter = scan_action_iter( &SyncEngine::new(), @@ -525,4 +539,77 @@ mod tests { validate_transform(transforms[3].as_ref(), 17510); } } + + #[test] + fn test_row_id_transform() { + let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( + "value", + DataType::INTEGER, + true, + )])); + let state_info = get_state_info( + schema.clone(), + vec![], + None, + [ + ("delta.enableRowTracking", "true"), + ( + "delta.rowTracking.materializedRowIdColumnName", + "row_id_col", + ), + ] + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + vec![("row_id", MetadataColumnSpec::RowId)], + ) + .unwrap(); + + let transform_spec = state_info.transform_spec.as_ref().unwrap(); + assert_transform_spec( + transform_spec, + false, + "row_id_col", + "row_indexes_for_row_id_0", + ); + + let batch = vec![add_batch_for_row_id(get_log_schema().clone())]; + let iter = scan_action_iter( + &SyncEngine::new(), + batch + .into_iter() + .map(|batch| Ok(ActionsBatch::new(batch as _, true))), + Arc::new(state_info), + ); + + for res in iter { + let scan_metadata = res.unwrap(); + let transforms = scan_metadata.scan_file_transforms; + assert_eq!(transforms.len(), 1, "Should have 1 transform"); + if let Some(Expr::Transform(transform_expr)) = transforms[0].as_ref().map(Arc::as_ref) { + assert!(transform_expr.input_path.is_none()); + let row_id_transform = transform_expr + .field_transforms + .get("row_id_col") + .expect("Should have row_id_col transform"); + assert!(row_id_transform.is_replace); + assert_eq!(row_id_transform.exprs.len(), 1); + let expr = &row_id_transform.exprs[0]; + let expeceted_expr = Arc::new(Expr::variadic( + VariadicExpressionOp::Coalesce, + vec![ + Expr::column(["row_id_col"]), + Expr::binary( + BinaryExpressionOp::Plus, + Expr::literal(42i64), + Expr::column(["row_indexes_for_row_id_0"]), + ), + ], + )); + assert_eq!(expr, &expeceted_expr); + } else { + panic!("Should have been a transform expression"); + } + } + } } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 7de93e05f..d5bd40ac3 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -9,7 +9,6 @@ use itertools::Itertools; use tracing::debug; use url::Url; -use self::field_classifiers::{ScanTransformFieldClassifier, TransformFieldClassifier}; use self::log_replay::get_scan_metadata_transform_expr; use crate::actions::deletion_vector::{ deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor, @@ -23,15 +22,12 @@ use crate::listed_log_files::ListedLogFiles; use crate::log_replay::{ActionsBatch, HasSelectionVector}; use crate::log_segment::LogSegment; use crate::scan::state::{DvInfo, Stats}; -use crate::schema::ToSchema as _; +use crate::scan::state_info::StateInfo; use crate::schema::{ ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField, - StructType, + ToSchema as _, }; -use crate::snapshot::SnapshotRef; -use crate::table_features::ColumnMappingMode; -use crate::transforms::TransformSpec; -use crate::{DeltaResult, Engine, EngineData, Error, FileMeta, Version}; +use crate::{DeltaResult, Engine, EngineData, Error, FileMeta, SnapshotRef, Version}; use self::log_replay::scan_action_iter; @@ -39,6 +35,7 @@ pub(crate) mod data_skipping; pub(crate) mod field_classifiers; pub mod log_replay; pub mod state; +pub(crate) mod state_info; // safety: we define get_log_schema() and _know_ it contains ADD_NAME and REMOVE_NAME #[allow(clippy::unwrap_used)] @@ -118,19 +115,12 @@ impl ScanBuilder { pub fn build(self) -> DeltaResult { // if no schema is provided, use snapshot's entire schema (e.g. SELECT *) let logical_schema = self.schema.unwrap_or_else(|| self.snapshot.schema()); - let partition_columns = self - .snapshot - .table_configuration() - .metadata() - .partition_columns(); - let column_mapping_mode = self.snapshot.table_configuration().column_mapping_mode(); let state_info = StateInfo::try_new( logical_schema, - partition_columns, - column_mapping_mode, + self.snapshot.table_configuration(), self.predicate, - ScanTransformFieldClassifier, + (), // No classifer, default is for scans )?; Ok(Scan { @@ -508,6 +498,7 @@ impl Scan { StructField::nullable("modificationTime", DataType::LONG), StructField::nullable("stats", DataType::STRING), StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()), + StructField::nullable("baseRowId", DataType::LONG), ]), )]) }); @@ -739,7 +730,8 @@ impl Scan { /// cardinality: long, /// }, /// fileConstantValues: { -/// partitionValues: map +/// partitionValues: map, +/// baseRowId: long /// } /// } /// ``` @@ -747,111 +739,6 @@ pub fn scan_row_schema() -> SchemaRef { log_replay::SCAN_ROW_SCHEMA.clone() } -/// All the state needed to process a scan. -#[derive(Debug)] -pub(crate) struct StateInfo { - /// The logical schema for this scan - pub(crate) logical_schema: SchemaRef, - /// The physical schema to read from parquet files - pub(crate) physical_schema: SchemaRef, - /// The physical predicate for data skipping - pub(crate) physical_predicate: PhysicalPredicate, - /// Transform specification for converting physical to logical data - pub(crate) transform_spec: Option>, -} - -impl StateInfo { - /// Create StateInfo with a custom field classifier for different scan types. - /// Get the state needed to process a scan. - /// - /// `logical_schema` - The logical schema of the scan output, which includes partition columns - /// `partition_columns` - List of column names that are partition columns in the table - /// `column_mapping_mode` - The column mapping mode used by the table for physical to logical mapping - /// `predicate` - Optional predicate to filter data during the scan - /// `classifier` - The classifier to use for different scan types - pub(crate) fn try_new( - logical_schema: SchemaRef, - partition_columns: &[String], - column_mapping_mode: ColumnMappingMode, - predicate: Option, - classifier: C, - ) -> DeltaResult { - let mut read_fields = Vec::with_capacity(logical_schema.num_fields()); - let mut read_field_names = HashSet::with_capacity(logical_schema.num_fields()); - let mut transform_spec = Vec::new(); - let mut last_physical_field: Option = None; - - // Loop over all selected fields and build both the physical schema and transform spec - for (index, logical_field) in logical_schema.fields().enumerate() { - let transform = classifier.classify_field( - logical_field, - index, - partition_columns, - &last_physical_field, - ); - - if let Some(spec) = transform { - // Field needs transformation - not in physical schema - transform_spec.push(spec); - } else { - // Physical field - should be read from parquet - // Validate metadata column doesn't conflict with partition columns - if logical_field.is_metadata_column() - && partition_columns.contains(logical_field.name()) - { - return Err(Error::Schema(format!( - "Metadata column names must not match partition columns: {}", - logical_field.name() - ))); - } - - // Add to physical schema - let physical_field = logical_field.make_physical(column_mapping_mode); - debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n"); - let physical_name = physical_field.name.clone(); - - if !logical_field.is_metadata_column() { - read_field_names.insert(physical_name.clone()); - } - - last_physical_field = Some(physical_name); - read_fields.push(physical_field); - } - } - - // This iteration runs in O(3) time since each metadata column can appear at most once in the schema - for metadata_column in logical_schema.metadata_columns() { - if read_field_names.contains(metadata_column.name()) { - return Err(Error::Schema(format!( - "Metadata column names must not match physical columns: {}", - metadata_column.name() - ))); - } - } - - let physical_schema = Arc::new(StructType::try_new(read_fields)?); - - let physical_predicate = match predicate { - Some(pred) => PhysicalPredicate::try_new(&pred, &logical_schema)?, - None => PhysicalPredicate::None, - }; - - let transform_spec = - if !transform_spec.is_empty() || column_mapping_mode != ColumnMappingMode::None { - Some(Arc::new(transform_spec)) - } else { - None - }; - - Ok(StateInfo { - logical_schema, - physical_schema, - physical_predicate, - transform_spec, - }) - } -} - pub fn selection_vector( engine: &dyn Engine, descriptor: &DeletionVectorDescriptor, @@ -866,6 +753,8 @@ pub fn selection_vector( #[cfg(test)] pub(crate) mod test_utils { use crate::arrow::array::StringArray; + use crate::scan::state_info::StateInfo; + use crate::schema::StructType; use crate::utils::test_utils::string_array_to_engine_data; use itertools::Itertools; use std::sync::Arc; @@ -883,7 +772,7 @@ pub(crate) mod test_utils { }; use super::state::ScanCallback; - use super::{PhysicalPredicate, StateInfo}; + use super::PhysicalPredicate; use crate::transforms::TransformSpec; // Generates a batch of sidecar actions with the given paths. @@ -932,6 +821,21 @@ pub(crate) mod test_utils { ArrowEngineData::try_from_engine_data(parsed).unwrap() } + // Generates a batch with an add action. + // The schema is provided as null columns affect equality checks. + pub(crate) fn add_batch_for_row_id(output_schema: SchemaRef) -> Box { + let handler = SyncJsonHandler {}; + let json_strings: StringArray = vec![ + r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","baseRowId": 42, "tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, + r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none", "delta.enableRowTracking": "true", "delta.rowTracking.materializedRowIdColumnName":"row_id_col"},"createdTime":1677811175819}}"#, + ] + .into(); + let parsed = handler + .parse_json(string_array_to_engine_data(json_strings), output_schema) + .unwrap(); + ArrowEngineData::try_from_engine_data(parsed).unwrap() + } + // An add batch with a removed file parsed with the schema provided pub(crate) fn add_batch_with_remove(output_schema: SchemaRef) -> Box { let handler = SyncJsonHandler {}; @@ -976,8 +880,8 @@ pub(crate) mod test_utils { context: T, validate_callback: ScanCallback, ) { - let logical_schema = logical_schema - .unwrap_or_else(|| Arc::new(crate::schema::StructType::new_unchecked(vec![]))); + let logical_schema = + logical_schema.unwrap_or_else(|| Arc::new(StructType::new_unchecked(vec![]))); let state_info = Arc::new(StateInfo { logical_schema: logical_schema.clone(), physical_schema: logical_schema, @@ -1017,8 +921,7 @@ mod tests { use crate::engine::arrow_data::ArrowEngineData; use crate::engine::sync::SyncEngine; use crate::expressions::{column_expr, column_pred, Expression as Expr, Predicate as Pred}; - use crate::schema::{ColumnMetadataKey, PrimitiveType}; - use crate::transforms::FieldTransformSpec; + use crate::schema::{ColumnMetadataKey, PrimitiveType, StructType}; use crate::Snapshot; use super::*; @@ -1509,187 +1412,4 @@ mod tests { ); Ok(()) } - - #[test] - fn test_state_info_no_partition_columns() { - // Test case: No partition columns, no column mapping - let schema = Arc::new(StructType::new_unchecked(vec![ - StructField::nullable("id", DataType::STRING), - StructField::nullable("value", DataType::LONG), - ])); - - let state_info = StateInfo::try_new( - schema.clone(), - &[], // No partition columns - ColumnMappingMode::None, - None, // No predicate - ScanTransformFieldClassifier, - ) - .unwrap(); - - // Should have no transform spec (no partitions, no column mapping) - assert!(state_info.transform_spec.is_none()); - - // Physical schema should match logical schema - assert_eq!(state_info.logical_schema, schema); - assert_eq!(state_info.physical_schema.fields().len(), 2); - - // No predicate - assert_eq!(state_info.physical_predicate, PhysicalPredicate::None); - } - - #[test] - fn test_state_info_with_partition_columns() { - // Test case: With partition columns - let schema = Arc::new(StructType::new_unchecked(vec![ - StructField::nullable("id", DataType::STRING), - StructField::nullable("date", DataType::DATE), // Partition column - StructField::nullable("value", DataType::LONG), - ])); - - let state_info = StateInfo::try_new( - schema.clone(), - &["date".to_string()], // date is a partition column - ColumnMappingMode::None, - None, - ScanTransformFieldClassifier, - ) - .unwrap(); - - // Should have a transform spec for the partition column - assert!(state_info.transform_spec.is_some()); - let transform_spec = state_info.transform_spec.as_ref().unwrap(); - assert_eq!(transform_spec.len(), 1); - - // Check the transform spec for the partition column - match &transform_spec[0] { - FieldTransformSpec::MetadataDerivedColumn { - field_index, - insert_after, - } => { - assert_eq!(*field_index, 1); // Index of "date" in logical schema - assert_eq!(insert_after, &Some("id".to_string())); // After "id" which is physical - } - _ => panic!("Expected MetadataDerivedColumn transform"), - } - - // Physical schema should not include partition column - assert_eq!(state_info.logical_schema, schema); - assert_eq!(state_info.physical_schema.fields().len(), 2); // Only id and value - } - - #[test] - fn test_state_info_multiple_partition_columns() { - // Test case: Multiple partition columns interspersed with regular columns - let schema = Arc::new(StructType::new_unchecked(vec![ - StructField::nullable("col1", DataType::STRING), - StructField::nullable("part1", DataType::STRING), // Partition - StructField::nullable("col2", DataType::LONG), - StructField::nullable("part2", DataType::INTEGER), // Partition - ])); - - let state_info = StateInfo::try_new( - schema.clone(), - &["part1".to_string(), "part2".to_string()], - ColumnMappingMode::None, - None, - ScanTransformFieldClassifier, - ) - .unwrap(); - - // Should have transforms for both partition columns - assert!(state_info.transform_spec.is_some()); - let transform_spec = state_info.transform_spec.as_ref().unwrap(); - assert_eq!(transform_spec.len(), 2); - - // Check first partition column transform - match &transform_spec[0] { - FieldTransformSpec::MetadataDerivedColumn { - field_index, - insert_after, - } => { - assert_eq!(*field_index, 1); // Index of "part1" - assert_eq!(insert_after, &Some("col1".to_string())); - } - _ => panic!("Expected MetadataDerivedColumn transform"), - } - - // Check second partition column transform - match &transform_spec[1] { - FieldTransformSpec::MetadataDerivedColumn { - field_index, - insert_after, - } => { - assert_eq!(*field_index, 3); // Index of "part2" - assert_eq!(insert_after, &Some("col2".to_string())); - } - _ => panic!("Expected MetadataDerivedColumn transform"), - } - - // Physical schema should only have non-partition columns - assert_eq!(state_info.physical_schema.fields().len(), 2); // col1 and col2 - } - - #[test] - fn test_state_info_with_predicate() { - // Test case: With a valid predicate - let schema = Arc::new(StructType::new_unchecked(vec![ - StructField::nullable("id", DataType::STRING), - StructField::nullable("value", DataType::LONG), - ])); - - let predicate = Arc::new(column_expr!("value").gt(Expr::literal(10i64))); - - let state_info = StateInfo::try_new( - schema.clone(), - &[], - ColumnMappingMode::None, - Some(predicate), - ScanTransformFieldClassifier, - ) - .unwrap(); - - // Should have a physical predicate - match &state_info.physical_predicate { - PhysicalPredicate::Some(_pred, schema) => { - // Physical predicate exists - assert_eq!(schema.fields().len(), 1); // Only "value" is referenced - } - _ => panic!("Expected PhysicalPredicate::Some"), - } - } - - #[test] - fn test_state_info_partition_at_beginning() { - // Test case: Partition column at the beginning - let schema = Arc::new(StructType::new_unchecked(vec![ - StructField::nullable("date", DataType::DATE), // Partition column - StructField::nullable("id", DataType::STRING), - StructField::nullable("value", DataType::LONG), - ])); - - let state_info = StateInfo::try_new( - schema.clone(), - &["date".to_string()], - ColumnMappingMode::None, - None, - ScanTransformFieldClassifier, - ) - .unwrap(); - - // Should have a transform spec for the partition column - let transform_spec = state_info.transform_spec.as_ref().unwrap(); - assert_eq!(transform_spec.len(), 1); - - match &transform_spec[0] { - FieldTransformSpec::MetadataDerivedColumn { - field_index, - insert_after, - } => { - assert_eq!(*field_index, 0); // Index of "date" - assert_eq!(insert_after, &None); // No physical field before it, so prepend - } - _ => panic!("Expected MetadataDerivedColumn transform"), - } - } } diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index 90a3422f8..752135323 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -179,7 +179,7 @@ impl RowVisitor for ScanFileVisitor<'_, T> { } fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { require!( - getters.len() == 10, + getters.len() == 11, Error::InternalError(format!( "Wrong number of ScanFileVisitor getters: {}", getters.len() diff --git a/kernel/src/scan/state_info.rs b/kernel/src/scan/state_info.rs new file mode 100644 index 000000000..8e5a4731e --- /dev/null +++ b/kernel/src/scan/state_info.rs @@ -0,0 +1,645 @@ +//! StateInfo handles the state that we use through log-replay in order to correctly construct all +//! the physical->logical transforms needed for each add file + +use std::collections::HashSet; +use std::sync::Arc; + +use tracing::debug; + +use crate::scan::field_classifiers::TransformFieldClassifier; +use crate::scan::PhysicalPredicate; +use crate::schema::{DataType, MetadataColumnSpec, SchemaRef, StructType}; +use crate::table_configuration::TableConfiguration; +use crate::table_features::ColumnMappingMode; +use crate::transforms::{FieldTransformSpec, TransformSpec}; +use crate::{DeltaResult, Error, PredicateRef, StructField}; + +/// All the state needed to process a scan. +#[derive(Debug)] +pub(crate) struct StateInfo { + /// The logical schema for this scan + pub(crate) logical_schema: SchemaRef, + /// The physical schema to read from parquet files + pub(crate) physical_schema: SchemaRef, + /// The physical predicate for data skipping + pub(crate) physical_predicate: PhysicalPredicate, + /// Transform specification for converting physical to logical data + pub(crate) transform_spec: Option>, +} + +/// Validating the metadata columns also extracts information needed to properly construct the full +/// `StateInfo`. We use this struct to group this information so it can be cleanly passed back from +/// `validate_metadata_columns` +#[derive(Default)] +struct MetadataInfo<'a> { + /// What are the names of the requested metadata fields + metadata_field_names: HashSet<&'a String>, + /// The name of the column that's selecting row indexes if that's been requested or None if they + /// are not requested. We remember this if it's been requested explicitly. this is so we can + /// reference this column and not re-add it as a requested column if we're _also_ requesting + /// row-ids. + selected_row_index_col_name: Option<&'a String>, + /// the materializedRowIdColumnName extracted from the table config if row ids are requested, or + /// None if they are not requested + materialized_row_id_column_name: Option<&'a String>, +} + +/// This validates that we have sensible metadata columns, and that the requested metadata is +/// supported by the table. Also computes and returns any extra info needed to build the transform +/// for the requested columns. +// Runs in O(supported_number_of_metadata_columns) time since each metadata +// column can appear at most once in the schema +fn validate_metadata_columns<'a>( + logical_schema: &'a SchemaRef, + table_configuration: &'a TableConfiguration, +) -> DeltaResult> { + let mut metadata_info = MetadataInfo::default(); + let partition_columns = table_configuration.metadata().partition_columns(); + for metadata_column in logical_schema.metadata_columns() { + // Ensure we don't have a metadata column with same name as a partition column + if partition_columns.contains(metadata_column.name()) { + return Err(Error::Schema(format!( + "Metadata column names must not match partition columns: {}", + metadata_column.name() + ))); + } + match metadata_column.get_metadata_column_spec() { + Some(MetadataColumnSpec::RowIndex) => { + metadata_info.selected_row_index_col_name = Some(metadata_column.name()); + } + Some(MetadataColumnSpec::RowId) => { + if table_configuration.table_properties().enable_row_tracking != Some(true) { + return Err(Error::unsupported("Row ids are not enabled on this table")); + } + let row_id_col = table_configuration + .metadata() + .configuration() + .get("delta.rowTracking.materializedRowIdColumnName") + .ok_or(Error::generic("No delta.rowTracking.materializedRowIdColumnName key found in metadata configuration"))?; + metadata_info.materialized_row_id_column_name = Some(row_id_col); + } + Some(MetadataColumnSpec::RowCommitVersion) => {} + None => {} + } + metadata_info + .metadata_field_names + .insert(metadata_column.name()); + } + Ok(metadata_info) +} + +impl StateInfo { + /// Create StateInfo with a custom field classifier for different scan types. + /// Get the state needed to process a scan. + /// + /// `logical_schema` - The logical schema of the scan output, which includes partition columns + /// `table_configuration` - The TableConfiguration for this table + /// `predicate` - Optional predicate to filter data during the scan + /// `classifier` - The classifier to use for different scan types. Use `()` if not needed + pub(crate) fn try_new( + logical_schema: SchemaRef, + table_configuration: &TableConfiguration, + predicate: Option, + classifier: C, + ) -> DeltaResult { + let partition_columns = table_configuration.metadata().partition_columns(); + let column_mapping_mode = table_configuration.column_mapping_mode(); + let mut read_fields = Vec::with_capacity(logical_schema.num_fields()); + let mut transform_spec = Vec::new(); + let mut last_physical_field: Option = None; + + let metadata_info = validate_metadata_columns(&logical_schema, table_configuration)?; + + // Loop over all selected fields and build both the physical schema and transform spec + for (index, logical_field) in logical_schema.fields().enumerate() { + if let Some(spec) = + classifier.classify_field(logical_field, index, &last_physical_field) + { + // Classifier has handled this field via a transformation, just push it and move on + transform_spec.push(spec); + } else if partition_columns.contains(logical_field.name()) { + // push the transform for this partition column + transform_spec.push(FieldTransformSpec::MetadataDerivedColumn { + field_index: index, + insert_after: last_physical_field.clone(), + }); + } else { + // Regular field field or a metadata column, figure out which and handle it + match logical_field.get_metadata_column_spec() { + Some(MetadataColumnSpec::RowId) => { + let index_column_name = match metadata_info.selected_row_index_col_name { + Some(index_column_name) => index_column_name.to_string(), + None => { + // the index column isn't being explicitly requested, so add it to + // `read_fields` so the parquet_reader will generate it, and add a + // transform to drop it before returning logical data + + // ensure we have a column name that isn't already in our schema + let index_column_name = (0..) + .map(|i| format!("row_indexes_for_row_id_{}", i)) + .find(|name| logical_schema.field(name).is_none()) + .ok_or(Error::generic( + "Couldn't generate row index column name", + ))?; + read_fields.push(StructField::create_metadata_column( + &index_column_name, + MetadataColumnSpec::RowIndex, + )); + transform_spec.push(FieldTransformSpec::StaticDrop { + field_name: index_column_name.clone(), + }); + index_column_name + } + }; + let Some(row_id_col_name) = metadata_info.materialized_row_id_column_name + else { + return Err(Error::internal_error( + "Should always return a materialized_row_id_column_name if selecting row ids" + )); + }; + + read_fields.push(StructField::nullable(row_id_col_name, DataType::LONG)); + transform_spec.push(FieldTransformSpec::GenerateRowId { + field_name: row_id_col_name.to_string(), + row_index_field_name: index_column_name, + }); + } + Some(MetadataColumnSpec::RowCommitVersion) => { + return Err(Error::unsupported("Row commit versions not supported")); + } + Some(MetadataColumnSpec::RowIndex) | None => { + // note that RowIndex is handled in the parquet reader so we just add it as + // if it's a normal physical column + let physical_field = logical_field.make_physical(column_mapping_mode); + debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n"); + let physical_name = physical_field.name.clone(); + + if !logical_field.is_metadata_column() + && metadata_info.metadata_field_names.contains(&physical_name) + { + return Err(Error::Schema(format!( + "Metadata column names must not match physical columns, but logical column '{}' has physical name '{}'", + logical_field.name(), physical_name, + ))); + } + last_physical_field = Some(physical_name); + read_fields.push(physical_field); + } + } + } + } + + let physical_schema = Arc::new(StructType::try_new(read_fields)?); + + let physical_predicate = match predicate { + Some(pred) => PhysicalPredicate::try_new(&pred, &logical_schema)?, + None => PhysicalPredicate::None, + }; + + let transform_spec = + if !transform_spec.is_empty() || column_mapping_mode != ColumnMappingMode::None { + Some(Arc::new(transform_spec)) + } else { + None + }; + + Ok(StateInfo { + logical_schema, + physical_schema, + physical_predicate, + transform_spec, + }) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use std::{collections::HashMap, sync::Arc}; + + use url::Url; + + use crate::actions::{Metadata, Protocol}; + use crate::expressions::{column_expr, Expression as Expr}; + use crate::schema::{ColumnMetadataKey, MetadataValue}; + use crate::utils::test_utils::assert_result_error_with_message; + + use super::*; + + // get a state info with no predicate or extra metadata + pub(crate) fn get_simple_state_info( + schema: SchemaRef, + partition_columns: Vec, + ) -> DeltaResult { + get_state_info(schema, partition_columns, None, HashMap::new(), vec![]) + } + + pub(crate) fn get_state_info( + schema: SchemaRef, + partition_columns: Vec, + predicate: Option, + metadata_configuration: HashMap, + metadata_cols: Vec<(&str, MetadataColumnSpec)>, + ) -> DeltaResult { + let metadata = Metadata::try_new( + None, + None, + schema.as_ref().clone(), + partition_columns, + 10, + metadata_configuration, + )?; + let no_features: Option> = None; // needed for type annotation + let protocol = Protocol::try_new(2, 2, no_features.clone(), no_features)?; + let table_configuration = TableConfiguration::try_new( + metadata, + protocol, + Url::parse("s3://my-table").unwrap(), + 1, + )?; + + let mut schema = schema; + for (name, spec) in metadata_cols.into_iter() { + schema = Arc::new( + schema + .add_metadata_column(name, spec) + .expect("Couldn't add metadata col"), + ); + } + + StateInfo::try_new(schema.clone(), &table_configuration, predicate, ()) + } + + pub(crate) fn assert_transform_spec( + transform_spec: &TransformSpec, + requested_row_indexes: bool, + expected_row_id_name: &str, + expected_row_index_name: &str, + ) { + // if we requested row indexes, there's only one transform for the row id col, otherwise the + // first transform drops the row index column, and the second one adds the row ids + let expected_transform_count = if requested_row_indexes { 1 } else { 2 }; + let generate_offset = if requested_row_indexes { 0 } else { 1 }; + + assert_eq!(transform_spec.len(), expected_transform_count); + + if !requested_row_indexes { + // ensure we have a drop transform if we didn't request row indexes + match &transform_spec[0] { + FieldTransformSpec::StaticDrop { field_name } => { + assert_eq!(field_name, expected_row_index_name); + } + _ => panic!("Expected StaticDrop transform"), + } + } + + match &transform_spec[generate_offset] { + FieldTransformSpec::GenerateRowId { + field_name, + row_index_field_name, + } => { + assert_eq!(field_name, expected_row_id_name); + assert_eq!(row_index_field_name, expected_row_index_name); + } + _ => panic!("Expected GenerateRowId transform"), + } + } + + #[test] + fn no_partition_columns() { + // Test case: No partition columns, no column mapping + let schema = Arc::new(StructType::new_unchecked(vec![ + StructField::nullable("id", DataType::STRING), + StructField::nullable("value", DataType::LONG), + ])); + + let state_info = get_simple_state_info(schema.clone(), vec![]).unwrap(); + + // Should have no transform spec (no partitions, no column mapping) + assert!(state_info.transform_spec.is_none()); + + // Physical schema should match logical schema + assert_eq!(state_info.logical_schema, schema); + assert_eq!(state_info.physical_schema.fields().len(), 2); + + // No predicate + assert_eq!(state_info.physical_predicate, PhysicalPredicate::None); + } + + #[test] + fn with_partition_columns() { + // Test case: With partition columns + let schema = Arc::new(StructType::new_unchecked(vec![ + StructField::nullable("id", DataType::STRING), + StructField::nullable("date", DataType::DATE), // Partition column + StructField::nullable("value", DataType::LONG), + ])); + + let state_info = get_simple_state_info( + schema.clone(), + vec!["date".to_string()], // date is a partition column + ) + .unwrap(); + + // Should have a transform spec for the partition column + assert!(state_info.transform_spec.is_some()); + let transform_spec = state_info.transform_spec.as_ref().unwrap(); + assert_eq!(transform_spec.len(), 1); + + // Check the transform spec for the partition column + match &transform_spec[0] { + FieldTransformSpec::MetadataDerivedColumn { + field_index, + insert_after, + } => { + assert_eq!(*field_index, 1); // Index of "date" in logical schema + assert_eq!(insert_after, &Some("id".to_string())); // After "id" which is physical + } + _ => panic!("Expected MetadataDerivedColumn transform"), + } + + // Physical schema should not include partition column + assert_eq!(state_info.logical_schema, schema); + assert_eq!(state_info.physical_schema.fields().len(), 2); // Only id and value + } + + #[test] + fn multiple_partition_columns() { + // Test case: Multiple partition columns interspersed with regular columns + let schema = Arc::new(StructType::new_unchecked(vec![ + StructField::nullable("col1", DataType::STRING), + StructField::nullable("part1", DataType::STRING), // Partition + StructField::nullable("col2", DataType::LONG), + StructField::nullable("part2", DataType::INTEGER), // Partition + ])); + + let state_info = get_simple_state_info( + schema.clone(), + vec!["part1".to_string(), "part2".to_string()], + ) + .unwrap(); + + // Should have transforms for both partition columns + assert!(state_info.transform_spec.is_some()); + let transform_spec = state_info.transform_spec.as_ref().unwrap(); + assert_eq!(transform_spec.len(), 2); + + // Check first partition column transform + match &transform_spec[0] { + FieldTransformSpec::MetadataDerivedColumn { + field_index, + insert_after, + } => { + assert_eq!(*field_index, 1); // Index of "part1" + assert_eq!(insert_after, &Some("col1".to_string())); + } + _ => panic!("Expected MetadataDerivedColumn transform"), + } + + // Check second partition column transform + match &transform_spec[1] { + FieldTransformSpec::MetadataDerivedColumn { + field_index, + insert_after, + } => { + assert_eq!(*field_index, 3); // Index of "part2" + assert_eq!(insert_after, &Some("col2".to_string())); + } + _ => panic!("Expected MetadataDerivedColumn transform"), + } + + // Physical schema should only have non-partition columns + assert_eq!(state_info.physical_schema.fields().len(), 2); // col1 and col2 + } + + #[test] + fn with_predicate() { + // Test case: With a valid predicate + let schema = Arc::new(StructType::new_unchecked(vec![ + StructField::nullable("id", DataType::STRING), + StructField::nullable("value", DataType::LONG), + ])); + + let predicate = Arc::new(column_expr!("value").gt(Expr::literal(10i64))); + + let state_info = get_state_info( + schema.clone(), + vec![], // no partition columns + Some(predicate), + HashMap::new(), // no extra metadata + vec![], // no metadata + ) + .unwrap(); + + // Should have a physical predicate + match &state_info.physical_predicate { + PhysicalPredicate::Some(_pred, schema) => { + // Physical predicate exists + assert_eq!(schema.fields().len(), 1); // Only "value" is referenced + } + _ => panic!("Expected PhysicalPredicate::Some"), + } + } + + #[test] + fn partition_at_beginning() { + // Test case: Partition column at the beginning + let schema = Arc::new(StructType::new_unchecked(vec![ + StructField::nullable("date", DataType::DATE), // Partition column + StructField::nullable("id", DataType::STRING), + StructField::nullable("value", DataType::LONG), + ])); + + let state_info = get_simple_state_info(schema.clone(), vec!["date".to_string()]).unwrap(); + + // Should have a transform spec for the partition column + let transform_spec = state_info.transform_spec.as_ref().unwrap(); + assert_eq!(transform_spec.len(), 1); + + match &transform_spec[0] { + FieldTransformSpec::MetadataDerivedColumn { + field_index, + insert_after, + } => { + assert_eq!(*field_index, 0); // Index of "date" + assert_eq!(insert_after, &None); // No physical field before it, so prepend + } + _ => panic!("Expected MetadataDerivedColumn transform"), + } + } + + fn get_string_map(slice: &[(&str, &str)]) -> HashMap { + slice + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[test] + fn request_row_ids() { + let schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable( + "id", + DataType::STRING, + )])); + + let state_info = get_state_info( + schema.clone(), + vec![], + None, + get_string_map(&[ + ("delta.enableRowTracking", "true"), + ( + "delta.rowTracking.materializedRowIdColumnName", + "some_row_id_col", + ), + ]), + vec![("row_id", MetadataColumnSpec::RowId)], + ) + .unwrap(); + + // Should have a transform spec for the row_id column + let transform_spec = state_info.transform_spec.as_ref().unwrap(); + assert_transform_spec( + transform_spec, + false, // we did not request row indexes + "some_row_id_col", + "row_indexes_for_row_id_0", + ); + } + + #[test] + fn request_row_ids_conflicting_row_index_col_name() { + let schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable( + "row_indexes_for_row_id_0", // this will conflict with the first generated name for row indexes + DataType::STRING, + )])); + + let state_info = get_state_info( + schema.clone(), + vec![], + None, + get_string_map(&[ + ("delta.enableRowTracking", "true"), + ( + "delta.rowTracking.materializedRowIdColumnName", + "some_row_id_col", + ), + ]), + vec![("row_id", MetadataColumnSpec::RowId)], + ) + .unwrap(); + + // Should have a transform spec for the row_id column + let transform_spec = state_info.transform_spec.as_ref().unwrap(); + assert_transform_spec( + transform_spec, + false, // we did not request row indexes + "some_row_id_col", + "row_indexes_for_row_id_1", // ensure we didn't conflict with the col in the schema + ); + } + + #[test] + fn request_row_ids_and_indexes() { + let schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable( + "id", + DataType::STRING, + )])); + + let state_info = get_state_info( + schema.clone(), + vec![], + None, + get_string_map(&[ + ("delta.enableRowTracking", "true"), + ( + "delta.rowTracking.materializedRowIdColumnName", + "some_row_id_col", + ), + ]), + vec![ + ("row_id", MetadataColumnSpec::RowId), + ("row_index", MetadataColumnSpec::RowIndex), + ], + ) + .unwrap(); + + // Should have a transform spec for the row_id column + let transform_spec = state_info.transform_spec.as_ref().unwrap(); + assert_transform_spec( + transform_spec, + true, // we did request row indexes + "some_row_id_col", + "row_index", + ); + } + + #[test] + fn invalid_rowtracking_config() { + let schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable( + "id", + DataType::STRING, + )])); + + for (metadata_config, metadata_cols, expected_error) in [ + (HashMap::new(), vec![("row_id", MetadataColumnSpec::RowId)], "Unsupported: Row ids are not enabled on this table"), + ( + get_string_map(&[("delta.enableRowTracking", "true")]), + vec![("row_id", MetadataColumnSpec::RowId)], + "Generic delta kernel error: No delta.rowTracking.materializedRowIdColumnName key found in metadata configuration", + ), + ] { + let res = get_state_info(schema.clone(), vec![], None, metadata_config, metadata_cols); + assert_result_error_with_message(res, expected_error); + } + } + + #[test] + fn metadata_column_matches_partition_column() { + let schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable( + "id", + DataType::STRING, + )])); + let res = get_state_info( + schema.clone(), + vec!["part_col".to_string()], + None, + HashMap::new(), + vec![("part_col", MetadataColumnSpec::RowId)], + ); + assert_result_error_with_message( + res, + "Schema error: Metadata column names must not match partition columns: part_col", + ); + } + + #[test] + fn metadata_column_matches_read_field() { + let schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable( + "id", + DataType::STRING, + ) + .with_metadata(HashMap::::from([ + ( + ColumnMetadataKey::ColumnMappingId.as_ref().to_string(), + 1.into(), + ), + ( + ColumnMetadataKey::ColumnMappingPhysicalName + .as_ref() + .to_string(), + "other".into(), + ), + ]))])); + let res = get_state_info( + schema.clone(), + vec![], + None, + get_string_map(&[("delta.columnMapping.mode", "name")]), + vec![("other", MetadataColumnSpec::RowIndex)], + ); + assert_result_error_with_message( + res, + "Schema error: Metadata column names must not match physical columns, but logical column 'id' has physical name 'other'" + ); + } +} diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index c5898e0eb..9172dc883 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -240,13 +240,6 @@ impl TableChanges { pub fn table_root(&self) -> &Url { &self.table_root } - /// The partition columns that will be read. - pub(crate) fn partition_columns(&self) -> &Vec { - self.end_snapshot - .table_configuration() - .metadata() - .partition_columns() - } /// Create a [`TableChangesScanBuilder`] for an `Arc`. pub fn scan_builder(self: Arc) -> TableChangesScanBuilder { diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs index 3c9b3a62d..dd38f8015 100644 --- a/kernel/src/table_changes/physical_to_logical.rs +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use crate::expressions::Scalar; -use crate::scan::StateInfo; +use crate::scan::state_info::StateInfo; use crate::schema::{DataType, SchemaRef, StructField, StructType}; use crate::transforms::{get_transform_expr, parse_partition_values}; use crate::{DeltaResult, Error, ExpressionRef}; @@ -79,6 +79,9 @@ pub(crate) fn scan_file_physical_schema( // added to overwrite any conflicting values. This behavior can be made more strict by changing // the parse_partition_values function to return an error for missing partition values, // and adding cdf values to the partition_values map + +// Note: Delta doesn't support row-tracking for CDF (see: +// https://docs.databricks.com/aws/en/delta/row-tracking#limitations) pub(crate) fn get_cdf_transform_expr( scan_file: &CdfScanFile, state_info: &StateInfo, @@ -111,9 +114,13 @@ pub(crate) fn get_cdf_transform_expr( let cdf_values = get_cdf_columns(&state_info.logical_schema, scan_file)?; partition_values.extend(cdf_values); - let expr = get_transform_expr(transform_spec, partition_values, physical_schema)?; - - Ok(Some(expr)) + get_transform_expr( + transform_spec, + partition_values, + physical_schema, + None, /* base_row_id */ + ) + .map(Some) } #[cfg(test)] @@ -121,7 +128,8 @@ mod tests { use super::*; use crate::expressions::Expression; use crate::scan::state::DvInfo; - use crate::scan::{PhysicalPredicate, StateInfo}; + use crate::scan::state_info::StateInfo; + use crate::scan::PhysicalPredicate; use crate::schema::{DataType, StructField, StructType}; use crate::transforms::FieldTransformSpec; use std::collections::HashMap; diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 56f19afd4..a7a59198b 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -7,7 +7,8 @@ use url::Url; use crate::actions::deletion_vector::split_vector; use crate::scan::field_classifiers::CdfTransformFieldClassifier; -use crate::scan::{PhysicalPredicate, ScanResult, StateInfo}; +use crate::scan::state_info::StateInfo; +use crate::scan::{PhysicalPredicate, ScanResult}; use crate::schema::SchemaRef; use crate::{DeltaResult, Engine, FileMeta, PredicateRef}; @@ -115,11 +116,7 @@ impl TableChangesScanBuilder { // Create StateInfo using CDF field classifier let state_info = StateInfo::try_new( logical_schema, - self.table_changes.partition_columns(), - self.table_changes - .end_snapshot - .table_configuration() - .column_mapping_mode(), + self.table_changes.end_snapshot.table_configuration(), self.predicate, CdfTransformFieldClassifier, )?; diff --git a/kernel/src/transforms.rs b/kernel/src/transforms.rs index d73a1e39f..571c86130 100644 --- a/kernel/src/transforms.rs +++ b/kernel/src/transforms.rs @@ -9,7 +9,9 @@ use std::sync::Arc; use itertools::Itertools; -use crate::expressions::{Expression, ExpressionRef, Scalar, Transform}; +use crate::expressions::{ + BinaryExpressionOp, Expression, ExpressionRef, Scalar, Transform, VariadicExpressionOp, +}; use crate::schema::{DataType, SchemaRef, StructType}; use crate::{DeltaResult, Error}; @@ -30,19 +32,18 @@ pub(crate) enum FieldTransformSpec { insert_after: Option, expr: ExpressionRef, }, - /// Replace the named input column with an expression - // NOTE: Row tracking will eventually need to replace the physical rowid column with a COALESCE - // to compute non-materialized row ids and row commit versions. - #[allow(unused)] - StaticReplace { - field_name: String, - expr: ExpressionRef, - }, /// Drops the named input column - // NOTE: Row tracking will need to drop metadata columns that were used to compute rowids, since + // NOTE: Row tracking needs to drop metadata columns that were used to compute rowids, since // they should not appear in the query's output. #[allow(unused)] StaticDrop { field_name: String }, + /// Generate the RowId column. + GenerateRowId { + /// column name which should end up containing the RowId + field_name: String, + /// column name which contains row indexes + row_index_field_name: String, + }, /// Insert a partition column after the named input column. /// The partition column is identified by its field index in the logical table schema. /// Its value varies from file to file and is obtained from file metadata. @@ -94,7 +95,7 @@ pub(crate) fn parse_partition_values( ), FieldTransformSpec::DynamicColumn { .. } | FieldTransformSpec::StaticInsert { .. } - | FieldTransformSpec::StaticReplace { .. } + | FieldTransformSpec::GenerateRowId { .. } | FieldTransformSpec::StaticDrop { .. } => None, }) .try_collect() @@ -109,6 +110,7 @@ pub(crate) fn get_transform_expr( transform_spec: &TransformSpec, mut metadata_values: HashMap, physical_schema: &StructType, + base_row_id: Option, ) -> DeltaResult { let mut transform = Transform::new_top_level(); @@ -118,10 +120,27 @@ pub(crate) fn get_transform_expr( StaticInsert { insert_after, expr } => { transform.with_inserted_field(insert_after.clone(), expr.clone()) } - StaticReplace { field_name, expr } => { - transform.with_replaced_field(field_name.clone(), expr.clone()) - } StaticDrop { field_name } => transform.with_dropped_field(field_name.clone()), + GenerateRowId { + field_name, + row_index_field_name, + } => { + let base_row_id = base_row_id.ok_or_else(|| { + Error::generic("Asked to generate RowIds, but no baseRowId found.") + })?; + let expr = Arc::new(Expression::variadic( + VariadicExpressionOp::Coalesce, + vec![ + Expression::column([field_name]), + Expression::binary( + BinaryExpressionOp::Plus, + Expression::literal(base_row_id), + Expression::column([row_index_field_name]), + ), + ], + )); + transform.with_replaced_field(field_name.clone(), expr) + } MetadataDerivedColumn { field_index, insert_after, @@ -314,7 +333,12 @@ mod tests { // Create a minimal physical schema for test let physical_schema = StructType::new_unchecked(vec![]); - let result = get_transform_expr(&transform_spec, partition_values, &physical_schema); + let result = get_transform_expr( + &transform_spec, + partition_values, + &physical_schema, + None, /* base_row_id */ + ); assert_result_error_with_message(result, "missing partition value"); } @@ -326,12 +350,8 @@ mod tests { insert_after: Some("col1".to_string()), expr: expr.clone(), }, - FieldTransformSpec::StaticReplace { - field_name: "col2".to_string(), - expr: expr.clone(), - }, FieldTransformSpec::StaticDrop { - field_name: "col3".to_string(), + field_name: "col2".to_string(), }, ]; let metadata_values = HashMap::new(); @@ -339,11 +359,15 @@ mod tests { // Create a physical schema with the relevant columns let physical_schema = StructType::new_unchecked(vec![ StructField::nullable("col1", DataType::STRING), - StructField::nullable("col2", DataType::INTEGER), - StructField::nullable("col3", DataType::LONG), + StructField::nullable("col2", DataType::LONG), ]); - let result = - get_transform_expr(&transform_spec, metadata_values, &physical_schema).unwrap(); + let result = get_transform_expr( + &transform_spec, + metadata_values, + &physical_schema, + None, /* base_row_id */ + ) + .unwrap(); let Expression::Transform(transform) = result.as_ref() else { panic!("Expected Transform expression"); @@ -359,20 +383,10 @@ mod tests { }; assert_eq!(scalar, &Scalar::Integer(42)); - // Verify StaticReplace: should replace col2 with the expression + // Verify StaticDrop: should drop col2 (empty expressions and is_replace = true) assert!(transform.field_transforms.contains_key("col2")); assert!(transform.field_transforms["col2"].is_replace); - assert_eq!(transform.field_transforms["col2"].exprs.len(), 1); - let Expression::Literal(scalar) = transform.field_transforms["col2"].exprs[0].as_ref() - else { - panic!("Expected literal expression for replace"); - }; - assert_eq!(scalar, &Scalar::Integer(42)); - - // Verify StaticDrop: should drop col3 (empty expressions and is_replace = true) - assert!(transform.field_transforms.contains_key("col3")); - assert!(transform.field_transforms["col3"].is_replace); - assert!(transform.field_transforms["col3"].exprs.is_empty()); + assert!(transform.field_transforms["col2"].exprs.is_empty()); } #[test] @@ -390,7 +404,12 @@ mod tests { ]); let metadata_values = HashMap::new(); - let result = get_transform_expr(&transform_spec, metadata_values, &physical_schema); + let result = get_transform_expr( + &transform_spec, + metadata_values, + &physical_schema, + None, /* base_row_id */ + ); let transform_expr = result.expect("Transform expression should be created successfully"); let Expression::Transform(transform) = transform_expr.as_ref() else { @@ -433,7 +452,12 @@ mod tests { ), ); - let result = get_transform_expr(&transform_spec, metadata_values, &physical_schema); + let result = get_transform_expr( + &transform_spec, + metadata_values, + &physical_schema, + None, /* base_row_id */ + ); let transform_expr = result.expect("Transform expression should be created successfully"); let Expression::Transform(transform) = transform_expr.as_ref() else { @@ -465,7 +489,12 @@ mod tests { let mut metadata_values = HashMap::new(); metadata_values.insert(1, ("year".to_string(), Scalar::Integer(2024))); - let result = get_transform_expr(&transform_spec, metadata_values, &physical_schema); + let result = get_transform_expr( + &transform_spec, + metadata_values, + &physical_schema, + None, /* base_row_id */ + ); let transform_expr = result.expect("Transform expression should be created successfully"); let Expression::Transform(transform) = transform_expr.as_ref() else { @@ -500,7 +529,86 @@ mod tests { let metadata_values = HashMap::new(); // Should fail with missing data error - let result = get_transform_expr(&transform_spec, metadata_values, &physical_schema); + let result = get_transform_expr( + &transform_spec, + metadata_values, + &physical_schema, + None, /* base_row_id */ + ); assert_result_error_with_message(result, "missing partition value for dynamic column"); } + + #[test] + fn get_transform_expr_generate_row_ids() { + let transform_spec = vec![FieldTransformSpec::GenerateRowId { + field_name: "row_id_col".to_string(), + row_index_field_name: "row_index_col".to_string(), + }]; + + // Physical schema contains row index col, but no row-id col + let physical_schema = StructType::new_unchecked(vec![ + StructField::nullable("id", DataType::STRING), + StructField::not_null("row_index_col", DataType::LONG), + ]); + let metadata_values = HashMap::new(); + + let result = get_transform_expr( + &transform_spec, + metadata_values, + &physical_schema, + Some(4), /* base_row_id */ + ); + let transform_expr = result.expect("Transform expression should be created successfully"); + + let Expression::Transform(transform) = transform_expr.as_ref() else { + panic!("Expected Transform expression"); + }; + + assert!(transform.input_path.is_none()); + let row_id_transform = transform + .field_transforms + .get("row_id_col") + .expect("Should have row_id_col transform"); + assert!(row_id_transform.is_replace); + + let expeceted_expr = Arc::new(Expression::variadic( + VariadicExpressionOp::Coalesce, + vec![ + Expression::column(["row_id_col"]), + Expression::binary( + BinaryExpressionOp::Plus, + Expression::literal(4i64), + Expression::column(["row_index_col"]), + ), + ], + )); + assert_eq!(row_id_transform.exprs.len(), 1); + let expr = &row_id_transform.exprs[0]; + assert_eq!(expr, &expeceted_expr); + } + + #[test] + fn get_transform_expr_generate_row_ids_no_base_id() { + let transform_spec = vec![FieldTransformSpec::GenerateRowId { + field_name: "row_id_col".to_string(), + row_index_field_name: "row_index_col".to_string(), + }]; + + // Physical schema contains row index col, but no row-id col + let physical_schema = StructType::new_unchecked(vec![ + StructField::nullable("id", DataType::STRING), + StructField::not_null("row_index_col", DataType::LONG), + ]); + let metadata_values = HashMap::new(); + + assert_result_error_with_message( + get_transform_expr( + &transform_spec, + metadata_values, + &physical_schema, + None, /* base_row_id */ + ), + "Asked to generate RowIds, but no baseRowId found", + ); + } } diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 91fb2f052..0a8c474a3 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -1500,44 +1500,34 @@ async fn test_unsupported_metadata_columns() -> Result<(), Box { - let error_msg = e.to_string(); - if error_msg.contains(error_text) && error_msg.contains("not supported") { - found_error = true; - break; - } - } - Ok(_) => { - panic!( - "Expected error for {} metadata column, but scan succeeded", - error_text - ); - } - } - } + + let scan_err = snapshot + .scan_builder() + .with_schema(schema) + .build() + .unwrap_err(); + let error_msg = scan_err.to_string(); assert!( - found_error, - "Expected error about {} not being supported", - error_text + error_msg.contains(error_text), + "Expected {error_msg} to contain {error_text}" ); }