Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,15 @@ impl DomainMetadata {
}
}

// Create a new DomainMetadata action to remove a domain.
pub(crate) fn remove(domain: String) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like remove is a somewhat misleading name for a method that is actually a constructor. I don't have a strong opinion here, but maybe something like new_tombstone communicates the function's intent more clearly?

Self {
domain,
configuration: String::new(),
removed: true,
}
}

// returns true if the domain metadata is an system-controlled domain (all domains that start
// with "delta.")
#[allow(unused)]
Expand Down
14 changes: 14 additions & 0 deletions kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,20 @@ impl Transaction {
self
}

/// Remove domain metadata from the Delta log.
/// This creates a tombstone to logically delete the specified domain. We don't check
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: IMO it is good style to have a paragraph break (i.e., empty line) after the one-line summary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I'm not following what you mean. Do you mean the "We don't check" should start on the next line perhaps?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant

/// Remove domain metadata from the Delta log.
///
/// This creates a tombstone to logically delete the specified domain. We don't check

See this clippy lint for reference.

/// if the domain metadata exists in the log, the remove action is simply a tombstone record.
/// Note that each domain can only appear once per transaction. That is, multiple operations
/// on the same domain are disallowed in a single transaction, as well as setting and removing
/// the same domain in a single transaction. If a duplicate domain is included, the `commit` will
/// fail (that is, we don't eagerly check domain validity here).
/// Removing metadata for multiple distinct domains is allowed.
pub fn with_domain_metadata_removed(mut self, domain: String) -> Self {
self.domain_metadatas
.push(DomainMetadata::remove(domain.clone()));
self
}

/// Generate domain metadata actions with validation. Handle both user and system domains.
fn generate_domain_metadata_actions<'a>(
&'a self,
Expand Down
195 changes: 195 additions & 0 deletions kernel/tests/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1382,3 +1382,198 @@ async fn test_set_domain_metadata_unsupported_writer_feature(

Ok(())
}

#[tokio::test]
async fn test_remove_domain_metadata_basic() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();

let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"number",
DataType::INTEGER,
)])?);

let table_name = "test_domain_metadata_unsupported";

let (store, engine, table_location) = engine_store_setup(table_name, None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![],
vec!["domainMetadata"],
)
.await?;

let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot.transaction()?;

let domain = "app.deprecated";

// removing domain metadata that is not present is allowed; the log simply contains a tombstone action
txn.with_domain_metadata_removed(domain.to_string())
.commit(&engine)?;

let commit_data = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000001.json"
)))
.await?
.bytes()
.await?;
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
.into_iter()
.try_collect()?;

let domain_action = actions
.iter()
.find(|v| v.get("domainMetadata").is_some())
.unwrap();
assert_eq!(domain_action["domainMetadata"]["domain"], "app.deprecated");
assert_eq!(domain_action["domainMetadata"]["configuration"], "");
assert_eq!(domain_action["domainMetadata"]["removed"], true);

let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let config = final_snapshot.get_domain_metadata(domain, &engine)?;
assert_eq!(config, None);

Ok(())
}

#[tokio::test]
async fn test_domain_metadata_set_remove_conflicts() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();

let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"number",
DataType::INTEGER,
)])?);

let table_name = "test_domain_metadata_unsupported";

let (store, engine, table_location) = engine_store_setup(table_name, None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![],
vec!["domainMetadata"],
)
.await?;

let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;

// set then remove same domain
let txn = snapshot.clone().transaction()?;
let err = txn
.with_domain_metadata("app.config".to_string(), "v1".to_string())
.with_domain_metadata_removed("app.config".to_string())
.commit(&engine)
.unwrap_err();
assert!(err
.to_string()
.contains("already specified in this transaction"));

// remove then set same domain
let txn2 = snapshot.clone().transaction()?;
let err = txn2
.with_domain_metadata_removed("test.domain".to_string())
.with_domain_metadata("test.domain".to_string(), "v1".to_string())
.commit(&engine)
.unwrap_err();
assert!(err
.to_string()
.contains("already specified in this transaction"));

// remove same domain twice
let txn3 = snapshot.clone().transaction()?;
let err = txn3
.with_domain_metadata_removed("another.domain".to_string())
.with_domain_metadata_removed("another.domain".to_string())
.commit(&engine)
.unwrap_err();
assert!(err
.to_string()
.contains("already specified in this transaction"));

// remove system domain
let txn4 = snapshot.clone().transaction()?;
let err = txn4
.with_domain_metadata_removed("delta.system".to_string())
.commit(&engine)
.unwrap_err();
assert!(err
.to_string()
.contains("Cannot modify domains that start with 'delta.' as those are system controlled"));

Ok(())
}

#[tokio::test]
async fn test_domain_metadata_set_then_remove_across_transactions(
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();

let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"number",
DataType::INTEGER,
)])?);

let table_name = "test_domain_metadata_unsupported";

let (store, engine, table_location) = engine_store_setup(table_name, None);
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![],
vec!["domainMetadata"],
)
.await?;

let domain = "app.config";

// txn 1: Set domain metadata
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot.transaction()?;
txn.with_domain_metadata(domain.to_string(), r#"{"version": 1}"#.to_string())
.commit(&engine)?;

// txn 2: Remove the same domain metadata
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let txn = snapshot.transaction()?;
txn.with_domain_metadata_removed(domain.to_string())
.commit(&engine)?;

// verify removal commit
let commit_data = store
.get(&Path::from(format!(
"/{table_name}/_delta_log/00000000000000000002.json"
)))
.await?
.bytes()
.await?;
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
.into_iter()
.try_collect()?;

let domain_action = actions
.iter()
.find(|v| v.get("domainMetadata").is_some())
.unwrap();
assert_eq!(domain_action["domainMetadata"]["domain"], domain);
assert_eq!(domain_action["domainMetadata"]["configuration"], "");
assert_eq!(domain_action["domainMetadata"]["removed"], true);

// verify reads see the metadata removal
let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let domain_config = final_snapshot.get_domain_metadata(domain, &engine)?;
assert_eq!(domain_config, None);

Ok(())
}
Loading