-
Couldn't load subscription status.
- Fork 118
feat: support writing domain metadata (2/2) #1275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
7897691
27f59c6
eae4394
535d242
9ade4c7
a7aefc0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,13 @@ | ||
| use std::collections::HashSet; | ||
| use std::collections::{HashMap, HashSet}; | ||
| use std::iter; | ||
| use std::ops::Deref; | ||
| use std::sync::{Arc, LazyLock}; | ||
|
|
||
| use url::Url; | ||
|
|
||
| use crate::actions::{ | ||
| as_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema, | ||
| get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, | ||
| as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema, | ||
| get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, | ||
| }; | ||
| #[cfg(feature = "catalog-managed")] | ||
| use crate::committer::FileSystemCommitter; | ||
|
|
@@ -352,7 +352,27 @@ impl Transaction { | |
| self | ||
| } | ||
|
|
||
| /// Remove domain metadata from the Delta log. | ||
| /// If the domain exists in the Delta log, this creates a tombstone to logically delete | ||
| /// the domain. The tombstone preserves the previous configuration value. | ||
| /// If the domain does not exist in the Delta log, this is a no-op. | ||
| /// Note that each domain can only appear once per transaction. That is, multiple operations | ||
| /// on the same domain are disallowed in a single transaction, as well as setting and removing | ||
| /// the same domain in a single transaction. If a duplicate domain is included, the `commit` will | ||
| /// fail (that is, we don't eagerly check domain validity here). | ||
| /// Removing metadata for multiple distinct domains is allowed. | ||
| pub fn with_domain_metadata_removed(mut self, domain: String) -> Self { | ||
| // actual configuration value determined during commit | ||
| self.domain_metadatas | ||
| .push(DomainMetadata::remove(domain, String::new())); | ||
|
Comment on lines
+366
to
+367
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we seem to be conflating the API communicating domain metadata additions/removals with the underlying DomainMetadata itself. Can we simplify and make our lives easier below if we just take a set of domains to remove? Then we (1) don't have the 'fill-in-later' behavior here, and (2) immediately know if we have removals I suppose we might need to still have something that knows our global set of domains since it's a logic error to add/remove in the same transaciton.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm this is interesting. if we have a separate state variable for removals, we can make it something like regarding point 1, the "fill-in" behavior, we can do log replay and store that value in a wdyt about adding a separate state variable as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. resolved offline; will merge pr as-is. (can cut a follow-ups as this is internal state only - no external API) |
||
| self | ||
| } | ||
|
|
||
| /// Generate domain metadata actions with validation. Handle both user and system domains. | ||
| /// | ||
| /// This function may perform an expensive log replay operation if there are any domain removals. | ||
| /// The log replay is required to fetch the previous configuration value for the domain to preserve | ||
| /// in removal tombstones as mandated by the Delta spec. | ||
| fn generate_domain_metadata_actions<'a>( | ||
| &'a self, | ||
| engine: &'a dyn Engine, | ||
|
|
@@ -370,32 +390,58 @@ impl Transaction { | |
| )); | ||
| } | ||
|
|
||
| // validate domain metadata | ||
| let mut domains = HashSet::new(); | ||
| for domain_metadata in &self.domain_metadatas { | ||
| if domain_metadata.is_internal() { | ||
| // validate user domain metadata and check if we have removals | ||
| let mut seen_domains = HashSet::new(); | ||
| let mut has_removals = false; | ||
| for dm in &self.domain_metadatas { | ||
| if dm.is_internal() { | ||
| return Err(Error::Generic( | ||
| "Cannot modify domains that start with 'delta.' as those are system controlled" | ||
| .to_string(), | ||
| )); | ||
| } | ||
| if !domains.insert(domain_metadata.domain()) { | ||
|
|
||
| if !seen_domains.insert(dm.domain()) { | ||
| return Err(Error::Generic(format!( | ||
| "Metadata for domain {} already specified in this transaction", | ||
| domain_metadata.domain() | ||
| dm.domain() | ||
| ))); | ||
| } | ||
|
Comment on lines
+397
to
409
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These feel like checks that should be done in
Generally I prefer to fail early. Feel free to make this a followup issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I generally agree with the fail fast principle, but this case warrants an exception imo. We're treating these methods as builder methods (return wdyt? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I was willing to deal with an awkward interface
But this is a really good point! GJ |
||
|
|
||
| if dm.is_removed() { | ||
| has_removals = true; | ||
| } | ||
MannDP marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // fetch previous configuration values (requires log replay) | ||
| let existing_domains = if has_removals { | ||
| scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)? | ||
| } else { | ||
| HashMap::new() | ||
| }; | ||
OussamaSaoudi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| let user_domains = self | ||
| .domain_metadatas | ||
| .iter() | ||
| .filter_map(move |dm: &DomainMetadata| { | ||
| if dm.is_removed() { | ||
| existing_domains.get(dm.domain()).map(|existing| { | ||
| DomainMetadata::remove( | ||
| dm.domain().to_string(), | ||
| existing.configuration().to_string(), | ||
| ) | ||
| }) | ||
| } else { | ||
| Some(dm.clone()) | ||
| } | ||
| }); | ||
MannDP marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| let system_domains = row_tracking_high_watermark | ||
| .map(DomainMetadata::try_from) | ||
| .transpose()? | ||
| .into_iter(); | ||
|
|
||
| Ok(self | ||
| .domain_metadatas | ||
| .iter() | ||
| .cloned() | ||
| Ok(user_domains | ||
| .chain(system_domains) | ||
| .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine))) | ||
| } | ||
|
|
||
MannDP marked this conversation as resolved.
Show resolved
Hide resolved
|
Uh oh!
There was an error while loading. Please reload this page.