Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateE
use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor};
use crate::scan::Scalar;
use crate::schema::ToSchema as _;
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType};
use crate::transforms::{get_transform_expr, parse_partition_values, TransformSpec};
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, StructField, StructType};
use crate::transforms::{get_transform_expr, parse_partition_values};
use crate::utils::require;
use crate::{DeltaResult, Engine, Error, ExpressionEvaluator};

Expand Down Expand Up @@ -92,9 +92,7 @@ impl ScanLogReplayProcessor {
struct AddRemoveDedupVisitor<'seen> {
deduplicator: FileActionDeduplicator<'seen>,
selection_vector: Vec<bool>,
logical_schema: SchemaRef,
physical_schema: SchemaRef,
transform_spec: Option<Arc<TransformSpec>>,
state_info: Arc<StateInfo>,
partition_filter: Option<PredicateRef>,
row_transform_exprs: Vec<Option<ExpressionRef>>,
}
Expand All @@ -111,9 +109,7 @@ impl AddRemoveDedupVisitor<'_> {
fn new(
seen: &mut HashSet<FileActionKey>,
selection_vector: Vec<bool>,
logical_schema: SchemaRef,
physical_schema: SchemaRef,
transform_spec: Option<Arc<TransformSpec>>,
state_info: Arc<StateInfo>,
partition_filter: Option<PredicateRef>,
is_log_batch: bool,
) -> AddRemoveDedupVisitor<'_> {
Expand All @@ -127,9 +123,7 @@ impl AddRemoveDedupVisitor<'_> {
Self::REMOVE_DV_START_INDEX,
),
selection_vector,
logical_schema,
physical_schema,
transform_spec,
state_info,
partition_filter,
row_transform_exprs: Vec::new(),
}
Expand Down Expand Up @@ -177,12 +171,16 @@ impl AddRemoveDedupVisitor<'_> {
// WARNING: It's not safe to partition-prune removes (just like it's not safe to data skip
// removes), because they are needed to suppress earlier incompatible adds we might
// encounter if the table's schema was replaced after the most recent checkpoint.
let partition_values = match &self.transform_spec {
let partition_values = match &self.state_info.transform_spec {
Some(transform) if is_add => {
let partition_values =
getters[Self::ADD_PARTITION_VALUES_INDEX].get(i, "add.partitionValues")?;
let partition_values =
parse_partition_values(&self.logical_schema, transform, &partition_values)?;
let partition_values = parse_partition_values(
&self.state_info.logical_schema,
transform,
&partition_values,
self.state_info.column_mapping_mode,
)?;
if self.is_file_partition_pruned(&partition_values) {
return Ok(false);
}
Expand All @@ -196,9 +194,16 @@ impl AddRemoveDedupVisitor<'_> {
return Ok(false);
}
let transform = self
.state_info
.transform_spec
.as_ref()
.map(|transform| get_transform_expr(transform, partition_values, &self.physical_schema))
.map(|transform| {
get_transform_expr(
transform,
partition_values,
&self.state_info.physical_schema,
)
})
.transpose()?;
if transform.is_some() {
// fill in any needed `None`s for previous rows
Expand Down Expand Up @@ -334,9 +339,7 @@ impl LogReplayProcessor for ScanLogReplayProcessor {
let mut visitor = AddRemoveDedupVisitor::new(
&mut self.seen_file_keys,
selection_vector,
self.state_info.logical_schema.clone(),
self.state_info.physical_schema.clone(),
self.state_info.transform_spec.clone(),
self.state_info.clone(),
self.partition_filter.clone(),
is_log_batch,
);
Expand Down Expand Up @@ -450,6 +453,7 @@ mod tests {
physical_schema: logical_schema.clone(),
physical_predicate: PhysicalPredicate::None,
transform_spec: None,
column_mapping_mode: ColumnMappingMode::None,
});
let iter = scan_action_iter(
&SyncEngine::new(),
Expand Down
16 changes: 13 additions & 3 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl PhysicalPredicate {
pub(crate) fn try_new(
predicate: &Predicate,
logical_schema: &Schema,
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<PhysicalPredicate> {
if can_statically_skip_all_files(predicate) {
return Ok(PhysicalPredicate::StaticSkipAll);
Expand All @@ -167,6 +168,7 @@ impl PhysicalPredicate {
column_mappings: HashMap::new(),
logical_path: vec![],
physical_path: vec![],
column_mapping_mode,
};
let schema_opt = get_referenced_fields.transform_struct(logical_schema);
let mut unresolved = get_referenced_fields.unresolved_references.into_iter();
Expand Down Expand Up @@ -217,6 +219,7 @@ struct GetReferencedFields<'a> {
column_mappings: HashMap<ColumnName, ColumnName>,
logical_path: Vec<String>,
physical_path: Vec<String>,
column_mapping_mode: ColumnMappingMode,
}
impl<'a> SchemaTransform<'a> for GetReferencedFields<'a> {
// Capture the path mapping for this leaf field
Expand All @@ -242,7 +245,7 @@ impl<'a> SchemaTransform<'a> for GetReferencedFields<'a> {
}

fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
let physical_name = field.physical_name();
let physical_name = field.physical_name(self.column_mapping_mode);
self.logical_path.push(field.name.clone());
self.physical_path.push(physical_name.to_string());
let field = self.recurse_into_struct_field(field);
Expand Down Expand Up @@ -756,6 +759,8 @@ pub(crate) struct StateInfo {
pub(crate) physical_predicate: PhysicalPredicate,
/// Transform specification for converting physical to logical data
pub(crate) transform_spec: Option<Arc<TransformSpec>>,
/// The column mapping mode for this scan
pub(crate) column_mapping_mode: ColumnMappingMode,
}

impl StateInfo {
Expand Down Expand Up @@ -830,7 +835,7 @@ impl StateInfo {
let physical_schema = Arc::new(StructType::try_new(read_fields)?);

let physical_predicate = match predicate {
Some(pred) => PhysicalPredicate::try_new(&pred, &logical_schema)?,
Some(pred) => PhysicalPredicate::try_new(&pred, &logical_schema, column_mapping_mode)?,
None => PhysicalPredicate::None,
};

Expand All @@ -846,6 +851,7 @@ impl StateInfo {
physical_schema,
physical_predicate,
transform_spec,
column_mapping_mode,
})
}
}
Expand Down Expand Up @@ -882,6 +888,7 @@ pub(crate) mod test_utils {

use super::state::ScanCallback;
use super::{PhysicalPredicate, StateInfo};
use crate::table_features::ColumnMappingMode;
use crate::transforms::TransformSpec;

// Generates a batch of sidecar actions with the given paths.
Expand Down Expand Up @@ -981,6 +988,7 @@ pub(crate) mod test_utils {
physical_schema: logical_schema,
physical_predicate: PhysicalPredicate::None,
transform_spec,
column_mapping_mode: ColumnMappingMode::None,
});
let iter = scan_action_iter(
&SyncEngine::new(),
Expand Down Expand Up @@ -1191,7 +1199,9 @@ mod tests {
];

for (predicate, expected) in test_cases {
let result = PhysicalPredicate::try_new(&predicate, &logical_schema).ok();
let result =
PhysicalPredicate::try_new(&predicate, &logical_schema, ColumnMappingMode::Name)
.ok();
assert_eq!(
result, expected,
"Failed for predicate: {predicate:#?}, expected {expected:#?}, got {result:#?}"
Expand Down
27 changes: 18 additions & 9 deletions kernel/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,17 +314,26 @@ impl StructField {

/// Get the physical name for this field as it should be read from parquet.
///
/// When `column_mapping_mode` is `None`, always returns the logical name (even if physical
/// name metadata is present). When mode is `Id` or `Name`, returns the physical name from
/// metadata if present, otherwise returns the logical name.
///
/// NOTE: Caller affirms that the schema was already validated by
/// [`crate::table_features::validate_schema_column_mapping`], to ensure that annotations are
/// always and only present when column mapping mode is enabled.
#[internal_api]
pub(crate) fn physical_name(&self) -> &str {
match self
.metadata
.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
{
Some(MetadataValue::String(physical_name)) => physical_name,
_ => &self.name,
pub(crate) fn physical_name(&self, column_mapping_mode: ColumnMappingMode) -> &str {
match column_mapping_mode {
ColumnMappingMode::None => &self.name,
ColumnMappingMode::Id | ColumnMappingMode::Name => {
match self
.metadata
.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
{
Some(MetadataValue::String(physical_name)) => physical_name,
_ => &self.name,
}
}
}
}

Expand Down Expand Up @@ -422,7 +431,7 @@ impl StructField {
.is_some_and(|x| matches!(x, MetadataValue::String(_))));
}
}
field.physical_name().to_owned()
field.physical_name(self.column_mapping_mode).to_owned()
}
};

Expand Down Expand Up @@ -1901,7 +1910,7 @@ mod tests {
assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4));
assert!(matches!(id_start, MetadataValue::Number(num) if *num == 2147483648i64));
assert_eq!(
field.physical_name(),
field.physical_name(mode),
"col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
);
let physical_field = field.make_physical(mode);
Expand Down
11 changes: 6 additions & 5 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::scan::state::DvInfo;
use crate::scan::PhysicalPredicate;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::log_replay::LogReplayScanner;
use crate::table_features::ReaderFeature;
use crate::table_features::{ColumnMappingMode, ReaderFeature};
use crate::utils::test_utils::{assert_result_error_with_message, Action, LocalMockTable};
use crate::Predicate;
use crate::{DeltaResult, Engine, Error, Version};
Expand Down Expand Up @@ -539,10 +539,11 @@ async fn data_skipping_filter() {
Scalar::from(4),
);
let logical_schema = get_schema();
let predicate = match PhysicalPredicate::try_new(&predicate, &logical_schema) {
Ok(PhysicalPredicate::Some(p, s)) => Some((p, s)),
other => panic!("Unexpected result: {other:?}"),
};
let predicate =
match PhysicalPredicate::try_new(&predicate, &logical_schema, ColumnMappingMode::None) {
Ok(PhysicalPredicate::Some(p, s)) => Some((p, s)),
other => panic!("Unexpected result: {other:?}"),
};
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/table_changes/physical_to_logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub(crate) fn get_cdf_transform_expr(
&state_info.logical_schema,
transform_spec,
&scan_file.partition_values,
state_info.column_mapping_mode,
)?;
partition_values.extend(parsed_values);

Expand All @@ -114,6 +115,7 @@ mod tests {
use crate::scan::state::DvInfo;
use crate::scan::{PhysicalPredicate, StateInfo};
use crate::schema::{DataType, StructField, StructType};
use crate::table_features::ColumnMappingMode;
use crate::transforms::FieldTransformSpec;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -163,6 +165,7 @@ mod tests {
physical_schema: physical_schema.into(),
physical_predicate: PhysicalPredicate::None,
transform_spec: Some(Arc::new(transform_spec)),
column_mapping_mode: ColumnMappingMode::None,
}
}

Expand Down
34 changes: 27 additions & 7 deletions kernel/src/transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use itertools::Itertools;

use crate::expressions::{Expression, ExpressionRef, Scalar, Transform};
use crate::schema::{DataType, SchemaRef, StructType};
use crate::table_features::ColumnMappingMode;
use crate::{DeltaResult, Error};

/// A list of field transforms that describes a transform expression to be created at scan time.
Expand Down Expand Up @@ -69,13 +70,14 @@ pub(crate) fn parse_partition_value(
field_idx: usize,
logical_schema: &SchemaRef,
partition_values: &HashMap<String, String>,
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<(usize, (String, Scalar))> {
let Some(field) = logical_schema.field_at_index(field_idx) else {
return Err(Error::InternalError(format!(
"out of bounds partition column field index {field_idx}"
)));
};
let name = field.physical_name();
let name = field.physical_name(column_mapping_mode);
let partition_value = parse_partition_value_raw(partition_values.get(name), field.data_type())?;
Ok((field_idx, (name.to_string(), partition_value)))
}
Expand All @@ -85,13 +87,19 @@ pub(crate) fn parse_partition_values(
logical_schema: &SchemaRef,
transform_spec: &TransformSpec,
partition_values: &HashMap<String, String>,
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<HashMap<usize, (String, Scalar)>> {
transform_spec
.iter()
.filter_map(|field_transform| match field_transform {
FieldTransformSpec::MetadataDerivedColumn { field_index, .. } => Some(
parse_partition_value(*field_index, logical_schema, partition_values),
),
FieldTransformSpec::MetadataDerivedColumn { field_index, .. } => {
Some(parse_partition_value(
*field_index,
logical_schema,
partition_values,
column_mapping_mode,
))
}
FieldTransformSpec::DynamicColumn { .. }
| FieldTransformSpec::StaticInsert { .. }
| FieldTransformSpec::StaticReplace { .. }
Expand Down Expand Up @@ -202,7 +210,7 @@ mod tests {
)]));
let partition_values = HashMap::new();

let result = parse_partition_value(5, &schema, &partition_values);
let result = parse_partition_value(5, &schema, &partition_values, ColumnMappingMode::None);
assert_result_error_with_message(result, "out of bounds");
}

Expand Down Expand Up @@ -237,7 +245,13 @@ mod tests {
partition_values.insert("id".to_string(), "test".to_string());
partition_values.insert("_change_type".to_string(), "insert".to_string());

let result = parse_partition_values(&schema, &transform_spec, &partition_values).unwrap();
let result = parse_partition_values(
&schema,
&transform_spec,
&partition_values,
ColumnMappingMode::None,
)
.unwrap();
assert_eq!(result.len(), 2);
assert!(result.contains_key(&0));
assert!(result.contains_key(&1));
Expand All @@ -257,7 +271,13 @@ mod tests {
let transform_spec = vec![];
let partition_values = HashMap::new();

let result = parse_partition_values(&schema, &transform_spec, &partition_values).unwrap();
let result = parse_partition_values(
&schema,
&transform_spec,
&partition_values,
ColumnMappingMode::None,
)
.unwrap();
assert!(result.is_empty());
}

Expand Down
Loading