From 2007eafca66251c4fe6acb280f257c94ec42d70f Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 22 Oct 2025 12:52:30 -0700 Subject: [PATCH] uc-committer --- kernel/src/actions/mod.rs | 16 ++--- kernel/src/committer.rs | 18 +++--- kernel/src/table_features/mod.rs | 6 ++ uc-catalog/Cargo.toml | 1 + uc-catalog/src/committer.rs | 108 +++++++++++++++++++++++++++++++ uc-catalog/src/lib.rs | 104 ++++++++++++++++++++++++++++- uc-client/src/client.rs | 8 ++- 7 files changed, 237 insertions(+), 24 deletions(-) create mode 100644 uc-catalog/src/committer.rs diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 577983816..05459eff7 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -509,11 +509,6 @@ impl Protocol { /// Check if writing to a table with this protocol is supported. That is: does the kernel /// support the specified protocol writer version and all enabled writer features? pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> { - #[cfg(feature = "catalog-managed")] - require!( - !self.is_catalog_managed(), - Error::unsupported("Writes are not yet supported for catalog-managed tables") - ); match &self.writer_features { Some(writer_features) if self.min_writer_version == 7 => { // if we're on version 7, make sure we support all the specified features @@ -1401,7 +1396,7 @@ mod tests { .unwrap(); assert_result_error_with_message( protocol.ensure_write_supported(), - r#"Unsupported: Unknown WriterFeatures: "identityColumns". Supported WriterFeatures: "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#, + r#"Unsupported: Unknown WriterFeatures: "identityColumns". Supported WriterFeatures: "appendOnly", "catalogManaged", "catalogOwned-preview", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "vacuumProtocolCheck", "variantType", "variantType-preview", "variantShredding-preview", "v2Checkpoint""#, ); // Unknown writer features should cause an error @@ -1414,7 +1409,7 @@ mod tests { .unwrap(); assert_result_error_with_message( protocol.ensure_write_supported(), - r#"Unsupported: Unknown WriterFeatures: "unsupported writer". Supported WriterFeatures: "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#, + r#"Unsupported: Unknown WriterFeatures: "unsupported writer". Supported WriterFeatures: "appendOnly", "catalogManaged", "catalogOwned-preview", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "vacuumProtocolCheck", "variantType", "variantType-preview", "variantShredding-preview", "v2Checkpoint""#, ); } @@ -1470,8 +1465,9 @@ mod tests { assert_eq!(parse_features::(features), expected); } + #[cfg(feature = "catalog-managed")] #[test] - fn test_no_catalog_managed_writes() { + fn test_catalog_managed_writes() { let protocol = Protocol::try_new( 3, 7, @@ -1479,7 +1475,7 @@ mod tests { Some([WriterFeature::CatalogManaged]), ) .unwrap(); - assert!(protocol.ensure_write_supported().is_err()); + assert!(protocol.ensure_write_supported().is_ok()); let protocol = Protocol::try_new( 3, 7, @@ -1487,7 +1483,7 @@ mod tests { Some([WriterFeature::CatalogOwnedPreview]), ) .unwrap(); - assert!(protocol.ensure_write_supported().is_err()); + assert!(protocol.ensure_write_supported().is_ok()); } #[test] diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs index cd61ca7e6..4b5bdebe5 100644 --- a/kernel/src/committer.rs +++ b/kernel/src/committer.rs @@ -209,7 +209,7 @@ mod tests { #[cfg(feature = "catalog-managed")] #[tokio::test] - async fn catalog_managed_tables_block_transactions() { + async fn disallow_filesystem_committer_for_catalog_managed_tables() { let storage = Arc::new(InMemory::new()); let table_root = Url::parse("memory:///").unwrap(); let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); @@ -226,18 +226,16 @@ mod tests { let snapshot = crate::snapshot::SnapshotBuilder::new_for(table_root) .build(&engine) .unwrap(); - // Try to create a transaction with FileSystemCommitter + // Try to commit a transaction with FileSystemCommitter let committer = Box::new(FileSystemCommitter::new()); - let err = snapshot.transaction(committer).unwrap_err(); + let err = snapshot + .transaction(committer) + .unwrap() + .commit(&engine) + .unwrap_err(); assert!(matches!( err, - crate::Error::Unsupported(e) if e.contains("Writes are not yet supported for catalog-managed tables") + crate::Error::Generic(e) if e.contains("The FileSystemCommitter cannot be used to commit to catalog-managed tables. Please provide a committer for your catalog via Transaction::with_committer().") )); - // after allowing writes, we will check that this disallows default committer for - // catalog-managed tables. - // assert!(matches!( - // err, - // crate::Error::Generic(e) if e.contains("Cannot use the default committer for a catalog-managed table") - // )); } } diff --git a/kernel/src/table_features/mod.rs b/kernel/src/table_features/mod.rs index 189eaf856..9ce7ae61f 100644 --- a/kernel/src/table_features/mod.rs +++ b/kernel/src/table_features/mod.rs @@ -231,15 +231,21 @@ pub(crate) static SUPPORTED_READER_FEATURES: LazyLock> = Lazy pub(crate) static SUPPORTED_WRITER_FEATURES: LazyLock> = LazyLock::new(|| { vec![ WriterFeature::AppendOnly, + #[cfg(feature = "catalog-managed")] + WriterFeature::CatalogManaged, + #[cfg(feature = "catalog-managed")] + WriterFeature::CatalogOwnedPreview, WriterFeature::DeletionVectors, WriterFeature::DomainMetadata, WriterFeature::InCommitTimestamp, WriterFeature::Invariants, WriterFeature::RowTracking, WriterFeature::TimestampWithoutTimezone, + WriterFeature::VacuumProtocolCheck, WriterFeature::VariantType, WriterFeature::VariantTypePreview, WriterFeature::VariantShreddingPreview, + WriterFeature::V2Checkpoint, ] }); diff --git a/uc-catalog/Cargo.toml b/uc-catalog/Cargo.toml index 5ca92362d..81512144e 100644 --- a/uc-catalog/Cargo.toml +++ b/uc-catalog/Cargo.toml @@ -23,6 +23,7 @@ serde_json = "1.0" tokio = { version = "1", features = ["full"] } tracing = "0.1" url = "2" +uuid = { version = "1", features = ["v4"] } [dev-dependencies] delta_kernel = { path = "../kernel", features = ["arrow-56", "default-engine-rustls", "catalog-managed"] } diff --git a/uc-catalog/src/committer.rs b/uc-catalog/src/committer.rs new file mode 100644 index 000000000..bcb65f350 --- /dev/null +++ b/uc-catalog/src/committer.rs @@ -0,0 +1,108 @@ +use std::sync::Arc; + +use delta_kernel::committer::{CommitMetadata, CommitResponse, Committer}; +use delta_kernel::{DeltaResult, Engine, Error as DeltaError, FilteredEngineData}; +use uc_client::models::commits::{Commit, CommitRequest}; +use uc_client::prelude::UCClient; + +use url::Url; + +// A [UCCommitter] is a Unity Catalog [Committer] implementation for committing to a specific +/// delta table in UC. +#[derive(Debug, Clone)] +pub struct UCCommitter { + client: Arc, + table_id: String, + table_uri: String, +} + +impl UCCommitter { + pub fn new( + client: Arc, + table_id: impl Into, + table_uri: impl Into, + ) -> Self { + UCCommitter { + client, + table_id: table_id.into(), + table_uri: table_uri.into(), + } + } +} + +impl Committer for UCCommitter { + fn commit( + &self, + engine: &dyn Engine, + actions: Box> + Send + '_>, + commit_metadata: CommitMetadata, + ) -> DeltaResult { + let uuid = uuid::Uuid::new_v4(); + let filename = format!( + "{version:020}.{uuid}.json", + version = commit_metadata.version() + ); + // FIXME use table path from commit_metadata? + let mut commit_path = Url::parse(&self.table_uri)?; + commit_path.path_segments_mut().unwrap().extend(&[ + "_delta_log", + "_staged_commits", + &filename, + ]); + + engine + .json_handler() + .write_json_file(&commit_path, Box::new(actions), false)?; + + let mut other = Url::parse(&self.table_uri)?; + other.path_segments_mut().unwrap().extend(&[ + "_delta_log", + "_staged_commits", + &format!("{:020}", commit_metadata.version()), + ]); + let committed = engine + .storage_handler() + .list_from(&other)? + .next() + .unwrap() + .unwrap(); + println!("wrote commit file: {:?}", committed); + + // FIXME: ? + let timestamp: i64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + .try_into() + .map_err(|e| DeltaError::Generic(format!("Unable to convert timestamp to i64: {e}")))?; + + // let last_backfilled_version = + // commit_metadata.latest_published_version.map(|v| v.try_into().unwrap()); // FIXME + let last_backfilled_version = None; + let commit_req = CommitRequest::new( + self.table_id.clone(), + self.table_uri.clone(), + Commit::new( + commit_metadata.version().try_into().unwrap(), + timestamp, + filename, + committed.size as i64, + committed.last_modified, + ), + last_backfilled_version, + ); + // FIXME + let handle = tokio::runtime::Handle::current(); + tokio::task::block_in_place(|| { + handle.block_on(async move { + self.client + .commit(commit_req) + .await + .map_err(|e| DeltaError::Generic(format!("UC commit error: {e}"))) + }) + })?; + Ok(CommitResponse::Committed { + version: commit_metadata.version(), + }) + } +} diff --git a/uc-catalog/src/lib.rs b/uc-catalog/src/lib.rs index f41cd9293..8c34e5e0a 100644 --- a/uc-catalog/src/lib.rs +++ b/uc-catalog/src/lib.rs @@ -1,5 +1,8 @@ //! UCCatalog implements a high-level interface for interacting with Delta Tables in Unity Catalog. +mod committer; +pub use committer::UCCommitter; + use std::sync::Arc; use delta_kernel::{Engine, LogPath, Snapshot, Version}; @@ -63,8 +66,10 @@ impl<'a> UCCatalog<'a> { start_version: Some(0), end_version: version.and_then(|v| v.try_into().ok()), }; - // TODO: does it paginate? - let commits = self.client.get_commits(req).await?; + let mut commits = self.client.get_commits(req).await?; + if let Some(commits) = &mut commits.commits { + commits.sort_by_key(|c| c.version); + } // if commits are present, we ensure they are sorted+contiguous if let Some(commits) = &commits.commits { @@ -88,7 +93,11 @@ impl<'a> UCCatalog<'a> { }; // consume uc-client's Commit and hand back a delta_kernel LogPath - let table_url = Url::parse(&table_uri)?; + let mut table_url = Url::parse(&table_uri)?; + // add trailing slash + if !table_url.path().ends_with('/') { + table_url.path_segments_mut().unwrap().push(""); + } let commits: Vec<_> = commits .commits .unwrap_or_default() @@ -120,6 +129,7 @@ mod tests { use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; + use delta_kernel::transaction::CommitResult; use super::*; @@ -192,4 +202,92 @@ mod tests { Ok(()) } + + // ignored test which you can run manually to play around with writing to a UC table. run with: + // `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t write_uc_table --nocapture -- --ignored` + #[ignore] + #[tokio::test(flavor = "multi_thread")] + async fn write_uc_table() -> Result<(), Box> { + let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set"); + let token = env::var("TOKEN").expect("TOKEN environment variable not set"); + let table_name = env::var("TABLENAME").expect("TABLENAME environment variable not set"); + + // build UC client, get table info and credentials + let client = Arc::new(UCClient::builder(endpoint, &token).build()?); + let (table_id, table_uri) = get_table(&client, &table_name).await?; + let creds = client + .get_credentials(&table_id, Operation::ReadWrite) + .await + .map_err(|e| format!("Failed to get credentials: {}", e))?; + + // build catalog + let catalog = UCCatalog::new(&client); + + // TODO: support non-AWS + let creds = creds + .aws_temp_credentials + .ok_or("No AWS temporary credentials found")?; + + let options = [ + ("region", "us-west-2"), + ("access_key_id", &creds.access_key_id), + ("secret_access_key", &creds.secret_access_key), + ("session_token", &creds.session_token), + ]; + + let table_url = Url::parse(&table_uri)?; + let (store, _path) = object_store::parse_url_opts(&table_url, options)?; + let store: Arc = store.into(); + + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let committer = Box::new(UCCommitter::new( + client.clone(), + table_id.clone(), + table_uri.clone(), + )); + let snapshot = catalog + .load_snapshot(&table_id, &table_uri, &engine) + .await?; + println!("latest snapshot version: {:?}", snapshot.version()); + let txn = snapshot.clone().transaction(committer)?; + let _write_context = txn.get_write_context(); + // do a write. + + // print the log + // use futures::stream::StreamExt; + // let mut stream = store.list(Some(&path)); + // while let Some(path) = stream.next().await { + // println!("object: {:?}", path.unwrap()); + // } + + match txn.commit(&engine)? { + CommitResult::CommittedTransaction(t) => { + println!("🎉 committed version {}", t.commit_version()); + // FIXME! ideally should use post-commit snapshot (need to make sure to plumb + // through log tail) + // let snapshot = t.unwrap().post_commit_snapshot(&engine)?; + let _snapshot = catalog + .load_snapshot_at(&table_id, &table_uri, t.commit_version(), &engine) + .await?; + // let published = Arc::into_inner(snapshot).unwrap().publish(&engine)?; + // println!("published snapshot: {published:?}"); + } + CommitResult::ConflictedTransaction(t) => { + println!("💥 commit conflicted at version {}", t.conflict_version()); + } + CommitResult::RetryableTransaction(_) => { + println!("we should retry..."); + } + } + + // some debug + // let commit = store + // .get(&object_store::path::Path::from("19a85dee-54bc-43a2-87ab-023d0ec16013/tables/3553b6b9-91ce-47d7-b2f9-8eb4ba5e93f5/_delta_log/_staged_commits/00000000000000000001.5cabaf8b-037d-4a84-b493-2e3d8ab899df.json")) + // .await + // .unwrap(); + // let bytes = commit.bytes().await?; + // println!("commit file contents:\n{}", String::from_utf8_lossy(&bytes)); + + Ok(()) + } } diff --git a/uc-client/src/client.rs b/uc-client/src/client.rs index 071264882..f720073f4 100644 --- a/uc-client/src/client.rs +++ b/uc-client/src/client.rs @@ -2,6 +2,7 @@ use std::future::Future; use std::time::Duration; use reqwest::{header, Client, Response, StatusCode}; +use serde::Deserialize; use tracing::{instrument, warn}; use url::Url; @@ -80,7 +81,12 @@ impl UCClient { }) .await?; - self.handle_response(response).await + // Note: can't just deserialize to () directly so we make an empty struct to deserialize + // the `{}` into. Externally we still return unit type for ease of use/understanding. + #[derive(Deserialize)] + struct EmptyResponse {} + let _: EmptyResponse = self.handle_response(response).await?; + Ok(()) } /// Resolve the table by name.