Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 65 additions & 5 deletions kernel/src/table_changes/physical_to_logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ pub(crate) fn scan_file_physical_schema(

// Get the transform expression for a CDF scan file
//
// Returns None when no transformation is needed (identity transform), otherwise returns Some(expr).
//
// Note: parse_partition_values returns null values for missing partition columns,
// and CDF metadata columns (commit_timestamp, commit_version, change_type) are then
// added to overwrite any conflicting values. This behavior can be made more strict by changing
Expand All @@ -81,7 +83,7 @@ pub(crate) fn get_cdf_transform_expr(
scan_file: &CdfScanFile,
state_info: &StateInfo,
physical_schema: &StructType,
) -> DeltaResult<ExpressionRef> {
) -> DeltaResult<Option<ExpressionRef>> {
let mut partition_values = HashMap::new();

// Get the transform spec from StateInfo (if present)
Expand All @@ -92,6 +94,11 @@ pub(crate) fn get_cdf_transform_expr(
.map(|ts| ts.as_ref())
.unwrap_or(&empty_spec);

// Return None for identity transforms to avoid unnecessary expression evaluation
if transform_spec.is_empty() {
return Ok(None);
}

// Handle regular partition values using parse_partition_values
let parsed_values = parse_partition_values(
&state_info.logical_schema,
Expand All @@ -104,7 +111,9 @@ pub(crate) fn get_cdf_transform_expr(
let cdf_values = get_cdf_columns(&state_info.logical_schema, scan_file)?;
partition_values.extend(cdf_values);

get_transform_expr(transform_spec, partition_values, physical_schema)
let expr = get_transform_expr(transform_spec, partition_values, physical_schema)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it not make more sense to do the is_identity check inside get_transform_expr and have that return: DeltaResult<Option<ExpressionRef>>?

Copy link
Collaborator

@OussamaSaoudi OussamaSaoudi Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah i think it'd look like this:

if transform_spec.empty() {
    return Ok(None);
}
// Regular get_transform_expr code
let mut transform = Transform::new_top_level();

that way, we avoid any allocation of expression.

Copy link
Collaborator Author

@DrakeLin DrakeLin Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried doing this, but ColumnMapping breaks for normal read paths. We still need the Transform expression, even if it's the identity, for the evaluator to apply the logical schema and map column names in

pub fn transform_to_logical(

I moved the transform expr skipping up so we don't construct the Expression unnecessarily for CDFs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, okay thanks. I guess an "identity" expression does communicate that we need some kind of transform, but it's at the schema level. I'll think a bit if we could express this better, but for this PR this is fine.


Ok(Some(expr))
}

#[cfg(test)]
Expand Down Expand Up @@ -192,7 +201,9 @@ mod tests {
let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema);
assert!(result.is_ok());

let expr = result.unwrap();
let expr_opt = result.unwrap();
assert!(expr_opt.is_some(), "Expected Some(expr) but got None");
let expr = expr_opt.unwrap();
let Expression::Transform(transform) = expr.as_ref() else {
panic!("Expected Transform expression");
};
Expand Down Expand Up @@ -236,7 +247,9 @@ mod tests {
let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema);
assert!(result.is_ok());

let expr = result.unwrap();
let expr_opt = result.unwrap();
assert!(expr_opt.is_some(), "Expected Some(expr) but got None");
let expr = expr_opt.unwrap();
let Expression::Transform(transform) = expr.as_ref() else {
panic!("Expected Transform expression");
};
Expand Down Expand Up @@ -283,7 +296,9 @@ mod tests {
let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema);
assert!(result.is_ok());

let expr = result.unwrap();
let expr_opt = result.unwrap();
assert!(expr_opt.is_some(), "Expected Some(expr) but got None");
let expr = expr_opt.unwrap();
let Expression::Transform(transform) = expr.as_ref() else {
panic!("Expected Transform expression");
};
Expand Down Expand Up @@ -340,4 +355,49 @@ mod tests {
let result = scan_file_physical_schema(&remove_file, &physical_schema);
assert_eq!(result.fields().len(), 2); // No change
}

#[test]
fn test_get_cdf_transform_expr_returns_none_for_identity() {
// When there's no transform spec and no CDF metadata columns in the schema,
// the function should return None (identity transform)
let scan_file = CdfScanFile {
path: "test/file.parquet".to_string(),
partition_values: HashMap::new(),
scan_type: CdfScanFileType::Add,
commit_version: 100,
commit_timestamp: 1000000000000,
dv_info: DvInfo::default(),
remove_dv: None,
};

// Create a simple schema without CDF metadata columns
let logical_schema = Arc::new(StructType::new_unchecked(vec![
StructField::nullable("id", DataType::STRING),
StructField::nullable("name", DataType::STRING),
]));

let physical_schema = StructType::new_unchecked(vec![
StructField::nullable("id", DataType::STRING),
StructField::nullable("name", DataType::STRING),
]);

// Empty transform spec - no transformations needed
let transform_spec = vec![];

let state_info = StateInfo {
logical_schema,
physical_schema: physical_schema.clone().into(),
physical_predicate: PhysicalPredicate::None,
transform_spec: Some(Arc::new(transform_spec)),
};

let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema);
assert!(result.is_ok());

let expr_opt = result.unwrap();
assert!(
expr_opt.is_none(),
"Expected None for identity transform but got Some(expr)"
);
}
}
22 changes: 15 additions & 7 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,14 @@ fn read_scan_file(
scan_file_physical_schema(&scan_file, state_info.physical_schema.as_ref());
let transform_expr = get_cdf_transform_expr(&scan_file, state_info, physical_schema.as_ref())?;

let phys_to_logical_eval = engine.evaluation_handler().new_expression_evaluator(
physical_schema.clone(),
transform_expr,
state_info.logical_schema.clone().into(),
);
// Only create an evaluator if transformation is needed
let phys_to_logical_eval = transform_expr.map(|expr| {
engine.evaluation_handler().new_expression_evaluator(
physical_schema.clone(),
expr,
state_info.logical_schema.clone().into(),
)
});
// Determine if the scan file was derived from a deletion vector pair
let is_dv_resolved_pair = scan_file.remove_dv.is_some();

Expand All @@ -261,8 +264,13 @@ fn read_scan_file(

let result = read_result_iter.map(move |batch| -> DeltaResult<_> {
let batch = batch?;
// to transform the physical data into the correct logical form
let logical = phys_to_logical_eval.evaluate(batch.as_ref());
// Transform the physical data into the correct logical form, or pass through unchanged
let logical = if let Some(ref eval) = phys_to_logical_eval {
eval.evaluate(batch.as_ref())
} else {
// No transformation needed - pass through the batch as-is
Ok(batch)
};
let len = logical.as_ref().map_or(0, |res| res.len());
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
// will cover the following results. we `take()` out of `selection_vector` to avoid
Expand Down
Loading