diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 58dff9bac..0625fbd04 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -190,27 +190,32 @@ impl TryFrom for Scalar { )] #[internal_api] pub(crate) struct Metadata { - // TODO: Make the struct fields private to force using the try_new function. /// Unique identifier for this table - pub(crate) id: String, + id: String, /// User-provided identifier for this table - pub(crate) name: Option, + name: Option, /// User-provided description for this table - pub(crate) description: Option, + description: Option, /// Specification of the encoding for the files stored in the table - pub(crate) format: Format, + format: Format, /// Schema of the table - pub(crate) schema_string: String, + schema_string: String, /// Column names by which the data should be partitioned - pub(crate) partition_columns: Vec, + partition_columns: Vec, /// The time when this metadata action is created, in milliseconds since the Unix epoch - pub(crate) created_time: Option, + created_time: Option, /// Configuration options for the metadata action. These are parsed into [`TableProperties`]. - pub(crate) configuration: HashMap, + configuration: HashMap, } impl Metadata { + /// Create a new [`Metadata`] instances. + /// + /// # Errors + /// + /// Returns an error if there are any metadata columns in the schema. // TODO: remove allow(dead_code) after we use this API in CREATE TABLE, etc. + #[internal_api] #[allow(dead_code)] pub(crate) fn try_new( name: Option, @@ -246,6 +251,13 @@ impl Metadata { }) } + #[internal_api] + pub(crate) fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { + let mut visitor = MetadataVisitor::default(); + visitor.visit_rows_of(data)?; + Ok(visitor.metadata) + } + // TODO(#1068/1069): make these just pub directly or make better internal_api macro for fields #[internal_api] #[allow(dead_code)] @@ -271,12 +283,6 @@ impl Metadata { self.created_time } - pub(crate) fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { - let mut visitor = MetadataVisitor::default(); - visitor.visit_rows_of(data)?; - Ok(visitor.metadata) - } - #[internal_api] #[allow(dead_code)] pub(crate) fn configuration(&self) -> &HashMap { diff --git a/kernel/src/checkpoint/tests.rs b/kernel/src/checkpoint/tests.rs index bcc4e96c0..554e42292 100644 --- a/kernel/src/checkpoint/tests.rs +++ b/kernel/src/checkpoint/tests.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use crate::action_reconciliation::{ deleted_file_retention_timestamp_with_time, DEFAULT_RETENTION_SECS, @@ -9,6 +9,7 @@ use crate::arrow::datatypes::{DataType, Schema}; use crate::checkpoint::create_last_checkpoint_data; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; +use crate::schema::{DataType as KernelDataType, StructField, StructType}; use crate::utils::test_utils::Action; use crate::{DeltaResult, FileMeta, Snapshot}; @@ -206,11 +207,17 @@ fn create_v2_checkpoint_protocol_action() -> Action { /// Create a Metadata action fn create_metadata_action() -> Action { - Action::Metadata(Metadata { - id: "test-table".into(), - schema_string: "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}".to_string(), - ..Default::default() - }) + Action::Metadata( + Metadata::try_new( + Some("test-table".into()), + None, + StructType::new_unchecked([StructField::nullable("value", KernelDataType::INTEGER)]), + vec![], + 0, + HashMap::new(), + ) + .unwrap(), + ) } /// Create an Add action with the specified path diff --git a/kernel/src/engine/arrow_data.rs b/kernel/src/engine/arrow_data.rs index 153fce6f0..b6a48e581 100644 --- a/kernel/src/engine/arrow_data.rs +++ b/kernel/src/engine/arrow_data.rs @@ -371,9 +371,9 @@ mod tests { .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); let metadata = Metadata::try_new_from_data(parsed.as_ref())?.unwrap(); - assert_eq!(metadata.id, "aff5cb91-8cd9-4195-aef9-446908507302"); - assert_eq!(metadata.created_time, Some(1670892997849)); - assert_eq!(metadata.partition_columns, vec!("c1", "c2")); + assert_eq!(metadata.id(), "aff5cb91-8cd9-4195-aef9-446908507302"); + assert_eq!(metadata.created_time(), Some(1670892997849)); + assert_eq!(*metadata.partition_columns(), vec!("c1", "c2")); Ok(()) } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 4406c5ddc..0c728edec 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -118,7 +118,7 @@ impl ScanBuilder { let logical_schema = self.schema.unwrap_or_else(|| self.snapshot.schema()); let state_info = StateInfo::try_new( logical_schema.as_ref(), - &self.snapshot.metadata().partition_columns, + self.snapshot.metadata().partition_columns(), self.snapshot.table_configuration().column_mapping_mode(), )?; diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index e9d830b5c..0b8aef0ff 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -55,21 +55,26 @@ fn result_to_sv(iter: impl Iterator async fn metadata_protocol() { let engine = Arc::new(SyncEngine::new()); let mut mock_table = LocalMockTable::new(); - let schema_string = serde_json::to_string(&get_schema()).unwrap(); mock_table .commit([ - Action::Metadata(Metadata { - schema_string, - configuration: HashMap::from([ - ("delta.enableChangeDataFeed".to_string(), "true".to_string()), - ( - "delta.enableDeletionVectors".to_string(), - "true".to_string(), - ), - ("delta.columnMapping.mode".to_string(), "none".to_string()), - ]), - ..Default::default() - }), + Action::Metadata( + Metadata::try_new( + None, + None, + get_schema(), + vec![], + 0, + HashMap::from([ + ("delta.enableChangeDataFeed".to_string(), "true".to_string()), + ( + "delta.enableDeletionVectors".to_string(), + "true".to_string(), + ), + ("delta.columnMapping.mode".to_string(), "none".to_string()), + ]), + ) + .unwrap(), + ), Action::Protocol( Protocol::try_new( 3, @@ -95,16 +100,21 @@ async fn metadata_protocol() { async fn cdf_not_enabled() { let engine = Arc::new(SyncEngine::new()); let mut mock_table = LocalMockTable::new(); - let schema_string = serde_json::to_string(&get_schema()).unwrap(); mock_table - .commit([Action::Metadata(Metadata { - schema_string, - configuration: HashMap::from([( - "delta.enableDeletionVectors".to_string(), - "true".to_string(), - )]), - ..Default::default() - })]) + .commit([Action::Metadata( + Metadata::try_new( + None, + None, + get_schema(), + vec![], + 0, + HashMap::from([( + "delta.enableDeletionVectors".to_string(), + "true".to_string(), + )]), + ) + .unwrap(), + )]) .await; let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) @@ -150,20 +160,25 @@ async fn unsupported_reader_feature() { async fn column_mapping_should_fail() { let engine = Arc::new(SyncEngine::new()); let mut mock_table = LocalMockTable::new(); - let schema_string = serde_json::to_string(&get_schema()).unwrap(); mock_table - .commit([Action::Metadata(Metadata { - schema_string, - configuration: HashMap::from([ - ( - "delta.enableDeletionVectors".to_string(), - "true".to_string(), - ), - ("delta.enableChangeDataFeed".to_string(), "true".to_string()), - ("delta.columnMapping.mode".to_string(), "id".to_string()), - ]), - ..Default::default() - })]) + .commit([Action::Metadata( + Metadata::try_new( + None, + None, + get_schema(), + vec![], + 0, + HashMap::from([ + ( + "delta.enableDeletionVectors".to_string(), + "true".to_string(), + ), + ("delta.enableChangeDataFeed".to_string(), "true".to_string()), + ("delta.columnMapping.mode".to_string(), "id".to_string()), + ]), + ) + .unwrap(), + )]) .await; let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) @@ -185,16 +200,18 @@ async fn incompatible_schemas_fail() { let engine = Arc::new(SyncEngine::new()); let mut mock_table = LocalMockTable::new(); - let schema_string = serde_json::to_string(&commit_schema).unwrap(); mock_table - .commit([Action::Metadata(Metadata { - schema_string, - configuration: HashMap::from([( - "delta.enableChangeDataFeed".to_string(), - "true".to_string(), - )]), - ..Default::default() - })]) + .commit([Action::Metadata( + Metadata::try_new( + None, + None, + commit_schema, + vec![], + 0, + HashMap::from([("delta.enableChangeDataFeed".to_string(), "true".to_string())]), + ) + .unwrap(), + )]) .await; let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 83fbb7207..73b82b66b 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -236,7 +236,7 @@ impl TableChanges { } /// The partition columns that will be read. pub(crate) fn partition_columns(&self) -> &Vec { - &self.end_snapshot.metadata().partition_columns + self.end_snapshot.metadata().partition_columns() } /// Create a [`TableChangesScanBuilder`] for an `Arc`. diff --git a/kernel/src/table_configuration.rs b/kernel/src/table_configuration.rs index f8a570dfd..143497fb3 100644 --- a/kernel/src/table_configuration.rs +++ b/kernel/src/table_configuration.rs @@ -417,6 +417,7 @@ mod test { use url::Url; use crate::actions::{Metadata, Protocol}; + use crate::schema::{DataType, StructField, StructType}; use crate::table_features::{ReaderFeature, WriterFeature}; use crate::table_properties::TableProperties; use crate::utils::test_utils::assert_result_error_with_message; @@ -426,14 +427,16 @@ mod test { #[test] fn dv_supported_not_enabled() { - let metadata = Metadata { - configuration: HashMap::from_iter([( - "delta.enableChangeDataFeed".to_string(), - "true".to_string(), - )]), - schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), - ..Default::default() - }; + let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]); + let metadata = Metadata::try_new( + None, + None, + schema, + vec![], + 0, + HashMap::from_iter([("delta.enableChangeDataFeed".to_string(), "true".to_string())]), + ) + .unwrap(); let protocol = Protocol::try_new( 3, 7, @@ -448,18 +451,22 @@ mod test { } #[test] fn dv_enabled() { - let metadata = Metadata { - configuration: HashMap::from_iter([( - "delta.enableChangeDataFeed".to_string(), - "true".to_string(), - ), - ( - "delta.enableDeletionVectors".to_string(), - "true".to_string(), - )]), - schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), - ..Default::default() - }; + let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]); + let metadata = Metadata::try_new( + None, + None, + schema, + vec![], + 0, + HashMap::from_iter([ + ("delta.enableChangeDataFeed".to_string(), "true".to_string()), + ( + "delta.enableDeletionVectors".to_string(), + "true".to_string(), + ), + ]), + ) + .unwrap(); let protocol = Protocol::try_new( 3, 7, @@ -474,16 +481,29 @@ mod test { } #[test] fn ict_supported_and_enabled() { - let metadata = Metadata { - schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), - configuration: HashMap::from_iter([( - "delta.enableInCommitTimestamps".to_string(), - "true".to_string(), - ), - ("delta.inCommitTimestampEnablementVersion".to_string(), "5".to_string()), - ("delta.inCommitTimestampEnablementTimestamp".to_string(), "100".to_string())]), - ..Default::default() - }; + let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]); + let metadata = Metadata::try_new( + None, + None, + schema, + vec![], + 0, + HashMap::from_iter([ + ( + "delta.enableInCommitTimestamps".to_string(), + "true".to_string(), + ), + ( + "delta.inCommitTimestampEnablementVersion".to_string(), + "5".to_string(), + ), + ( + "delta.inCommitTimestampEnablementTimestamp".to_string(), + "100".to_string(), + ), + ]), + ) + .unwrap(); let protocol = Protocol::try_new( 3, 7, @@ -500,14 +520,19 @@ mod test { } #[test] fn ict_supported_and_enabled_without_enablement_info() { - let metadata = Metadata { - schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), - configuration: HashMap::from_iter([( + let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]); + let metadata = Metadata::try_new( + None, + None, + schema, + vec![], + 0, + HashMap::from_iter([( "delta.enableInCommitTimestamps".to_string(), "true".to_string(), )]), - ..Default::default() - }; + ) + .unwrap(); let protocol = Protocol::try_new( 3, 7, @@ -525,10 +550,8 @@ mod test { } #[test] fn ict_supported_and_not_enabled() { - let metadata = Metadata { - schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), - ..Default::default() - }; + let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]); + let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap(); let protocol = Protocol::try_new( 3, 7, @@ -543,10 +566,8 @@ mod test { } #[test] fn fails_on_unsupported_feature() { - let metadata = Metadata { - schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), - ..Default::default() - }; + let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]); + let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap(); let protocol = Protocol::try_new(3, 7, Some(["unknown"]), Some(["unknown"])).unwrap(); let table_root = Url::try_from("file:///").unwrap(); TableConfiguration::try_new(metadata, protocol, table_root, 0) @@ -554,14 +575,16 @@ mod test { } #[test] fn dv_not_supported() { - let metadata = Metadata { - configuration: HashMap::from_iter([( - "delta.enableChangeDataFeed".to_string(), - "true".to_string(), - )]), - schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), - ..Default::default() - }; + let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]); + let metadata = Metadata::try_new( + None, + None, + schema, + vec![], + 0, + HashMap::from_iter([("delta.enableChangeDataFeed".to_string(), "true".to_string())]), + ) + .unwrap(); let protocol = Protocol::try_new( 3, 7, @@ -577,15 +600,16 @@ mod test { #[test] fn test_try_new_from() { - let schema_string =r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(); - let metadata = Metadata { - configuration: HashMap::from_iter([( - "delta.enableChangeDataFeed".to_string(), - "true".to_string(), - )]), - schema_string: schema_string.clone(), - ..Default::default() - }; + let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]); + let metadata = Metadata::try_new( + None, + None, + schema, + vec![], + 0, + HashMap::from_iter([("delta.enableChangeDataFeed".to_string(), "true".to_string())]), + ) + .unwrap(); let protocol = Protocol::try_new( 3, 7, @@ -596,8 +620,15 @@ mod test { let table_root = Url::try_from("file:///").unwrap(); let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap(); - let new_metadata = Metadata { - configuration: HashMap::from_iter([ + let new_schema = + StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]); + let new_metadata = Metadata::try_new( + None, + None, + new_schema, + vec![], + 0, + HashMap::from_iter([ ( "delta.enableChangeDataFeed".to_string(), "false".to_string(), @@ -607,9 +638,8 @@ mod test { "true".to_string(), ), ]), - schema_string, - ..Default::default() - }; + ) + .unwrap(); let new_protocol = Protocol::try_new( 3, 7, @@ -652,11 +682,9 @@ mod test { #[test] fn test_timestamp_ntz_validation_integration() { // Schema with TIMESTAMP_NTZ column - let schema_string = r#"{"type":"struct","fields":[{"name":"ts","type":"timestamp_ntz","nullable":true,"metadata":{}}]}"#.to_string(); - let metadata = Metadata { - schema_string, - ..Default::default() - }; + let schema = + StructType::new_unchecked([StructField::nullable("ts", DataType::TIMESTAMP_NTZ)]); + let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap(); let protocol_without_timestamp_ntz_features = Protocol::try_new( 3, @@ -699,11 +727,9 @@ mod test { #[test] fn test_variant_validation_integration() { // Schema with VARIANT column - let schema_string = r#"{"type":"struct","fields":[{"name":"v","type":"variant","nullable":true,"metadata":{}}]}"#.to_string(); - let metadata = Metadata { - schema_string, - ..Default::default() - }; + let schema = + StructType::new_unchecked([StructField::nullable("v", DataType::unshredded_variant())]); + let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap(); let protocol_without_variant_features = Protocol::try_new( 3, diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 5ac14e513..88d34b658 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -330,7 +330,7 @@ impl Transaction { fn generate_logical_to_physical(&self) -> Expression { // for now, we just pass through all the columns except partition columns. // note this is _incorrect_ if table config deems we need partition columns. - let partition_columns = &self.read_snapshot.metadata().partition_columns; + let partition_columns = self.read_snapshot.metadata().partition_columns(); let schema = self.read_snapshot.schema(); let fields = schema .fields()