diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index f675f1248..bcbb7b44c 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -99,6 +99,7 @@ pub mod table_configuration; pub mod table_features; pub mod table_properties; pub mod transaction; +pub(crate) mod transforms; mod row_tracking; diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index fa6aec130..4438f50de 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -2,19 +2,18 @@ use std::clone::Clone; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, LazyLock}; -use itertools::Itertools; - use super::data_skipping::DataSkippingFilter; -use super::{ScanMetadata, TransformSpec}; +use super::ScanMetadata; use crate::actions::deletion_vector::DeletionVectorDescriptor; use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, PredicateRef}; use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateEvaluator as _}; use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor}; -use crate::scan::{FieldTransformSpec, Scalar}; +use crate::scan::Scalar; use crate::schema::ToSchema as _; use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType}; +use crate::transforms::{get_transform_expr, parse_partition_values, TransformSpec}; use crate::utils::require; use crate::{DeltaResult, Engine, Error, ExpressionEvaluator}; @@ -122,82 +121,6 @@ impl AddRemoveDedupVisitor<'_> { } } - fn parse_partition_value( - &self, - field_idx: usize, - partition_values: &HashMap, - ) -> DeltaResult<(usize, (String, Scalar))> { - let field = self.logical_schema.field_at_index(field_idx); - let Some(field) = field else { - return Err(Error::InternalError(format!( - "out of bounds partition column field index {field_idx}" - ))); - }; - let name = field.physical_name(); - let partition_value = - super::parse_partition_value(partition_values.get(name), field.data_type())?; - Ok((field_idx, (name.to_string(), partition_value))) - } - - fn parse_partition_values( - &self, - transform_spec: &TransformSpec, - partition_values: &HashMap, - ) -> DeltaResult> { - transform_spec - .iter() - .filter_map(|field_transform| match field_transform { - FieldTransformSpec::PartitionColumn { field_index, .. } => { - Some(self.parse_partition_value(*field_index, partition_values)) - } - FieldTransformSpec::StaticInsert { .. } - | FieldTransformSpec::StaticReplace { .. } - | FieldTransformSpec::StaticDrop { .. } => None, - }) - .try_collect() - } - - /// Compute an expression that will transform from physical to logical for a given Add file action - /// - /// An empty `transform_spec` is valid and represents the case where only column mapping is needed - /// (e.g., no partition columns to inject). The resulting empty `Expression::Transform` will - /// pass all input fields through unchanged while applying the output schema for name mapping. - fn get_transform_expr( - &self, - transform_spec: &TransformSpec, - mut partition_values: HashMap, - ) -> DeltaResult { - let mut transform = crate::expressions::Transform::new_top_level(); - - for field_transform in transform_spec { - use FieldTransformSpec::*; - transform = match field_transform { - StaticInsert { insert_after, expr } => { - transform.with_inserted_field(insert_after.clone(), expr.clone()) - } - StaticReplace { field_name, expr } => { - transform.with_replaced_field(field_name.clone(), expr.clone()) - } - StaticDrop { field_name } => transform.with_dropped_field(field_name.clone()), - PartitionColumn { - field_index, - insert_after, - } => { - let Some((_, partition_value)) = partition_values.remove(field_index) else { - return Err(Error::InternalError(format!( - "missing partition value for field index {field_index}" - ))); - }; - - let partition_value = Arc::new(partition_value.into()); - transform.with_inserted_field(insert_after.clone(), partition_value) - } - } - } - - Ok(Arc::new(Expression::Transform(transform))) - } - fn is_file_partition_pruned( &self, partition_values: &HashMap, @@ -244,7 +167,8 @@ impl AddRemoveDedupVisitor<'_> { Some(transform) if is_add => { let partition_values = getters[Self::ADD_PARTITION_VALUES_INDEX].get(i, "add.partitionValues")?; - let partition_values = self.parse_partition_values(transform, &partition_values)?; + let partition_values = + parse_partition_values(&self.logical_schema, transform, &partition_values)?; if self.is_file_partition_pruned(&partition_values) { return Ok(false); } @@ -260,7 +184,7 @@ impl AddRemoveDedupVisitor<'_> { let transform = self .transform_spec .as_ref() - .map(|transform| self.get_transform_expr(transform, partition_values)) + .map(|transform| get_transform_expr(transform, partition_values)) .transpose()?; if transform.is_some() { // fill in any needed `None`s for previous rows @@ -448,7 +372,7 @@ mod tests { add_batch_simple, add_batch_with_partition_col, add_batch_with_remove, run_with_validate_callback, }; - use crate::scan::{Scan, StateInfo}; + use crate::scan::{get_transform_spec, StateInfo}; use crate::table_features::ColumnMappingMode; use crate::Expression as Expr; use crate::{ @@ -536,7 +460,7 @@ mod tests { let partition_cols = ["date".to_string()]; let state_info = StateInfo::try_new(schema.as_ref(), &partition_cols, ColumnMappingMode::None).unwrap(); - let static_transform = Some(Arc::new(Scan::get_transform_spec(&state_info.all_fields))); + let static_transform = Some(Arc::new(get_transform_spec(&state_info.all_fields))); let batch = vec![add_batch_with_partition_col()]; let iter = scan_action_iter( &SyncEngine::new(), diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 96fdc6573..8b06a18be 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -29,6 +29,7 @@ use crate::schema::{ }; use crate::snapshot::SnapshotRef; use crate::table_features::ColumnMappingMode; +use crate::transforms::{get_transform_spec, ColumnType}; use crate::{DeltaResult, Engine, EngineData, Error, FileMeta, Version}; use self::log_replay::scan_action_iter; @@ -307,21 +308,6 @@ impl ScanResult { } } -/// Scan uses this to set up what kinds of top-level columns it is scanning. For `Selected` we just -/// store the name of the column, as that's all that's needed during the actual query. For -/// `Partition` we store an index into the logical schema for this query since later we need the -/// data type as well to materialize the partition column. -#[derive(PartialEq, Debug)] -pub(crate) enum ColumnType { - // A column, selected from the data, as is - Selected(String), - // A partition column that needs to be added back in - Partition(usize), -} - -/// A list of field transforms that describes a transform expression to be created at scan time. -type TransformSpec = Vec; - /// utility method making it easy to get a transform for a particular row. If the requested row is /// outside the range of the passed slice returns `None`, otherwise returns the element at the index /// of the specified row @@ -332,40 +318,6 @@ pub fn get_transform_for_row( transforms.get(row).cloned().flatten() } -/// Transforms aren't computed all at once. So static ones can just go straight to `Expression`, but -/// things like partition columns need to filled in. This enum holds an expression that's part of a -/// [`TransformSpec`]. -pub(crate) enum FieldTransformSpec { - /// Insert the given expression after the named input column (None = prepend instead) - // NOTE: It's quite likely we will sometimes need to reorder columns for one reason or another, - // which would usually be expressed as a drop+insert pair of transforms. - #[allow(unused)] - StaticInsert { - insert_after: Option, - expr: ExpressionRef, - }, - /// Replace the named input column with an expression - // NOTE: Row tracking will eventually need to replace the physical rowid column with a COALESCE - // to compute non-materialized row ids and row commit versions. - #[allow(unused)] - StaticReplace { - field_name: String, - expr: ExpressionRef, - }, - /// Drops the named input column - // NOTE: Row tracking will need to drop metadata columns that were used to compute rowids, since - // they should not appear in the query's output. - #[allow(unused)] - StaticDrop { field_name: String }, - /// Inserts a partition column after the named input column. The partition column is identified - /// by its field index in the logical table schema (the column is not present in the physical - /// read schema). Its value varies from file to file and is obtained from file metadata. - PartitionColumn { - field_index: usize, - insert_after: Option, - }, -} - /// [`ScanMetadata`] contains (1) a batch of [`FilteredEngineData`] specifying data files to be scanned /// and (2) a vector of transforms (one transform per scan file) that must be applied to the data read /// from those files. @@ -474,35 +426,6 @@ impl Scan { } } - /// Computes the transform spec for this scan. Static (query-level) transforms can already be - /// turned into expressions now, but file-level transforms like partition values can only be - /// described now; they are converted to expressions during the scan, using file metadata. - /// - /// NOTE: Transforms are "sparse" in the sense that they only mention fields which actually - /// change (added, replaced, dropped); the transform implicitly captures all fields that pass - /// from input to output unchanged and in the same relative order. - fn get_transform_spec(all_fields: &[ColumnType]) -> TransformSpec { - let mut transform_spec = TransformSpec::new(); - let mut last_physical_field: Option<&str> = None; - - for field in all_fields { - match field { - ColumnType::Selected(physical_name) => { - // Track physical field for calculating partition value insertion points. - last_physical_field = Some(physical_name); - } - ColumnType::Partition(logical_idx) => { - transform_spec.push(FieldTransformSpec::PartitionColumn { - insert_after: last_physical_field.map(String::from), - field_index: *logical_idx, - }); - } - } - } - - transform_spec - } - /// Get an iterator of [`ScanMetadata`]s that should be used to facilitate a scan. This handles /// log-replay, reconciling Add and Remove actions, and applying data skipping (if possible). /// Each item in the returned iterator is a struct of: @@ -672,7 +595,7 @@ impl Scan { // - Column mapping: Physical field names must be mapped to logical field names via output schema let static_transform = (self.have_partition_cols || self.snapshot.column_mapping_mode() != ColumnMappingMode::None) - .then(|| Arc::new(Scan::get_transform_spec(&self.all_fields))); + .then(|| Arc::new(get_transform_spec(&self.all_fields))); let physical_predicate = match self.physical_predicate.clone() { PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()), PhysicalPredicate::Some(predicate, schema) => Some((predicate, schema)), @@ -841,19 +764,6 @@ pub fn scan_row_schema() -> SchemaRef { log_replay::SCAN_ROW_SCHEMA.clone() } -pub(crate) fn parse_partition_value( - raw: Option<&String>, - data_type: &DataType, -) -> DeltaResult { - match (raw, data_type.as_primitive_opt()) { - (Some(v), Some(primitive)) => primitive.parse_scalar(v), - (Some(_), None) => Err(Error::generic(format!( - "Unexpected partition column type: {data_type:?}" - ))), - _ => Ok(Scalar::Null(data_type.clone())), - } -} - /// All the state needed to process a scan. struct StateInfo { /// All fields referenced by the query. @@ -960,7 +870,8 @@ pub(crate) mod test_utils { JsonHandler, }; - use super::{state::ScanCallback, TransformSpec}; + use super::state::ScanCallback; + use crate::transforms::TransformSpec; // Generates a batch of sidecar actions with the given paths. // The schema is provided as null columns affect equality checks. @@ -1446,7 +1357,7 @@ mod tests { ]; for (raw, data_type, expected) in &cases { - let value = parse_partition_value( + let value = crate::transforms::parse_partition_value_raw( Some(&raw.to_string()), &DataType::Primitive(data_type.clone()), ) diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs index 76e129e3f..dea691f0e 100644 --- a/kernel/src/table_changes/physical_to_logical.rs +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use itertools::Itertools; use crate::expressions::Scalar; -use crate::scan::{parse_partition_value, ColumnType}; use crate::schema::{ColumnName, DataType, SchemaRef, StructField, StructType}; +use crate::transforms::ColumnType; use crate::{DeltaResult, Error, Expression}; use super::scan_file::{CdfScanFile, CdfScanFileType}; @@ -50,8 +50,9 @@ pub(crate) fn physical_to_logical_expr( )); }; let name = field.physical_name(); + let raw_value = scan_file.partition_values.get(name); let value_expression = - parse_partition_value(scan_file.partition_values.get(name), field.data_type())?; + crate::transforms::parse_partition_value_raw(raw_value, field.data_type())?; Ok(value_expression.into()) } ColumnType::Selected(field_name) => { @@ -85,7 +86,6 @@ mod tests { use std::collections::HashMap; use crate::expressions::{column_expr, Expression as Expr, Scalar}; - use crate::scan::ColumnType; use crate::schema::{DataType, StructField, StructType}; use crate::table_changes::physical_to_logical::physical_to_logical_expr; use crate::table_changes::scan_file::{CdfScanFile, CdfScanFileType}; @@ -93,6 +93,7 @@ mod tests { ADD_CHANGE_TYPE, CHANGE_TYPE_COL_NAME, COMMIT_TIMESTAMP_COL_NAME, COMMIT_VERSION_COL_NAME, REMOVE_CHANGE_TYPE, }; + use crate::transforms::ColumnType; #[test] fn verify_physical_to_logical_expression() { diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 55b0576e8..8cfe627bd 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -7,8 +7,9 @@ use tracing::debug; use url::Url; use crate::actions::deletion_vector::split_vector; -use crate::scan::{ColumnType, PhysicalPredicate, ScanResult}; +use crate::scan::{PhysicalPredicate, ScanResult}; use crate::schema::{SchemaRef, StructType}; +use crate::transforms::ColumnType; use crate::{DeltaResult, Engine, FileMeta, PredicateRef}; use super::log_replay::{table_changes_action_iter, TableChangesScanMetadata}; @@ -361,10 +362,11 @@ mod tests { use crate::engine::sync::SyncEngine; use crate::expressions::{column_expr, Scalar}; - use crate::scan::{ColumnType, PhysicalPredicate}; + use crate::scan::PhysicalPredicate; use crate::schema::{DataType, StructField, StructType}; use crate::table_changes::TableChanges; use crate::table_changes::COMMIT_VERSION_COL_NAME; + use crate::transforms::ColumnType; use crate::Predicate; #[test] diff --git a/kernel/src/transforms.rs b/kernel/src/transforms.rs new file mode 100644 index 000000000..1f2fc71ee --- /dev/null +++ b/kernel/src/transforms.rs @@ -0,0 +1,377 @@ +//! Transform-related types and utilities for Delta Kernel. +//! +//! This module contains the types and functions needed to handle transforms +//! during scan and table changes operations, including partition value processing +//! and expression generation. + +use std::collections::HashMap; +use std::sync::Arc; + +use itertools::Itertools; + +use crate::expressions::{Expression, ExpressionRef}; +use crate::schema::{DataType, SchemaRef}; +use crate::{DeltaResult, Error}; + +/// Scan uses this to set up what kinds of top-level columns it is scanning. For `Selected` we just +/// store the name of the column, as that's all that's needed during the actual query. For +/// `Partition` we store an index into the logical schema for this query since later we need the +/// data type as well to materialize the partition column. +#[derive(PartialEq, Debug)] +pub(crate) enum ColumnType { + // A column, selected from the data, as is + Selected(String), + // A partition column that needs to be added back in + Partition(usize), +} + +/// A list of field transforms that describes a transform expression to be created at scan time. +pub(crate) type TransformSpec = Vec; + +/// Transforms aren't computed all at once. So static ones can just go straight to `Expression`, but +/// things like partition columns need to filled in. This enum holds an expression that's part of a +/// [`TransformSpec`]. +#[derive(Debug)] +pub(crate) enum FieldTransformSpec { + /// Insert the given expression after the named input column (None = prepend instead) + // NOTE: It's quite likely we will sometimes need to reorder columns for one reason or another, + // which would usually be expressed as a drop+insert pair of transforms. + #[allow(unused)] + StaticInsert { + insert_after: Option, + expr: ExpressionRef, + }, + /// Replace the named input column with an expression + // NOTE: Row tracking will eventually need to replace the physical rowid column with a COALESCE + // to compute non-materialized row ids and row commit versions. + #[allow(unused)] + StaticReplace { + field_name: String, + expr: ExpressionRef, + }, + /// Drops the named input column + // NOTE: Row tracking will need to drop metadata columns that were used to compute rowids, since + // they should not appear in the query's output. + #[allow(unused)] + StaticDrop { field_name: String }, + /// Inserts a partition column after the named input column. The partition column is identified + /// by its field index in the logical table schema (the column is not present in the physical + /// read schema). Its value varies from file to file and is obtained from file metadata. + PartitionColumn { + field_index: usize, + insert_after: Option, + }, +} + +/// Parse a single partition value from the raw string representation +pub(crate) fn parse_partition_value( + field_idx: usize, + logical_schema: &SchemaRef, + partition_values: &HashMap, +) -> DeltaResult<(usize, (String, crate::expressions::Scalar))> { + let Some(field) = logical_schema.field_at_index(field_idx) else { + return Err(Error::InternalError(format!( + "out of bounds partition column field index {field_idx}" + ))); + }; + let name = field.physical_name(); + let partition_value = parse_partition_value_raw(partition_values.get(name), field.data_type())?; + Ok((field_idx, (name.to_string(), partition_value))) +} + +/// Parse all partition values from a transform spec +pub(crate) fn parse_partition_values( + logical_schema: &SchemaRef, + transform_spec: &TransformSpec, + partition_values: &HashMap, +) -> DeltaResult> { + transform_spec + .iter() + .filter_map(|field_transform| match field_transform { + FieldTransformSpec::PartitionColumn { field_index, .. } => Some(parse_partition_value( + *field_index, + logical_schema, + partition_values, + )), + FieldTransformSpec::StaticInsert { .. } + | FieldTransformSpec::StaticReplace { .. } + | FieldTransformSpec::StaticDrop { .. } => None, + }) + .try_collect() +} + +/// Compute an expression that will transform from physical to logical for a given Add file action +/// +/// An empty `transform_spec` is valid and represents the case where only column mapping is needed +/// (e.g., no partition columns to inject). The resulting empty `Expression::Transform` will +/// pass all input fields through unchanged while applying the output schema for name mapping. +pub(crate) fn get_transform_expr( + transform_spec: &TransformSpec, + mut partition_values: HashMap, +) -> DeltaResult { + let mut transform = crate::expressions::Transform::new_top_level(); + + for field_transform in transform_spec { + use FieldTransformSpec::*; + transform = match field_transform { + StaticInsert { insert_after, expr } => { + transform.with_inserted_field(insert_after.clone(), expr.clone()) + } + StaticReplace { field_name, expr } => { + transform.with_replaced_field(field_name.clone(), expr.clone()) + } + StaticDrop { field_name } => transform.with_dropped_field(field_name.clone()), + PartitionColumn { + field_index, + insert_after, + } => { + let Some((_, partition_value)) = partition_values.remove(field_index) else { + return Err(Error::InternalError(format!( + "missing partition value for field index {field_index}" + ))); + }; + + let partition_value = Arc::new(partition_value.into()); + transform.with_inserted_field(insert_after.clone(), partition_value) + } + } + } + + Ok(Arc::new(Expression::Transform(transform))) +} + +/// Computes the transform spec for this scan. Static (query-level) transforms can already be +/// turned into expressions now, but file-level transforms like partition values can only be +/// described now; they are converted to expressions during the scan, using file metadata. +/// +/// NOTE: Transforms are "sparse" in the sense that they only mention fields which actually +/// change (added, replaced, dropped); the transform implicitly captures all fields that pass +/// from input to output unchanged and in the same relative order. +pub(crate) fn get_transform_spec(all_fields: &[ColumnType]) -> TransformSpec { + let mut transform_spec = TransformSpec::new(); + let mut last_physical_field: Option<&str> = None; + + for field in all_fields { + match field { + ColumnType::Selected(physical_name) => { + // Track physical field for calculating partition value insertion points. + last_physical_field = Some(physical_name); + } + ColumnType::Partition(logical_idx) => { + transform_spec.push(FieldTransformSpec::PartitionColumn { + insert_after: last_physical_field.map(String::from), + field_index: *logical_idx, + }); + } + } + } + + transform_spec +} + +/// Parse a partition value from the raw string representation +/// This was originally `parse_partition_value` in scan/mod.rs +pub(crate) fn parse_partition_value_raw( + raw: Option<&String>, + data_type: &DataType, +) -> DeltaResult { + use crate::expressions::Scalar; + match (raw, data_type.as_primitive_opt()) { + (Some(v), Some(primitive)) => primitive.parse_scalar(v), + (Some(_), None) => Err(Error::generic(format!( + "Unexpected partition column type: {data_type:?}" + ))), + _ => Ok(Scalar::Null(data_type.clone())), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::expressions::Scalar; + use crate::schema::{DataType, PrimitiveType, StructField, StructType}; + use std::collections::HashMap; + + #[test] + fn test_parse_partition_value_invalid_index() { + let schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable( + "col1", + DataType::STRING, + )])); + let partition_values = HashMap::new(); + + let result = parse_partition_value(5, &schema, &partition_values); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("out of bounds")); + } + + #[test] + fn test_parse_partition_values_mixed_transforms() { + let schema = Arc::new(StructType::new_unchecked(vec![ + StructField::nullable("id", DataType::STRING), + StructField::nullable("age", DataType::LONG), + ])); + let transform_spec = vec![ + FieldTransformSpec::PartitionColumn { + field_index: 1, + insert_after: Some("id".to_string()), + }, + FieldTransformSpec::StaticDrop { + field_name: "unused".to_string(), + }, + FieldTransformSpec::PartitionColumn { + field_index: 0, + insert_after: None, + }, + ]; + let mut partition_values = HashMap::new(); + partition_values.insert("age".to_string(), "30".to_string()); + partition_values.insert("id".to_string(), "test".to_string()); + + let result = parse_partition_values(&schema, &transform_spec, &partition_values).unwrap(); + assert_eq!(result.len(), 2); + assert!(result.contains_key(&0)); + assert!(result.contains_key(&1)); + } + + #[test] + fn test_parse_partition_values_empty_spec() { + let schema = Arc::new(StructType::new_unchecked(vec![])); + let transform_spec = vec![]; + let partition_values = HashMap::new(); + + let result = parse_partition_values(&schema, &transform_spec, &partition_values).unwrap(); + assert!(result.is_empty()); + } + + #[test] + fn test_get_transform_expr_missing_partition_value() { + let transform_spec = vec![FieldTransformSpec::PartitionColumn { + field_index: 0, + insert_after: None, + }]; + let partition_values = HashMap::new(); // Missing required partition value + + let result = get_transform_expr(&transform_spec, partition_values); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("missing partition value")); + } + + #[test] + fn test_get_transform_expr_static_transforms() { + let expr = Arc::new(Expression::literal(42)); + let transform_spec = vec![ + FieldTransformSpec::StaticInsert { + insert_after: Some("col1".to_string()), + expr: expr.clone(), + }, + FieldTransformSpec::StaticReplace { + field_name: "col2".to_string(), + expr: expr.clone(), + }, + FieldTransformSpec::StaticDrop { + field_name: "col3".to_string(), + }, + ]; + let partition_values = HashMap::new(); + + let result = get_transform_expr(&transform_spec, partition_values).unwrap(); + assert!(matches!(result.as_ref(), Expression::Transform(_))); + } + + #[test] + fn test_get_transform_spec_selected_only() { + let all_fields = vec![ + ColumnType::Selected("col1".to_string()), + ColumnType::Selected("col2".to_string()), + ]; + + let result = get_transform_spec(&all_fields); + assert!(result.is_empty()); // No partition columns = empty transform spec + } + + #[test] + fn test_get_transform_spec_with_partitions() { + let all_fields = vec![ + ColumnType::Selected("col1".to_string()), + ColumnType::Partition(1), + ColumnType::Selected("col2".to_string()), + ColumnType::Partition(2), + ]; + + let result = get_transform_spec(&all_fields); + assert_eq!(result.len(), 2); + + // Check first partition column + if let FieldTransformSpec::PartitionColumn { + field_index, + insert_after, + } = &result[0] + { + assert_eq!(*field_index, 1); + assert_eq!(insert_after.as_ref().unwrap(), "col1"); + } else { + panic!("Expected PartitionColumn transform"); + } + + // Check second partition column + if let FieldTransformSpec::PartitionColumn { + field_index, + insert_after, + } = &result[1] + { + assert_eq!(*field_index, 2); + assert_eq!(insert_after.as_ref().unwrap(), "col2"); + } else { + panic!("Expected PartitionColumn transform"); + } + } + + #[test] + fn test_parse_partition_value_raw_string() { + let result = + parse_partition_value_raw(Some(&"test_string".to_string()), &DataType::STRING).unwrap(); + assert_eq!(result, Scalar::String("test_string".to_string())); + } + + #[test] + fn test_parse_partition_value_raw_integer() { + let result = parse_partition_value_raw( + Some(&"42".to_string()), + &DataType::Primitive(PrimitiveType::Integer), + ) + .unwrap(); + assert_eq!(result, Scalar::Integer(42)); + } + + #[test] + fn test_parse_partition_value_raw_null() { + let result = parse_partition_value_raw(None, &DataType::STRING).unwrap(); + assert!(matches!(result, Scalar::Null(_))); + } + + #[test] + fn test_parse_partition_value_raw_invalid_type() { + let result = parse_partition_value_raw( + Some(&"value".to_string()), + &DataType::struct_type_unchecked(vec![]), // Non-primitive type + ); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Unexpected partition column type")); + } + + #[test] + fn test_parse_partition_value_raw_invalid_parse() { + let result = parse_partition_value_raw( + Some(&"not_a_number".to_string()), + &DataType::Primitive(PrimitiveType::Integer), + ); + assert!(result.is_err()); + } +}