Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub mod table_configuration;
pub mod table_features;
pub mod table_properties;
pub mod transaction;
pub(crate) mod transforms;

mod row_tracking;

Expand Down
92 changes: 8 additions & 84 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ use std::clone::Clone;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock};

use itertools::Itertools;

use super::data_skipping::DataSkippingFilter;
use super::{ScanMetadata, TransformSpec};
use super::ScanMetadata;
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::get_log_add_schema;
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, PredicateRef};
use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateEvaluator as _};
use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor};
use crate::scan::{FieldTransformSpec, Scalar};
use crate::scan::Scalar;
use crate::schema::ToSchema as _;
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType};
use crate::transforms::{get_transform_expr, parse_partition_values, TransformSpec};
use crate::utils::require;
use crate::{DeltaResult, Engine, Error, ExpressionEvaluator};

Expand Down Expand Up @@ -122,82 +121,6 @@ impl AddRemoveDedupVisitor<'_> {
}
}

fn parse_partition_value(
&self,
field_idx: usize,
partition_values: &HashMap<String, String>,
) -> DeltaResult<(usize, (String, Scalar))> {
let field = self.logical_schema.field_at_index(field_idx);
let Some(field) = field else {
return Err(Error::InternalError(format!(
"out of bounds partition column field index {field_idx}"
)));
};
let name = field.physical_name();
let partition_value =
super::parse_partition_value(partition_values.get(name), field.data_type())?;
Ok((field_idx, (name.to_string(), partition_value)))
}

fn parse_partition_values(
&self,
transform_spec: &TransformSpec,
partition_values: &HashMap<String, String>,
) -> DeltaResult<HashMap<usize, (String, Scalar)>> {
transform_spec
.iter()
.filter_map(|field_transform| match field_transform {
FieldTransformSpec::PartitionColumn { field_index, .. } => {
Some(self.parse_partition_value(*field_index, partition_values))
}
FieldTransformSpec::StaticInsert { .. }
| FieldTransformSpec::StaticReplace { .. }
| FieldTransformSpec::StaticDrop { .. } => None,
})
.try_collect()
}

/// Compute an expression that will transform from physical to logical for a given Add file action
///
/// An empty `transform_spec` is valid and represents the case where only column mapping is needed
/// (e.g., no partition columns to inject). The resulting empty `Expression::Transform` will
/// pass all input fields through unchanged while applying the output schema for name mapping.
fn get_transform_expr(
&self,
transform_spec: &TransformSpec,
mut partition_values: HashMap<usize, (String, Scalar)>,
) -> DeltaResult<ExpressionRef> {
let mut transform = crate::expressions::Transform::new_top_level();

for field_transform in transform_spec {
use FieldTransformSpec::*;
transform = match field_transform {
StaticInsert { insert_after, expr } => {
transform.with_inserted_field(insert_after.clone(), expr.clone())
}
StaticReplace { field_name, expr } => {
transform.with_replaced_field(field_name.clone(), expr.clone())
}
StaticDrop { field_name } => transform.with_dropped_field(field_name.clone()),
PartitionColumn {
field_index,
insert_after,
} => {
let Some((_, partition_value)) = partition_values.remove(field_index) else {
return Err(Error::InternalError(format!(
"missing partition value for field index {field_index}"
)));
};

let partition_value = Arc::new(partition_value.into());
transform.with_inserted_field(insert_after.clone(), partition_value)
}
}
}

Ok(Arc::new(Expression::Transform(transform)))
}

