Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ impl DomainMetadata {
}
}

// Create a new DomainMetadata action to remove a domain.
/// Create a new DomainMetadata action to remove a domain.
pub(crate) fn remove(domain: String, configuration: String) -> Self {
Self {
domain,
Expand All @@ -992,10 +992,6 @@ impl DomainMetadata {
pub(crate) fn configuration(&self) -> &str {
&self.configuration
}

pub(crate) fn is_removed(&self) -> bool {
self.removed
}
}

#[cfg(test)]
Expand Down
142 changes: 85 additions & 57 deletions kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::iter;
use std::ops::Deref;
use std::sync::{Arc, LazyLock};
Expand All @@ -8,6 +8,7 @@ use url::Url;
use crate::actions::{
as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema,
get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction,
INTERNAL_DOMAIN_PREFIX,
};
#[cfg(feature = "catalog-managed")]
use crate::committer::FileSystemCommitter;
Expand Down Expand Up @@ -134,7 +135,11 @@ pub struct Transaction {
// commit-wide timestamp (in milliseconds since epoch) - used in ICT, `txn` action, etc. to
// keep all timestamps within the same commit consistent.
commit_timestamp: i64,
domain_metadatas: Vec<DomainMetadata>,
// Domain metadata additions for this transaction.
domain_metadata_additions: Vec<DomainMetadata>,
// Domain names to remove in this transaction. The configuration values are fetched during
// commit from the log to preserve the pre-image in tombstones.
domain_removals: Vec<String>,
// Whether this transaction contains any logical data changes.
data_change: bool,
}
Expand Down Expand Up @@ -177,7 +182,8 @@ impl Transaction {
add_files_metadata: vec![],
set_transactions: vec![],
commit_timestamp,
domain_metadatas: vec![],
domain_metadata_additions: vec![],
domain_removals: vec![],
data_change: true,
})
}
Expand Down Expand Up @@ -347,7 +353,7 @@ impl Transaction {
/// fail (that is, we don't eagerly check domain validity here).
/// Setting metadata for multiple distinct domains is allowed.
pub fn with_domain_metadata(mut self, domain: String, configuration: String) -> Self {
self.domain_metadatas
self.domain_metadata_additions
.push(DomainMetadata::new(domain, configuration));
self
}
Expand All @@ -362,12 +368,50 @@ impl Transaction {
/// fail (that is, we don't eagerly check domain validity here).
/// Removing metadata for multiple distinct domains is allowed.
pub fn with_domain_metadata_removed(mut self, domain: String) -> Self {
// actual configuration value determined during commit
self.domain_metadatas
.push(DomainMetadata::remove(domain, String::new()));
self.domain_removals.push(domain);
self
}

/// Validate that user domains don't conflict with system domains or each other.
fn validate_user_domain_operations(&self) -> DeltaResult<()> {
let mut seen_domains = HashSet::new();

// Validate domain additions
for dm in &self.domain_metadata_additions {
let domain = dm.domain();
if domain.starts_with(INTERNAL_DOMAIN_PREFIX) {
return Err(Error::generic(
"Cannot modify domains that start with 'delta.' as those are system controlled",
));
}

if !seen_domains.insert(domain) {
return Err(Error::generic(format!(
"Metadata for domain {} already specified in this transaction",
domain
)));
}
}

// Validate domain removals
for domain in &self.domain_removals {
if domain.starts_with(INTERNAL_DOMAIN_PREFIX) {
return Err(Error::generic(
"Cannot modify domains that start with 'delta.' as those are system controlled",
));
}

if !seen_domains.insert(domain.as_str()) {
return Err(Error::generic(format!(
"Metadata for domain {} already specified in this transaction",
domain
)));
}
}

Ok(())
}

/// Generate domain metadata actions with validation. Handle both user and system domains.
///
/// This function may perform an expensive log replay operation if there are any domain removals.
Expand All @@ -378,71 +422,55 @@ impl Transaction {
engine: &'a dyn Engine,
row_tracking_high_watermark: Option<RowTrackingDomainMetadata>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a> {
// if there are domain metadata actions, the table must support it
if !self.domain_metadatas.is_empty()
// Validate feature support for user domain operations
if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty())
&& !self
.read_snapshot
.table_configuration()
.is_domain_metadata_supported()
{
return Err(Error::unsupported(
"Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"
));
return Err(Error::generic("Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"));
Copy link
Member

@zachschuermann zachschuermann Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error change from unsupported -> generic might highlight gap in test coverage? could we add something quick?

}

// validate user domain metadata and check if we have removals
let mut seen_domains = HashSet::new();
let mut has_removals = false;
for dm in &self.domain_metadatas {
if dm.is_internal() {
return Err(Error::Generic(
"Cannot modify domains that start with 'delta.' as those are system controlled"
.to_string(),
));
}

if !seen_domains.insert(dm.domain()) {
return Err(Error::Generic(format!(
"Metadata for domain {} already specified in this transaction",
dm.domain()
)));
}

if dm.is_removed() {
has_removals = true;
}
}
// Validate user domain operations
self.validate_user_domain_operations()?;

// Generate user domain removals via log replay (expensive if non-empty)
let removal_actions = if !self.domain_removals.is_empty() {
// Scan log to fetch existing configurations for tombstones
let existing_domains =
scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?;

// Create removal tombstones with pre-image configurations
let removals: Vec<_> = self
.domain_removals
.iter()
.filter_map(|domain| {
// If domain doesn't exist in the log, this is a no-op (filter it out)
existing_domains.get(domain).map(|existing| {
DomainMetadata::remove(domain.clone(), existing.configuration().to_owned())
})
})
.collect();

// fetch previous configuration values (requires log replay)
let existing_domains = if has_removals {
scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?
removals
} else {
HashMap::new()
vec![]
};

let user_domains = self
.domain_metadatas
.iter()
.filter_map(move |dm: &DomainMetadata| {
if dm.is_removed() {
existing_domains.get(dm.domain()).map(|existing| {
DomainMetadata::remove(
dm.domain().to_string(),
existing.configuration().to_string(),
)
})
} else {
Some(dm.clone())
}
});

let system_domains = row_tracking_high_watermark
// Generate system domain actions (row tracking)
let system_domain_actions = row_tracking_high_watermark
.map(DomainMetadata::try_from)
.transpose()?
.into_iter();

Ok(user_domains
.chain(system_domains)
// Chain all domain actions and convert to EngineData
Ok(self
.domain_metadata_additions
.clone()
.into_iter()
Comment on lines +469 to +471
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm.. cloning everything since this method takes &self? is it only used in commit (which takes self)? if so should we just make this take self too?

.chain(removal_actions)
.chain(system_domain_actions)
.map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)))
}

Expand Down
Loading