diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 2f98488ce..08f1c79c7 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -12,8 +12,8 @@ use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateE use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor}; use crate::scan::Scalar; use crate::schema::ToSchema as _; -use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType}; -use crate::transforms::{get_transform_expr, parse_partition_values, TransformSpec}; +use crate::schema::{ColumnNamesAndTypes, DataType, MapType, StructField, StructType}; +use crate::transforms::{get_transform_expr, parse_partition_values}; use crate::utils::require; use crate::{DeltaResult, Engine, Error, ExpressionEvaluator}; @@ -92,9 +92,7 @@ impl ScanLogReplayProcessor { struct AddRemoveDedupVisitor<'seen> { deduplicator: FileActionDeduplicator<'seen>, selection_vector: Vec, - logical_schema: SchemaRef, - physical_schema: SchemaRef, - transform_spec: Option>, + state_info: Arc, partition_filter: Option, row_transform_exprs: Vec>, } @@ -111,9 +109,7 @@ impl AddRemoveDedupVisitor<'_> { fn new( seen: &mut HashSet, selection_vector: Vec, - logical_schema: SchemaRef, - physical_schema: SchemaRef, - transform_spec: Option>, + state_info: Arc, partition_filter: Option, is_log_batch: bool, ) -> AddRemoveDedupVisitor<'_> { @@ -127,9 +123,7 @@ impl AddRemoveDedupVisitor<'_> { Self::REMOVE_DV_START_INDEX, ), selection_vector, - logical_schema, - physical_schema, - transform_spec, + state_info, partition_filter, row_transform_exprs: Vec::new(), } @@ -177,12 +171,16 @@ impl AddRemoveDedupVisitor<'_> { // WARNING: It's not safe to partition-prune removes (just like it's not safe to data skip // removes), because they are needed to suppress earlier incompatible adds we might // encounter if the table's schema was replaced after the most recent checkpoint. - let partition_values = match &self.transform_spec { + let partition_values = match &self.state_info.transform_spec { Some(transform) if is_add => { let partition_values = getters[Self::ADD_PARTITION_VALUES_INDEX].get(i, "add.partitionValues")?; - let partition_values = - parse_partition_values(&self.logical_schema, transform, &partition_values)?; + let partition_values = parse_partition_values( + &self.state_info.logical_schema, + transform, + &partition_values, + self.state_info.column_mapping_mode, + )?; if self.is_file_partition_pruned(&partition_values) { return Ok(false); } @@ -196,9 +194,16 @@ impl AddRemoveDedupVisitor<'_> { return Ok(false); } let transform = self + .state_info .transform_spec .as_ref() - .map(|transform| get_transform_expr(transform, partition_values, &self.physical_schema)) + .map(|transform| { + get_transform_expr( + transform, + partition_values, + &self.state_info.physical_schema, + ) + }) .transpose()?; if transform.is_some() { // fill in any needed `None`s for previous rows @@ -334,9 +339,7 @@ impl LogReplayProcessor for ScanLogReplayProcessor { let mut visitor = AddRemoveDedupVisitor::new( &mut self.seen_file_keys, selection_vector, - self.state_info.logical_schema.clone(), - self.state_info.physical_schema.clone(), - self.state_info.transform_spec.clone(), + self.state_info.clone(), self.partition_filter.clone(), is_log_batch, ); @@ -450,6 +453,7 @@ mod tests { physical_schema: logical_schema.clone(), physical_predicate: PhysicalPredicate::None, transform_spec: None, + column_mapping_mode: ColumnMappingMode::None, }); let iter = scan_action_iter( &SyncEngine::new(), diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 02ddd9ded..bd1e79f08 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -158,6 +158,7 @@ impl PhysicalPredicate { pub(crate) fn try_new( predicate: &Predicate, logical_schema: &Schema, + column_mapping_mode: ColumnMappingMode, ) -> DeltaResult { if can_statically_skip_all_files(predicate) { return Ok(PhysicalPredicate::StaticSkipAll); @@ -167,6 +168,7 @@ impl PhysicalPredicate { column_mappings: HashMap::new(), logical_path: vec![], physical_path: vec![], + column_mapping_mode, }; let schema_opt = get_referenced_fields.transform_struct(logical_schema); let mut unresolved = get_referenced_fields.unresolved_references.into_iter(); @@ -217,6 +219,7 @@ struct GetReferencedFields<'a> { column_mappings: HashMap, logical_path: Vec, physical_path: Vec, + column_mapping_mode: ColumnMappingMode, } impl<'a> SchemaTransform<'a> for GetReferencedFields<'a> { // Capture the path mapping for this leaf field @@ -242,7 +245,7 @@ impl<'a> SchemaTransform<'a> for GetReferencedFields<'a> { } fn transform_struct_field(&mut self, field: &'a StructField) -> Option> { - let physical_name = field.physical_name(); + let physical_name = field.physical_name(self.column_mapping_mode); self.logical_path.push(field.name.clone()); self.physical_path.push(physical_name.to_string()); let field = self.recurse_into_struct_field(field); @@ -756,6 +759,8 @@ pub(crate) struct StateInfo { pub(crate) physical_predicate: PhysicalPredicate, /// Transform specification for converting physical to logical data pub(crate) transform_spec: Option>, + /// The column mapping mode for this scan + pub(crate) column_mapping_mode: ColumnMappingMode, } impl StateInfo { @@ -830,7 +835,7 @@ impl StateInfo { let physical_schema = Arc::new(StructType::try_new(read_fields)?); let physical_predicate = match predicate { - Some(pred) => PhysicalPredicate::try_new(&pred, &logical_schema)?, + Some(pred) => PhysicalPredicate::try_new(&pred, &logical_schema, column_mapping_mode)?, None => PhysicalPredicate::None, }; @@ -846,6 +851,7 @@ impl StateInfo { physical_schema, physical_predicate, transform_spec, + column_mapping_mode, }) } } @@ -882,6 +888,7 @@ pub(crate) mod test_utils { use super::state::ScanCallback; use super::{PhysicalPredicate, StateInfo}; + use crate::table_features::ColumnMappingMode; use crate::transforms::TransformSpec; // Generates a batch of sidecar actions with the given paths. @@ -981,6 +988,7 @@ pub(crate) mod test_utils { physical_schema: logical_schema, physical_predicate: PhysicalPredicate::None, transform_spec, + column_mapping_mode: ColumnMappingMode::None, }); let iter = scan_action_iter( &SyncEngine::new(), @@ -1191,7 +1199,9 @@ mod tests { ]; for (predicate, expected) in test_cases { - let result = PhysicalPredicate::try_new(&predicate, &logical_schema).ok(); + let result = + PhysicalPredicate::try_new(&predicate, &logical_schema, ColumnMappingMode::Name) + .ok(); assert_eq!( result, expected, "Failed for predicate: {predicate:#?}, expected {expected:#?}, got {result:#?}" diff --git a/kernel/src/schema/mod.rs b/kernel/src/schema/mod.rs index 8c1c4302a..844856852 100644 --- a/kernel/src/schema/mod.rs +++ b/kernel/src/schema/mod.rs @@ -314,17 +314,26 @@ impl StructField { /// Get the physical name for this field as it should be read from parquet. /// + /// When `column_mapping_mode` is `None`, always returns the logical name (even if physical + /// name metadata is present). When mode is `Id` or `Name`, returns the physical name from + /// metadata if present, otherwise returns the logical name. + /// /// NOTE: Caller affirms that the schema was already validated by /// [`crate::table_features::validate_schema_column_mapping`], to ensure that annotations are /// always and only present when column mapping mode is enabled. #[internal_api] - pub(crate) fn physical_name(&self) -> &str { - match self - .metadata - .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) - { - Some(MetadataValue::String(physical_name)) => physical_name, - _ => &self.name, + pub(crate) fn physical_name(&self, column_mapping_mode: ColumnMappingMode) -> &str { + match column_mapping_mode { + ColumnMappingMode::None => &self.name, + ColumnMappingMode::Id | ColumnMappingMode::Name => { + match self + .metadata + .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) + { + Some(MetadataValue::String(physical_name)) => physical_name, + _ => &self.name, + } + } } } @@ -422,7 +431,7 @@ impl StructField { .is_some_and(|x| matches!(x, MetadataValue::String(_)))); } } - field.physical_name().to_owned() + field.physical_name(self.column_mapping_mode).to_owned() } }; @@ -1901,7 +1910,7 @@ mod tests { assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4)); assert!(matches!(id_start, MetadataValue::Number(num) if *num == 2147483648i64)); assert_eq!( - field.physical_name(), + field.physical_name(mode), "col-5f422f40-de70-45b2-88ab-1d5c90e94db1" ); let physical_field = field.make_physical(mode); diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index f1278504a..b9e2c3e90 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -10,7 +10,7 @@ use crate::scan::state::DvInfo; use crate::scan::PhysicalPredicate; use crate::schema::{DataType, StructField, StructType}; use crate::table_changes::log_replay::LogReplayScanner; -use crate::table_features::ReaderFeature; +use crate::table_features::{ColumnMappingMode, ReaderFeature}; use crate::utils::test_utils::{assert_result_error_with_message, Action, LocalMockTable}; use crate::Predicate; use crate::{DeltaResult, Engine, Error, Version}; @@ -539,10 +539,11 @@ async fn data_skipping_filter() { Scalar::from(4), ); let logical_schema = get_schema(); - let predicate = match PhysicalPredicate::try_new(&predicate, &logical_schema) { - Ok(PhysicalPredicate::Some(p, s)) => Some((p, s)), - other => panic!("Unexpected result: {other:?}"), - }; + let predicate = + match PhysicalPredicate::try_new(&predicate, &logical_schema, ColumnMappingMode::None) { + Ok(PhysicalPredicate::Some(p, s)) => Some((p, s)), + other => panic!("Unexpected result: {other:?}"), + }; let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) .unwrap() .into_iter(); diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs index 14d0a4bcd..5c9ed9d00 100644 --- a/kernel/src/table_changes/physical_to_logical.rs +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -97,6 +97,7 @@ pub(crate) fn get_cdf_transform_expr( &state_info.logical_schema, transform_spec, &scan_file.partition_values, + state_info.column_mapping_mode, )?; partition_values.extend(parsed_values); @@ -114,6 +115,7 @@ mod tests { use crate::scan::state::DvInfo; use crate::scan::{PhysicalPredicate, StateInfo}; use crate::schema::{DataType, StructField, StructType}; + use crate::table_features::ColumnMappingMode; use crate::transforms::FieldTransformSpec; use std::collections::HashMap; use std::sync::Arc; @@ -163,6 +165,7 @@ mod tests { physical_schema: physical_schema.into(), physical_predicate: PhysicalPredicate::None, transform_spec: Some(Arc::new(transform_spec)), + column_mapping_mode: ColumnMappingMode::None, } } diff --git a/kernel/src/transforms.rs b/kernel/src/transforms.rs index d73a1e39f..11053f968 100644 --- a/kernel/src/transforms.rs +++ b/kernel/src/transforms.rs @@ -11,6 +11,7 @@ use itertools::Itertools; use crate::expressions::{Expression, ExpressionRef, Scalar, Transform}; use crate::schema::{DataType, SchemaRef, StructType}; +use crate::table_features::ColumnMappingMode; use crate::{DeltaResult, Error}; /// A list of field transforms that describes a transform expression to be created at scan time. @@ -69,13 +70,14 @@ pub(crate) fn parse_partition_value( field_idx: usize, logical_schema: &SchemaRef, partition_values: &HashMap, + column_mapping_mode: ColumnMappingMode, ) -> DeltaResult<(usize, (String, Scalar))> { let Some(field) = logical_schema.field_at_index(field_idx) else { return Err(Error::InternalError(format!( "out of bounds partition column field index {field_idx}" ))); }; - let name = field.physical_name(); + let name = field.physical_name(column_mapping_mode); let partition_value = parse_partition_value_raw(partition_values.get(name), field.data_type())?; Ok((field_idx, (name.to_string(), partition_value))) } @@ -85,13 +87,19 @@ pub(crate) fn parse_partition_values( logical_schema: &SchemaRef, transform_spec: &TransformSpec, partition_values: &HashMap, + column_mapping_mode: ColumnMappingMode, ) -> DeltaResult> { transform_spec .iter() .filter_map(|field_transform| match field_transform { - FieldTransformSpec::MetadataDerivedColumn { field_index, .. } => Some( - parse_partition_value(*field_index, logical_schema, partition_values), - ), + FieldTransformSpec::MetadataDerivedColumn { field_index, .. } => { + Some(parse_partition_value( + *field_index, + logical_schema, + partition_values, + column_mapping_mode, + )) + } FieldTransformSpec::DynamicColumn { .. } | FieldTransformSpec::StaticInsert { .. } | FieldTransformSpec::StaticReplace { .. } @@ -202,7 +210,7 @@ mod tests { )])); let partition_values = HashMap::new(); - let result = parse_partition_value(5, &schema, &partition_values); + let result = parse_partition_value(5, &schema, &partition_values, ColumnMappingMode::None); assert_result_error_with_message(result, "out of bounds"); } @@ -237,7 +245,13 @@ mod tests { partition_values.insert("id".to_string(), "test".to_string()); partition_values.insert("_change_type".to_string(), "insert".to_string()); - let result = parse_partition_values(&schema, &transform_spec, &partition_values).unwrap(); + let result = parse_partition_values( + &schema, + &transform_spec, + &partition_values, + ColumnMappingMode::None, + ) + .unwrap(); assert_eq!(result.len(), 2); assert!(result.contains_key(&0)); assert!(result.contains_key(&1)); @@ -257,7 +271,13 @@ mod tests { let transform_spec = vec![]; let partition_values = HashMap::new(); - let result = parse_partition_values(&schema, &transform_spec, &partition_values).unwrap(); + let result = parse_partition_values( + &schema, + &transform_spec, + &partition_values, + ColumnMappingMode::None, + ) + .unwrap(); assert!(result.is_empty()); }