Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,20 +961,11 @@ pub(crate) struct DomainMetadata {

impl DomainMetadata {
/// Create a new DomainMetadata action.
pub(crate) fn new(domain: String, configuration: String) -> Self {
pub(crate) fn new(domain: String, configuration: String, removed: bool) -> Self {
Self {
domain,
configuration,
removed: false,
}
}

// Create a new DomainMetadata action to remove a domain.
pub(crate) fn remove(domain: String, configuration: String) -> Self {
Self {
domain,
configuration,
removed: true,
removed,
}
}

Expand All @@ -992,10 +983,6 @@ impl DomainMetadata {
pub(crate) fn configuration(&self) -> &str {
&self.configuration
}

pub(crate) fn is_removed(&self) -> bool {
self.removed
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions kernel/src/row_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl TryFrom<RowTrackingDomainMetadata> for DomainMetadata {
Ok(DomainMetadata::new(
RowTrackingDomainMetadata::ROW_TRACKING_DOMAIN_NAME.to_string(),
serde_json::to_string(&metadata)?,
false,
))
}
}
Expand Down
77 changes: 40 additions & 37 deletions kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use url::Url;
use crate::actions::{
as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema,
get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction,
INTERNAL_DOMAIN_PREFIX,
};
#[cfg(feature = "catalog-managed")]
use crate::committer::FileSystemCommitter;
Expand Down Expand Up @@ -134,7 +135,11 @@ pub struct Transaction {
// commit-wide timestamp (in milliseconds since epoch) - used in ICT, `txn` action, etc. to
// keep all timestamps within the same commit consistent.
commit_timestamp: i64,
domain_metadatas: Vec<DomainMetadata>,
// Domain metadata additions for this transaction.
domain_metadatas_to_add: Vec<DomainMetadata>,
// Domain names to remove in this transaction. The configuration values are fetched during
// commit from the log to preserve the pre-image in tombstones.
domains_to_remove: Vec<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_domain_metadata, remove_domains ?

// Whether this transaction contains any logical data changes.
data_change: bool,
}
Expand Down Expand Up @@ -177,7 +182,8 @@ impl Transaction {
add_files_metadata: vec![],
set_transactions: vec![],
commit_timestamp,
domain_metadatas: vec![],
domain_metadatas_to_add: vec![],
domains_to_remove: vec![],
data_change: true,
})
}
Expand Down Expand Up @@ -347,8 +353,8 @@ impl Transaction {
/// fail (that is, we don't eagerly check domain validity here).
/// Setting metadata for multiple distinct domains is allowed.
pub fn with_domain_metadata(mut self, domain: String, configuration: String) -> Self {
self.domain_metadatas
.push(DomainMetadata::new(domain, configuration));
self.domain_metadatas_to_add
.push(DomainMetadata::new(domain, configuration, false));
self
}

Expand All @@ -362,9 +368,7 @@ impl Transaction {
/// fail (that is, we don't eagerly check domain validity here).
/// Removing metadata for multiple distinct domains is allowed.
pub fn with_domain_metadata_removed(mut self, domain: String) -> Self {
// actual configuration value determined during commit
self.domain_metadatas
.push(DomainMetadata::remove(domain, String::new()));
self.domains_to_remove.push(domain);
self
}

Expand All @@ -378,8 +382,8 @@ impl Transaction {
engine: &'a dyn Engine,
row_tracking_high_watermark: Option<RowTrackingDomainMetadata>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a> {
// if there are domain metadata actions, the table must support it
if !self.domain_metadatas.is_empty()
// if there are domain metadata actions (additions or removals), the table must support it
if (!self.domain_metadatas_to_add.is_empty() || !self.domains_to_remove.is_empty())
&& !self
.read_snapshot
.table_configuration()
Expand All @@ -390,58 +394,57 @@ impl Transaction {
));
}

// validate user domain metadata and check if we have removals
// validate all domain metadata operations (additions and removals)
let mut seen_domains = HashSet::new();
let mut has_removals = false;
for dm in &self.domain_metadatas {
if dm.is_internal() {

// chain both additions and removals into a single iterator of domain names
let all_domains = self
.domain_metadatas_to_add
.iter()
.map(|dm| dm.domain())
.chain(self.domains_to_remove.iter().map(String::as_str));

for domain in all_domains {
if domain.starts_with(INTERNAL_DOMAIN_PREFIX) {
return Err(Error::Generic(
"Cannot modify domains that start with 'delta.' as those are system controlled"
.to_string(),
));
}

if !seen_domains.insert(dm.domain()) {
if !seen_domains.insert(domain) {
return Err(Error::Generic(format!(
"Metadata for domain {} already specified in this transaction",
dm.domain()
domain
)));
}

if dm.is_removed() {
has_removals = true;
}
}

// fetch previous configuration values (requires log replay)
let existing_domains = if has_removals {
// fetch previous configuration values if we have removals (requires log replay)
let existing_domains = if !self.domains_to_remove.is_empty() {
scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?
} else {
HashMap::new()
};

let user_domains = self
.domain_metadatas
.iter()
.filter_map(move |dm: &DomainMetadata| {
if dm.is_removed() {
existing_domains.get(dm.domain()).map(|existing| {
DomainMetadata::remove(
dm.domain().to_string(),
existing.configuration().to_string(),
)
})
} else {
Some(dm.clone())
}
});
// process domain additions - clone directly since domain_metadatas_to_add only contains additions
let domain_additions = self.domain_metadatas_to_add.iter().cloned();

// process domain removals - fetch configuration from existing domains and create tombstones
let domain_removals = self.domains_to_remove.iter().filter_map(move |domain| {
// if domain doesn't exist in the log, this is a no-op (filter it out)
existing_domains.get(domain).map(|existing| {
DomainMetadata::new(domain.clone(), existing.configuration().to_owned(), true)
})
});

let system_domains = row_tracking_high_watermark
.map(DomainMetadata::try_from)
.transpose()?
.into_iter();

Ok(user_domains
Ok(domain_additions
.chain(domain_removals)
.chain(system_domains)
.map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)))
}
Expand Down
Loading