diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index ae7669912..5540b478b 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -969,7 +969,7 @@ impl DomainMetadata { } } - // Create a new DomainMetadata action to remove a domain. + /// Create a new DomainMetadata action to remove a domain. pub(crate) fn remove(domain: String, configuration: String) -> Self { Self { domain, @@ -992,10 +992,6 @@ impl DomainMetadata { pub(crate) fn configuration(&self) -> &str { &self.configuration } - - pub(crate) fn is_removed(&self) -> bool { - self.removed - } } #[cfg(test)] diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index f647ba0e8..1e8c2592a 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::iter; use std::ops::Deref; use std::sync::{Arc, LazyLock}; @@ -8,6 +8,7 @@ use url::Url; use crate::actions::{ as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema, get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, + INTERNAL_DOMAIN_PREFIX, }; #[cfg(feature = "catalog-managed")] use crate::committer::FileSystemCommitter; @@ -134,7 +135,11 @@ pub struct Transaction { // commit-wide timestamp (in milliseconds since epoch) - used in ICT, `txn` action, etc. to // keep all timestamps within the same commit consistent. commit_timestamp: i64, - domain_metadatas: Vec, + // Domain metadata additions for this transaction. + domain_metadata_additions: Vec, + // Domain names to remove in this transaction. The configuration values are fetched during + // commit from the log to preserve the pre-image in tombstones. + domain_removals: Vec, // Whether this transaction contains any logical data changes. data_change: bool, } @@ -177,7 +182,8 @@ impl Transaction { add_files_metadata: vec![], set_transactions: vec![], commit_timestamp, - domain_metadatas: vec![], + domain_metadata_additions: vec![], + domain_removals: vec![], data_change: true, }) } @@ -347,7 +353,7 @@ impl Transaction { /// fail (that is, we don't eagerly check domain validity here). /// Setting metadata for multiple distinct domains is allowed. pub fn with_domain_metadata(mut self, domain: String, configuration: String) -> Self { - self.domain_metadatas + self.domain_metadata_additions .push(DomainMetadata::new(domain, configuration)); self } @@ -362,12 +368,50 @@ impl Transaction { /// fail (that is, we don't eagerly check domain validity here). /// Removing metadata for multiple distinct domains is allowed. pub fn with_domain_metadata_removed(mut self, domain: String) -> Self { - // actual configuration value determined during commit - self.domain_metadatas - .push(DomainMetadata::remove(domain, String::new())); + self.domain_removals.push(domain); self } + /// Validate that user domains don't conflict with system domains or each other. + fn validate_user_domain_operations(&self) -> DeltaResult<()> { + let mut seen_domains = HashSet::new(); + + // Validate domain additions + for dm in &self.domain_metadata_additions { + let domain = dm.domain(); + if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { + return Err(Error::generic( + "Cannot modify domains that start with 'delta.' as those are system controlled", + )); + } + + if !seen_domains.insert(domain) { + return Err(Error::generic(format!( + "Metadata for domain {} already specified in this transaction", + domain + ))); + } + } + + // Validate domain removals + for domain in &self.domain_removals { + if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { + return Err(Error::generic( + "Cannot modify domains that start with 'delta.' as those are system controlled", + )); + } + + if !seen_domains.insert(domain.as_str()) { + return Err(Error::generic(format!( + "Metadata for domain {} already specified in this transaction", + domain + ))); + } + } + + Ok(()) + } + /// Generate domain metadata actions with validation. Handle both user and system domains. /// /// This function may perform an expensive log replay operation if there are any domain removals. @@ -378,71 +422,55 @@ impl Transaction { engine: &'a dyn Engine, row_tracking_high_watermark: Option, ) -> DeltaResult>> + 'a> { - // if there are domain metadata actions, the table must support it - if !self.domain_metadatas.is_empty() + // Validate feature support for user domain operations + if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty()) && !self .read_snapshot .table_configuration() .is_domain_metadata_supported() { - return Err(Error::unsupported( - "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature" - )); + return Err(Error::unsupported("Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature")); } - // validate user domain metadata and check if we have removals - let mut seen_domains = HashSet::new(); - let mut has_removals = false; - for dm in &self.domain_metadatas { - if dm.is_internal() { - return Err(Error::Generic( - "Cannot modify domains that start with 'delta.' as those are system controlled" - .to_string(), - )); - } - - if !seen_domains.insert(dm.domain()) { - return Err(Error::Generic(format!( - "Metadata for domain {} already specified in this transaction", - dm.domain() - ))); - } - - if dm.is_removed() { - has_removals = true; - } - } + // Validate user domain operations + self.validate_user_domain_operations()?; + + // Generate user domain removals via log replay (expensive if non-empty) + let removal_actions = if !self.domain_removals.is_empty() { + // Scan log to fetch existing configurations for tombstones + let existing_domains = + scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?; + + // Create removal tombstones with pre-image configurations + let removals: Vec<_> = self + .domain_removals + .iter() + .filter_map(|domain| { + // If domain doesn't exist in the log, this is a no-op (filter it out) + existing_domains.get(domain).map(|existing| { + DomainMetadata::remove(domain.clone(), existing.configuration().to_owned()) + }) + }) + .collect(); - // fetch previous configuration values (requires log replay) - let existing_domains = if has_removals { - scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)? + removals } else { - HashMap::new() + vec![] }; - let user_domains = self - .domain_metadatas - .iter() - .filter_map(move |dm: &DomainMetadata| { - if dm.is_removed() { - existing_domains.get(dm.domain()).map(|existing| { - DomainMetadata::remove( - dm.domain().to_string(), - existing.configuration().to_string(), - ) - }) - } else { - Some(dm.clone()) - } - }); - - let system_domains = row_tracking_high_watermark + // Generate system domain actions (row tracking) + let system_domain_actions = row_tracking_high_watermark .map(DomainMetadata::try_from) .transpose()? .into_iter(); - Ok(user_domains - .chain(system_domains) + // Chain all domain actions and convert to EngineData + Ok(self + .domain_metadata_additions + .clone() + .into_iter() + .chain(removal_actions) + .chain(system_domain_actions) .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine))) } diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index cc534b36d..53c6efc81 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1409,6 +1409,42 @@ async fn test_set_domain_metadata_unsupported_writer_feature( Ok(()) } +#[tokio::test] +async fn test_remove_domain_metadata_unsupported_writer_feature( +) -> Result<(), Box> { + let _ = tracing_subscriber::fmt::try_init(); + + let schema = Arc::new(StructType::try_new(vec![StructField::nullable( + "number", + DataType::INTEGER, + )])?); + + let table_name = "test_remove_domain_metadata_unsupported"; + + // Create table WITHOUT domain metadata writer feature support + let (store, engine, table_location) = engine_store_setup(table_name, None); + let table_url = create_table( + store.clone(), + table_location, + schema.clone(), + &[], + true, + vec![], + vec![], + ) + .await?; + + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let res = snapshot + .transaction(Box::new(FileSystemCommitter::new()))? + .with_domain_metadata_removed("app.config".to_string()) + .commit(&engine); + + assert_result_error_with_message(res, "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"); + + Ok(()) +} + #[tokio::test] async fn test_remove_domain_metadata_non_existent_domain() -> Result<(), Box> {