fn is_file_partition_pruned(
&self,
partition_values: &HashMap<usize, (String, Scalar)>,
Expand Down Expand Up @@ -244,7 +167,8 @@ impl AddRemoveDedupVisitor<'_> {
Some(transform) if is_add => {
let partition_values =
getters[Self::ADD_PARTITION_VALUES_INDEX].get(i, "add.partitionValues")?;
let partition_values = self.parse_partition_values(transform, &partition_values)?;
let partition_values =
parse_partition_values(&self.logical_schema, transform, &partition_values)?;
if self.is_file_partition_pruned(&partition_values) {
return Ok(false);
}
Expand All @@ -260,7 +184,7 @@ impl AddRemoveDedupVisitor<'_> {
let transform = self
.transform_spec
.as_ref()
.map(|transform| self.get_transform_expr(transform, partition_values))
.map(|transform| get_transform_expr(transform, partition_values))
.transpose()?;
if transform.is_some() {
// fill in any needed `None`s for previous rows
Expand Down Expand Up @@ -448,7 +372,7 @@ mod tests {
add_batch_simple, add_batch_with_partition_col, add_batch_with_remove,
run_with_validate_callback,
};
use crate::scan::{Scan, StateInfo};
use crate::scan::{get_transform_spec, StateInfo};
use crate::table_features::ColumnMappingMode;
use crate::Expression as Expr;
use crate::{
Expand Down Expand Up @@ -536,7 +460,7 @@ mod tests {
let partition_cols = ["date".to_string()];
let state_info =
StateInfo::try_new(schema.as_ref(), &partition_cols, ColumnMappingMode::None).unwrap();
let static_transform = Some(Arc::new(Scan::get_transform_spec(&state_info.all_fields)));
let static_transform = Some(Arc::new(get_transform_spec(&state_info.all_fields)));
let batch = vec![add_batch_with_partition_col()];
let iter = scan_action_iter(
&SyncEngine::new(),
Expand Down
99 changes: 5 additions & 94 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::schema::{
};
use crate::snapshot::SnapshotRef;
use crate::table_features::ColumnMappingMode;
use crate::transforms::{get_transform_spec, ColumnType};
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta, Version};

use self::log_replay::scan_action_iter;
Expand Down Expand Up @@ -307,21 +308,6 @@ impl ScanResult {
}
}

/// Scan uses this to set up what kinds of top-level columns it is scanning. For `Selected` we just
/// store the name of the column, as that's all that's needed during the actual query. For
/// `Partition` we store an index into the logical schema for this query since later we need the
/// data type as well to materialize the partition column.
#[derive(PartialEq, Debug)]
pub(crate) enum ColumnType {
// A column, selected from the data, as is
Selected(String),
// A partition column that needs to be added back in
Partition(usize),
}

/// A list of field transforms that describes a transform expression to be created at scan time.
type TransformSpec = Vec<FieldTransformSpec>;

/// utility method making it easy to get a transform for a particular row. If the requested row is
/// outside the range of the passed slice returns `None`, otherwise returns the element at the index
/// of the specified row
Expand All @@ -332,40 +318,6 @@ pub fn get_transform_for_row(
transforms.get(row).cloned().flatten()
}

/// Transforms aren't computed all at once. So static ones can just go straight to `Expression`, but
/// things like partition columns need to filled in. This enum holds an expression that's part of a
/// [`TransformSpec`].
pub(crate) enum FieldTransformSpec {
/// Insert the given expression after the named input column (None = prepend instead)
// NOTE: It's quite likely we will sometimes need to reorder columns for one reason or another,
// which would usually be expressed as a drop+insert pair of transforms.
#[allow(unused)]
StaticInsert {
insert_after: Option<String>,
expr: ExpressionRef,
},
/// Replace the named input column with an expression
// NOTE: Row tracking will eventually need to replace the physical rowid column with a COALESCE
// to compute non-materialized row ids and row commit versions.
#[allow(unused)]
StaticReplace {
field_name: String,
expr: ExpressionRef,
},
/// Drops the named input column
// NOTE: Row tracking will need to drop metadata columns that were used to compute rowids, since
// they should not appear in the query's output.
#[allow(unused)]
StaticDrop { field_name: String },
/// Inserts a partition column after the named input column. The partition column is identified
/// by its field index in the logical table schema (the column is not present in the physical
/// read schema). Its value varies from file to file and is obtained from file metadata.
PartitionColumn {
field_index: usize,
insert_after: Option<String>,
},
}

/// [`ScanMetadata`] contains (1) a batch of [`FilteredEngineData`] specifying data files to be scanned
/// and (2) a vector of transforms (one transform per scan file) that must be applied to the data read
/// from those files.
Expand Down Expand Up @@ -474,35 +426,6 @@ impl Scan {
}
}

/// Computes the transform spec for this scan. Static (query-level) transforms can already be
/// turned into expressions now, but file-level transforms like partition values can only be
/// described now; they are converted to expressions during the scan, using file metadata.
///
/// NOTE: Transforms are "sparse" in the sense that they only mention fields which actually
/// change (added, replaced, dropped); the transform implicitly captures all fields that pass
/// from input to output unchanged and in the same relative order.
fn get_transform_spec(all_fields: &[ColumnType]) -> TransformSpec {
let mut transform_spec = TransformSpec::new();
let mut last_physical_field: Option<&str> = None;

for field in all_fields {
match field {
ColumnType::Selected(physical_name) => {
// Track physical field for calculating partition value insertion points.
last_physical_field = Some(physical_name);
}
ColumnType::Partition(logical_idx) => {
transform_spec.push(FieldTransformSpec::PartitionColumn {
insert_after: last_physical_field.map(String::from),
field_index: *logical_idx,
});
}
}
}

transform_spec
}

/// Get an iterator of [`ScanMetadata`]s that should be used to facilitate a scan. This handles
/// log-replay, reconciling Add and Remove actions, and applying data skipping (if possible).
/// Each item in the returned iterator is a struct of:
Expand Down Expand Up @@ -672,7 +595,7 @@ impl Scan {
// - Column mapping: Physical field names must be mapped to logical field names via output schema
let static_transform = (self.have_partition_cols
|| self.snapshot.column_mapping_mode() != ColumnMappingMode::None)
.then(|| Arc::new(Scan::get_transform_spec(&self.all_fields)));
.then(|| Arc::new(get_transform_spec(&self.all_fields)));
let physical_predicate = match self.physical_predicate.clone() {
PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()),
PhysicalPredicate::Some(predicate, schema) => Some((predicate, schema)),
Expand Down Expand Up @@ -841,19 +764,6 @@ pub fn scan_row_schema() -> SchemaRef {
log_replay::SCAN_ROW_SCHEMA.clone()
}

pub(crate) fn parse_partition_value(
raw: Option<&String>,
data_type: &DataType,
) -> DeltaResult<Scalar> {
match (raw, data_type.as_primitive_opt()) {
(Some(v), Some(primitive)) => primitive.parse_scalar(v),
(Some(_), None) => Err(Error::generic(format!(
"Unexpected partition column type: {data_type:?}"
))),
_ => Ok(Scalar::Null(data_type.clone())),
}
}

/// All the state needed to process a scan.
struct StateInfo {
/// All fields referenced by the query.
Expand Down Expand Up @@ -960,7 +870,8 @@ pub(crate) mod test_utils {
JsonHandler,
};

use super::{state::ScanCallback, TransformSpec};
use super::state::ScanCallback;
use crate::transforms::TransformSpec;

// Generates a batch of sidecar actions with the given paths.
// The schema is provided as null columns affect equality checks.
Expand Down Expand Up @@ -1446,7 +1357,7 @@ mod tests {
];

for (raw, data_type, expected) in &cases {
let value = parse_partition_value(
let value = crate::transforms::parse_partition_value_raw(
Some(&raw.to_string()),
&DataType::Primitive(data_type.clone()),
)
Expand Down
7 changes: 4 additions & 3 deletions kernel/src/table_changes/physical_to_logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::sync::Arc;
use itertools::Itertools;

use crate::expressions::Scalar;
use crate::scan::{parse_partition_value, ColumnType};
use crate::schema::{ColumnName, DataType, SchemaRef, StructField, StructType};
use crate::transforms::ColumnType;
use crate::{DeltaResult, Error, Expression};

use super::scan_file::{CdfScanFile, CdfScanFileType};
Expand Down Expand Up @@ -50,8 +50,9 @@ pub(crate) fn physical_to_logical_expr(
));
};
let name = field.physical_name();
let raw_value = scan_file.partition_values.get(name);
let value_expression =
parse_partition_value(scan_file.partition_values.get(name), field.data_type())?;
crate::transforms::parse_partition_value_raw(raw_value, field.data_type())?;
Ok(value_expression.into())
}
ColumnType::Selected(field_name) => {
Expand Down Expand Up @@ -85,14 +86,14 @@ mod tests {
use std::collections::HashMap;

use crate::expressions::{column_expr, Expression as Expr, Scalar};
use crate::scan::ColumnType;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::physical_to_logical::physical_to_logical_expr;
use crate::table_changes::scan_file::{CdfScanFile, CdfScanFileType};
use crate::table_changes::{
ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME,
REMOVE_CHANGE_TYPE,
};
use crate::transforms::ColumnType;

#[test]
fn verify_physical_to_logical_expression() {
Expand Down
6 changes: 4 additions & 2 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use tracing::debug;
use url::Url;

use crate::actions::deletion_vector::split_vector;
use crate::scan::{ColumnType, PhysicalPredicate, ScanResult};
use crate::scan::{PhysicalPredicate, ScanResult};
use crate::schema::{SchemaRef, StructType};
use crate::transforms::ColumnType;
use crate::{DeltaResult, Engine, FileMeta, PredicateRef};

use super::log_replay::{table_changes_action_iter, TableChangesScanMetadata};
Expand Down Expand Up @@ -361,10 +362,11 @@ mod tests {

use crate::engine::sync::SyncEngine;
use crate::expressions::{column_expr, Scalar};
use crate::scan::{ColumnType, PhysicalPredicate};
use crate::scan::PhysicalPredicate;
use crate::schema::{DataType, StructField, StructType};
use crate::table_changes::TableChanges;
use crate::table_changes::COMMIT_VERSION_COL_NAME;
use crate::transforms::ColumnType;
use crate::Predicate;

#[test]
Expand Down
Loading
Loading