diff --git a/Cargo.toml b/Cargo.toml index 331d965c81..00144945b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.15.2", features = [ +delta_kernel = { version = "0.16.0", features = [ "arrow-56", "default-engine-rustls", "internal-api", @@ -93,4 +93,4 @@ arro3 = "arro3" Arro3 = "Arro3" AKS = "AKS" # to avoid using 'type' as a field name. -tpe = "tpe" \ No newline at end of file +tpe = "tpe" diff --git a/crates/aws/tests/integration_s3_dynamodb.rs b/crates/aws/tests/integration_s3_dynamodb.rs index 49bacbfc64..721f6fe440 100644 --- a/crates/aws/tests/integration_s3_dynamodb.rs +++ b/crates/aws/tests/integration_s3_dynamodb.rs @@ -92,11 +92,11 @@ async fn test_create_s3_table() -> TestResult<()> { let table_name = format!("{}_{}", "create_test", Uuid::new_v4()); let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned())); - let schema = StructType::new(vec![StructField::new( + let schema = StructType::try_new(vec![StructField::new( "id".to_string(), DataType::Primitive(PrimitiveType::Integer), true, - )]); + )])?; let storage_options: HashMap = HashMap::from([ ( deltalake_aws::constants::AWS_ALLOW_HTTP.into(), @@ -454,11 +454,11 @@ fn add_action(name: &str) -> Action { async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestResult { let table_name = format!("{table_name}_{}", Uuid::new_v4()); let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned())); - let schema = StructType::new(vec![StructField::new( + let schema = StructType::try_new(vec![StructField::new( "Id".to_string(), DataType::Primitive(PrimitiveType::Integer), true, - )]); + )])?; let table_url = Url::parse(&table_uri).unwrap(); let table = DeltaTableBuilder::from_uri(table_url) .unwrap() diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 80396f84bb..ec92048c82 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -82,9 +82,8 @@ parking_lot = "0.12" percent-encoding = "2" tracing = { workspace = true } rand = "0.8" -maplit = "1" sqlparser = { version = "0.56.0" } -humantime = { version = "2.1.0" } +humantime = { version = "2.1.0", optional = true } validator = { version = "0.19", features = ["derive"] } [dev-dependencies] @@ -94,7 +93,6 @@ datatest-stable = "0.2" deltalake-test = { path = "../test" } dotenvy = "0" fs_extra = "1.2.0" -maplit = "1" pretty_assertions = "1.2.1" pretty_env_logger = "0.5.0" rstest = { version = "0.26.1" } @@ -110,7 +108,7 @@ json = ["parquet/json"] python = ["arrow/pyarrow"] native-tls = ["delta_kernel/default-engine-native-tls"] rustls = ["delta_kernel/default-engine-rustls"] -cloud = ["object_store/cloud"] +cloud = ["object_store/cloud", "dep:humantime"] # enable caching some file I/O operations when scanning delta logs delta-cache = ["foyer", "tempfile", "url/serde"] diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index b6cb446d74..c506355905 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -605,7 +605,7 @@ mod test { } async fn setup_table() -> DeltaTable { - let schema = StructType::new(vec![ + let schema = StructType::try_new(vec![ StructField::new( "id".to_string(), DataType::Primitive(PrimitiveType::String), @@ -668,18 +668,24 @@ mod test { ), StructField::new( "_struct".to_string(), - DataType::Struct(Box::new(StructType::new(vec![ - StructField::new("a", DataType::Primitive(PrimitiveType::Integer), true), - StructField::new( - "nested", - DataType::Struct(Box::new(StructType::new(vec![StructField::new( - "b", - DataType::Primitive(PrimitiveType::Integer), + DataType::Struct(Box::new( + StructType::try_new(vec![ + StructField::new("a", DataType::Primitive(PrimitiveType::Integer), true), + StructField::new( + "nested", + DataType::Struct(Box::new( + StructType::try_new(vec![StructField::new( + "b", + DataType::Primitive(PrimitiveType::Integer), + true, + )]) + .unwrap(), + )), true, - )]))), - true, - ), - ]))), + ), + ]) + .unwrap(), + )), true, ), StructField::new( @@ -690,7 +696,8 @@ mod test { ))), true, ), - ]); + ]) + .unwrap(); let table = DeltaOps::new_in_memory() .create() diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 9af78fa0ac..9696bfcd7a 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -1193,7 +1193,7 @@ mod tests { let tmp_dir = TempDir::new().unwrap(); let table_path = tmp_dir.path().to_str().unwrap(); - let schema = StructType::new(vec![ + let schema = StructType::try_new(vec![ StructField::new( "id".to_string(), DataType::Primitive(PrimitiveType::Long), @@ -1204,7 +1204,7 @@ mod tests { DataType::Primitive(PrimitiveType::String), false, ), - ]); + ])?; CreateBuilder::new() .with_location(table_path) diff --git a/crates/core/src/kernel/arrow/engine_ext.rs b/crates/core/src/kernel/arrow/engine_ext.rs index b503c0e196..48321fcdd5 100644 --- a/crates/core/src/kernel/arrow/engine_ext.rs +++ b/crates/core/src/kernel/arrow/engine_ext.rs @@ -127,12 +127,12 @@ impl SnapshotExt for Snapshot { fn stats_schema(&self) -> DeltaResult { let partition_columns = self.metadata().partition_columns(); let column_mapping_mode = self.table_configuration().column_mapping_mode(); - let physical_schema = StructType::new( + let physical_schema = StructType::try_new( self.schema() .fields() .filter(|field| !partition_columns.contains(field.name())) .map(|field| field.make_physical(column_mapping_mode)), - ); + )?; Ok(Arc::new(stats_schema( &physical_schema, self.table_properties(), @@ -225,7 +225,7 @@ fn partitions_schema( if partition_columns.is_empty() { return Ok(None); } - Ok(Some(StructType::new( + Ok(Some(StructType::try_new( partition_columns .iter() .map(|col| { @@ -234,7 +234,7 @@ fn partitions_schema( }) }) .collect::, _>>()?, - ))) + )?)) } /// Generates the expected schema for file statistics. @@ -293,7 +293,7 @@ pub(crate) fn stats_schema( } } - StructType::new(fields) + StructType::try_new(fields).expect("Failed to construct StructType for stats_schema") } // Convert a min/max stats schema into a nullcount schema (all leaf fields are LONG) @@ -424,7 +424,7 @@ impl<'a> SchemaTransform<'a> for BaseStatsTransform { self.path.pop(); // exclude struct fields with no children - if matches!(field.data_type(), DataType::Struct(dt) if dt.fields.is_empty()) { + if matches!(field.data_type(), DataType::Struct(dt) if dt.num_fields() == 0) { None } else { Some(field) @@ -610,15 +610,17 @@ mod tests { #[test] fn test_stats_schema_simple() { let properties: TableProperties = [("key", "value")].into(); - let file_schema = StructType::new([StructField::nullable("id", DataType::LONG)]); + let file_schema = + StructType::try_new([StructField::nullable("id", DataType::LONG)]).unwrap(); let stats_schema = stats_schema(&file_schema, &properties); - let expected = StructType::new([ + let expected = StructType::try_new([ StructField::nullable("numRecords", DataType::LONG), StructField::nullable("nullCount", file_schema.clone()), StructField::nullable("minValues", file_schema.clone()), StructField::nullable("maxValues", file_schema), - ]); + ]) + .unwrap(); assert_eq!(&expected, &stats_schema); } @@ -636,46 +638,53 @@ mod tests { // Create array type for a field that's not eligible for data skipping let array_type = DataType::Array(Box::new(ArrayType::new(DataType::STRING, false))); - let metadata_struct = StructType::new([ + let metadata_struct = StructType::try_new([ StructField::nullable("name", DataType::STRING), StructField::nullable("tags", array_type), StructField::nullable("score", DataType::DOUBLE), - ]); - let file_schema = StructType::new([ + ]) + .unwrap(); + let file_schema = StructType::try_new([ StructField::nullable("id", DataType::LONG), StructField::nullable( "metadata", DataType::Struct(Box::new(metadata_struct.clone())), ), - ]); + ]) + .unwrap(); let stats_schema = stats_schema(&file_schema, &properties); - let expected_null_nested = StructType::new([ + let expected_null_nested = StructType::try_new([ StructField::nullable("name", DataType::LONG), StructField::nullable("tags", DataType::LONG), StructField::nullable("score", DataType::LONG), - ]); - let expected_null = StructType::new([ + ]) + .unwrap(); + let expected_null = StructType::try_new([ StructField::nullable("id", DataType::LONG), StructField::nullable("metadata", DataType::Struct(Box::new(expected_null_nested))), - ]); + ]) + .unwrap(); - let expected_nested = StructType::new([ + let expected_nested = StructType::try_new([ StructField::nullable("name", DataType::STRING), StructField::nullable("score", DataType::DOUBLE), - ]); - let expected_fields = StructType::new([ + ]) + .unwrap(); + let expected_fields = StructType::try_new([ StructField::nullable("id", DataType::LONG), StructField::nullable("metadata", DataType::Struct(Box::new(expected_nested))), - ]); + ]) + .unwrap(); - let expected = StructType::new([ + let expected = StructType::try_new([ StructField::nullable("numRecords", DataType::LONG), StructField::nullable("nullCount", expected_null), StructField::nullable("minValues", expected_fields.clone()), StructField::nullable("maxValues", expected_fields.clone()), - ]); + ]) + .unwrap(); assert_eq!(&expected, &stats_schema); } @@ -688,33 +697,38 @@ mod tests { )] .into(); - let user_struct = StructType::new([ + let user_struct = StructType::try_new([ StructField::nullable("name", DataType::STRING), StructField::nullable("age", DataType::INTEGER), - ]); - let file_schema = StructType::new([ + ]) + .unwrap(); + let file_schema = StructType::try_new([ StructField::nullable("id", DataType::LONG), StructField::nullable("user.info", DataType::Struct(Box::new(user_struct.clone()))), - ]); + ]) + .unwrap(); let stats_schema = stats_schema(&file_schema, &properties); - let expected_nested = StructType::new([StructField::nullable("name", DataType::STRING)]); - let expected_fields = StructType::new([StructField::nullable( + let expected_nested = + StructType::try_new([StructField::nullable("name", DataType::STRING)]).unwrap(); + let expected_fields = StructType::try_new([StructField::nullable( "user.info", DataType::Struct(Box::new(expected_nested)), - )]); + )]) + .unwrap(); let null_count = NullCountStatsTransform .transform_struct(&expected_fields) .unwrap() .into_owned(); - let expected = StructType::new([ + let expected = StructType::try_new([ StructField::nullable("numRecords", DataType::LONG), StructField::nullable("nullCount", null_count), StructField::nullable("minValues", expected_fields.clone()), StructField::nullable("maxValues", expected_fields.clone()), - ]); + ]) + .unwrap(); assert_eq!(&expected, &stats_schema); } @@ -727,25 +741,28 @@ mod tests { )] .into(); - let logical_schema = StructType::new([ + let logical_schema = StructType::try_new([ StructField::nullable("name", DataType::STRING), StructField::nullable("age", DataType::INTEGER), - ]); + ]) + .unwrap(); let stats_schema = stats_schema(&logical_schema, &properties); - let expected_fields = StructType::new([StructField::nullable("name", DataType::STRING)]); + let expected_fields = + StructType::try_new([StructField::nullable("name", DataType::STRING)]).unwrap(); let null_count = NullCountStatsTransform .transform_struct(&expected_fields) .unwrap() .into_owned(); - let expected = StructType::new([ + let expected = StructType::try_new([ StructField::nullable("numRecords", DataType::LONG), StructField::nullable("nullCount", null_count), StructField::nullable("minValues", expected_fields.clone()), StructField::nullable("maxValues", expected_fields.clone()), - ]); + ]) + .unwrap(); assert_eq!(&expected, &stats_schema); } @@ -758,30 +775,34 @@ mod tests { // - "id" (LONG) - eligible for both null count and min/max // - "is_active" (BOOLEAN) - eligible for null count but NOT for min/max // - "metadata" (BINARY) - eligible for null count but NOT for min/max - let file_schema = StructType::new([ + let file_schema = StructType::try_new([ StructField::nullable("id", DataType::LONG), StructField::nullable("is_active", DataType::BOOLEAN), StructField::nullable("metadata", DataType::BINARY), - ]); + ]) + .unwrap(); let stats_schema = stats_schema(&file_schema, &properties); // Expected nullCount schema: all fields converted to LONG - let expected_null_count = StructType::new([ + let expected_null_count = StructType::try_new([ StructField::nullable("id", DataType::LONG), StructField::nullable("is_active", DataType::LONG), StructField::nullable("metadata", DataType::LONG), - ]); + ]) + .unwrap(); // Expected minValues/maxValues schema: only eligible fields (no boolean, no binary) - let expected_min_max = StructType::new([StructField::nullable("id", DataType::LONG)]); + let expected_min_max = + StructType::try_new([StructField::nullable("id", DataType::LONG)]).unwrap(); - let expected = StructType::new([ + let expected = StructType::try_new([ StructField::nullable("numRecords", DataType::LONG), StructField::nullable("nullCount", expected_null_count), StructField::nullable("minValues", expected_min_max.clone()), StructField::nullable("maxValues", expected_min_max), - ]); + ]) + .unwrap(); assert_eq!(&expected, &stats_schema); } @@ -791,50 +812,57 @@ mod tests { let properties: TableProperties = [("key", "value")].into(); // Create a nested schema where some nested fields are eligible for min/max and others aren't - let user_struct = StructType::new([ + let user_struct = StructType::try_new([ StructField::nullable("name", DataType::STRING), // eligible for min/max StructField::nullable("is_admin", DataType::BOOLEAN), // NOT eligible for min/max StructField::nullable("age", DataType::INTEGER), // eligible for min/max StructField::nullable("profile_pic", DataType::BINARY), // NOT eligible for min/max - ]); + ]) + .unwrap(); - let file_schema = StructType::new([ + let file_schema = StructType::try_new([ StructField::nullable("id", DataType::LONG), StructField::nullable("user", DataType::Struct(Box::new(user_struct.clone()))), StructField::nullable("is_deleted", DataType::BOOLEAN), // NOT eligible for min/max - ]); + ]) + .unwrap(); let stats_schema = stats_schema(&file_schema, &properties); // Expected nullCount schema: all fields converted to LONG, maintaining structure - let expected_null_user = StructType::new([ + let expected_null_user = StructType::try_new([ StructField::nullable("name", DataType::LONG), StructField::nullable("is_admin", DataType::LONG), StructField::nullable("age", DataType::LONG), StructField::nullable("profile_pic", DataType::LONG), - ]); - let expected_null_count = StructType::new([ + ]) + .unwrap(); + let expected_null_count = StructType::try_new([ StructField::nullable("id", DataType::LONG), StructField::nullable("user", DataType::Struct(Box::new(expected_null_user))), StructField::nullable("is_deleted", DataType::LONG), - ]); + ]) + .unwrap(); // Expected minValues/maxValues schema: only eligible fields - let expected_minmax_user = StructType::new([ + let expected_minmax_user = StructType::try_new([ StructField::nullable("name", DataType::STRING), StructField::nullable("age", DataType::INTEGER), - ]); - let expected_min_max = StructType::new([ + ]) + .unwrap(); + let expected_min_max = StructType::try_new([ StructField::nullable("id", DataType::LONG), StructField::nullable("user", DataType::Struct(Box::new(expected_minmax_user))), - ]); + ]) + .unwrap(); - let expected = StructType::new([ + let expected = StructType::try_new([ StructField::nullable("numRecords", DataType::LONG), StructField::nullable("nullCount", expected_null_count), StructField::nullable("minValues", expected_min_max.clone()), StructField::nullable("maxValues", expected_min_max), - ]); + ]) + .unwrap(); assert_eq!(&expected, &stats_schema); } @@ -844,41 +872,45 @@ mod tests { let properties: TableProperties = [("key", "value")].into(); // Create a schema with only fields that are NOT eligible for min/max skipping - let file_schema = StructType::new([ + let file_schema = StructType::try_new([ StructField::nullable("is_active", DataType::BOOLEAN), StructField::nullable("metadata", DataType::BINARY), StructField::nullable( "tags", DataType::Array(Box::new(ArrayType::new(DataType::STRING, false))), ), - ]); + ]) + .unwrap(); let stats_schema = stats_schema(&file_schema, &properties); // Expected nullCount schema: all fields converted to LONG - let expected_null_count = StructType::new([ + let expected_null_count = StructType::try_new([ StructField::nullable("is_active", DataType::LONG), StructField::nullable("metadata", DataType::LONG), StructField::nullable("tags", DataType::LONG), - ]); + ]) + .unwrap(); // Expected minValues/maxValues schema: empty since no fields are eligible // Since there are no eligible fields, minValues and maxValues should not be present - let expected = StructType::new([ + let expected = StructType::try_new([ StructField::nullable("numRecords", DataType::LONG), StructField::nullable("nullCount", expected_null_count), // No minValues or maxValues fields since no primitive fields are eligible - ]); + ]) + .unwrap(); assert_eq!(&expected, &stats_schema); } #[test] fn test_partitions_schema() -> DeltaResultLocal<()> { - let logical_schema = StructType::new([ + let logical_schema = StructType::try_new([ StructField::nullable("name", DataType::STRING), StructField::nullable("age", DataType::INTEGER), - ]); + ]) + .unwrap(); let result = partitions_schema(&logical_schema, &[])?; assert_eq!(None, result); diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index ecabddb5c7..328c211b01 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -4,7 +4,6 @@ use std::str::FromStr; use delta_kernel::schema::{DataType, StructField}; use delta_kernel::table_features::{ReaderFeature, WriterFeature}; -use maplit::hashset; use serde::{Deserialize, Serialize}; use crate::kernel::{error::Error, DeltaResult}; @@ -497,7 +496,7 @@ impl ProtocolInner { } None => { self.writer_features = - Some(hashset! {WriterFeature::ChangeDataFeed}) + Some(HashSet::from([WriterFeature::ChangeDataFeed])) } } } else if self.min_writer_version <= 3 { @@ -520,14 +519,14 @@ impl ProtocolInner { features.insert(WriterFeature::DeletionVectors); features } - None => hashset! {WriterFeature::DeletionVectors}, + None => HashSet::from([WriterFeature::DeletionVectors]), }; let reader_features = match self.reader_features { Some(mut features) => { features.insert(ReaderFeature::DeletionVectors); features } - None => hashset! {ReaderFeature::DeletionVectors}, + None => HashSet::from([ReaderFeature::DeletionVectors]), }; self.min_reader_version = 3; self.min_writer_version = 7; diff --git a/crates/core/src/kernel/models/fields.rs b/crates/core/src/kernel/models/fields.rs index 4b9f0692dd..b4c198f276 100644 --- a/crates/core/src/kernel/models/fields.rs +++ b/crates/core/src/kernel/models/fields.rs @@ -1,5 +1,5 @@ //! Schema definitions for action types -use std::sync::{Arc, LazyLock}; +use std::sync::LazyLock; use delta_kernel::schema::{ArrayType, DataType, MapType, StructField, StructType}; @@ -27,20 +27,21 @@ impl ActionType { static METADATA_FIELD: LazyLock = LazyLock::new(|| { StructField::new( "metaData", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("id", DataType::STRING, true), StructField::new("name", DataType::STRING, true), StructField::new("description", DataType::STRING, true), StructField::new( "format", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("provider", DataType::STRING, true), StructField::new( "options", MapType::new(DataType::STRING, DataType::STRING, true), false, ), - ]), + ]) + .expect("Failed to construct format StructType in METADATA_FIELD"), false, ), StructField::new("schemaString", DataType::STRING, true), @@ -55,7 +56,8 @@ static METADATA_FIELD: LazyLock = LazyLock::new(|| { MapType::new(DataType::STRING, DataType::STRING, true), false, ), - ]), + ]) + .expect("Failed to construct StructType for METADATA_FIELD"), true, ) }); @@ -63,7 +65,7 @@ static METADATA_FIELD: LazyLock = LazyLock::new(|| { static PROTOCOL_FIELD: LazyLock = LazyLock::new(|| { StructField::new( "protocol", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("minReaderVersion", DataType::INTEGER, true), StructField::new("minWriterVersion", DataType::INTEGER, true), StructField::new( @@ -76,7 +78,8 @@ static PROTOCOL_FIELD: LazyLock = LazyLock::new(|| { ArrayType::new(DataType::STRING, true), true, ), - ]), + ]) + .expect("Failed to construct StructType for PROTOCOL_FIELD"), true, ) }); @@ -84,7 +87,7 @@ static PROTOCOL_FIELD: LazyLock = LazyLock::new(|| { static COMMIT_INFO_FIELD: LazyLock = LazyLock::new(|| { StructField::new( "commitInfo", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("timestamp", DataType::LONG, false), StructField::new("operation", DataType::STRING, false), StructField::new("isolationLevel", DataType::STRING, true), @@ -101,7 +104,8 @@ static COMMIT_INFO_FIELD: LazyLock = LazyLock::new(|| { MapType::new(DataType::STRING, DataType::STRING, true), true, ), - ]), + ]) + .expect("Failed to construct StructType for COMMIT_INFO_FIELD"), true, ) }); @@ -109,7 +113,7 @@ static COMMIT_INFO_FIELD: LazyLock = LazyLock::new(|| { static ADD_FIELD: LazyLock = LazyLock::new(|| { StructField::new( "add", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("path", DataType::STRING, true), partition_values_field(), StructField::new("size", DataType::LONG, true), @@ -121,7 +125,8 @@ static ADD_FIELD: LazyLock = LazyLock::new(|| { StructField::new("baseRowId", DataType::LONG, true), StructField::new("defaultRowCommitVersion", DataType::LONG, true), StructField::new("clusteringProvider", DataType::STRING, true), - ]), + ]) + .expect("Failed to construct StructType for ADD_FIELD"), true, ) }); @@ -129,7 +134,7 @@ static ADD_FIELD: LazyLock = LazyLock::new(|| { static REMOVE_FIELD: LazyLock = LazyLock::new(|| { StructField::new( "remove", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("path", DataType::STRING, true), StructField::new("deletionTimestamp", DataType::LONG, true), StructField::new("dataChange", DataType::BOOLEAN, true), @@ -141,7 +146,8 @@ static REMOVE_FIELD: LazyLock = LazyLock::new(|| { deletion_vector_field(), StructField::new("baseRowId", DataType::LONG, true), StructField::new("defaultRowCommitVersion", DataType::LONG, true), - ]), + ]) + .expect("Failed to construct StructType for REMOVE_FIELD"), true, ) }); @@ -150,11 +156,12 @@ static REMOVE_FIELD: LazyLock = LazyLock::new(|| { static REMOVE_FIELD_CHECKPOINT: LazyLock = LazyLock::new(|| { StructField::new( "remove", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("path", DataType::STRING, false), StructField::new("deletionTimestamp", DataType::LONG, true), StructField::new("dataChange", DataType::BOOLEAN, false), - ]), + ]) + .expect("Failed to construct StructType for REMOVE_FIELD_CHECKPOINT"), true, ) }); @@ -162,13 +169,14 @@ static REMOVE_FIELD_CHECKPOINT: LazyLock = LazyLock::new(|| { static CDC_FIELD: LazyLock = LazyLock::new(|| { StructField::new( "cdc", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("path", DataType::STRING, false), partition_values_field(), StructField::new("size", DataType::LONG, false), StructField::new("dataChange", DataType::BOOLEAN, false), tags_field(), - ]), + ]) + .expect("Failed to construct StructType for CDC_FIELD"), true, ) }); @@ -176,11 +184,12 @@ static CDC_FIELD: LazyLock = LazyLock::new(|| { static TXN_FIELD: LazyLock = LazyLock::new(|| { StructField::new( "txn", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("appId", DataType::STRING, false), StructField::new("version", DataType::LONG, false), StructField::new("lastUpdated", DataType::LONG, true), - ]), + ]) + .expect("Failed to construct StructType for TXN_FIELD"), true, ) }); @@ -188,7 +197,7 @@ static TXN_FIELD: LazyLock = LazyLock::new(|| { static DOMAIN_METADATA_FIELD: LazyLock = LazyLock::new(|| { StructField::new( "domainMetadata", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("domain", DataType::STRING, false), StructField::new( "configuration", @@ -196,7 +205,8 @@ static DOMAIN_METADATA_FIELD: LazyLock = LazyLock::new(|| { true, ), StructField::new("removed", DataType::BOOLEAN, false), - ]), + ]) + .expect("Failed to construct StructType for DOMAIN_METADATA_FIELD"), true, ) }); @@ -204,10 +214,11 @@ static DOMAIN_METADATA_FIELD: LazyLock = LazyLock::new(|| { static CHECKPOINT_METADATA_FIELD: LazyLock = LazyLock::new(|| { StructField::new( "checkpointMetadata", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("flavor", DataType::STRING, false), tags_field(), - ]), + ]) + .expect("Failed to construct StructType for CHECKPOINT_METADATA_FIELD"), true, ) }); @@ -215,20 +226,21 @@ static CHECKPOINT_METADATA_FIELD: LazyLock = LazyLock::new(|| { static SIDECAR_FIELD: LazyLock = LazyLock::new(|| { StructField::new( "sidecar", - StructType::new(vec![ + StructType::try_new(vec![ StructField::new("path", DataType::STRING, false), StructField::new("sizeInBytes", DataType::LONG, true), StructField::new("modificationTime", DataType::LONG, false), StructField::new("type", DataType::STRING, false), tags_field(), - ]), + ]) + .expect("Failed to construct StructType for SIDECAR_FIELD"), true, ) }); #[allow(unused)] static LOG_SCHEMA: LazyLock = LazyLock::new(|| { - StructType::new(vec![ + StructType::try_new(vec![ ADD_FIELD.clone(), CDC_FIELD.clone(), COMMIT_INFO_FIELD.clone(), @@ -238,6 +250,7 @@ static LOG_SCHEMA: LazyLock = LazyLock::new(|| { REMOVE_FIELD.clone(), TXN_FIELD.clone(), ]) + .expect("Failed to construct StructType for LOG_SCHEMA") }); fn tags_field() -> StructField { @@ -259,20 +272,24 @@ fn partition_values_field() -> StructField { fn deletion_vector_field() -> StructField { StructField::new( "deletionVector", - DataType::Struct(Box::new(StructType::new(vec![ - StructField::new("storageType", DataType::STRING, false), - StructField::new("pathOrInlineDv", DataType::STRING, false), - StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, false), - StructField::new("cardinality", DataType::LONG, false), - ]))), + DataType::Struct(Box::new( + StructType::try_new(vec![ + StructField::new("storageType", DataType::STRING, false), + StructField::new("pathOrInlineDv", DataType::STRING, false), + StructField::new("offset", DataType::INTEGER, true), + StructField::new("sizeInBytes", DataType::INTEGER, false), + StructField::new("cardinality", DataType::LONG, false), + ]) + .expect("Failed to construct StructType for deletion_vector_field"), + )), true, ) } -pub(crate) fn log_schema_ref() -> &'static Arc { - static LOG_SCHEMA_REF: LazyLock> = - LazyLock::new(|| Arc::new(LOG_SCHEMA.clone())); +#[cfg(feature = "datafusion")] +pub(crate) fn log_schema_ref() -> &'static std::sync::Arc { + static LOG_SCHEMA_REF: LazyLock> = + LazyLock::new(|| std::sync::Arc::new(LOG_SCHEMA.clone())); &LOG_SCHEMA_REF } diff --git a/crates/core/src/kernel/schema/cast/merge_schema.rs b/crates/core/src/kernel/schema/cast/merge_schema.rs index d93c859418..abfd5fc2d4 100644 --- a/crates/core/src/kernel/schema/cast/merge_schema.rs +++ b/crates/core/src/kernel/schema/cast/merge_schema.rs @@ -112,7 +112,8 @@ pub(crate) fn merge_delta_struct( } } - Ok(StructType::new(fields)) + Ok(StructType::try_new(fields) + .map_err(|e| ArrowError::from_external_error(Box::new(e)))?) } Err(e) => { errors.push(e.to_string()); diff --git a/crates/core/src/kernel/schema/cast/mod.rs b/crates/core/src/kernel/schema/cast/mod.rs index bc275e6de8..3885254125 100644 --- a/crates/core/src/kernel/schema/cast/mod.rs +++ b/crates/core/src/kernel/schema/cast/mod.rs @@ -264,20 +264,22 @@ mod tests { fn test_merge_delta_schema_with_meta() { let mut left_meta = HashMap::new(); left_meta.insert("a".to_string(), "a1".to_string()); - let left_schema = DeltaStructType::new(vec![DeltaStructField::new( + let left_schema = DeltaStructType::try_new(vec![DeltaStructField::new( "f", DeltaDataType::STRING, false, ) - .with_metadata(left_meta)]); + .with_metadata(left_meta)]) + .unwrap(); let mut right_meta = HashMap::new(); right_meta.insert("b".to_string(), "b2".to_string()); - let right_schema = DeltaStructType::new(vec![DeltaStructField::new( + let right_schema = DeltaStructType::try_new(vec![DeltaStructField::new( "f", DeltaDataType::STRING, true, ) - .with_metadata(right_meta)]); + .with_metadata(right_meta)]) + .unwrap(); let result = merge_delta_struct(&left_schema, &right_schema).unwrap(); let fields = result.fields().collect_vec(); diff --git a/crates/core/src/kernel/schema/partitions.rs b/crates/core/src/kernel/schema/partitions.rs index 9cae659448..7a392b4276 100644 --- a/crates/core/src/kernel/schema/partitions.rs +++ b/crates/core/src/kernel/schema/partitions.rs @@ -348,10 +348,11 @@ mod tests { #[test] fn test_filter_to_kernel_predicate_equal() { - let schema = StructType::new(vec![ + let schema = StructType::try_new(vec![ StructField::new("name", DataType::Primitive(PrimitiveType::String), true), StructField::new("age", DataType::Primitive(PrimitiveType::Integer), true), - ]); + ]) + .unwrap(); let filter = PartitionFilter { key: "name".to_string(), value: PartitionValue::Equal("Alice".to_string()), @@ -365,11 +366,12 @@ mod tests { #[test] fn test_filter_to_kernel_predicate_not_equal() { - let schema = StructType::new(vec![StructField::new( + let schema = StructType::try_new(vec![StructField::new( "status", DataType::Primitive(PrimitiveType::String), true, - )]); + )]) + .unwrap(); let filter = PartitionFilter { key: "status".to_string(), value: PartitionValue::NotEqual("inactive".to_string()), @@ -383,10 +385,11 @@ mod tests { #[test] fn test_filter_to_kernel_predicate_comparisons() { - let schema = StructType::new(vec![ + let schema = StructType::try_new(vec![ StructField::new("score", DataType::Primitive(PrimitiveType::Integer), true), StructField::new("price", DataType::Primitive(PrimitiveType::Long), true), - ]); + ]) + .unwrap(); // Test less than let filter = PartitionFilter { @@ -427,11 +430,12 @@ mod tests { #[test] fn test_filter_to_kernel_predicate_in_operations() { - let schema = StructType::new(vec![StructField::new( + let schema = StructType::try_new(vec![StructField::new( "category", DataType::Primitive(PrimitiveType::String), true, - )]); + )]) + .unwrap(); let column = Expression::column(["category"]); let categories = [ @@ -469,11 +473,12 @@ mod tests { #[test] fn test_filter_to_kernel_predicate_empty_in_list() { - let schema = StructType::new(vec![StructField::new( + let schema = StructType::try_new(vec![StructField::new( "tag", DataType::Primitive(PrimitiveType::String), true, - )]); + )]) + .unwrap(); let filter = PartitionFilter { key: "tag".to_string(), @@ -485,11 +490,12 @@ mod tests { #[test] fn test_filter_to_kernel_predicate_field_not_found() { - let schema = StructType::new(vec![StructField::new( + let schema = StructType::try_new(vec![StructField::new( "existing_field", DataType::Primitive(PrimitiveType::String), true, - )]); + )]) + .unwrap(); let filter = PartitionFilter { key: "nonexistent_field".to_string(), @@ -506,16 +512,18 @@ mod tests { #[test] fn test_filter_to_kernel_predicate_non_primitive_field() { - let nested_struct = StructType::new(vec![StructField::new( + let nested_struct = StructType::try_new(vec![StructField::new( "inner", DataType::Primitive(PrimitiveType::String), true, - )]); - let schema = StructType::new(vec![StructField::new( + )]) + .unwrap(); + let schema = StructType::try_new(vec![StructField::new( "nested", DataType::Struct(Box::new(nested_struct)), true, - )]); + )]) + .unwrap(); let filter = PartitionFilter { key: "nested".to_string(), @@ -532,7 +540,7 @@ mod tests { #[test] fn test_filter_to_kernel_predicate_different_data_types() { - let schema = StructType::new(vec![ + let schema = StructType::try_new(vec![ StructField::new( "bool_field", DataType::Primitive(PrimitiveType::Boolean), @@ -554,7 +562,8 @@ mod tests { DataType::Primitive(PrimitiveType::Float), true, ), - ]); + ]) + .unwrap(); // Test boolean field let filter = PartitionFilter { @@ -580,11 +589,12 @@ mod tests { #[test] fn test_filter_to_kernel_predicate_invalid_scalar_value() { - let schema = StructType::new(vec![StructField::new( + let schema = StructType::try_new(vec![StructField::new( "number", DataType::Primitive(PrimitiveType::Integer), true, - )]); + )]) + .unwrap(); let filter = PartitionFilter { key: "number".to_string(), diff --git a/crates/core/src/kernel/snapshot/iterators.rs b/crates/core/src/kernel/snapshot/iterators.rs index 96edebe9fd..3bab97ea61 100644 --- a/crates/core/src/kernel/snapshot/iterators.rs +++ b/crates/core/src/kernel/snapshot/iterators.rs @@ -24,8 +24,6 @@ const FIELD_NAME_SIZE: &str = "size"; const FIELD_NAME_MODIFICATION_TIME: &str = "modificationTime"; const FIELD_NAME_STATS: &str = "stats"; const FIELD_NAME_STATS_PARSED: &str = "stats_parsed"; -const FIELD_NAME_FILE_CONSTANT_VALUES: &str = "fileConstantValues"; -const FIELD_NAME_PARTITION_VALUES: &str = "partitionValues"; const FIELD_NAME_PARTITION_VALUES_PARSED: &str = "partitionValues_parsed"; const FIELD_NAME_DELETION_VECTOR: &str = "deletionVector"; diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 90d882814e..029581ae03 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use arrow_array::{Array, RecordBatch, StringArray, StructArray}; +use arrow_array::RecordBatch; use delta_kernel::actions::{Metadata, Protocol}; use delta_kernel::expressions::{Scalar, StructData}; use delta_kernel::table_configuration::TableConfiguration; @@ -10,14 +10,6 @@ use indexmap::IndexMap; use super::super::scalars::ScalarExt; use super::iterators::LogicalFileView; -use crate::kernel::arrow::extract::extract_and_cast; -use crate::{DeltaResult, DeltaTableError}; - -const COL_NUM_RECORDS: &str = "numRecords"; -const COL_MIN_VALUES: &str = "minValues"; -const COL_MAX_VALUES: &str = "maxValues"; -const COL_NULL_COUNT: &str = "nullCount"; - pub(crate) trait PartitionsExt { fn hive_partition_path(&self) -> String; } @@ -134,6 +126,7 @@ mod datafusion { use ::datafusion::physical_optimizer::pruning::PruningStatistics; use ::datafusion::physical_plan::Accumulator; use arrow_arith::aggregate::sum; + use arrow_array::{Array, RecordBatch, StringArray, StructArray}; use arrow_array::{ArrayRef, BooleanArray, Int64Array, UInt64Array}; use arrow_schema::DataType as ArrowDataType; use delta_kernel::expressions::Expression; @@ -143,8 +136,16 @@ mod datafusion { use super::*; use crate::kernel::arrow::engine_ext::ExpressionEvaluatorExt as _; use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column}; + use crate::{DeltaResult, DeltaTableError}; + + use crate::kernel::arrow::extract::extract_and_cast; use crate::kernel::ARROW_HANDLER; + const COL_NUM_RECORDS: &str = "numRecords"; + const COL_MIN_VALUES: &str = "minValues"; + const COL_MAX_VALUES: &str = "maxValues"; + const COL_NULL_COUNT: &str = "nullCount"; + #[derive(Debug, Default, Clone)] enum AccumulatorType { Min, diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 836605a403..25aec85158 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -92,7 +92,11 @@ impl Snapshot { table_root.set_path(&format!("{}/", table_root.path())); } let snapshot = match spawn_blocking(move || { - KernelSnapshot::try_new(table_root, engine.as_ref(), version) + let mut builder = KernelSnapshot::builder_for(table_root); + if let Some(version) = version { + builder = builder.at_version(version); + } + builder.build(engine.as_ref()) }) .await .map_err(|e| DeltaTableError::Generic(e.to_string()))? @@ -111,7 +115,7 @@ impl Snapshot { let schema = snapshot.table_configuration().schema(); Ok(Self { - inner: Arc::new(snapshot), + inner: snapshot, config, schema, table_url: log_store.config().location.clone(), @@ -142,12 +146,12 @@ impl Snapshot { ); let engine = log_store.engine(None); - let snapshot = KernelSnapshot::try_new(table_url.clone(), engine.as_ref(), None)?; + let snapshot = KernelSnapshot::builder_for(table_url.clone()).build(engine.as_ref())?; let schema = snapshot.table_configuration().schema(); Ok(( Self { - inner: Arc::new(snapshot), + inner: snapshot, config: Default::default(), schema, table_url, @@ -175,7 +179,11 @@ impl Snapshot { let engine = log_store.engine(None); let current = self.inner.clone(); let snapshot = spawn_blocking(move || { - KernelSnapshot::try_new_from(current, engine.as_ref(), target_version) + let mut builder = KernelSnapshot::builder_from(current); + if let Some(version) = target_version { + builder = builder.at_version(version); + } + builder.build(engine.as_ref()) }) .await .map_err(|e| DeltaTableError::Generic(e.to_string()))??; @@ -354,13 +362,16 @@ impl Snapshot { log_store: &dyn LogStore, ) -> BoxStream<'_, DeltaResult> { static TOMBSTONE_SCHEMA: LazyLock> = LazyLock::new(|| { - Arc::new(StructType::new(vec![ - ActionType::Remove.schema_field().clone(), - ActionType::Sidecar.schema_field().clone(), - ])) + Arc::new( + StructType::try_new(vec![ + ActionType::Remove.schema_field().clone(), + ActionType::Sidecar.schema_field().clone(), + ]) + .expect("Failed to create a StructType somehow"), + ) }); - // TODO: which capacity to choose? + // TODO: which capacity to choose let mut builder = RecordBatchReceiverStreamBuilder::new(100); let tx = builder.tx(); @@ -677,6 +688,7 @@ impl EagerSnapshot { } } +#[cfg(any(test, feature = "integration_test"))] pub(crate) fn partitions_schema( schema: &StructType, partition_columns: &[String], @@ -684,7 +696,7 @@ pub(crate) fn partitions_schema( if partition_columns.is_empty() { return Ok(None); } - Ok(Some(StructType::new( + Ok(Some(StructType::try_new( partition_columns .iter() .map(|col| { @@ -693,7 +705,7 @@ pub(crate) fn partitions_schema( }) }) .collect::, _>>()?, - ))) + )?)) } #[cfg(test)] @@ -813,11 +825,12 @@ mod tests { #[tokio::test] async fn test_partition_schema() { - let schema = StructType::new(vec![ + let schema = StructType::try_new(vec![ StructField::new("id", DataType::LONG, true), StructField::new("name", DataType::STRING, true), StructField::new("date", DataType::DATE, true), - ]); + ]) + .unwrap(); let partition_columns = vec!["date".to_string()]; let metadata = ActionFactory::metadata(&schema, Some(&partition_columns), None); @@ -839,11 +852,9 @@ mod tests { let (snapshot, _) = Snapshot::new_test(vec![&commit_data]).await.unwrap(); - let expected = Arc::new(StructType::new(vec![StructField::new( - "date", - DataType::DATE, - true, - )])); + let expected = Arc::new( + StructType::try_new(vec![StructField::new("date", DataType::DATE, true)]).unwrap(), + ); assert_eq!(snapshot.inner.partitions_schema().unwrap(), Some(expected)); let metadata = ActionFactory::metadata(&schema, None::>, None); diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 769244a663..9393791f39 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -224,7 +224,7 @@ mod tests { "size", "fileConstantValues", ])?; - let partition_schema = StructType::new(vec![StructField::nullable( + let partition_schema = StructType::try_new(vec![StructField::nullable( "Company Very Short", DataType::STRING, ) @@ -237,13 +237,14 @@ mod tests { "delta.columnMapping.physicalName".to_string(), MetadataValue::String(physical_partition_name.clone()), ), - ])]); + ])]) + .unwrap(); let partition_values = MapType::new(DataType::STRING, DataType::STRING, true); - let file_constant_values: SchemaRef = Arc::new(StructType::new([StructField::nullable( - "partitionValues", - partition_values, - )])); + let file_constant_values: SchemaRef = Arc::new( + StructType::try_new([StructField::nullable("partitionValues", partition_values)]) + .unwrap(), + ); // Inspecting the schema of file_constant_values: let _: ArrowSchema = file_constant_values.as_ref().try_into_arrow()?; diff --git a/crates/core/src/logstore/config.rs b/crates/core/src/logstore/config.rs index 7337d6373b..9969599a91 100644 --- a/crates/core/src/logstore/config.rs +++ b/crates/core/src/logstore/config.rs @@ -262,6 +262,7 @@ pub fn parse_f64(value: &str) -> DeltaResult { .map_err(|_| DeltaTableError::Generic(format!("failed to parse \"{value}\" as f64"))) } +#[cfg(feature = "cloud")] pub fn parse_duration(value: &str) -> DeltaResult { humantime::parse_duration(value) .map_err(|_| DeltaTableError::Generic(format!("failed to parse \"{value}\" as Duration"))) @@ -299,7 +300,6 @@ pub fn str_is_truthy(val: &str) -> bool { #[cfg(test)] mod tests { use super::*; - use maplit::hashmap; use std::time::Duration; // Test retry config parsing @@ -307,13 +307,13 @@ mod tests { #[test] fn test_retry_config_from_options() { use object_store::RetryConfig; - let options = hashmap! { - "max_retries".to_string() => "100".to_string() , - "retry_timeout".to_string() => "300s".to_string() , - "backoff_config.init_backoff".to_string() => "20s".to_string() , - "backoff_config.max_backoff".to_string() => "1h".to_string() , - "backoff_config.base".to_string() => "50.0".to_string() , - }; + let options = HashMap::from([ + ("max_retries".to_string(), "100".to_string()), + ("retry_timeout".to_string(), "300s".to_string()), + ("backoff_config.init_backoff".to_string(), "20s".to_string()), + ("backoff_config.max_backoff".to_string(), "1h".to_string()), + ("backoff_config.base".to_string(), "50.0".to_string()), + ]); let (retry_config, remainder): (RetryConfig, _) = super::try_parse_impl(options).unwrap(); assert!(remainder.is_empty()); @@ -329,11 +329,11 @@ mod tests { #[test] fn test_parse_result_handling() { use object_store::RetryConfig; - let options = hashmap! { - "retry_timeout".to_string() => "300s".to_string(), - "max_retries".to_string() => "not_a_number".to_string(), - "unknown_key".to_string() => "value".to_string(), - }; + let options = HashMap::from([ + ("retry_timeout".to_string(), "300s".to_string()), + ("max_retries".to_string(), "not_a_number".to_string()), + ("unknown_key".to_string(), "value".to_string()), + ]); let result: ParseResult = options.into_iter().collect(); println!("result: {result:?}"); @@ -348,11 +348,11 @@ mod tests { #[cfg(feature = "cloud")] #[test] fn test_storage_config_parsing() { - let options = hashmap! { - "max_retries".to_string() => "5".to_string(), - "retry_timeout".to_string() => "10s".to_string(), - "unknown_prop".to_string() => "value".to_string(), - }; + let options = HashMap::from([ + ("max_retries".to_string(), "5".to_string()), + ("retry_timeout".to_string(), "10s".to_string()), + ("unknown_prop".to_string(), "value".to_string()), + ]); let config = StorageConfig::parse_options(options).unwrap(); assert_eq!(config.retry.max_retries, 5); @@ -370,9 +370,6 @@ mod tests { assert_eq!(parse_f64("3.14").unwrap(), 3.14); assert!(parse_f64("not_a_number").is_err()); - assert_eq!(parse_duration("1h").unwrap(), Duration::from_secs(3600)); - assert!(parse_duration("invalid").is_err()); - assert!(parse_bool("true").unwrap()); assert!(parse_bool("1").unwrap()); assert!(!parse_bool("false").unwrap()); @@ -381,6 +378,13 @@ mod tests { assert_eq!(parse_string("test").unwrap(), "test"); } + #[cfg(feature = "cloud")] + #[test] + fn test_parsing_duration() { + assert_eq!(parse_duration("1h").unwrap(), Duration::from_secs(3600)); + assert!(parse_duration("invalid").is_err()); + } + // Test str_is_truthy function #[test] fn test_str_is_truthy() { diff --git a/crates/core/src/logstore/storage/retry_ext.rs b/crates/core/src/logstore/storage/retry_ext.rs index 7028615463..bfccb4cedb 100644 --- a/crates/core/src/logstore/storage/retry_ext.rs +++ b/crates/core/src/logstore/storage/retry_ext.rs @@ -4,6 +4,7 @@ use ::object_store::path::Path; use ::object_store::{Error, ObjectStore, PutPayload, PutResult, Result}; use tracing::log::*; +#[cfg(feature = "cloud")] use crate::logstore::config; impl ObjectStoreRetryExt for T {} diff --git a/crates/core/src/operations/add_column.rs b/crates/core/src/operations/add_column.rs index 0e263349dd..fe00722ea0 100644 --- a/crates/core/src/operations/add_column.rs +++ b/crates/core/src/operations/add_column.rs @@ -84,7 +84,7 @@ impl std::future::IntoFuture for AddColumnBuilder { let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; - let fields_right = &StructType::new(fields.clone()); + let fields_right = &StructType::try_new(fields.clone())?; if !fields_right .get_generated_columns() diff --git a/crates/core/src/operations/create.rs b/crates/core/src/operations/create.rs index 5805d06871..ba07fa84d4 100644 --- a/crates/core/src/operations/create.rs +++ b/crates/core/src/operations/create.rs @@ -303,7 +303,7 @@ impl CreateBuilder { }) .unwrap_or_else(|| current_protocol); - let schema = StructType::new(self.columns); + let schema = StructType::try_new(self.columns)?; let protocol = protocol .apply_properties_to_protocol(&configuration, self.raise_if_key_not_exists)? diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 55d87c0c8b..a2a531f08e 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -406,6 +406,7 @@ pub fn get_num_idx_cols_and_stats_columns( /// Get the target_file_size from the table configuration in the sates /// If table_config does not exist (only can occur in the first write action) it takes /// the configuration that was passed to the writerBuilder. +#[cfg(feature = "datafusion")] pub(crate) fn get_target_file_size( config: Option<&TableProperties>, configuration: &HashMap>, diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 75c1d7b7f3..8e6dbeb908 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -861,7 +861,7 @@ mod tests { #[tokio::test] async fn test_update_case_sensitive() { - let schema = StructType::new(vec![ + let schema = StructType::try_new(vec![ StructField::new( "Id".to_string(), DeltaDataType::Primitive(PrimitiveType::String), @@ -877,7 +877,8 @@ mod tests { DeltaDataType::Primitive(PrimitiveType::String), true, ), - ]); + ]) + .unwrap(); let arrow_schema = Arc::new(ArrowSchema::new(vec![ Field::new("Id", DataType::Utf8, true), @@ -1084,7 +1085,7 @@ mod tests { #[tokio::test] async fn test_update_with_array() { - let schema = StructType::new(vec![ + let schema = StructType::try_new(vec![ StructField::new( "id".to_string(), DeltaDataType::Primitive(PrimitiveType::Integer), @@ -1103,7 +1104,8 @@ mod tests { ))), true, ), - ]); + ]) + .unwrap(); let arrow_schema: ArrowSchema = (&schema).try_into_arrow().unwrap(); // Create the first batch @@ -1154,7 +1156,7 @@ mod tests { #[tokio::test] async fn test_update_with_array_that_must_be_coerced() { let _ = pretty_env_logger::try_init(); - let schema = StructType::new(vec![ + let schema = StructType::try_new(vec![ StructField::new( "id".to_string(), DeltaDataType::Primitive(PrimitiveType::Integer), @@ -1173,7 +1175,8 @@ mod tests { ))), true, ), - ]); + ]) + .unwrap(); let arrow_schema: ArrowSchema = (&schema).try_into_arrow().unwrap(); // Create the first batch diff --git a/crates/core/src/operations/update_field_metadata.rs b/crates/core/src/operations/update_field_metadata.rs index 76ceab6d02..320c626c59 100644 --- a/crates/core/src/operations/update_field_metadata.rs +++ b/crates/core/src/operations/update_field_metadata.rs @@ -92,14 +92,14 @@ impl std::future::IntoFuture for UpdateFieldMetadataBuilder { let table_schema = this.snapshot.schema(); - let mut fields = table_schema.fields.clone(); // Check if the field exists in the schema. Otherwise, no need to continue the // operation - let Some(field) = fields.get_mut(&this.field_name) else { + let Some(field) = table_schema.field(&this.field_name) else { return Err(DeltaTableError::Generic( "No field with the provided name in the schema".to_string(), )); }; + let mut field = field.clone(); // DO NOT MODIFY PROTECTED METADATA. // Since `delta_kernel::schema::ColumnMetadataKey` does not `impl` any parsing (e.g. `std::core::From``) - at the time of implementation - @@ -125,7 +125,15 @@ impl std::future::IntoFuture for UpdateFieldMetadataBuilder { .or_insert(value); }); - let updated_table_schema = StructType::new(fields.into_values()); + // This feels a little silly but I could not find a better way to modify the StructType + // "in place" as of delta-kernel-rs 0.16.0 + let updated_table_schema = StructType::try_new(table_schema.fields().map(|f| { + match f.name == field.name { + // return our modified field instead + true => field.clone(), + false => f.clone(), + } + }))?; let mut metadata = this.snapshot.metadata().clone(); diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index aaaa02e16a..828e80617b 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -1,6 +1,6 @@ //! Implementation for writing delta checkpoints. -use std::sync::{Arc, LazyLock}; +use std::sync::LazyLock; use url::Url; @@ -42,11 +42,13 @@ pub(crate) async fn create_checkpoint_for( let engine = log_store.engine(operation_id); let task_engine = engine.clone(); - let snapshot = - spawn_blocking(move || Snapshot::try_new(table_root, task_engine.as_ref(), Some(version))) - .await - .map_err(|e| DeltaTableError::Generic(e.to_string()))??; - let snapshot = Arc::new(snapshot); + let snapshot = spawn_blocking(move || { + Snapshot::builder_for(table_root) + .at_version(version) + .build(task_engine.as_ref()) + }) + .await + .map_err(|e| DeltaTableError::Generic(e.to_string()))??; let cp_writer = snapshot.checkpoint()?; diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index ba60d6ed6c..2a97c2d530 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -306,13 +306,13 @@ impl DeltaTableState { if let Some(partition_schema) = self.snapshot.snapshot().inner.partitions_schema()? { fields.push(StructField::nullable( "partition", - DataType::struct_type(partition_schema.fields().cloned()), + DataType::try_struct_type(partition_schema.fields().cloned())?, )); expressions.push(column_expr_ref!("partitionValues_parsed")); } let expression = Expression::Struct(expressions); - let table_schema = DataType::struct_type(fields); + let table_schema = DataType::try_struct_type(fields)?; let input_schema = self.snapshot.files.schema(); let input_schema = Arc::new(input_schema.as_ref().try_into_kernel()?); diff --git a/crates/core/src/test_utils/factories/actions.rs b/crates/core/src/test_utils/factories/actions.rs index f4464d2b4a..773606c3c6 100644 --- a/crates/core/src/test_utils/factories/actions.rs +++ b/crates/core/src/test_utils/factories/actions.rs @@ -74,12 +74,13 @@ impl ActionFactory { HashMap::new() }; - let data_schema = StructType::new( + let data_schema = StructType::try_new( schema .fields() .filter(|f| !partition_columns.contains(f.name())) .cloned(), - ); + ) + .unwrap(); let batch = DataFactory::record_batch(&data_schema, 10, &bounds).unwrap(); let stats = DataFactory::file_stats(&batch).unwrap(); diff --git a/crates/core/src/test_utils/factories/mod.rs b/crates/core/src/test_utils/factories/mod.rs index 612d807fad..2e81885207 100644 --- a/crates/core/src/test_utils/factories/mod.rs +++ b/crates/core/src/test_utils/factories/mod.rs @@ -43,7 +43,7 @@ impl TestSchemas { /// - modified: string pub fn simple() -> &'static StructType { static SIMPLE: LazyLock = LazyLock::new(|| { - StructType::new(vec![ + StructType::try_new(vec![ StructField::new( "id".to_string(), DataType::Primitive(PrimitiveType::String), @@ -60,6 +60,7 @@ impl TestSchemas { true, ), ]) + .expect("Failed to construct StructType") }); &SIMPLE diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 60b5fda955..66e45de70e 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -29,7 +29,6 @@ use super::utils::{ ShareableBuffer, }; use super::{DeltaWriter, DeltaWriterError, WriteMode}; -use crate::ensure_table_uri; use crate::errors::DeltaTableError; use crate::kernel::schema::merge_arrow_schema; use crate::kernel::MetadataExt as _; @@ -970,7 +969,7 @@ mod tests { DataType as DeltaDataType, PrimitiveType, StructField, StructType, }; - let table_schema = StructType::new(vec![ + let table_schema = StructType::try_new(vec![ StructField::new( "id".to_string(), DeltaDataType::Primitive(PrimitiveType::String), @@ -986,7 +985,8 @@ mod tests { DeltaDataType::Primitive(PrimitiveType::String), true, ), - ]); + ]) + .unwrap(); let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path(); diff --git a/crates/core/src/writer/test_utils.rs b/crates/core/src/writer/test_utils.rs index 9f27f3cc7c..782f8b6edf 100644 --- a/crates/core/src/writer/test_utils.rs +++ b/crates/core/src/writer/test_utils.rs @@ -133,7 +133,7 @@ fn data_without_null() -> (Int32Array, StringArray, StringArray) { } pub fn get_delta_schema() -> StructType { - StructType::new(vec![ + StructType::try_new(vec![ StructField::new( "id".to_string(), DeltaDataType::Primitive(PrimitiveType::String), @@ -150,6 +150,7 @@ pub fn get_delta_schema() -> StructType { true, ), ]) + .unwrap() } pub fn get_delta_metadata(partition_cols: &[String]) -> Metadata { @@ -247,7 +248,7 @@ pub fn get_record_batch_with_nested_struct() -> RecordBatch { } pub fn get_delta_schema_with_nested_struct() -> StructType { - StructType::new(vec![ + StructType::try_new(vec![ StructField::new( "id".to_string(), DeltaDataType::Primitive(PrimitiveType::String), @@ -265,14 +266,18 @@ pub fn get_delta_schema_with_nested_struct() -> StructType { ), StructField::new( String::from("nested"), - DeltaDataType::Struct(Box::new(StructType::new(vec![StructField::new( - String::from("count"), - DeltaDataType::Primitive(PrimitiveType::Integer), - true, - )]))), + DeltaDataType::Struct(Box::new( + StructType::try_new(vec![StructField::new( + String::from("count"), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + )]) + .unwrap(), + )), true, ), ]) + .unwrap() } pub async fn setup_table_with_configuration( diff --git a/crates/core/tests/checkpoint_writer.rs b/crates/core/tests/checkpoint_writer.rs index 74ba66115c..2ccde70a78 100644 --- a/crates/core/tests/checkpoint_writer.rs +++ b/crates/core/tests/checkpoint_writer.rs @@ -180,6 +180,7 @@ mod simple_checkpoint { mod delete_expired_delta_log_in_checkpoint { use super::*; + use std::collections::HashMap; use std::fs::{FileTimes, OpenOptions}; use std::ops::Sub; use std::time::{Duration, SystemTime}; @@ -187,16 +188,21 @@ mod delete_expired_delta_log_in_checkpoint { use ::object_store::path::Path as ObjectStorePath; use deltalake_core::table::config::TableProperty; use deltalake_core::*; - use maplit::hashmap; #[tokio::test] async fn test_delete_expired_logs() { let mut table = fs_common::create_table( "../test/tests/data/checkpoints_with_expired_logs/expired", - Some(hashmap! { - TableProperty::LogRetentionDuration.as_ref().into() => Some("interval 10 minute".to_string()), - TableProperty::EnableExpiredLogCleanup.as_ref().into() => Some("true".to_string()) - }), + Some(HashMap::from([ + ( + TableProperty::LogRetentionDuration.as_ref().into(), + Some("interval 10 minute".to_string()), + ), + ( + TableProperty::EnableExpiredLogCleanup.as_ref().into(), + Some("true".to_string()), + ), + ])), ) .await; @@ -265,10 +271,16 @@ mod delete_expired_delta_log_in_checkpoint { async fn test_not_delete_expired_logs() { let mut table = fs_common::create_table( "../test/tests/data/checkpoints_with_expired_logs/not_delete_expired", - Some(hashmap! { - TableProperty::LogRetentionDuration.as_ref().into() => Some("interval 1 second".to_string()), - TableProperty::EnableExpiredLogCleanup.as_ref().into() => Some("false".to_string()) - }), + Some(HashMap::from([ + ( + TableProperty::LogRetentionDuration.as_ref().into(), + Some("interval 1 second".to_string()), + ), + ( + TableProperty::EnableExpiredLogCleanup.as_ref().into(), + Some("false".to_string()), + ), + ])), ) .await; @@ -322,16 +334,20 @@ mod checkpoints_with_tombstones { use deltalake_core::kernel::*; use deltalake_core::table::config::TableProperty; use deltalake_core::*; - use maplit::hashmap; use pretty_assertions::assert_eq; use std::collections::{HashMap, HashSet}; #[tokio::test] #[ignore] async fn test_expired_tombstones() { - let mut table = fs_common::create_table("../test/tests/data/checkpoints_tombstones/expired", Some(hashmap! { - TableProperty::DeletedFileRetentionDuration.as_ref().into() => Some("interval 1 minute".to_string()) - })).await; + let mut table = fs_common::create_table( + "../test/tests/data/checkpoints_tombstones/expired", + Some(HashMap::from([( + TableProperty::DeletedFileRetentionDuration.as_ref().into(), + Some("interval 1 minute".to_string()), + )])), + ) + .await; let a1 = fs_common::add(3 * 60 * 1000); // 3 mins ago, let a2 = fs_common::add(2 * 60 * 1000); // 2 mins ago, diff --git a/crates/core/tests/command_merge.rs b/crates/core/tests/command_merge.rs index d0948e96c5..8e55d3896b 100644 --- a/crates/core/tests/command_merge.rs +++ b/crates/core/tests/command_merge.rs @@ -31,7 +31,7 @@ async fn create_table(table_uri: &str, partition: Option>) -> DeltaTab } fn get_delta_schema() -> StructType { - StructType::new(vec![ + StructType::try_new(vec![ StructField::new( "id".to_string(), DeltaDataType::Primitive(PrimitiveType::String), @@ -48,6 +48,7 @@ fn get_delta_schema() -> StructType { true, ), ]) + .unwrap() } fn get_arrow_schema() -> Arc { diff --git a/crates/core/tests/fs_common/mod.rs b/crates/core/tests/fs_common/mod.rs index 04910bc9b5..f179040a96 100644 --- a/crates/core/tests/fs_common/mod.rs +++ b/crates/core/tests/fs_common/mod.rs @@ -90,11 +90,12 @@ pub async fn create_table( fs::create_dir_all(&log_dir).unwrap(); cleanup_dir_except(log_dir, vec![]); - let schema = StructType::new(vec![StructField::new( + let schema = StructType::try_new(vec![StructField::new( "id".to_string(), DataType::Primitive(PrimitiveType::Integer), true, - )]); + )]) + .unwrap(); create_test_table(path, schema, Vec::new(), config.unwrap_or_default()).await } diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index 5f28b6a992..b371021a1f 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -1321,7 +1321,7 @@ mod local { ))), true, )]; - let schema = StructType::new(fields); + let schema = StructType::try_new(fields).unwrap(); let table = deltalake_core::DeltaTableBuilder::from_uri( url::Url::from_directory_path( std::path::Path::new("./tests/data/issue-1619") diff --git a/crates/core/tests/read_delta_partitions_test.rs b/crates/core/tests/read_delta_partitions_test.rs index a36bf57cca..7aa58a9a78 100644 --- a/crates/core/tests/read_delta_partitions_test.rs +++ b/crates/core/tests/read_delta_partitions_test.rs @@ -8,8 +8,8 @@ mod fs_common; #[tokio::test] async fn read_null_partitions_from_checkpoint() { use deltalake_core::kernel::Add; - use maplit::hashmap; use serde_json::json; + use std::collections::HashMap; let mut table = fs_common::create_table_from_json( "../test/tests/data/read_null_partitions_from_checkpoint", @@ -28,9 +28,7 @@ async fn read_null_partitions_from_checkpoint() { let delta_log = std::path::Path::new(&table.table_uri()).join("_delta_log"); let add = |partition: Option| Add { - partition_values: hashmap! { - "color".to_string() => partition - }, + partition_values: HashMap::from([("color".to_string(), partition)]), ..fs_common::add(0) }; diff --git a/crates/test/src/concurrent.rs b/crates/test/src/concurrent.rs index 4d2ff9d1dc..0a13aa5ba4 100644 --- a/crates/test/src/concurrent.rs +++ b/crates/test/src/concurrent.rs @@ -20,11 +20,11 @@ pub async fn test_concurrent_writes(context: &IntegrationContext) -> TestResult async fn prepare_table( context: &IntegrationContext, ) -> Result<(DeltaTable, String), Box> { - let schema = StructType::new(vec![StructField::new( + let schema = StructType::try_new(vec![StructField::new( "Id".to_string(), DataType::Primitive(PrimitiveType::Integer), true, - )]); + )])?; let table_uri = context.uri_for_table(TestTables::Custom("concurrent_workers".into())); diff --git a/python/src/schema.rs b/python/src/schema.rs index e5613ea5ec..1d1127578f 100644 --- a/python/src/schema.rs +++ b/python/src/schema.rs @@ -22,6 +22,7 @@ use pyo3_arrow::PySchema as PyArrow3Schema; use std::collections::HashMap; use std::sync::Arc; +use crate::error::SchemaMismatchError; use crate::utils::warn; // PyO3 doesn't yet support converting classes with inheritance with Python @@ -636,7 +637,8 @@ impl StructType { .into_iter() .map(|field| field.inner.clone()) .collect(); - let inner_type = DeltaStructType::new(fields); + let inner_type = + DeltaStructType::try_new(fields).expect("Failed to construct a StructType"); Self { inner_type } } @@ -757,7 +759,8 @@ impl PySchema { .into_iter() .map(|field| field.inner.clone()) .collect(); - let inner_type = DeltaStructType::new(fields); + let inner_type = DeltaStructType::try_new(fields) + .map_err(|e| SchemaMismatchError::new_err(e.to_string()))?; Ok((Self {}, StructType { inner_type })) } diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 3404664d47..6ebdff1251 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1618,23 +1618,6 @@ def test_issue_1651_roundtrip_timestamp(tmp_path: pathlib.Path): assert dataset.count_rows() == 1 -@pytest.mark.pyarrow -def test_invalid_decimals(tmp_path: pathlib.Path): - import re - - import pyarrow as pa - - data = pa.table( - {"x": pa.array([Decimal("10000000000000000000000000000000000000.0")])} - ) - - with pytest.raises( - SchemaMismatchError, - match=re.escape("Invalid data type for Delta Lake: Decimal256(39, 1)"), - ): - write_deltalake(table_or_uri=tmp_path, mode="append", data=data) - - @pytest.mark.pyarrow def test_write_large_decimal(tmp_path: pathlib.Path): import pyarrow as pa