From b1310103d7d43c6789349041d4b3319ccd1f7cb7 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 18 Sep 2025 14:58:06 -0700 Subject: [PATCH 1/8] uc-catalog --- Cargo.toml | 3 +- kernel/src/log_path.rs | 17 ++-- uc-catalog/Cargo.toml | 28 ++++++ uc-catalog/src/lib.rs | 188 +++++++++++++++++++++++++++++++++++++++++ uc-client/src/lib.rs | 1 - 5 files changed, 224 insertions(+), 13 deletions(-) create mode 100644 uc-catalog/Cargo.toml create mode 100644 uc-catalog/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index b8a9c76d5..176d3c86f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,8 @@ members = [ "kernel/examples/*", "test-utils", "feature-tests", - "uc-client", # WIP: this is an experimental UC client for catalog-managed table work + "uc-client", # WIP: this is an experimental UC client for catalog-managed table work + "uc-catalog", # WIP: this is an experimental UC catalog implementation ] # note that in addition to the members above, the workspace includes examples: # - inspect-table diff --git a/kernel/src/log_path.rs b/kernel/src/log_path.rs index 2584801bc..42f8c1dd3 100644 --- a/kernel/src/log_path.rs +++ b/kernel/src/log_path.rs @@ -44,18 +44,13 @@ impl LogPath { size: FileSize, ) -> DeltaResult { // TODO: we should introduce TablePath/LogPath types which enforce checks like ending '/' - - // require table_root ends with '/' - require!( - table_root.path().ends_with('/'), - Error::generic("table root must be a directory-like URL ending with '/'") - ); - let location = table_root - .join("_delta_log/")? - .join("_staged_commits/")? - .join(filename)?; + let mut commit_path = table_root.clone(); + commit_path + .path_segments_mut() + .map_err(|()| Error::invalid_table_location(table_root))? + .extend(&["_delta_log", "_staged_commits", filename]); let file_meta = FileMeta { - location, + location: commit_path, last_modified, size, }; diff --git a/uc-catalog/Cargo.toml b/uc-catalog/Cargo.toml new file mode 100644 index 000000000..5ca92362d --- /dev/null +++ b/uc-catalog/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "uc-catalog" +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +rust-version.workspace = true +version.workspace = true + +# for cargo-release +[package.metadata.release] +release = false + +[dependencies] +delta_kernel = { path = "../kernel", features = ["catalog-managed"] } +uc-client = { path = "../uc-client" } +itertools = "0.14" +object_store = "0.12.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1", features = ["full"] } +tracing = "0.1" +url = "2" + +[dev-dependencies] +delta_kernel = { path = "../kernel", features = ["arrow-56", "default-engine-rustls", "catalog-managed"] } diff --git a/uc-catalog/src/lib.rs b/uc-catalog/src/lib.rs new file mode 100644 index 000000000..915fc2257 --- /dev/null +++ b/uc-catalog/src/lib.rs @@ -0,0 +1,188 @@ +//! UCCatalog implements a high-level interface for interacting with Delta Tables in Unity Catalog. + +use std::sync::Arc; + +use delta_kernel::{Engine, LogPath, Snapshot, Version}; + +use uc_client::prelude::*; + +use itertools::Itertools; +use tracing::info; +use url::Url; + +/// The [UCCatalog] provides a high-level interface to interact with Delta Tables stored in Unity +/// Catalog. +pub struct UCCatalog<'a> { + client: &'a UCClient, +} + +impl<'a> UCCatalog<'a> { + pub fn new(client: &'a UCClient) -> Self { + UCCatalog { client } + } + + pub async fn load_snapshot( + &self, + table_id: &str, + table_uri: &str, + engine: &dyn Engine, + ) -> Result, Box> { + self.load_snapshot_inner(table_id, table_uri, None, engine) + .await + } + + pub async fn load_snapshot_at( + &self, + table_id: &str, + table_uri: &str, + version: Version, + engine: &dyn Engine, + ) -> Result, Box> { + self.load_snapshot_inner(table_id, table_uri, Some(version), engine) + .await + } + + pub(crate) async fn load_snapshot_inner( + &self, + table_id: &str, + table_uri: &str, + version: Option, + engine: &dyn Engine, + ) -> Result, Box> { + let table_uri = table_uri.to_string(); + let req = CommitsRequest { + table_id: table_id.to_string(), + table_uri: table_uri.clone(), + start_version: Some(0), + end_version: version.and_then(|v| v.try_into().ok()), + }; + // TODO: does it paginate? + let commits = self.client.get_commits(req).await?; + + // if commits are present, we ensure they are sorted+contiguous + if let Some(commits) = &commits.commits { + if !commits.windows(2).all(|w| w[1].version == w[0].version + 1) { + return Err("Received non-contiguous commit versions".into()); + } + } + + // we always get back the latest version from commits response, and pass that in to + // kernel's Snapshot builder. basically, load_table for the latest version always looks + // like a time travel query since we know the latest version ahead of time. + // + // note there is a weird edge case: if the table was just created it will return + // latest_table_version = -1, but the 0.json will exist in the _delta_log. + let version: Version = match version { + Some(v) => v, + None => match commits.latest_table_version { + -1 => 0, + i => i.try_into()?, + }, + }; + + // consume uc-client's Commit and hand back a delta_kernel LogPath + let table_url = Url::parse(&table_uri)?; + let commits: Vec<_> = commits + .commits + .unwrap_or_default() + .into_iter() + .map(|c| -> Result> { + LogPath::staged_commit( + table_url.clone(), + &c.file_name, + c.file_modification_timestamp, + c.file_size.try_into()?, + ) + .map_err(|e| e.into()) + }) + .try_collect()?; + + info!("commits for kernel: {:?}\n", commits); + + Snapshot::builder_for(Url::parse(&(table_uri + "/"))?) + .at_version(version) + .with_log_tail(commits) + .build(engine) + .map_err(|e| e.into()) + } +} + +#[cfg(test)] +mod tests { + use std::env; + + use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; + use delta_kernel::engine::default::DefaultEngine; + + use super::*; + + // We could just re-export UCClient's get_table to not require consumers to directly import + // uc_client themselves. + async fn get_table( + client: &UCClient, + table_name: &str, + ) -> Result<(String, String), Box> { + let res = client.get_table(table_name).await?; + let table_id = res.table_id; + let table_uri = res.storage_location; + + info!( + "[GET TABLE] got table_id: {}, table_uri: {}\n", + table_id, table_uri + ); + + Ok((table_id, table_uri)) + } + + // ignored test which you can run manually to play around with reading a UC table. run with: + // `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t read_uc_table --nocapture -- --ignored` + #[ignore] + #[tokio::test] + async fn read_uc_table() -> Result<(), Box> { + let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set"); + let token = env::var("TOKEN").expect("TOKEN environment variable not set"); + let table_name = env::var("TABLENAME").expect("TABLENAME environment variable not set"); + + // build UC client, get table info and credentials + let client = UCClient::builder(endpoint, &token).build()?; + let (table_id, table_uri) = get_table(&client, &table_name).await?; + let creds = client + .get_credentials(&table_id, Operation::Read) + .await + .map_err(|e| format!("Failed to get credentials: {}", e))?; + + // build catalog + let catalog = UCCatalog::new(&client); + + // TODO: support non-AWS + let creds = creds + .aws_temp_credentials + .ok_or("No AWS temporary credentials found")?; + + let options = [ + ("region", "us-west-2"), + ("access_key_id", &creds.access_key_id), + ("secret_access_key", &creds.secret_access_key), + ("session_token", &creds.session_token), + ]; + + let table_url = Url::parse(&table_uri)?; + let (store, path) = object_store::parse_url_opts(&table_url, options)?; + let store: Arc<_> = store.into(); + + info!("created object store: {:?}\npath: {:?}\n", store, path); + + let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + + // read table + let snapshot = catalog + .load_snapshot(&table_id, &table_uri, &engine) + .await?; + // or time travel + // let snapshot = catalog.load_snapshot_at(&table, 2).await?; + + println!("🎉 loaded snapshot: {snapshot:?}"); + + Ok(()) + } +} diff --git a/uc-client/src/lib.rs b/uc-client/src/lib.rs index 46c90f1ad..c24b9c4a7 100644 --- a/uc-client/src/lib.rs +++ b/uc-client/src/lib.rs @@ -34,7 +34,6 @@ pub use error::{Error, Result}; #[doc(hidden)] pub mod prelude { pub use crate::client::UCClient; - pub use crate::error::Result; pub use crate::models::{ commits::{Commit, CommitsRequest, CommitsResponse}, credentials::{Operation, TemporaryTableCredentials}, From 702ab431e12338704aef1a2a78faa7165f0f2d2f Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 23 Sep 2025 16:43:40 -0700 Subject: [PATCH 2/8] commit type changes --- ffi/src/transaction/mod.rs | 14 ++-- kernel/examples/write-table/src/main.rs | 6 +- kernel/src/transaction/mod.rs | 93 ++++++++++++++++++------- kernel/tests/row_tracking.rs | 29 +++++--- kernel/tests/write.rs | 11 ++- 5 files changed, 105 insertions(+), 48 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 8bb3681b0..95ed5e38c 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -112,13 +112,13 @@ pub unsafe extern "C" fn commit( // TODO: for now this removes the enum, which prevents doing any conflict resolution. We should fix // this by making the commit function return the enum somehow. match txn.commit(engine.as_ref()) { - Ok(CommitResult::Committed { - version: v, - post_commit_stats: _, - }) => Ok(v), - Ok(CommitResult::Conflict(_, v)) => Err(delta_kernel::Error::Generic(format!( - "commit conflict at version {v}" - ))), + Ok(CommitResult::CommittedTransaction(committed)) => Ok(committed.version()), + Ok(CommitResult::ConflictedTransaction(conflicted)) => { + Err(delta_kernel::Error::Generic(format!( + "commit conflict at version {}", + conflicted.conflict_version + ))) + } Err(e) => Err(e), } .into_extern_result(&extern_engine) diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 138724ff4..f4d324c39 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -101,7 +101,8 @@ async fn try_main() -> DeltaResult<()> { // Commit the transaction match txn.commit(&engine)? { - CommitResult::Committed { version, .. } => { + CommitResult::CommittedTransaction(committed) => { + let version = committed.version(); println!("✓ Committed transaction at version {version}"); println!("✓ Successfully wrote {} rows to the table", cli.num_rows); @@ -111,7 +112,8 @@ async fn try_main() -> DeltaResult<()> { Ok(()) } - CommitResult::Conflict(_, conflicting_version) => { + CommitResult::ConflictedTransaction(conflicted) => { + let conflicting_version = conflicted.conflict_version; println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); Err(Error::generic("Commit failed")) } diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 88d34b658..c462fd542 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -18,7 +18,7 @@ use crate::snapshot::SnapshotRef; use crate::utils::current_time_ms; use crate::{ DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData, - RowVisitor, Version, + RowVisitor, Snapshot, Version, }; /// Type alias for an iterator of [`EngineData`] results. @@ -221,22 +221,12 @@ impl Transaction { let json_handler = engine.json_handler(); match json_handler.write_json_file(&commit_path.location, Box::new(actions), false) { - Ok(()) => Ok(CommitResult::Committed { - version: commit_version, - post_commit_stats: PostCommitStats { - commits_since_checkpoint: self - .read_snapshot - .log_segment() - .commits_since_checkpoint() - + 1, - commits_since_log_compaction: self - .read_snapshot - .log_segment() - .commits_since_log_compaction_or_checkpoint() - + 1, - }, - }), - Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)), + Ok(()) => Ok(CommitResult::CommittedTransaction( + self.into_committed(commit_version), + )), + Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::ConflictedTransaction( + self.into_conflicted(commit_version), + )), Err(e) => Err(e), } } @@ -479,6 +469,31 @@ impl Transaction { Ok((Box::new(add_actions), None)) } } + + fn into_committed(self, version: Version) -> CommittedTransaction { + let stats = PostCommitStats { + commits_since_checkpoint: self.read_snapshot.log_segment().commits_since_checkpoint() + + 1, + commits_since_log_compaction: self + .read_snapshot + .log_segment() + .commits_since_log_compaction_or_checkpoint() + + 1, + }; + + CommittedTransaction { + transaction: self, + commit_version: version, + post_commit_stats: stats, + } + } + + fn into_conflicted(self, conflict_version: Version) -> ConflictedTransaction { + ConflictedTransaction { + transaction: self, + conflict_version, + } + } } /// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to @@ -530,18 +545,48 @@ pub struct PostCommitStats { #[derive(Debug)] pub enum CommitResult { /// The transaction was successfully committed. - Committed { - /// the version of the table that was just committed - version: Version, - /// The [`PostCommitStats`] for this transaction - post_commit_stats: PostCommitStats, - }, + CommittedTransaction(CommittedTransaction), /// This transaction conflicted with an existing version (at the version given). The transaction /// is returned so the caller can resolve the conflict (along with the version which /// conflicted). // TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to // update the transaction to a new version etc. - Conflict(Transaction, Version), + ConflictedTransaction(ConflictedTransaction), +} + +#[derive(Debug)] +pub struct CommittedTransaction { + transaction: Transaction, + /// the version of the table that was just committed + commit_version: Version, + /// The [`PostCommitStats`] for this transaction + post_commit_stats: PostCommitStats, +} + +impl CommittedTransaction { + /// The version of the table that was just committed + pub fn version(&self) -> Version { + self.commit_version + } + + /// The [`PostCommitStats`] for this transaction + pub fn post_commit_stats(&self) -> &PostCommitStats { + &self.post_commit_stats + } + + /// Compute a new snapshot for the table at the commit version. Note this is generally more + /// efficient than creating a new snapshot from scratch. + pub fn post_commit_snapshot(&self, engine: &dyn Engine) -> DeltaResult { + Snapshot::builder_from(self.transaction.read_snapshot.clone()) + .at_version(self.commit_version) + .build(engine) + } +} + +#[derive(Debug)] +pub struct ConflictedTransaction { + pub transaction: Transaction, + pub conflict_version: Version, } #[cfg(test)] diff --git a/kernel/tests/row_tracking.rs b/kernel/tests/row_tracking.rs index 10b75d260..0621f4990 100644 --- a/kernel/tests/row_tracking.rs +++ b/kernel/tests/row_tracking.rs @@ -660,22 +660,35 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { // Commit the first transaction - this should succeed let result1 = txn1.commit(engine1.as_ref())?; match result1 { - CommitResult::Committed { version, .. } => { - assert_eq!(version, 1, "First transaction should commit at version 1"); + CommitResult::CommittedTransaction(committed) => { + assert_eq!( + committed.version(), + 1, + "First transaction should commit at version 1" + ); } - CommitResult::Conflict(_, version) => { - panic!("First transaction should not conflict, got conflict at version {version}"); + CommitResult::ConflictedTransaction(conflicted) => { + panic!( + "First transaction should not conflict, got conflict at version {}", + conflicted.conflict_version + ); } } // Commit the second transaction - this should result in a conflict let result2 = txn2.commit(engine2.as_ref())?; match result2 { - CommitResult::Committed { version, .. } => { - panic!("Second transaction should conflict, but got committed at version {version}"); + CommitResult::CommittedTransaction(committed) => { + panic!( + "Second transaction should conflict, but got committed at version {}", + committed.version() + ); } - CommitResult::Conflict(_conflicted_txn, version) => { - assert_eq!(version, 1, "Conflict should be at version 1"); + CommitResult::ConflictedTransaction(conflicted) => { + assert_eq!( + conflicted.conflict_version, 1, + "Conflict should be at version 1" + ); // TODO: In the future, we need to resolve conflicts and retry the commit // For now, we just verify that we got the conflict as expected diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index a46dc5f1b..096728703 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -183,17 +183,14 @@ async fn write_data_and_check_result_and_stats( // commit! match txn.commit(engine.as_ref())? { - CommitResult::Committed { - version, - post_commit_stats, - } => { - assert_eq!(version, expected_since_commit as Version); + CommitResult::CommittedTransaction(committed) => { + assert_eq!(committed.version(), expected_since_commit as Version); assert_eq!( - post_commit_stats.commits_since_checkpoint, + committed.post_commit_stats().commits_since_checkpoint, expected_since_commit ); assert_eq!( - post_commit_stats.commits_since_log_compaction, + committed.post_commit_stats().commits_since_log_compaction, expected_since_commit ); } From 6b7823638ec14aaf128f7e198465e65d356ef3b6 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 29 Sep 2025 13:38:08 -0700 Subject: [PATCH 3/8] clean it up --- ffi/src/transaction/mod.rs | 12 +++--- kernel/examples/write-table/src/main.rs | 49 ++++++++++++++--------- kernel/src/transaction/mod.rs | 43 ++++++++++++++++++-- kernel/tests/row_tracking.rs | 52 +++++++++++++++++++------ kernel/tests/write.rs | 21 ++++++---- 5 files changed, 129 insertions(+), 48 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 95ed5e38c..24f6dd3f6 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -113,12 +113,12 @@ pub unsafe extern "C" fn commit( // this by making the commit function return the enum somehow. match txn.commit(engine.as_ref()) { Ok(CommitResult::CommittedTransaction(committed)) => Ok(committed.version()), - Ok(CommitResult::ConflictedTransaction(conflicted)) => { - Err(delta_kernel::Error::Generic(format!( - "commit conflict at version {}", - conflicted.conflict_version - ))) - } + Ok(CommitResult::RetryableTransaction(_)) => Err(delta_kernel::Error::unsupported( + "commit failed: retryable transaction not supported in FFI (yet)", + )), + Ok(CommitResult::ConflictedTransaction(conflicted)) => Err(delta_kernel::Error::Generic( + format!("commit conflict at version {}", conflicted.conflict_version), + )), Err(e) => Err(e), } .into_extern_result(&extern_engine) diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index f4d324c39..a669a2c76 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -18,7 +18,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; -use delta_kernel::transaction::CommitResult; +use delta_kernel::transaction::{CommitResult, RetryableTransaction}; use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef}; /// An example program that writes to a Delta table and creates it if necessary. @@ -99,25 +99,38 @@ async fn try_main() -> DeltaResult<()> { // Add the file metadata to the transaction txn.add_files(file_metadata); - // Commit the transaction - match txn.commit(&engine)? { - CommitResult::CommittedTransaction(committed) => { - let version = committed.version(); - println!("✓ Committed transaction at version {version}"); - println!("✓ Successfully wrote {} rows to the table", cli.num_rows); + // Commit the transaction (in a simple retry loop) + let mut retries = 0; + let committed = loop { + if retries > 5 { + return Err(Error::generic( + "Exceeded maximum 5 retries for committing transaction", + )); + } + txn = match txn.commit(&engine)? { + CommitResult::CommittedTransaction(committed) => break committed, + CommitResult::ConflictedTransaction(conflicted) => { + let conflicting_version = conflicted.conflict_version; + println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); + return Err(Error::generic("Commit failed")); + } + CommitResult::RetryableTransaction(RetryableTransaction { transaction, error }) => { + println!("✗ Failed to commit, retrying... retryable error: {error}"); + transaction + } + }; + retries += 1; + }; - // Read and display the data - read_and_display_data(&url, engine).await?; - println!("✓ Successfully read data from the table"); + let version = committed.version(); + println!("✓ Committed transaction at version {version}"); + println!("✓ Successfully wrote {} rows to the table", cli.num_rows); - Ok(()) - } - CommitResult::ConflictedTransaction(conflicted) => { - let conflicting_version = conflicted.conflict_version; - println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); - Err(Error::generic("Commit failed")) - } - } + // Read and display the data + read_and_display_data(&url, engine).await?; + println!("✓ Successfully read data from the table"); + + Ok(()) } /// Creates a new Delta table or gets an existing one. diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index c462fd542..faec1b55d 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -168,8 +168,11 @@ impl Transaction { }) } - /// Consume the transaction and commit it to the table. The result is a [CommitResult] which - /// will include the failed transaction in case of a conflict so the user can retry. + /// Consume the transaction and commit it to the table. The result is a result of + /// [CommitResult] with the following semantics: + /// - Ok(CommitResult) for either success or a recoverable error (includes the failed + /// transaction in case of a conflict so the user can retry, etc.) + /// - Err(Error) indicates a non-retryable error (e.g. logic/validation error). pub fn commit(self, engine: &dyn Engine) -> DeltaResult { // Step 1: Check for duplicate app_ids and generate set transactions (`txn`) // Note: The commit info must always be the first action in the commit but we generate it in @@ -227,7 +230,8 @@ impl Transaction { Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::ConflictedTransaction( self.into_conflicted(commit_version), )), - Err(e) => Err(e), + // TODO: we may want to be more selective about what is retryable + Err(e) => Ok(CommitResult::RetryableTransaction(self.into_retryable(e))), } } @@ -494,6 +498,13 @@ impl Transaction { conflict_version, } } + + fn into_retryable(self, error: Error) -> RetryableTransaction { + RetryableTransaction { + transaction: self, + error, + } + } } /// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to @@ -541,8 +552,17 @@ pub struct PostCommitStats { pub commits_since_log_compaction: u64, } -/// Result of committing a transaction. +/// The result of attempting to commit this transaction. +/// +/// The commit result can be one of the following: +/// - CommittedTransaction: the transaction was successfully committed. post-commit stats and +/// post-commit snapshot can be obtained from the committed transaction. +/// - ConflictedTransaction: the transaction conflicted with an existing version. This transcation +/// must be rebased before retrying. +/// - RetryableTransaction: an IO (retryable) error occurred during the commit. This transaction +/// can be retried without rebasing. #[derive(Debug)] +#[must_use] pub enum CommitResult { /// The transaction was successfully committed. CommittedTransaction(CommittedTransaction), @@ -552,6 +572,15 @@ pub enum CommitResult { // TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to // update the transaction to a new version etc. ConflictedTransaction(ConflictedTransaction), + /// An IO (retryable) error occurred during the commit. + RetryableTransaction(RetryableTransaction), +} + +impl CommitResult { + /// Returns true if the commit was successful. + pub fn is_committed(&self) -> bool { + matches!(self, CommitResult::CommittedTransaction(_)) + } } #[derive(Debug)] @@ -589,6 +618,12 @@ pub struct ConflictedTransaction { pub conflict_version: Version, } +#[derive(Debug)] +pub struct RetryableTransaction { + pub transaction: Transaction, + pub error: Error, +} + #[cfg(test)] mod tests { use super::*; diff --git a/kernel/tests/row_tracking.rs b/kernel/tests/row_tracking.rs index 0621f4990..a75e36b96 100644 --- a/kernel/tests/row_tracking.rs +++ b/kernel/tests/row_tracking.rs @@ -217,7 +217,9 @@ async fn test_row_tracking_append() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -264,7 +266,9 @@ async fn test_row_tracking_single_record_batches() -> DeltaResult<()> { vec![int32_array(vec![3])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -295,7 +299,9 @@ async fn test_row_tracking_large_batch() -> DeltaResult<()> { // Write a large batch with 1000 records let large_batch: Vec = (1..=1000).collect(); let data = generate_data(schema.clone(), [vec![int32_array(large_batch.clone())]])?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -342,7 +348,9 @@ async fn test_row_tracking_consecutive_transactions() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data_1).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_1) + .await? + .is_committed()); // Verify first commit verify_row_tracking_in_commit( @@ -357,7 +365,9 @@ async fn test_row_tracking_consecutive_transactions() -> DeltaResult<()> { // Second transaction: write one batch with 2 records // This should read the existing row tracking domain metadata and assign base row IDs starting from 6 let data_2 = generate_data(schema.clone(), [vec![int32_array(vec![7, 8])]])?; - write_data_to_table(&table_url, engine.clone(), data_2).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_2) + .await? + .is_committed()); // Verify second commit verify_row_tracking_in_commit( @@ -410,7 +420,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { ], ], )?; - write_data_to_table(&table_url, engine.clone(), data_1).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_1) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -429,7 +441,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { string_array(vec!["g".to_string(), "h".to_string()]), ]], )?; - write_data_to_table(&table_url, engine.clone(), data_2).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_2) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -454,7 +468,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { ], ], )?; - write_data_to_table(&table_url, engine.clone(), data_3).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_3) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -490,7 +506,9 @@ async fn test_row_tracking_with_regular_and_empty_adds() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -536,7 +554,9 @@ async fn test_row_tracking_with_empty_adds() -> DeltaResult<()> { vec![int32_array(Vec::::new())], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly // NB: The expected high water mark is a bit unintuitive here, as we are appending empty batches. @@ -579,7 +599,7 @@ async fn test_row_tracking_without_adds() -> DeltaResult<()> { let txn = snapshot.transaction()?; // Commit without adding any add files - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Fetch and parse the commit let commit_url = table_url.join(&format!("_delta_log/{:020}.json", 1))?; @@ -673,6 +693,9 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { conflicted.conflict_version ); } + CommitResult::RetryableTransaction(_) => { + panic!("First transaction should not be retryable error"); + } } // Commit the second transaction - this should result in a conflict @@ -693,6 +716,9 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { // TODO: In the future, we need to resolve conflicts and retry the commit // For now, we just verify that we got the conflict as expected } + CommitResult::RetryableTransaction(_) => { + panic!("Second transaction should not be retryable error"); + } } // Verify that the winning transaction is in the log and that it has the correct metadata @@ -758,7 +784,9 @@ async fn test_no_row_tracking_fields_without_feature() -> DeltaResult<()> { )?; // Write data to the table - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify that the commit does NOT contain row tracking fields let commit_url = table_url.join(&format!("_delta_log/{:020}.json", 1))?; diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 096728703..208372736 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -67,7 +67,7 @@ async fn test_commit_info() -> Result<(), Box> { let txn = snapshot.transaction()?.with_engine_info("default engine"); // commit! - txn.commit(&engine)?; + let _ = txn.commit(&engine)?; let commit1 = store .get(&Path::from(format!( @@ -216,7 +216,7 @@ async fn test_commit_info_action() -> Result<(), Box> { let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); - txn.commit(&engine)?; + let _ = txn.commit(&engine)?; let commit = store .get(&Path::from(format!( @@ -360,7 +360,7 @@ async fn test_no_add_actions() -> Result<(), Box> { let txn = snapshot.transaction()?.with_engine_info("default engine"); // Commit without adding any add files - txn.commit(&engine)?; + assert!(txn.commit(&engine)?.is_committed()); let commit1 = store .get(&Path::from(format!( @@ -474,7 +474,7 @@ async fn test_append_partitioned() -> Result<(), Box> { } // commit! - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); let commit1 = store .get(&Path::from(format!( @@ -652,7 +652,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { .with_transaction_id("app_id2".to_string(), 2); // commit! - txn.commit(&engine)?; + assert!(txn.commit(&engine)?.is_committed()); let snapshot = Snapshot::builder_for(table_url.clone()) .at_version(1) @@ -813,7 +813,7 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { txn.add_files(add_files_metadata); // Commit the transaction - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Verify the commit was written correctly let commit1 = store @@ -1020,7 +1020,7 @@ async fn test_append_variant() -> Result<(), Box> { txn.add_files(add_files_metadata); // Commit the transaction - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Verify the commit was written correctly let commit1_url = tmp_test_dir_url @@ -1192,7 +1192,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box Result<(), Box Date: Tue, 23 Sep 2025 16:43:40 -0700 Subject: [PATCH 4/8] committer --- kernel/src/committer.rs | 69 ++++++++++++++++++++++++++++++++++ kernel/src/lib.rs | 5 +++ kernel/src/snapshot/builder.rs | 2 +- kernel/src/transaction/mod.rs | 54 +++++++++++++++++++------- 4 files changed, 115 insertions(+), 15 deletions(-) create mode 100644 kernel/src/committer.rs diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs new file mode 100644 index 000000000..1371f9797 --- /dev/null +++ b/kernel/src/committer.rs @@ -0,0 +1,69 @@ +use std::sync::Arc; + +use crate::path::ParsedLogPath; +use crate::{DeltaResult, Engine, EngineDataResultIterator, Error, Version}; + +use url::Url; + +#[derive(Debug)] +pub struct CommitMetadata { + pub(crate) commit_path: ParsedLogPath, + pub(crate) version: Version, +} + +impl CommitMetadata { + pub(crate) fn new(commit_path: ParsedLogPath, version: Version) -> Self { + Self { + commit_path, + version, + } + } +} + +#[derive(Debug)] +/// Result of committing a transaction. +pub enum CommitResponse { + Committed { version: Version }, + Conflict { version: Version }, +} + +pub trait Committer: Send + Sync { + fn commit( + &self, + engine: &dyn Engine, + actions: EngineDataResultIterator<'_>, + commit_metadata: CommitMetadata, + ) -> DeltaResult; +} + +pub(crate) struct FileSystemCommitter; + +impl FileSystemCommitter { + pub(crate) fn new() -> Arc { + Arc::new(Self {}) + } +} + +impl Committer for FileSystemCommitter { + fn commit( + &self, + engine: &dyn Engine, + actions: EngineDataResultIterator<'_>, + commit_metadata: CommitMetadata, + ) -> DeltaResult { + let json_handler = engine.json_handler(); + match json_handler.write_json_file( + &commit_metadata.commit_path.location, + Box::new(actions), + false, + ) { + Ok(()) => Ok(CommitResponse::Committed { + version: commit_metadata.version, + }), + Err(Error::FileAlreadyExists(_)) => Ok(CommitResponse::Conflict { + version: commit_metadata.version, + }), + Err(e) => Err(e), + } + } +} diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 228c3ed61..63eab0021 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -89,6 +89,7 @@ use self::schema::{DataType, SchemaRef}; mod action_reconciliation; pub mod actions; pub mod checkpoint; +mod committer; pub mod engine_data; pub mod error; pub mod expressions; @@ -183,6 +184,10 @@ pub type FileDataReadResult = (FileMeta, Box); pub type FileDataReadResultIterator = Box>> + Send>; +/// Type alias for an iterator of [`EngineData`] results. +pub(crate) type EngineDataResultIterator<'a> = + Box>> + Send + 'a>; + /// The metadata that describes an object. #[derive(Debug, Clone, PartialEq, Eq)] pub struct FileMeta { diff --git a/kernel/src/snapshot/builder.rs b/kernel/src/snapshot/builder.rs index 15cf59c62..edc91dd44 100644 --- a/kernel/src/snapshot/builder.rs +++ b/kernel/src/snapshot/builder.rs @@ -1,7 +1,7 @@ //! Builder for creating [`Snapshot`] instances. use crate::log_segment::LogSegment; use crate::snapshot::SnapshotRef; -use crate::LogPath; +use crate::log_path::LogPath; use crate::{DeltaResult, Engine, Error, Snapshot, Version}; use url::Url; diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index faec1b55d..1fed00604 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -9,22 +9,19 @@ use crate::actions::{ as_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, }; +use crate::committer::{CommitMetadata, CommitResponse, Committer, FileSystemCommitter}; use crate::error::Error; use crate::expressions::{ArrayData, Transform, UnaryExpressionOp::ToJson}; use crate::path::ParsedLogPath; use crate::row_tracking::{RowTrackingDomainMetadata, RowTrackingVisitor}; use crate::schema::{ArrayType, MapType, SchemaRef, StructField, StructType}; use crate::snapshot::SnapshotRef; -use crate::utils::current_time_ms; +use crate::utils::{current_time_ms, require}; use crate::{ - DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData, - RowVisitor, Snapshot, Version, + DataType, DeltaResult, Engine, EngineData, EngineDataResultIterator, Expression, ExpressionRef, + IntoEngineData, RowVisitor, Snapshot, Version, }; -/// Type alias for an iterator of [`EngineData`] results. -type EngineDataResultIterator<'a> = - Box>> + Send + 'a>; - /// The minimal (i.e., mandatory) fields in an add action. pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock = LazyLock::new(|| { Arc::new(StructType::new_unchecked(vec![ @@ -115,6 +112,7 @@ fn with_row_tracking_cols(schema: &SchemaRef) -> SchemaRef { /// ``` pub struct Transaction { read_snapshot: SnapshotRef, + committer: Option>, operation: Option, engine_info: Option, add_files_metadata: Vec>, @@ -159,6 +157,7 @@ impl Transaction { Ok(Transaction { read_snapshot, + committer: None, operation: None, engine_info: None, add_files_metadata: vec![], @@ -168,12 +167,39 @@ impl Transaction { }) } + /// Set the committer that will be used to commit this transaction. If not set, the default + /// filesystem-based committer will be used. Note that the default committer is only allowed + /// for non-catalog-managed tables. + #[cfg(feature = "catalog-managed")] + pub fn with_committer(mut self, committer: impl Into>) -> Self { + self.committer = Some(committer.into()); + self + } + /// Consume the transaction and commit it to the table. The result is a result of /// [CommitResult] with the following semantics: /// - Ok(CommitResult) for either success or a recoverable error (includes the failed /// transaction in case of a conflict so the user can retry, etc.) /// - Err(Error) indicates a non-retryable error (e.g. logic/validation error). pub fn commit(self, engine: &dyn Engine) -> DeltaResult { + // Step 0: Determine the committer to use + #[cfg(feature = "catalog-managed")] + if self.committer.is_none() { + require!( + !self.read_snapshot.table_configuration().protocol().is_catalog_managed(), + Error::generic("Cannot use the default committer for a catalog-managed table. Please provide a committer via Transaction::with_committer.") + ); + } + + let default_committer: Arc; + let committer = match self.committer.as_ref() { + Some(c) => c, + None => { + default_committer = FileSystemCommitter::new(); + &default_committer + } + }; + // Step 1: Check for duplicate app_ids and generate set transactions (`txn`) // Note: The commit info must always be the first action in the commit but we generate it in // step 2 to fail early on duplicate transaction appIds @@ -222,15 +248,15 @@ impl Transaction { .chain(set_transaction_actions) .chain(domain_metadata_actions); - let json_handler = engine.json_handler(); - match json_handler.write_json_file(&commit_path.location, Box::new(actions), false) { - Ok(()) => Ok(CommitResult::CommittedTransaction( - self.into_committed(commit_version), + let commit_metadata = CommitMetadata::new(commit_path, commit_version); + match committer.commit(engine, Box::new(actions), commit_metadata) { + Ok(CommitResponse::Committed { version }) => Ok(CommitResult::CommittedTransaction( + self.into_committed(version), )), - Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::ConflictedTransaction( - self.into_conflicted(commit_version), + Ok(CommitResponse::Conflict { version }) => Ok(CommitResult::ConflictedTransaction( + self.into_conflicted(version), )), - // TODO: we may want to be more selective about what is retryable + // TODO: we want to be more selective about what is retryable Err(e) => Ok(CommitResult::RetryableTransaction(self.into_retryable(e))), } } From 7913b12245bdf300beb7d830caea10f24f0383df Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 29 Sep 2025 15:28:41 -0700 Subject: [PATCH 5/8] uc-client commit --- uc-client/src/client.rs | 18 ++++++- uc-client/src/models/commits.rs | 89 +++++++++++++++++++++++++++++++++ uc-client/src/models/mod.rs | 2 +- 3 files changed, 107 insertions(+), 2 deletions(-) diff --git a/uc-client/src/client.rs b/uc-client/src/client.rs index e1c4a9076..519753dc7 100644 --- a/uc-client/src/client.rs +++ b/uc-client/src/client.rs @@ -7,7 +7,7 @@ use url::Url; use crate::config::{ClientConfig, ClientConfigBuilder}; use crate::error::{Error, Result}; -use crate::models::commits::{CommitsRequest, CommitsResponse}; +use crate::models::commits::{CommitRequest, CommitResponse, CommitsRequest, CommitsResponse}; use crate::models::credentials::{CredentialsRequest, Operation, TemporaryTableCredentials}; use crate::models::tables::TablesResponse; @@ -64,6 +64,22 @@ impl UCClient { self.handle_response(response).await } + #[instrument(skip(self))] + pub async fn commit(&self, request: CommitRequest) -> Result { + let url = self.base_url.join("delta/preview/commits")?; + + let response = self + .execute_with_retry(|| { + self.client + .request(reqwest::Method::POST, url.clone()) + .json(&request) + .send() + }) + .await?; + + self.handle_response(response).await + } + #[instrument(skip(self))] pub async fn get_table(&self, table_name: &str) -> Result { let url = self.base_url.join(&format!("tables/{}", table_name))?; diff --git a/uc-client/src/models/commits.rs b/uc-client/src/models/commits.rs index 7d5dc1938..d9cab56b0 100644 --- a/uc-client/src/models/commits.rs +++ b/uc-client/src/models/commits.rs @@ -58,3 +58,92 @@ impl Commit { chrono::DateTime::from_timestamp_millis(self.file_modification_timestamp) } } + +// Structs for creating a new commit +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CommitRequest { + pub table_id: String, + pub table_uri: String, + pub commit_info: CommitInfo, + #[serde(skip_serializing_if = "Option::is_none")] + pub latest_backfilled_version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub protocol: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CommitInfo { + pub version: i64, + pub timestamp: i64, + pub file_name: String, + pub file_size: i64, + pub file_modification_timestamp: i64, + #[serde(default)] + pub is_disown_commit: bool, +} + +impl CommitRequest { + pub fn new( + table_id: impl Into, + table_uri: impl Into, + commit_info: CommitInfo, + ) -> Self { + Self { + table_id: table_id.into(), + table_uri: table_uri.into(), + commit_info, + latest_backfilled_version: None, + metadata: None, + protocol: None, + } + } + + pub fn with_latest_backfilled_version(mut self, version: i64) -> Self { + self.latest_backfilled_version = Some(version); + self + } + + pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self { + self.metadata = Some(metadata); + self + } + + pub fn with_protocol(mut self, protocol: serde_json::Value) -> Self { + self.protocol = Some(protocol); + self + } +} + +impl CommitInfo { + pub fn new( + version: i64, + timestamp: i64, + file_name: impl Into, + file_size: i64, + file_modification_timestamp: i64, + ) -> Self { + Self { + version, + timestamp, + file_name: file_name.into(), + file_size, + file_modification_timestamp, + is_disown_commit: false, + } + } + + pub fn with_disown_commit(mut self, disown: bool) -> Self { + self.is_disown_commit = disown; + self + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CommitResponse { + pub commit: Commit, +} diff --git a/uc-client/src/models/mod.rs b/uc-client/src/models/mod.rs index edf86ce57..7f0836263 100644 --- a/uc-client/src/models/mod.rs +++ b/uc-client/src/models/mod.rs @@ -2,6 +2,6 @@ pub mod commits; pub mod credentials; pub mod tables; -pub use commits::{Commit, CommitsRequest, CommitsResponse}; +pub use commits::{Commit, CommitInfo, CommitRequest, CommitResponse, CommitsRequest, CommitsResponse}; pub use credentials::{AwsTempCredentials, TemporaryTableCredentials}; pub use tables::TablesResponse; From b9272f1f29eb7aa67a6c6de49cbd9f407cc896d2 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 30 Sep 2025 13:11:46 -0700 Subject: [PATCH 6/8] e2e poc --- kernel/src/actions/mod.rs | 5 -- kernel/src/lib.rs | 8 +- kernel/src/log_path.rs | 2 + kernel/src/table_features/mod.rs | 7 ++ kernel/src/transaction/mod.rs | 13 +++ kernel/tests/write.rs | 3 - uc-catalog/Cargo.toml | 2 + uc-catalog/src/lib.rs | 150 +++++++++++++++++++++++++++++++ uc-client/src/client.rs | 7 +- uc-client/src/models/commits.rs | 68 ++++++-------- uc-client/src/models/mod.rs | 2 +- 11 files changed, 212 insertions(+), 55 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 0625fbd04..52b72b72f 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -485,11 +485,6 @@ impl Protocol { /// Check if writing to a table with this protocol is supported. That is: does the kernel /// support the specified protocol writer version and all enabled writer features? pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> { - #[cfg(feature = "catalog-managed")] - require!( - !self.is_catalog_managed(), - Error::unsupported("Writes are not yet supported for catalog-managed tables") - ); match &self.writer_features { Some(writer_features) if self.min_writer_version == 7 => { // if we're on version 7, make sure we support all the specified features diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 63eab0021..44aa42a1f 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -89,7 +89,6 @@ use self::schema::{DataType, SchemaRef}; mod action_reconciliation; pub mod actions; pub mod checkpoint; -mod committer; pub mod engine_data; pub mod error; pub mod expressions; @@ -105,6 +104,11 @@ pub mod table_properties; pub mod transaction; pub(crate) mod transforms; +#[cfg(not(feature = "catalog-managed"))] +mod committer; +#[cfg(feature = "catalog-managed")] +pub mod committer; + pub use log_path::LogPath; mod row_tracking; @@ -185,7 +189,7 @@ pub type FileDataReadResultIterator = Box>> + Send>; /// Type alias for an iterator of [`EngineData`] results. -pub(crate) type EngineDataResultIterator<'a> = +pub type EngineDataResultIterator<'a> = Box>> + Send + 'a>; /// The metadata that describes an object. diff --git a/kernel/src/log_path.rs b/kernel/src/log_path.rs index 42f8c1dd3..bf8d8a45b 100644 --- a/kernel/src/log_path.rs +++ b/kernel/src/log_path.rs @@ -87,6 +87,8 @@ mod test { assert_eq!(path.location, expected); } + // FIXME + #[ignore] #[test] fn test_staged_commit_path_creation_failures() { let last_modified = 1234567890i64; diff --git a/kernel/src/table_features/mod.rs b/kernel/src/table_features/mod.rs index 10ebed0d9..9b712b4fa 100644 --- a/kernel/src/table_features/mod.rs +++ b/kernel/src/table_features/mod.rs @@ -239,6 +239,13 @@ pub(crate) static SUPPORTED_WRITER_FEATURES: LazyLock> = Lazy WriterFeature::VariantType, WriterFeature::VariantTypePreview, WriterFeature::VariantShreddingPreview, + WriterFeature::V2Checkpoint, + WriterFeature::VacuumProtocolCheck, + WriterFeature::InCommitTimestamp, + #[cfg(feature = "catalog-managed")] + WriterFeature::CatalogManaged, + #[cfg(feature = "catalog-managed")] + WriterFeature::CatalogOwnedPreview, ] }); diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 1fed00604..980adaca6 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -500,6 +500,19 @@ impl Transaction { } } + pub fn hack_actions(&self, engine: &dyn Engine) -> EngineDataResultIterator<'_> { + let mut commit_info = CommitInfo::new( + self.commit_timestamp, + self.operation.clone(), + self.engine_info.clone(), + ); + commit_info.in_commit_timestamp = Some(self.commit_timestamp); + let commit_info_action = + commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine); + let actions = iter::once(commit_info_action); + Box::new(actions) + } + fn into_committed(self, version: Version) -> CommittedTransaction { let stats = PostCommitStats { commits_since_checkpoint: self.read_snapshot.log_segment().commits_since_checkpoint() diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 208372736..aea9034cd 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1255,9 +1255,6 @@ async fn test_set_domain_metadata_basic() -> Result<(), Box UCCatalog<'a> { // TODO: does it paginate? let commits = self.client.get_commits(req).await?; + // sort them + let mut commits = commits; + if let Some(c) = commits.commits.as_mut() { + c.sort_by_key(|c| c.version) + } + // if commits are present, we ensure they are sorted+contiguous if let Some(commits) = &commits.commits { if !commits.windows(2).all(|w| w[1].version == w[0].version + 1) { @@ -107,12 +114,52 @@ impl<'a> UCCatalog<'a> { } } +/// A [UCCommitter] is a Unity Catalog [Committer] implementation for committing to delta tables in +/// UC. +pub struct UCCommitter { + client: Arc, +} + +impl UCCommitter { + pub fn new(client: Arc) -> Self { + UCCommitter { client } + } +} + +use delta_kernel::committer::Committer; +use delta_kernel::EngineDataResultIterator; + +impl Committer for UCCommitter { + fn commit<'a>( + &self, + engine: &'a dyn Engine, + actions: EngineDataResultIterator<'a>, + commit_metadata: delta_kernel::committer::CommitMetadata, + ) -> delta_kernel::DeltaResult { + // let commit_req = CommitRequest::new( + // commit_metadata.table_id, + // commit_metadata.table_uri, + // Commit::new( + // commit_metadata.version as i64, + // commit_metadata.timestamp, + // format!("{}.json", commit_metadata.version), + // 123, + // 123456789, + // ), + // ); + // self.client.commit(commit_req) // ASYNC! + todo!() + } +} + #[cfg(test)] mod tests { use std::env; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; + use delta_kernel::transaction::CommitResult; + use object_store::ObjectStore; use super::*; @@ -185,4 +232,107 @@ mod tests { Ok(()) } + + // ignored test which you can run manually to play around with writing to a UC table. run with: + // `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t write_uc_table --nocapture -- --ignored` + #[ignore] + #[tokio::test] + async fn write_uc_table() -> Result<(), Box> { + let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set"); + let token = env::var("TOKEN").expect("TOKEN environment variable not set"); + let table_name = env::var("TABLENAME").expect("TABLENAME environment variable not set"); + + // build UC client, get table info and credentials + let client = Arc::new(UCClient::builder(endpoint, &token).build()?); + let (table_id, table_uri) = get_table(&client, &table_name).await?; + let creds = client + .get_credentials(&table_id, Operation::ReadWrite) + .await + .map_err(|e| format!("Failed to get credentials: {}", e))?; + + // build catalog + let catalog = UCCatalog::new(&client); + + // TODO: support non-AWS + let creds = creds + .aws_temp_credentials + .ok_or("No AWS temporary credentials found")?; + + let options = [ + ("region", "us-west-2"), + ("access_key_id", &creds.access_key_id), + ("secret_access_key", &creds.secret_access_key), + ("session_token", &creds.session_token), + ]; + + let table_url = Url::parse(&table_uri)?; + let (store, path) = object_store::parse_url_opts(&table_url, options)?; + let store: Arc = store.into(); + + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let committer = Arc::new(UCCommitter::new(client.clone())); + let snapshot = catalog + .load_snapshot(&table_id, &table_uri, &engine) + .await?; + println!("latest snapshot version: {:?}", snapshot.version()); + let txn = snapshot + .clone() + .transaction()? + .with_committer(committer as Arc); + let _write_context = txn.get_write_context(); + // do a write. + + let last_modified = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_millis() as i64; + let size: i64 = 1500; + let uuid = uuid::Uuid::new_v4(); + let version = snapshot.version() + 1; + let filename = format!("{version:020}.{uuid}.json"); + let mut commit_path = table_url.clone(); + commit_path.path_segments_mut().unwrap().extend(&[ + "_delta_log", + "_staged_commits", + &filename, + ]); + + // commit info only + let actions = txn.hack_actions(&engine); + engine + .json_handler() + .write_json_file(&commit_path, actions, false)?; + + // print the log + use futures::stream::StreamExt; + let mut stream = store.list(Some(&path)); + while let Some(path) = stream.next().await { + println!("object: {:?}", path.unwrap().location); + } + + let commit_req = CommitRequest::new( + table_id, + table_uri, + Commit::new( + version.try_into().unwrap(), + last_modified, + filename, + size, + last_modified, + ), + ); + client.commit(commit_req).await?; + + // match txn.commit(&engine)? { + // CommitResult::CommittedTransaction(t) => { + // println!("🎉 committed version {}", t.version()); + // } + // CommitResult::ConflictedTransaction(t) => { + // println!("💥 commit conflicted at version {}", t.conflict_version); + // } + // CommitResult::RetryableTransaction(_) => { + // println!("we should retry..."); + // } + // } + Ok(()) + } } diff --git a/uc-client/src/client.rs b/uc-client/src/client.rs index 519753dc7..d996e370c 100644 --- a/uc-client/src/client.rs +++ b/uc-client/src/client.rs @@ -61,13 +61,18 @@ impl UCClient { }) .await?; - self.handle_response(response).await + let res = self.handle_response(response).await; + println!("\nGet commits response: {:?}\n", res); + res } #[instrument(skip(self))] pub async fn commit(&self, request: CommitRequest) -> Result { let url = self.base_url.join("delta/preview/commits")?; + let json = serde_json::to_string_pretty(&request).unwrap_or_default(); + println!("Committing: {json}"); + let response = self .execute_with_retry(|| { self.client diff --git a/uc-client/src/models/commits.rs b/uc-client/src/models/commits.rs index d9cab56b0..046b397cf 100644 --- a/uc-client/src/models/commits.rs +++ b/uc-client/src/models/commits.rs @@ -50,6 +50,28 @@ pub struct Commit { } impl Commit { + pub fn new( + version: i64, + timestamp: i64, + file_name: impl Into, + file_size: i64, + file_modification_timestamp: i64, + ) -> Self { + Self { + version, + timestamp, + file_name: file_name.into(), + file_size, + file_modification_timestamp, + is_disown_commit: Some(false), + } + } + + pub fn with_disown_commit(mut self, disown: bool) -> Self { + self.is_disown_commit = Some(disown); + self + } + pub fn timestamp_as_datetime(&self) -> Option> { chrono::DateTime::from_timestamp_millis(self.timestamp) } @@ -61,11 +83,10 @@ impl Commit { // Structs for creating a new commit #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] pub struct CommitRequest { pub table_id: String, pub table_uri: String, - pub commit_info: CommitInfo, + pub commit_info: Commit, #[serde(skip_serializing_if = "Option::is_none")] pub latest_backfilled_version: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -74,23 +95,11 @@ pub struct CommitRequest { pub protocol: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CommitInfo { - pub version: i64, - pub timestamp: i64, - pub file_name: String, - pub file_size: i64, - pub file_modification_timestamp: i64, - #[serde(default)] - pub is_disown_commit: bool, -} - impl CommitRequest { pub fn new( table_id: impl Into, table_uri: impl Into, - commit_info: CommitInfo, + commit_info: Commit, ) -> Self { Self { table_id: table_id.into(), @@ -118,32 +127,5 @@ impl CommitRequest { } } -impl CommitInfo { - pub fn new( - version: i64, - timestamp: i64, - file_name: impl Into, - file_size: i64, - file_modification_timestamp: i64, - ) -> Self { - Self { - version, - timestamp, - file_name: file_name.into(), - file_size, - file_modification_timestamp, - is_disown_commit: false, - } - } - - pub fn with_disown_commit(mut self, disown: bool) -> Self { - self.is_disown_commit = disown; - self - } -} - #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CommitResponse { - pub commit: Commit, -} +pub struct CommitResponse {} diff --git a/uc-client/src/models/mod.rs b/uc-client/src/models/mod.rs index 7f0836263..8fe92111e 100644 --- a/uc-client/src/models/mod.rs +++ b/uc-client/src/models/mod.rs @@ -2,6 +2,6 @@ pub mod commits; pub mod credentials; pub mod tables; -pub use commits::{Commit, CommitInfo, CommitRequest, CommitResponse, CommitsRequest, CommitsResponse}; +pub use commits::{Commit, CommitRequest, CommitResponse, CommitsRequest, CommitsResponse}; pub use credentials::{AwsTempCredentials, TemporaryTableCredentials}; pub use tables::TablesResponse; From 650bb30753da71f2d3db7a4543d034b9d345dbe4 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 30 Sep 2025 16:04:22 -0700 Subject: [PATCH 7/8] trait checkpoint --- kernel/src/committer.rs | 36 ++++---- kernel/src/log_path.rs | 2 +- kernel/src/path.rs | 3 +- kernel/src/snapshot.rs | 3 +- kernel/src/transaction/mod.rs | 110 +++++++++++++------------ uc-catalog/src/lib.rs | 150 ++++++++++++++++++---------------- 6 files changed, 157 insertions(+), 147 deletions(-) diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs index 1371f9797..f779ce9fd 100644 --- a/kernel/src/committer.rs +++ b/kernel/src/committer.rs @@ -7,8 +7,8 @@ use url::Url; #[derive(Debug)] pub struct CommitMetadata { - pub(crate) commit_path: ParsedLogPath, - pub(crate) version: Version, + pub commit_path: ParsedLogPath, + pub version: Version, } impl CommitMetadata { @@ -28,41 +28,33 @@ pub enum CommitResponse { } pub trait Committer: Send + Sync { + type Context; + fn commit( &self, engine: &dyn Engine, actions: EngineDataResultIterator<'_>, - commit_metadata: CommitMetadata, + version: Version, + context: &Self::Context, ) -> DeltaResult; } -pub(crate) struct FileSystemCommitter; - -impl FileSystemCommitter { - pub(crate) fn new() -> Arc { - Arc::new(Self {}) - } -} +pub struct FileSystemCommitter; impl Committer for FileSystemCommitter { + type Context = Url; // Table root fn commit( &self, engine: &dyn Engine, actions: EngineDataResultIterator<'_>, - commit_metadata: CommitMetadata, + version: Version, + context: &Self::Context, ) -> DeltaResult { + let path = ParsedLogPath::new_commit(context, version)?; let json_handler = engine.json_handler(); - match json_handler.write_json_file( - &commit_metadata.commit_path.location, - Box::new(actions), - false, - ) { - Ok(()) => Ok(CommitResponse::Committed { - version: commit_metadata.version, - }), - Err(Error::FileAlreadyExists(_)) => Ok(CommitResponse::Conflict { - version: commit_metadata.version, - }), + match json_handler.write_json_file(&path.location, Box::new(actions), false) { + Ok(()) => Ok(CommitResponse::Committed { version }), + Err(Error::FileAlreadyExists(_)) => Ok(CommitResponse::Conflict { version }), Err(e) => Err(e), } } diff --git a/kernel/src/log_path.rs b/kernel/src/log_path.rs index bf8d8a45b..5a4091451 100644 --- a/kernel/src/log_path.rs +++ b/kernel/src/log_path.rs @@ -12,7 +12,7 @@ use url::Url; /// Today, a `LogPath` is a file in the `_delta_log` directory of a Delta table; in the future, /// this will expand to support providing inline data in the log path itself. #[derive(Debug, Clone, PartialEq)] -pub struct LogPath(ParsedLogPath); +pub struct LogPath(pub ParsedLogPath); impl From for ParsedLogPath { fn from(p: LogPath) -> Self { diff --git a/kernel/src/path.rs b/kernel/src/path.rs index e94752b12..95a7df0af 100644 --- a/kernel/src/path.rs +++ b/kernel/src/path.rs @@ -52,8 +52,7 @@ pub(crate) enum LogPathFileType { /// the _delta_log we may see _staged_commits/00000000000000000000.{uuid}.json, but we MUST NOT /// include those in listing, as only the catalog can tell us which are valid commits. #[derive(Debug, Clone, PartialEq, Eq)] -#[internal_api] -pub(crate) struct ParsedLogPath { +pub struct ParsedLogPath { pub location: Location, #[allow(unused)] pub filename: String, diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 5a7d4b013..b9db87736 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -8,6 +8,7 @@ use crate::actions::domain_metadata::domain_metadata_configuration; use crate::actions::set_transaction::SetTransactionScanner; use crate::actions::{Metadata, Protocol, INTERNAL_DOMAIN_PREFIX}; use crate::checkpoint::CheckpointWriter; +use crate::committer::FileSystemCommitter; use crate::listed_log_files::ListedLogFiles; use crate::log_segment::LogSegment; use crate::scan::ScanBuilder; @@ -341,7 +342,7 @@ impl Snapshot { } /// Create a [`Transaction`] for this `SnapshotRef`. - pub fn transaction(self: Arc) -> DeltaResult { + pub fn transaction(self: Arc) -> DeltaResult> { Transaction::try_new(self) } diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 980adaca6..83064c0cb 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -19,7 +19,7 @@ use crate::snapshot::SnapshotRef; use crate::utils::{current_time_ms, require}; use crate::{ DataType, DeltaResult, Engine, EngineData, EngineDataResultIterator, Expression, ExpressionRef, - IntoEngineData, RowVisitor, Snapshot, Version, + IntoEngineData, LogPath, RowVisitor, Snapshot, Version, }; /// The minimal (i.e., mandatory) fields in an add action. @@ -110,9 +110,9 @@ fn with_row_tracking_cols(schema: &SchemaRef) -> SchemaRef { /// // commit! (consume the transaction) /// txn.commit(&engine)?; /// ``` -pub struct Transaction { +pub struct Transaction { read_snapshot: SnapshotRef, - committer: Option>, + committer: Option, operation: Option, engine_info: Option, add_files_metadata: Vec>, @@ -128,7 +128,7 @@ pub struct Transaction { domain_metadatas: Vec, } -impl std::fmt::Debug for Transaction { +impl std::fmt::Debug for Transaction { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(&format!( "Transaction {{ read_snapshot version: {}, engine_info: {} }}", @@ -138,7 +138,7 @@ impl std::fmt::Debug for Transaction { } } -impl Transaction { +impl Transaction { /// Create a new transaction from a snapshot. The snapshot will be used to read the current /// state of the table (e.g. to read the current version). /// @@ -157,7 +157,7 @@ impl Transaction { Ok(Transaction { read_snapshot, - committer: None, + committer: Some(FileSystemCommitter {}), operation: None, engine_info: None, add_files_metadata: vec![], @@ -166,22 +166,39 @@ impl Transaction { domain_metadatas: vec![], }) } +} +impl Transaction { /// Set the committer that will be used to commit this transaction. If not set, the default /// filesystem-based committer will be used. Note that the default committer is only allowed /// for non-catalog-managed tables. #[cfg(feature = "catalog-managed")] - pub fn with_committer(mut self, committer: impl Into>) -> Self { - self.committer = Some(committer.into()); - self + pub fn with_committer(self, committer: impl Into) -> Transaction + where + T: Committer, + { + // self.committer = Some(committer.into()); + // self + Transaction { + read_snapshot: self.read_snapshot, + committer: Some(committer.into()), + operation: self.operation, + engine_info: self.engine_info, + add_files_metadata: self.add_files_metadata, + set_transactions: self.set_transactions, + commit_timestamp: self.commit_timestamp, + domain_metadatas: self.domain_metadatas, + } } +} +impl Transaction { /// Consume the transaction and commit it to the table. The result is a result of /// [CommitResult] with the following semantics: /// - Ok(CommitResult) for either success or a recoverable error (includes the failed /// transaction in case of a conflict so the user can retry, etc.) /// - Err(Error) indicates a non-retryable error (e.g. logic/validation error). - pub fn commit(self, engine: &dyn Engine) -> DeltaResult { + pub fn commit(self, engine: &dyn Engine, context: C::Context) -> DeltaResult> { // Step 0: Determine the committer to use #[cfg(feature = "catalog-managed")] if self.committer.is_none() { @@ -191,14 +208,14 @@ impl Transaction { ); } - let default_committer: Arc; - let committer = match self.committer.as_ref() { - Some(c) => c, - None => { - default_committer = FileSystemCommitter::new(); - &default_committer - } - }; + // let default_committer: FileSystemCommitter; + // let committer = match self.committer.as_ref() { + // Some(c) => c, + // None => { + // default_committer = FileSystemCommitter::new(); + // &default_committer + // } + // }; // Step 1: Check for duplicate app_ids and generate set transactions (`txn`) // Note: The commit info must always be the first action in the commit but we generate it in @@ -223,11 +240,12 @@ impl Transaction { .map(|txn| txn.into_engine_data(get_log_txn_schema().clone(), engine)); // Step 2: Construct commit info and initialize the action iterator - let commit_info = CommitInfo::new( + let mut commit_info = CommitInfo::new( self.commit_timestamp, self.operation.clone(), self.engine_info.clone(), ); + commit_info.in_commit_timestamp = Some(self.commit_timestamp); let commit_info_action = commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine); @@ -241,15 +259,18 @@ impl Transaction { self.generate_domain_metadata_actions(engine, row_tracking_domain_metadata)?; // Step 5: Commit the actions as a JSON file to the Delta log - let commit_path = - ParsedLogPath::new_commit(self.read_snapshot.table_root(), commit_version)?; + // let commit_path = + // ParsedLogPath::new_commit(self.read_snapshot.table_root(), commit_version)?; let actions = iter::once(commit_info_action) .chain(add_actions) .chain(set_transaction_actions) .chain(domain_metadata_actions); - let commit_metadata = CommitMetadata::new(commit_path, commit_version); - match committer.commit(engine, Box::new(actions), commit_metadata) { + // let path = commit_path.0.location.location; + // let path = ParsedLogPath::try_from(path).unwrap().unwrap(); + // let commit_metadata = CommitMetadata::new(path, commit_version); + let committer = self.committer.as_ref().unwrap(); + match committer.commit(engine, Box::new(actions), commit_version, &context) { Ok(CommitResponse::Committed { version }) => Ok(CommitResult::CommittedTransaction( self.into_committed(version), )), @@ -500,20 +521,7 @@ impl Transaction { } } - pub fn hack_actions(&self, engine: &dyn Engine) -> EngineDataResultIterator<'_> { - let mut commit_info = CommitInfo::new( - self.commit_timestamp, - self.operation.clone(), - self.engine_info.clone(), - ); - commit_info.in_commit_timestamp = Some(self.commit_timestamp); - let commit_info_action = - commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine); - let actions = iter::once(commit_info_action); - Box::new(actions) - } - - fn into_committed(self, version: Version) -> CommittedTransaction { + fn into_committed(self, version: Version) -> CommittedTransaction { let stats = PostCommitStats { commits_since_checkpoint: self.read_snapshot.log_segment().commits_since_checkpoint() + 1, @@ -531,14 +539,14 @@ impl Transaction { } } - fn into_conflicted(self, conflict_version: Version) -> ConflictedTransaction { + fn into_conflicted(self, conflict_version: Version) -> ConflictedTransaction { ConflictedTransaction { transaction: self, conflict_version, } } - fn into_retryable(self, error: Error) -> RetryableTransaction { + fn into_retryable(self, error: Error) -> RetryableTransaction { RetryableTransaction { transaction: self, error, @@ -602,20 +610,20 @@ pub struct PostCommitStats { /// can be retried without rebasing. #[derive(Debug)] #[must_use] -pub enum CommitResult { +pub enum CommitResult { /// The transaction was successfully committed. - CommittedTransaction(CommittedTransaction), + CommittedTransaction(CommittedTransaction), /// This transaction conflicted with an existing version (at the version given). The transaction /// is returned so the caller can resolve the conflict (along with the version which /// conflicted). // TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to // update the transaction to a new version etc. - ConflictedTransaction(ConflictedTransaction), + ConflictedTransaction(ConflictedTransaction), /// An IO (retryable) error occurred during the commit. - RetryableTransaction(RetryableTransaction), + RetryableTransaction(RetryableTransaction), } -impl CommitResult { +impl CommitResult { /// Returns true if the commit was successful. pub fn is_committed(&self) -> bool { matches!(self, CommitResult::CommittedTransaction(_)) @@ -623,15 +631,15 @@ impl CommitResult { } #[derive(Debug)] -pub struct CommittedTransaction { - transaction: Transaction, +pub struct CommittedTransaction { + transaction: Transaction, /// the version of the table that was just committed commit_version: Version, /// The [`PostCommitStats`] for this transaction post_commit_stats: PostCommitStats, } -impl CommittedTransaction { +impl CommittedTransaction { /// The version of the table that was just committed pub fn version(&self) -> Version { self.commit_version @@ -652,14 +660,14 @@ impl CommittedTransaction { } #[derive(Debug)] -pub struct ConflictedTransaction { - pub transaction: Transaction, +pub struct ConflictedTransaction { + pub transaction: Transaction, pub conflict_version: Version, } #[derive(Debug)] -pub struct RetryableTransaction { - pub transaction: Transaction, +pub struct RetryableTransaction { + pub transaction: Transaction, pub error: Error, } diff --git a/uc-catalog/src/lib.rs b/uc-catalog/src/lib.rs index 8e72a9c9f..4f136a39b 100644 --- a/uc-catalog/src/lib.rs +++ b/uc-catalog/src/lib.rs @@ -129,26 +129,60 @@ impl UCCommitter { use delta_kernel::committer::Committer; use delta_kernel::EngineDataResultIterator; +pub struct UCCommitterContext { + table_id: String, + table_uri: String, +} + impl Committer for UCCommitter { + type Context = UCCommitterContext; fn commit<'a>( &self, engine: &'a dyn Engine, actions: EngineDataResultIterator<'a>, - commit_metadata: delta_kernel::committer::CommitMetadata, + version: Version, + context: &UCCommitterContext, ) -> delta_kernel::DeltaResult { - // let commit_req = CommitRequest::new( - // commit_metadata.table_id, - // commit_metadata.table_uri, - // Commit::new( - // commit_metadata.version as i64, - // commit_metadata.timestamp, - // format!("{}.json", commit_metadata.version), - // 123, - // 123456789, - // ), - // ); - // self.client.commit(commit_req) // ASYNC! - todo!() + let last_modified = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + .try_into() + .unwrap(); + let size = 1500; + let filename = format!("{:020}.{}.json", version, uuid::Uuid::new_v4()); + let staged_commit_path = LogPath::staged_commit( + Url::parse(&context.table_uri)?, + &filename, + last_modified, + size, + )?; + engine.json_handler().write_json_file( + &staged_commit_path.0.location.location, + actions, + false, + )?; + let commit_req = CommitRequest::new( + &context.table_id, + &context.table_uri, + Commit::new( + version.try_into().unwrap(), + last_modified, + filename, + size.try_into().unwrap(), + last_modified, + ), + ); + + let handle = tokio::runtime::Handle::current(); + tokio::task::block_in_place(move || { + handle.block_on(async move { + match self.client.commit(commit_req).await { + Ok(_) => Ok(delta_kernel::committer::CommitResponse::Committed { version }), + Err(e) => Err(delta_kernel::Error::Generic(format!("commit failed: {e}"))), + } + }) + }) } } @@ -158,7 +192,7 @@ mod tests { use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; - use delta_kernel::transaction::CommitResult; + use delta_kernel::transaction::{CommitResult, Transaction}; use object_store::ObjectStore; use super::*; @@ -236,7 +270,7 @@ mod tests { // ignored test which you can run manually to play around with writing to a UC table. run with: // `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t write_uc_table --nocapture -- --ignored` #[ignore] - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn write_uc_table() -> Result<(), Box> { let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set"); let token = env::var("TOKEN").expect("TOKEN environment variable not set"); @@ -266,73 +300,49 @@ mod tests { ]; let table_url = Url::parse(&table_uri)?; - let (store, path) = object_store::parse_url_opts(&table_url, options)?; + let (store, _path) = object_store::parse_url_opts(&table_url, options)?; let store: Arc = store.into(); let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); - let committer = Arc::new(UCCommitter::new(client.clone())); + let committer = UCCommitter::new(client.clone()); let snapshot = catalog .load_snapshot(&table_id, &table_uri, &engine) .await?; println!("latest snapshot version: {:?}", snapshot.version()); - let txn = snapshot - .clone() - .transaction()? - .with_committer(committer as Arc); + let txn: Transaction = + snapshot.clone().transaction()?.with_committer(committer); let _write_context = txn.get_write_context(); // do a write. - let last_modified = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH)? - .as_millis() as i64; - let size: i64 = 1500; - let uuid = uuid::Uuid::new_v4(); - let version = snapshot.version() + 1; - let filename = format!("{version:020}.{uuid}.json"); - let mut commit_path = table_url.clone(); - commit_path.path_segments_mut().unwrap().extend(&[ - "_delta_log", - "_staged_commits", - &filename, - ]); + // print the log + // use futures::stream::StreamExt; + // let mut stream = store.list(Some(&path)); + // while let Some(path) = stream.next().await { + // println!("object: {:?}", path.unwrap().location); + // } - // commit info only - let actions = txn.hack_actions(&engine); - engine - .json_handler() - .write_json_file(&commit_path, actions, false)?; + // let commit = store + // .get(&object_store::path::Path::from("19a85dee-54bc-43a2-87ab-023d0ec16013/tables/cdac4b37-fc11-4148-ae02-512e6cc4e1bd/_delta_log/_staged_commits/00000000000000000001.5400fd23-8e08-4f87-8049-7b37431fb826.json")) + // .await + // .unwrap(); + // let bytes = commit.bytes().await?; + // println!("commit file contents:\n{}", String::from_utf8_lossy(&bytes)); - // print the log - use futures::stream::StreamExt; - let mut stream = store.list(Some(&path)); - while let Some(path) = stream.next().await { - println!("object: {:?}", path.unwrap().location); + let ctx = UCCommitterContext { + table_id: table_id.clone(), + table_uri: table_uri.clone(), + }; + match txn.commit(&engine, ctx)? { + CommitResult::CommittedTransaction(t) => { + println!("🎉 committed version {}", t.version()); + } + CommitResult::ConflictedTransaction(t) => { + println!("💥 commit conflicted at version {}", t.conflict_version); + } + CommitResult::RetryableTransaction(_) => { + println!("we should retry..."); + } } - - let commit_req = CommitRequest::new( - table_id, - table_uri, - Commit::new( - version.try_into().unwrap(), - last_modified, - filename, - size, - last_modified, - ), - ); - client.commit(commit_req).await?; - - // match txn.commit(&engine)? { - // CommitResult::CommittedTransaction(t) => { - // println!("🎉 committed version {}", t.version()); - // } - // CommitResult::ConflictedTransaction(t) => { - // println!("💥 commit conflicted at version {}", t.conflict_version); - // } - // CommitResult::RetryableTransaction(_) => { - // println!("we should retry..."); - // } - // } Ok(()) } } From 955281f9e1f1c313bf2e2791b435f1fdeac91dd8 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 30 Sep 2025 21:32:14 -0700 Subject: [PATCH 8/8] publish --- kernel/src/committer.rs | 5 ++ kernel/src/engine/default/filesystem.rs | 11 ++++ kernel/src/lib.rs | 3 + kernel/src/log_segment.rs | 26 +++++++++ kernel/src/path.rs | 73 +++++++++++++++++++++++++ kernel/src/snapshot.rs | 16 ++++++ kernel/src/transaction/mod.rs | 1 + uc-catalog/src/lib.rs | 53 +++++++++++++++--- uc-client/src/models/commits.rs | 20 ++++++- 9 files changed, 198 insertions(+), 10 deletions(-) diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs index f779ce9fd..408e40eda 100644 --- a/kernel/src/committer.rs +++ b/kernel/src/committer.rs @@ -37,6 +37,11 @@ pub trait Committer: Send + Sync { version: Version, context: &Self::Context, ) -> DeltaResult; + + fn published(&self, version: Version, context: &Self::Context) -> DeltaResult<()> { + let _ = (version, context); + Ok(()) + } } pub struct FileSystemCommitter; diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 08d03107c..b2fd0bc70 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -175,6 +175,17 @@ impl StorageHandler for ObjectStoreStorageHandler { Ok(Box::new(receiver.into_iter())) } + + fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()> { + let from_path = Path::from_url_path(from.path())?; + let to_path = Path::from_url_path(to.path())?; + let store = self.inner.clone(); + println!("Copying from {from_path:?} to {to_path:?}"); + // FIXME: ? copy_if_not_exists doesn't work on S3 + self.task_executor + .block_on(async move { store.copy(&from_path, &to_path).await }) + .map_err(|e| Error::Generic(format!("Failed to copy file: {e}"))) + } } #[cfg(test)] diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 44aa42a1f..6a9ee3800 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -543,6 +543,9 @@ pub trait StorageHandler: AsAny { &self, files: Vec, ) -> DeltaResult>>>; + + /// Performs a copy. Must not overwrite. + fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()>; } /// Provides JSON handling functionality to Delta Kernel. diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index a656b6654..ab15ed209 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -570,4 +570,30 @@ impl LogSegment { debug_assert!(to_sub <= self.end_version); self.end_version - to_sub } + + // TODO: decide on API: + // 1. should this mutate? should it consume self and hand back a new one? + // 2. should we take finer-grained args like which version to publish up to? + pub(crate) fn publish(mut self, engine: &dyn Engine) -> DeltaResult { + let storage = engine.storage_handler(); + + // Transform staged commits into published commits + for i in 0..self.ascending_commit_files.len() { + if matches!(self.ascending_commit_files[i].file_type, LogPathFileType::StagedCommit) { + // Clone the staged commit to get source location before transforming + let source_location = self.ascending_commit_files[i].location.location.clone(); + + // Take ownership of the commit to transform it + let staged_commit = self.ascending_commit_files.remove(i); + let published_commit = staged_commit.into_published()?; + + // Copy the actual file from staged to published location + storage.copy(&source_location, &published_commit.location.location)?; + + // Insert the published commit back + self.ascending_commit_files.insert(i, published_commit); + } + } + Ok(self) + } } diff --git a/kernel/src/path.rs b/kernel/src/path.rs index 95a7df0af..a4e372abf 100644 --- a/kernel/src/path.rs +++ b/kernel/src/path.rs @@ -214,6 +214,79 @@ impl ParsedLogPath { } } +impl ParsedLogPath { + /// Transform a staged commit into a published commit by updating its location + pub(crate) fn into_published(self) -> DeltaResult> { + if !matches!(self.file_type, LogPathFileType::StagedCommit) { + return Err(Error::internal_error( + "Unable to create a published path from a non-staged commit", + )); + } + + let published_filename = format!("{:020}.json", self.version); + let published_url = transform_staged_commit_url(self.location.location.clone())?; + + Ok(ParsedLogPath { + location: FileMeta { + location: published_url, + last_modified: self.location.last_modified, + size: self.location.size, + }, + filename: published_filename, + extension: "json".to_string(), + version: self.version, + file_type: LogPathFileType::Commit, + }) + } +} + +fn transform_staged_commit_url(mut url: Url) -> DeltaResult { + // Collect segments into owned strings to avoid borrowing issues + let segments: Vec = url + .path_segments() + .ok_or(Error::generic("cannot parse path segments"))? + .map(|s| s.to_string()) + .collect(); + + let staged_commits_index = segments + .iter() + .rposition(|s| s == "_staged_commits") + .ok_or(Error::generic("_staged_commits not found in path"))?; + + // Build new path: everything before _staged_commits + modified filename + let mut new_path = String::new(); + + // Add segments up to (but not including) _staged_commits + for (i, segment) in segments.iter().enumerate() { + if i >= staged_commits_index { + break; + } + if !new_path.is_empty() || !segment.is_empty() { + new_path.push('/'); + } + new_path.push_str(segment); + } + + // Add the modified filename (remove UUID) + if let Some(filename) = segments.get(staged_commits_index + 1) { + // Remove UUID from filename: 00000000000000000005.{uuid}.json -> 00000000000000000005.json + let new_filename = filename + .split('.') + .next() + .ok_or(Error::generic("invalid filename format"))? + .to_string() + + ".json"; + + if !new_path.is_empty() { + new_path.push('/'); + } + new_path.push_str(&new_filename); + } + + url.set_path(&new_path); + Ok(url) +} + impl ParsedLogPath { const DELTA_LOG_DIR: &'static str = "_delta_log/"; diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index b9db87736..6e2f57ce7 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -346,6 +346,22 @@ impl Snapshot { Transaction::try_new(self) } + #[cfg(feature = "catalog-managed")] + pub fn publish(mut self, engine: &dyn Engine) -> DeltaResult { + // FIXME: remove clone + self.log_segment = self.log_segment.clone().publish(engine)?; + + println!("Published log segment"); + for commit in &self.log_segment.ascending_commit_files { + println!("commit: {:?}", commit); + } + + // how to get committer + context? + // committer.published(self.version(), context)?; + + Ok(self) + } + /// Fetch the latest version of the provided `application_id` for this snapshot. Filters the txn based on the SetTransactionRetentionDuration property and lastUpdated /// /// Note that this method performs log replay (fetches and processes metadata from storage). diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 83064c0cb..9bd471764 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -654,6 +654,7 @@ impl CommittedTransaction { /// efficient than creating a new snapshot from scratch. pub fn post_commit_snapshot(&self, engine: &dyn Engine) -> DeltaResult { Snapshot::builder_from(self.transaction.read_snapshot.clone()) + // .with_log_tail(log_tail) .at_version(self.commit_version) .build(engine) } diff --git a/uc-catalog/src/lib.rs b/uc-catalog/src/lib.rs index 4f136a39b..e04ec7535 100644 --- a/uc-catalog/src/lib.rs +++ b/uc-catalog/src/lib.rs @@ -184,6 +184,26 @@ impl Committer for UCCommitter { }) }) } + + fn published( + &self, + version: Version, + context: &UCCommitterContext, + ) -> delta_kernel::DeltaResult<()> { + let request = + CommitRequest::ack_publish(&context.table_id, &context.table_uri, version as i64); + let handle = tokio::runtime::Handle::current(); + tokio::task::block_in_place(move || { + handle.block_on(async move { + match self.client.commit(request).await { + Ok(_) => Ok(()), + Err(e) => Err(delta_kernel::Error::Generic(format!( + "publish_commit failed: {e}" + ))), + } + }) + }) + } } #[cfg(test)] @@ -300,7 +320,7 @@ mod tests { ]; let table_url = Url::parse(&table_uri)?; - let (store, _path) = object_store::parse_url_opts(&table_url, options)?; + let (store, path) = object_store::parse_url_opts(&table_url, options)?; let store: Arc = store.into(); let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); @@ -315,11 +335,11 @@ mod tests { // do a write. // print the log - // use futures::stream::StreamExt; - // let mut stream = store.list(Some(&path)); - // while let Some(path) = stream.next().await { - // println!("object: {:?}", path.unwrap().location); - // } + use futures::stream::StreamExt; + let mut stream = store.list(Some(&path)); + while let Some(path) = stream.next().await { + println!("object: {:?}", path.unwrap().location); + } // let commit = store // .get(&object_store::path::Path::from("19a85dee-54bc-43a2-87ab-023d0ec16013/tables/cdac4b37-fc11-4148-ae02-512e6cc4e1bd/_delta_log/_staged_commits/00000000000000000001.5400fd23-8e08-4f87-8049-7b37431fb826.json")) @@ -332,17 +352,34 @@ mod tests { table_id: table_id.clone(), table_uri: table_uri.clone(), }; - match txn.commit(&engine, ctx)? { + let t = match txn.commit(&engine, ctx)? { CommitResult::CommittedTransaction(t) => { println!("🎉 committed version {}", t.version()); + Some(t) } CommitResult::ConflictedTransaction(t) => { println!("💥 commit conflicted at version {}", t.conflict_version); + None } CommitResult::RetryableTransaction(_) => { println!("we should retry..."); + None } - } + }; + + // FIXME! need to plumb log_tail + // let snapshot = t.unwrap().post_commit_snapshot(&engine)?; + let new_version = snapshot.version() + 1; + let snapshot = catalog + .load_snapshot_at(&table_id, &table_uri, new_version, &engine) + .await?; + let _published = Arc::into_inner(snapshot).unwrap().publish(&engine)?; + + // notify UC of publish. since Commmitter is tied to Transaction it's a bit hard to plumb + // this through to Snapshot... TODO + let request = CommitRequest::ack_publish(&table_id, &table_uri, new_version as i64); + client.commit(request).await?; + Ok(()) } } diff --git a/uc-client/src/models/commits.rs b/uc-client/src/models/commits.rs index 046b397cf..1aee33173 100644 --- a/uc-client/src/models/commits.rs +++ b/uc-client/src/models/commits.rs @@ -86,7 +86,8 @@ impl Commit { pub struct CommitRequest { pub table_id: String, pub table_uri: String, - pub commit_info: Commit, + #[serde(skip_serializing_if = "Option::is_none")] + pub commit_info: Option, #[serde(skip_serializing_if = "Option::is_none")] pub latest_backfilled_version: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -104,13 +105,28 @@ impl CommitRequest { Self { table_id: table_id.into(), table_uri: table_uri.into(), - commit_info, + commit_info: Some(commit_info), latest_backfilled_version: None, metadata: None, protocol: None, } } + pub fn ack_publish( + table_id: impl Into, + table_uri: impl Into, + last_backfilled_version: i64, + ) -> Self { + Self { + table_id: table_id.into(), + table_uri: table_uri.into(), + commit_info: None, + latest_backfilled_version: Some(last_backfilled_version), + metadata: None, + protocol: None, + } + } + pub fn with_latest_backfilled_version(mut self, version: i64) -> Self { self.latest_backfilled_version = Some(version); self