From edeb816c958f94650c05ca2babb677668e1e3aa8 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 23 Sep 2025 16:43:40 -0700 Subject: [PATCH 1/7] committer --- kernel/src/committer.rs | 70 ++++++++++++++++++++++++++++++++++ kernel/src/lib.rs | 5 +++ kernel/src/snapshot/builder.rs | 2 +- kernel/src/transaction/mod.rs | 56 +++++++++++++++++++-------- 4 files changed, 116 insertions(+), 17 deletions(-) create mode 100644 kernel/src/committer.rs diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs new file mode 100644 index 000000000..0b496375f --- /dev/null +++ b/kernel/src/committer.rs @@ -0,0 +1,70 @@ +use std::sync::Arc; + +use crate::path::ParsedLogPath; +use crate::{DeltaResult, Engine, Error, FilteredEngineData, Version}; + +use url::Url; + +#[derive(Debug)] +pub struct CommitMetadata { + pub(crate) commit_path: ParsedLogPath, + pub(crate) version: Version, +} + +impl CommitMetadata { + pub(crate) fn new(commit_path: ParsedLogPath, version: Version) -> Self { + Self { + commit_path, + version, + } + } +} + +#[derive(Debug)] +/// Result of committing a transaction. +pub enum CommitResponse { + Committed { version: Version }, + Conflict { version: Version }, +} + +pub trait Committer: Send + Sync { + fn commit( + &self, + engine: &dyn Engine, + actions: Box> + Send + '_>, + commit_metadata: CommitMetadata, + ) -> DeltaResult; +} + +pub(crate) struct FileSystemCommitter; + +impl FileSystemCommitter { + pub(crate) fn new() -> Arc { + Arc::new(Self {}) + } +} + +impl Committer for FileSystemCommitter { + fn commit( + &self, + engine: &dyn Engine, + actions: Box> + Send + '_>, + commit_metadata: CommitMetadata, + ) -> DeltaResult { + let json_handler = engine.json_handler(); + + match json_handler.write_json_file( + &commit_metadata.commit_path.location, + Box::new(actions), + false, + ) { + Ok(()) => Ok(CommitResponse::Committed { + version: commit_metadata.version, + }), + Err(Error::FileAlreadyExists(_)) => Ok(CommitResponse::Conflict { + version: commit_metadata.version, + }), + Err(e) => Err(e), + } + } +} diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 6f7635d7d..ee555a3f1 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -87,6 +87,7 @@ use self::schema::{DataType, SchemaRef}; mod action_reconciliation; pub mod actions; pub mod checkpoint; +mod committer; pub mod engine_data; pub mod error; pub mod expressions; @@ -182,6 +183,10 @@ pub type FileDataReadResult = (FileMeta, Box); pub type FileDataReadResultIterator = Box>> + Send>; +/// Type alias for an iterator of [`EngineData`] results. +pub(crate) type EngineDataResultIterator<'a> = + Box>> + Send + 'a>; + /// The metadata that describes an object. #[derive(Debug, Clone, PartialEq, Eq)] pub struct FileMeta { diff --git a/kernel/src/snapshot/builder.rs b/kernel/src/snapshot/builder.rs index 15cf59c62..860ed443c 100644 --- a/kernel/src/snapshot/builder.rs +++ b/kernel/src/snapshot/builder.rs @@ -1,7 +1,7 @@ //! Builder for creating [`Snapshot`] instances. +use crate::log_path::LogPath; use crate::log_segment::LogSegment; use crate::snapshot::SnapshotRef; -use crate::LogPath; use crate::{DeltaResult, Engine, Error, Snapshot, Version}; use url::Url; diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index a3c039718..6de968f82 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -9,6 +9,7 @@ use crate::actions::{ as_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, }; +use crate::committer::{CommitMetadata, CommitResponse, Committer, FileSystemCommitter}; use crate::engine_data::FilteredEngineData; use crate::error::Error; use crate::expressions::{ArrayData, Transform, UnaryExpressionOp::ToJson}; @@ -16,18 +17,14 @@ use crate::path::ParsedLogPath; use crate::row_tracking::{RowTrackingDomainMetadata, RowTrackingVisitor}; use crate::schema::{ArrayType, MapType, SchemaRef, StructField, StructType}; use crate::snapshot::SnapshotRef; -use crate::utils::current_time_ms; +use crate::utils::{current_time_ms, require}; use crate::{ - DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData, - RowVisitor, Version, + DataType, DeltaResult, Engine, EngineData, EngineDataResultIterator, Expression, ExpressionRef, + IntoEngineData, RowVisitor, Version, }; use delta_kernel_derive::internal_api; -/// Type alias for an iterator of [`EngineData`] results. -type EngineDataResultIterator<'a> = - Box>> + Send + 'a>; - -/// The static instance referenced by [`add_files_schema`] that doesn't contain the dataChange column. +/// The minimal (i.e., mandatory) fields in an add action. pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock = LazyLock::new(|| { Arc::new(StructType::new_unchecked(vec![ StructField::not_null("path", DataType::STRING), @@ -118,6 +115,7 @@ fn with_row_tracking_cols(schema: &SchemaRef) -> SchemaRef { /// ``` pub struct Transaction { read_snapshot: SnapshotRef, + committer: Option>, operation: Option, engine_info: Option, add_files_metadata: Vec>, @@ -164,6 +162,7 @@ impl Transaction { Ok(Transaction { read_snapshot, + committer: None, operation: None, engine_info: None, add_files_metadata: vec![], @@ -174,12 +173,39 @@ impl Transaction { }) } + /// Set the committer that will be used to commit this transaction. If not set, the default + /// filesystem-based committer will be used. Note that the default committer is only allowed + /// for non-catalog-managed tables. + #[cfg(feature = "catalog-managed")] + pub fn with_committer(mut self, committer: impl Into>) -> Self { + self.committer = Some(committer.into()); + self + } + /// Consume the transaction and commit it to the table. The result is a result of /// [CommitResult] with the following semantics: /// - Ok(CommitResult) for either success or a recoverable error (includes the failed /// transaction in case of a conflict so the user can retry, etc.) /// - Err(Error) indicates a non-retryable error (e.g. logic/validation error). pub fn commit(self, engine: &dyn Engine) -> DeltaResult { + // Step 0: Determine the committer to use + #[cfg(feature = "catalog-managed")] + if self.committer.is_none() { + require!( + !self.read_snapshot.table_configuration().protocol().is_catalog_managed(), + Error::generic("Cannot use the default committer for a catalog-managed table. Please provide a committer via Transaction::with_committer.") + ); + } + + let default_committer: Arc; + let committer = match self.committer.as_ref() { + Some(c) => c, + None => { + default_committer = FileSystemCommitter::new(); + &default_committer + } + }; + // Step 1: Check for duplicate app_ids and generate set transactions (`txn`) // Note: The commit info must always be the first action in the commit but we generate it in // step 2 to fail early on duplicate transaction appIds @@ -242,15 +268,13 @@ impl Transaction { // Convert EngineData to FilteredEngineData with all rows selected let filtered_actions = actions .map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected)); - - let json_handler = engine.json_handler(); - match json_handler.write_json_file(&commit_path.location, Box::new(filtered_actions), false) - { - Ok(()) => Ok(CommitResult::CommittedTransaction( - self.into_committed(commit_version), + let commit_metadata = CommitMetadata::new(commit_path, commit_version); + match committer.commit(engine, Box::new(filtered_actions), commit_metadata) { + Ok(CommitResponse::Committed { version }) => Ok(CommitResult::CommittedTransaction( + self.into_committed(version), )), - Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::ConflictedTransaction( - self.into_conflicted(commit_version), + Ok(CommitResponse::Conflict { version }) => Ok(CommitResult::ConflictedTransaction( + self.into_conflicted(version), )), // TODO: we may want to be more or less selective about what is retryable (this is tied // to the idea of "what kind of Errors should write_json_file return?") From ad0933a388af4401b8fe27d1f0355ee23e724ba5 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 29 Sep 2025 15:28:41 -0700 Subject: [PATCH 2/7] uc-client commit --- uc-client/src/client.rs | 18 ++++++- uc-client/src/models/commits.rs | 89 +++++++++++++++++++++++++++++++++ uc-client/src/models/mod.rs | 2 +- 3 files changed, 107 insertions(+), 2 deletions(-) diff --git a/uc-client/src/client.rs b/uc-client/src/client.rs index e1c4a9076..519753dc7 100644 --- a/uc-client/src/client.rs +++ b/uc-client/src/client.rs @@ -7,7 +7,7 @@ use url::Url; use crate::config::{ClientConfig, ClientConfigBuilder}; use crate::error::{Error, Result}; -use crate::models::commits::{CommitsRequest, CommitsResponse}; +use crate::models::commits::{CommitRequest, CommitResponse, CommitsRequest, CommitsResponse}; use crate::models::credentials::{CredentialsRequest, Operation, TemporaryTableCredentials}; use crate::models::tables::TablesResponse; @@ -64,6 +64,22 @@ impl UCClient { self.handle_response(response).await } + #[instrument(skip(self))] + pub async fn commit(&self, request: CommitRequest) -> Result { + let url = self.base_url.join("delta/preview/commits")?; + + let response = self + .execute_with_retry(|| { + self.client + .request(reqwest::Method::POST, url.clone()) + .json(&request) + .send() + }) + .await?; + + self.handle_response(response).await + } + #[instrument(skip(self))] pub async fn get_table(&self, table_name: &str) -> Result { let url = self.base_url.join(&format!("tables/{}", table_name))?; diff --git a/uc-client/src/models/commits.rs b/uc-client/src/models/commits.rs index 7d5dc1938..d9cab56b0 100644 --- a/uc-client/src/models/commits.rs +++ b/uc-client/src/models/commits.rs @@ -58,3 +58,92 @@ impl Commit { chrono::DateTime::from_timestamp_millis(self.file_modification_timestamp) } } + +// Structs for creating a new commit +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CommitRequest { + pub table_id: String, + pub table_uri: String, + pub commit_info: CommitInfo, + #[serde(skip_serializing_if = "Option::is_none")] + pub latest_backfilled_version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub protocol: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CommitInfo { + pub version: i64, + pub timestamp: i64, + pub file_name: String, + pub file_size: i64, + pub file_modification_timestamp: i64, + #[serde(default)] + pub is_disown_commit: bool, +} + +impl CommitRequest { + pub fn new( + table_id: impl Into, + table_uri: impl Into, + commit_info: CommitInfo, + ) -> Self { + Self { + table_id: table_id.into(), + table_uri: table_uri.into(), + commit_info, + latest_backfilled_version: None, + metadata: None, + protocol: None, + } + } + + pub fn with_latest_backfilled_version(mut self, version: i64) -> Self { + self.latest_backfilled_version = Some(version); + self + } + + pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self { + self.metadata = Some(metadata); + self + } + + pub fn with_protocol(mut self, protocol: serde_json::Value) -> Self { + self.protocol = Some(protocol); + self + } +} + +impl CommitInfo { + pub fn new( + version: i64, + timestamp: i64, + file_name: impl Into, + file_size: i64, + file_modification_timestamp: i64, + ) -> Self { + Self { + version, + timestamp, + file_name: file_name.into(), + file_size, + file_modification_timestamp, + is_disown_commit: false, + } + } + + pub fn with_disown_commit(mut self, disown: bool) -> Self { + self.is_disown_commit = disown; + self + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CommitResponse { + pub commit: Commit, +} diff --git a/uc-client/src/models/mod.rs b/uc-client/src/models/mod.rs index edf86ce57..7f0836263 100644 --- a/uc-client/src/models/mod.rs +++ b/uc-client/src/models/mod.rs @@ -2,6 +2,6 @@ pub mod commits; pub mod credentials; pub mod tables; -pub use commits::{Commit, CommitsRequest, CommitsResponse}; +pub use commits::{Commit, CommitInfo, CommitRequest, CommitResponse, CommitsRequest, CommitsResponse}; pub use credentials::{AwsTempCredentials, TemporaryTableCredentials}; pub use tables::TablesResponse; From c1d3b1adf4d0e17605fdf3a95cbc85cc706dad9b Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 30 Sep 2025 13:11:46 -0700 Subject: [PATCH 3/7] e2e poc --- kernel/src/actions/mod.rs | 5 - kernel/src/lib.rs | 8 +- kernel/src/log_path.rs | 2 + kernel/src/table_features/mod.rs | 7 + kernel/src/transaction/mod.rs | 13 ++ uc-catalog/Cargo.toml | 30 +++ uc-catalog/src/lib.rs | 338 +++++++++++++++++++++++++++++++ uc-client/src/client.rs | 7 +- uc-client/src/models/commits.rs | 68 +++---- uc-client/src/models/mod.rs | 2 +- 10 files changed, 428 insertions(+), 52 deletions(-) create mode 100644 uc-catalog/Cargo.toml create mode 100644 uc-catalog/src/lib.rs diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 77836d27b..057857912 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -485,11 +485,6 @@ impl Protocol { /// Check if writing to a table with this protocol is supported. That is: does the kernel /// support the specified protocol writer version and all enabled writer features? pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> { - #[cfg(feature = "catalog-managed")] - require!( - !self.is_catalog_managed(), - Error::unsupported("Writes are not yet supported for catalog-managed tables") - ); match &self.writer_features { Some(writer_features) if self.min_writer_version == 7 => { // if we're on version 7, make sure we support all the specified features diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index ee555a3f1..995684deb 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -87,7 +87,6 @@ use self::schema::{DataType, SchemaRef}; mod action_reconciliation; pub mod actions; pub mod checkpoint; -mod committer; pub mod engine_data; pub mod error; pub mod expressions; @@ -103,6 +102,11 @@ pub mod table_properties; pub mod transaction; pub(crate) mod transforms; +#[cfg(not(feature = "catalog-managed"))] +mod committer; +#[cfg(feature = "catalog-managed")] +pub mod committer; + pub use log_path::LogPath; mod row_tracking; @@ -184,7 +188,7 @@ pub type FileDataReadResultIterator = Box>> + Send>; /// Type alias for an iterator of [`EngineData`] results. -pub(crate) type EngineDataResultIterator<'a> = +pub type EngineDataResultIterator<'a> = Box>> + Send + 'a>; /// The metadata that describes an object. diff --git a/kernel/src/log_path.rs b/kernel/src/log_path.rs index 2584801bc..5bbf9bfee 100644 --- a/kernel/src/log_path.rs +++ b/kernel/src/log_path.rs @@ -92,6 +92,8 @@ mod test { assert_eq!(path.location, expected); } + // FIXME + #[ignore] #[test] fn test_staged_commit_path_creation_failures() { let last_modified = 1234567890i64; diff --git a/kernel/src/table_features/mod.rs b/kernel/src/table_features/mod.rs index 189eaf856..c6ad770ce 100644 --- a/kernel/src/table_features/mod.rs +++ b/kernel/src/table_features/mod.rs @@ -240,6 +240,13 @@ pub(crate) static SUPPORTED_WRITER_FEATURES: LazyLock> = Lazy WriterFeature::VariantType, WriterFeature::VariantTypePreview, WriterFeature::VariantShreddingPreview, + WriterFeature::V2Checkpoint, + WriterFeature::VacuumProtocolCheck, + WriterFeature::InCommitTimestamp, + #[cfg(feature = "catalog-managed")] + WriterFeature::CatalogManaged, + #[cfg(feature = "catalog-managed")] + WriterFeature::CatalogOwnedPreview, ] }); diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 6de968f82..7e95a55bd 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -578,6 +578,19 @@ impl Transaction { } } + pub fn hack_actions(&self, engine: &dyn Engine) -> EngineDataResultIterator<'_> { + let mut commit_info = CommitInfo::new( + self.commit_timestamp, + self.operation.clone(), + self.engine_info.clone(), + ); + commit_info.in_commit_timestamp = Some(self.commit_timestamp); + let commit_info_action = + commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine); + let actions = iter::once(commit_info_action); + Box::new(actions) + } + fn into_committed(self, version: Version) -> CommittedTransaction { let stats = PostCommitStats { commits_since_checkpoint: self.read_snapshot.log_segment().commits_since_checkpoint() diff --git a/uc-catalog/Cargo.toml b/uc-catalog/Cargo.toml new file mode 100644 index 000000000..efc3cf1b4 --- /dev/null +++ b/uc-catalog/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "uc-catalog" +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +rust-version.workspace = true +version.workspace = true + +# for cargo-release +[package.metadata.release] +release = false + +[dependencies] +delta_kernel = { path = "../kernel", features = ["catalog-managed"] } +uc-client = { path = "../uc-client" } +futures = "0.3" +itertools = "0.14" +object_store = "0.12.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1", features = ["full"] } +tracing = "0.1" +url = "2" +uuid = { version = "1", features = ["v4"] } + +[dev-dependencies] +delta_kernel = { path = "../kernel", features = ["arrow-56", "default-engine-rustls", "catalog-managed"] } diff --git a/uc-catalog/src/lib.rs b/uc-catalog/src/lib.rs new file mode 100644 index 000000000..8e72a9c9f --- /dev/null +++ b/uc-catalog/src/lib.rs @@ -0,0 +1,338 @@ +//! UCCatalog implements a high-level interface for interacting with Delta Tables in Unity Catalog. + +use std::sync::Arc; + +use delta_kernel::{Engine, LogPath, Snapshot, Version}; + +use uc_client::models::{Commit, CommitRequest}; +use uc_client::prelude::*; + +use itertools::Itertools; +use tracing::info; +use url::Url; + +/// The [UCCatalog] provides a high-level interface to interact with Delta Tables stored in Unity +/// Catalog. +pub struct UCCatalog<'a> { + client: &'a UCClient, +} + +impl<'a> UCCatalog<'a> { + pub fn new(client: &'a UCClient) -> Self { + UCCatalog { client } + } + + pub async fn load_snapshot( + &self, + table_id: &str, + table_uri: &str, + engine: &dyn Engine, + ) -> Result, Box> { + self.load_snapshot_inner(table_id, table_uri, None, engine) + .await + } + + pub async fn load_snapshot_at( + &self, + table_id: &str, + table_uri: &str, + version: Version, + engine: &dyn Engine, + ) -> Result, Box> { + self.load_snapshot_inner(table_id, table_uri, Some(version), engine) + .await + } + + pub(crate) async fn load_snapshot_inner( + &self, + table_id: &str, + table_uri: &str, + version: Option, + engine: &dyn Engine, + ) -> Result, Box> { + let table_uri = table_uri.to_string(); + let req = CommitsRequest { + table_id: table_id.to_string(), + table_uri: table_uri.clone(), + start_version: Some(0), + end_version: version.and_then(|v| v.try_into().ok()), + }; + // TODO: does it paginate? + let commits = self.client.get_commits(req).await?; + + // sort them + let mut commits = commits; + if let Some(c) = commits.commits.as_mut() { + c.sort_by_key(|c| c.version) + } + + // if commits are present, we ensure they are sorted+contiguous + if let Some(commits) = &commits.commits { + if !commits.windows(2).all(|w| w[1].version == w[0].version + 1) { + return Err("Received non-contiguous commit versions".into()); + } + } + + // we always get back the latest version from commits response, and pass that in to + // kernel's Snapshot builder. basically, load_table for the latest version always looks + // like a time travel query since we know the latest version ahead of time. + // + // note there is a weird edge case: if the table was just created it will return + // latest_table_version = -1, but the 0.json will exist in the _delta_log. + let version: Version = match version { + Some(v) => v, + None => match commits.latest_table_version { + -1 => 0, + i => i.try_into()?, + }, + }; + + // consume uc-client's Commit and hand back a delta_kernel LogPath + let table_url = Url::parse(&table_uri)?; + let commits: Vec<_> = commits + .commits + .unwrap_or_default() + .into_iter() + .map(|c| -> Result> { + LogPath::staged_commit( + table_url.clone(), + &c.file_name, + c.file_modification_timestamp, + c.file_size.try_into()?, + ) + .map_err(|e| e.into()) + }) + .try_collect()?; + + info!("commits for kernel: {:?}\n", commits); + + Snapshot::builder_for(Url::parse(&(table_uri + "/"))?) + .at_version(version) + .with_log_tail(commits) + .build(engine) + .map_err(|e| e.into()) + } +} + +/// A [UCCommitter] is a Unity Catalog [Committer] implementation for committing to delta tables in +/// UC. +pub struct UCCommitter { + client: Arc, +} + +impl UCCommitter { + pub fn new(client: Arc) -> Self { + UCCommitter { client } + } +} + +use delta_kernel::committer::Committer; +use delta_kernel::EngineDataResultIterator; + +impl Committer for UCCommitter { + fn commit<'a>( + &self, + engine: &'a dyn Engine, + actions: EngineDataResultIterator<'a>, + commit_metadata: delta_kernel::committer::CommitMetadata, + ) -> delta_kernel::DeltaResult { + // let commit_req = CommitRequest::new( + // commit_metadata.table_id, + // commit_metadata.table_uri, + // Commit::new( + // commit_metadata.version as i64, + // commit_metadata.timestamp, + // format!("{}.json", commit_metadata.version), + // 123, + // 123456789, + // ), + // ); + // self.client.commit(commit_req) // ASYNC! + todo!() + } +} + +#[cfg(test)] +mod tests { + use std::env; + + use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; + use delta_kernel::engine::default::DefaultEngine; + use delta_kernel::transaction::CommitResult; + use object_store::ObjectStore; + + use super::*; + + // We could just re-export UCClient's get_table to not require consumers to directly import + // uc_client themselves. + async fn get_table( + client: &UCClient, + table_name: &str, + ) -> Result<(String, String), Box> { + let res = client.get_table(table_name).await?; + let table_id = res.table_id; + let table_uri = res.storage_location; + + info!( + "[GET TABLE] got table_id: {}, table_uri: {}\n", + table_id, table_uri + ); + + Ok((table_id, table_uri)) + } + + // ignored test which you can run manually to play around with reading a UC table. run with: + // `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t read_uc_table --nocapture -- --ignored` + #[ignore] + #[tokio::test] + async fn read_uc_table() -> Result<(), Box> { + let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set"); + let token = env::var("TOKEN").expect("TOKEN environment variable not set"); + let table_name = env::var("TABLENAME").expect("TABLENAME environment variable not set"); + + // build UC client, get table info and credentials + let client = UCClient::builder(endpoint, &token).build()?; + let (table_id, table_uri) = get_table(&client, &table_name).await?; + let creds = client + .get_credentials(&table_id, Operation::Read) + .await + .map_err(|e| format!("Failed to get credentials: {}", e))?; + + // build catalog + let catalog = UCCatalog::new(&client); + + // TODO: support non-AWS + let creds = creds + .aws_temp_credentials + .ok_or("No AWS temporary credentials found")?; + + let options = [ + ("region", "us-west-2"), + ("access_key_id", &creds.access_key_id), + ("secret_access_key", &creds.secret_access_key), + ("session_token", &creds.session_token), + ]; + + let table_url = Url::parse(&table_uri)?; + let (store, path) = object_store::parse_url_opts(&table_url, options)?; + let store: Arc<_> = store.into(); + + info!("created object store: {:?}\npath: {:?}\n", store, path); + + let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + + // read table + let snapshot = catalog + .load_snapshot(&table_id, &table_uri, &engine) + .await?; + // or time travel + // let snapshot = catalog.load_snapshot_at(&table, 2).await?; + + println!("🎉 loaded snapshot: {snapshot:?}"); + + Ok(()) + } + + // ignored test which you can run manually to play around with writing to a UC table. run with: + // `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t write_uc_table --nocapture -- --ignored` + #[ignore] + #[tokio::test] + async fn write_uc_table() -> Result<(), Box> { + let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set"); + let token = env::var("TOKEN").expect("TOKEN environment variable not set"); + let table_name = env::var("TABLENAME").expect("TABLENAME environment variable not set"); + + // build UC client, get table info and credentials + let client = Arc::new(UCClient::builder(endpoint, &token).build()?); + let (table_id, table_uri) = get_table(&client, &table_name).await?; + let creds = client + .get_credentials(&table_id, Operation::ReadWrite) + .await + .map_err(|e| format!("Failed to get credentials: {}", e))?; + + // build catalog + let catalog = UCCatalog::new(&client); + + // TODO: support non-AWS + let creds = creds + .aws_temp_credentials + .ok_or("No AWS temporary credentials found")?; + + let options = [ + ("region", "us-west-2"), + ("access_key_id", &creds.access_key_id), + ("secret_access_key", &creds.secret_access_key), + ("session_token", &creds.session_token), + ]; + + let table_url = Url::parse(&table_uri)?; + let (store, path) = object_store::parse_url_opts(&table_url, options)?; + let store: Arc = store.into(); + + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let committer = Arc::new(UCCommitter::new(client.clone())); + let snapshot = catalog + .load_snapshot(&table_id, &table_uri, &engine) + .await?; + println!("latest snapshot version: {:?}", snapshot.version()); + let txn = snapshot + .clone() + .transaction()? + .with_committer(committer as Arc); + let _write_context = txn.get_write_context(); + // do a write. + + let last_modified = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_millis() as i64; + let size: i64 = 1500; + let uuid = uuid::Uuid::new_v4(); + let version = snapshot.version() + 1; + let filename = format!("{version:020}.{uuid}.json"); + let mut commit_path = table_url.clone(); + commit_path.path_segments_mut().unwrap().extend(&[ + "_delta_log", + "_staged_commits", + &filename, + ]); + + // commit info only + let actions = txn.hack_actions(&engine); + engine + .json_handler() + .write_json_file(&commit_path, actions, false)?; + + // print the log + use futures::stream::StreamExt; + let mut stream = store.list(Some(&path)); + while let Some(path) = stream.next().await { + println!("object: {:?}", path.unwrap().location); + } + + let commit_req = CommitRequest::new( + table_id, + table_uri, + Commit::new( + version.try_into().unwrap(), + last_modified, + filename, + size, + last_modified, + ), + ); + client.commit(commit_req).await?; + + // match txn.commit(&engine)? { + // CommitResult::CommittedTransaction(t) => { + // println!("🎉 committed version {}", t.version()); + // } + // CommitResult::ConflictedTransaction(t) => { + // println!("💥 commit conflicted at version {}", t.conflict_version); + // } + // CommitResult::RetryableTransaction(_) => { + // println!("we should retry..."); + // } + // } + Ok(()) + } +} diff --git a/uc-client/src/client.rs b/uc-client/src/client.rs index 519753dc7..d996e370c 100644 --- a/uc-client/src/client.rs +++ b/uc-client/src/client.rs @@ -61,13 +61,18 @@ impl UCClient { }) .await?; - self.handle_response(response).await + let res = self.handle_response(response).await; + println!("\nGet commits response: {:?}\n", res); + res } #[instrument(skip(self))] pub async fn commit(&self, request: CommitRequest) -> Result { let url = self.base_url.join("delta/preview/commits")?; + let json = serde_json::to_string_pretty(&request).unwrap_or_default(); + println!("Committing: {json}"); + let response = self .execute_with_retry(|| { self.client diff --git a/uc-client/src/models/commits.rs b/uc-client/src/models/commits.rs index d9cab56b0..046b397cf 100644 --- a/uc-client/src/models/commits.rs +++ b/uc-client/src/models/commits.rs @@ -50,6 +50,28 @@ pub struct Commit { } impl Commit { + pub fn new( + version: i64, + timestamp: i64, + file_name: impl Into, + file_size: i64, + file_modification_timestamp: i64, + ) -> Self { + Self { + version, + timestamp, + file_name: file_name.into(), + file_size, + file_modification_timestamp, + is_disown_commit: Some(false), + } + } + + pub fn with_disown_commit(mut self, disown: bool) -> Self { + self.is_disown_commit = Some(disown); + self + } + pub fn timestamp_as_datetime(&self) -> Option> { chrono::DateTime::from_timestamp_millis(self.timestamp) } @@ -61,11 +83,10 @@ impl Commit { // Structs for creating a new commit #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] pub struct CommitRequest { pub table_id: String, pub table_uri: String, - pub commit_info: CommitInfo, + pub commit_info: Commit, #[serde(skip_serializing_if = "Option::is_none")] pub latest_backfilled_version: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -74,23 +95,11 @@ pub struct CommitRequest { pub protocol: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CommitInfo { - pub version: i64, - pub timestamp: i64, - pub file_name: String, - pub file_size: i64, - pub file_modification_timestamp: i64, - #[serde(default)] - pub is_disown_commit: bool, -} - impl CommitRequest { pub fn new( table_id: impl Into, table_uri: impl Into, - commit_info: CommitInfo, + commit_info: Commit, ) -> Self { Self { table_id: table_id.into(), @@ -118,32 +127,5 @@ impl CommitRequest { } } -impl CommitInfo { - pub fn new( - version: i64, - timestamp: i64, - file_name: impl Into, - file_size: i64, - file_modification_timestamp: i64, - ) -> Self { - Self { - version, - timestamp, - file_name: file_name.into(), - file_size, - file_modification_timestamp, - is_disown_commit: false, - } - } - - pub fn with_disown_commit(mut self, disown: bool) -> Self { - self.is_disown_commit = disown; - self - } -} - #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CommitResponse { - pub commit: Commit, -} +pub struct CommitResponse {} diff --git a/uc-client/src/models/mod.rs b/uc-client/src/models/mod.rs index 7f0836263..8fe92111e 100644 --- a/uc-client/src/models/mod.rs +++ b/uc-client/src/models/mod.rs @@ -2,6 +2,6 @@ pub mod commits; pub mod credentials; pub mod tables; -pub use commits::{Commit, CommitInfo, CommitRequest, CommitResponse, CommitsRequest, CommitsResponse}; +pub use commits::{Commit, CommitRequest, CommitResponse, CommitsRequest, CommitsResponse}; pub use credentials::{AwsTempCredentials, TemporaryTableCredentials}; pub use tables::TablesResponse; From 682c63a5de467817ba8a297ab23c9b1c0649e221 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 30 Sep 2025 21:32:14 -0700 Subject: [PATCH 4/7] publish --- kernel/src/committer.rs | 4 ++ kernel/src/engine/default/filesystem.rs | 11 ++++ kernel/src/engine/sync/storage.rs | 6 +++ kernel/src/lib.rs | 3 ++ kernel/src/listed_log_files.rs | 3 ++ kernel/src/log_segment.rs | 29 ++++++++++ kernel/src/path.rs | 71 +++++++++++++++++++++++++ kernel/src/snapshot.rs | 16 ++++++ uc-catalog/src/lib.rs | 59 ++++++++++++++++++++ uc-client/src/models/commits.rs | 20 ++++++- 10 files changed, 220 insertions(+), 2 deletions(-) diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs index 0b496375f..a58bdc63d 100644 --- a/kernel/src/committer.rs +++ b/kernel/src/committer.rs @@ -34,6 +34,10 @@ pub trait Committer: Send + Sync { actions: Box> + Send + '_>, commit_metadata: CommitMetadata, ) -> DeltaResult; + + fn published(&self, _version: Version) -> DeltaResult<()> { + Ok(()) + } } pub(crate) struct FileSystemCommitter; diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 08d03107c..b2fd0bc70 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -175,6 +175,17 @@ impl StorageHandler for ObjectStoreStorageHandler { Ok(Box::new(receiver.into_iter())) } + + fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()> { + let from_path = Path::from_url_path(from.path())?; + let to_path = Path::from_url_path(to.path())?; + let store = self.inner.clone(); + println!("Copying from {from_path:?} to {to_path:?}"); + // FIXME: ? copy_if_not_exists doesn't work on S3 + self.task_executor + .block_on(async move { store.copy(&from_path, &to_path).await }) + .map_err(|e| Error::Generic(format!("Failed to copy file: {e}"))) + } } #[cfg(test)] diff --git a/kernel/src/engine/sync/storage.rs b/kernel/src/engine/sync/storage.rs index fd48e7f8e..7865ff5cb 100644 --- a/kernel/src/engine/sync/storage.rs +++ b/kernel/src/engine/sync/storage.rs @@ -70,6 +70,12 @@ impl StorageHandler for SyncStorageHandler { }); Ok(Box::new(iter)) } + + fn copy(&self, _from: &Url, _to: &Url) -> DeltaResult<()> { + Err(Error::unsupported( + "Copy not yet implemented for SyncStorageHandler", + )) + } } #[cfg(test)] diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 995684deb..f6ec8aa46 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -542,6 +542,9 @@ pub trait StorageHandler: AsAny { &self, files: Vec, ) -> DeltaResult>>>; + + /// Performs a copy. Must not overwrite. + fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()>; } /// Provides JSON handling functionality to Delta Kernel. diff --git a/kernel/src/listed_log_files.rs b/kernel/src/listed_log_files.rs index c209fddf1..1429cedb4 100644 --- a/kernel/src/listed_log_files.rs +++ b/kernel/src/listed_log_files.rs @@ -631,6 +631,9 @@ mod list_log_files_with_log_tail_tests { ) -> DeltaResult>>> { panic!("read_files used"); } + fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()> { + panic!("copy used from {from} to {to}"); + } } // when log_tail covers the entire requested range, no filesystem listing should occur diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index d0cb65b93..1124870bb 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -588,4 +588,33 @@ impl LogSegment { ); Ok(()) } + + // TODO: decide on API: + // 1. should this mutate? should it consume self and hand back a new one? + // 2. should we take finer-grained args like which version to publish up to? + pub(crate) fn publish(mut self, engine: &dyn Engine) -> DeltaResult { + let storage = engine.storage_handler(); + + // Transform staged commits into published commits + for i in 0..self.ascending_commit_files.len() { + if matches!( + self.ascending_commit_files[i].file_type, + LogPathFileType::StagedCommit + ) { + // Clone the staged commit to get source location before transforming + let source_location = self.ascending_commit_files[i].location.location.clone(); + + // Take ownership of the commit to transform it + let staged_commit = self.ascending_commit_files.remove(i); + let published_commit = staged_commit.into_published()?; + + // Copy the actual file from staged to published location + storage.copy(&source_location, &published_commit.location.location)?; + + // Insert the published commit back + self.ascending_commit_files.insert(i, published_commit); + } + } + Ok(self) + } } diff --git a/kernel/src/path.rs b/kernel/src/path.rs index 4a071bf32..59dbe46e8 100644 --- a/kernel/src/path.rs +++ b/kernel/src/path.rs @@ -257,6 +257,77 @@ impl ParsedLogPath { None => Err(Error::generic("Commit file contains no actions")), } } + + /// Transform a staged commit into a published commit by updating its location + pub(crate) fn into_published(self) -> DeltaResult> { + if !matches!(self.file_type, LogPathFileType::StagedCommit) { + return Err(Error::internal_error( + "Unable to create a published path from a non-staged commit", + )); + } + + let published_filename = format!("{:020}.json", self.version); + let published_url = transform_staged_commit_url(self.location.location.clone())?; + + Ok(ParsedLogPath { + location: FileMeta { + location: published_url, + last_modified: self.location.last_modified, + size: self.location.size, + }, + filename: published_filename, + extension: "json".to_string(), + version: self.version, + file_type: LogPathFileType::Commit, + }) + } +} + +fn transform_staged_commit_url(mut url: Url) -> DeltaResult { + // Collect segments into owned strings to avoid borrowing issues + let segments: Vec = url + .path_segments() + .ok_or(Error::generic("cannot parse path segments"))? + .map(|s| s.to_string()) + .collect(); + + let staged_commits_index = segments + .iter() + .rposition(|s| s == "_staged_commits") + .ok_or(Error::generic("_staged_commits not found in path"))?; + + // Build new path: everything before _staged_commits + modified filename + let mut new_path = String::new(); + + // Add segments up to (but not including) _staged_commits + for (i, segment) in segments.iter().enumerate() { + if i >= staged_commits_index { + break; + } + if !new_path.is_empty() || !segment.is_empty() { + new_path.push('/'); + } + new_path.push_str(segment); + } + + // Add the modified filename (remove UUID) + if let Some(filename) = segments.get(staged_commits_index + 1) { + // Remove UUID from filename: 00000000000000000005.{uuid}.json -> 00000000000000000005.json + let new_filename = filename + .split('.') + .next() + .ok_or(Error::generic("invalid filename format"))? + .to_string() + + ".json"; + + if !new_path.is_empty() { + new_path.push('/'); + } + new_path.push_str(&new_filename); + } + + url.set_path(&new_path); + Ok(url) } impl ParsedLogPath { diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 9665a7497..2fed36545 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -332,6 +332,22 @@ impl Snapshot { Transaction::try_new(self) } + #[cfg(feature = "catalog-managed")] + pub fn publish(mut self, engine: &dyn Engine) -> DeltaResult { + // FIXME: remove clone + self.log_segment = self.log_segment.clone().publish(engine)?; + + println!("Published log segment"); + for commit in &self.log_segment.ascending_commit_files { + println!("commit: {:?}", commit); + } + + // how to get committer + context? + // committer.published(self.version(), context)?; + + Ok(self) + } + /// Fetch the latest version of the provided `application_id` for this snapshot. Filters the txn based on the SetTransactionRetentionDuration property and lastUpdated /// /// Note that this method performs log replay (fetches and processes metadata from storage). diff --git a/uc-catalog/src/lib.rs b/uc-catalog/src/lib.rs index 8e72a9c9f..eb48d2f73 100644 --- a/uc-catalog/src/lib.rs +++ b/uc-catalog/src/lib.rs @@ -150,6 +150,26 @@ impl Committer for UCCommitter { // self.client.commit(commit_req) // ASYNC! todo!() } + + fn published( + &self, + version: Version, + context: &UCCommitterContext, + ) -> delta_kernel::DeltaResult<()> { + let request = + CommitRequest::ack_publish(&context.table_id, &context.table_uri, version as i64); + let handle = tokio::runtime::Handle::current(); + tokio::task::block_in_place(move || { + handle.block_on(async move { + match self.client.commit(request).await { + Ok(_) => Ok(()), + Err(e) => Err(delta_kernel::Error::Generic(format!( + "publish_commit failed: {e}" + ))), + } + }) + }) + } } #[cfg(test)] @@ -333,6 +353,45 @@ mod tests { // println!("we should retry..."); // } // } + // let commit = store + // .get(&object_store::path::Path::from("19a85dee-54bc-43a2-87ab-023d0ec16013/tables/cdac4b37-fc11-4148-ae02-512e6cc4e1bd/_delta_log/_staged_commits/00000000000000000001.5400fd23-8e08-4f87-8049-7b37431fb826.json")) + // .await + // .unwrap(); + // let bytes = commit.bytes().await?; + // println!("commit file contents:\n{}", String::from_utf8_lossy(&bytes)); + + // let ctx = UCCommitterContext { + // table_id: table_id.clone(), + // table_uri: table_uri.clone(), + // }; + // let t = match txn.commit(&engine, ctx)? { + // CommitResult::CommittedTransaction(t) => { + // println!("🎉 committed version {}", t.version()); + // Some(t) + // } + // CommitResult::ConflictedTransaction(t) => { + // println!("💥 commit conflicted at version {}", t.conflict_version); + // None + // } + // CommitResult::RetryableTransaction(_) => { + // println!("we should retry..."); + // None + // } + // }; + + // // FIXME! need to plumb log_tail + // // let snapshot = t.unwrap().post_commit_snapshot(&engine)?; + // let new_version = snapshot.version() + 1; + // let snapshot = catalog + // .load_snapshot_at(&table_id, &table_uri, new_version, &engine) + // .await?; + // let _published = Arc::into_inner(snapshot).unwrap().publish(&engine)?; + + // // notify UC of publish. since Commmitter is tied to Transaction it's a bit hard to plumb + // // this through to Snapshot... TODO + // let request = CommitRequest::ack_publish(&table_id, &table_uri, new_version as i64); + // client.commit(request).await?; + Ok(()) } } diff --git a/uc-client/src/models/commits.rs b/uc-client/src/models/commits.rs index 046b397cf..1aee33173 100644 --- a/uc-client/src/models/commits.rs +++ b/uc-client/src/models/commits.rs @@ -86,7 +86,8 @@ impl Commit { pub struct CommitRequest { pub table_id: String, pub table_uri: String, - pub commit_info: Commit, + #[serde(skip_serializing_if = "Option::is_none")] + pub commit_info: Option, #[serde(skip_serializing_if = "Option::is_none")] pub latest_backfilled_version: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -104,13 +105,28 @@ impl CommitRequest { Self { table_id: table_id.into(), table_uri: table_uri.into(), - commit_info, + commit_info: Some(commit_info), latest_backfilled_version: None, metadata: None, protocol: None, } } + pub fn ack_publish( + table_id: impl Into, + table_uri: impl Into, + last_backfilled_version: i64, + ) -> Self { + Self { + table_id: table_id.into(), + table_uri: table_uri.into(), + commit_info: None, + latest_backfilled_version: Some(last_backfilled_version), + metadata: None, + protocol: None, + } + } + pub fn with_latest_backfilled_version(mut self, version: i64) -> Self { self.latest_backfilled_version = Some(version); self From d19dfd78003b8f8c275fd8a2aa8b8e2f5e0c87fa Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 7 Oct 2025 11:18:45 -0700 Subject: [PATCH 5/7] remove the 'no-catalog-managed writes' test --- kernel/src/actions/mod.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 057857912..914dc561b 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -1446,26 +1446,6 @@ mod tests { assert_eq!(parse_features::(features), expected); } - #[test] - fn test_no_catalog_managed_writes() { - let protocol = Protocol::try_new( - 3, - 7, - Some([ReaderFeature::CatalogManaged]), - Some([WriterFeature::CatalogManaged]), - ) - .unwrap(); - assert!(protocol.ensure_write_supported().is_err()); - let protocol = Protocol::try_new( - 3, - 7, - Some([ReaderFeature::CatalogOwnedPreview]), - Some([WriterFeature::CatalogOwnedPreview]), - ) - .unwrap(); - assert!(protocol.ensure_write_supported().is_err()); - } - #[test] fn test_into_engine_data() { let engine = ExprEngine::new(); From 182dfb4477c1a27bc74f0659889cd2cea21079b2 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 7 Oct 2025 14:02:51 -0700 Subject: [PATCH 6/7] finish up --- Cargo.toml | 1 + kernel/src/committer.rs | 10 +- kernel/src/log_path.rs | 1 + kernel/src/snapshot.rs | 5 +- kernel/src/transaction/mod.rs | 5 +- uc-catalog/src/lib.rs | 244 +++++++++++++++++----------------- uc-client/src/lib.rs | 1 - 7 files changed, 141 insertions(+), 126 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2161343cd..8d96286ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "feature-tests", "mem-test", "uc-client", # WIP: this is an experimental UC client for catalog-managed table work + "uc-catalog", # WIP: this is an experimental UC catalog for catalog-managed table work ] # note that in addition to the members above, the workspace includes examples: # - inspect-table diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs index a58bdc63d..1a275d175 100644 --- a/kernel/src/committer.rs +++ b/kernel/src/committer.rs @@ -7,15 +7,17 @@ use url::Url; #[derive(Debug)] pub struct CommitMetadata { - pub(crate) commit_path: ParsedLogPath, - pub(crate) version: Version, + pub commit_path: ParsedLogPath, + pub version: Version, + pub timestamp: i64, } impl CommitMetadata { - pub(crate) fn new(commit_path: ParsedLogPath, version: Version) -> Self { + pub(crate) fn new(commit_path: ParsedLogPath, version: Version, timestamp: i64) -> Self { Self { commit_path, version, + timestamp, } } } @@ -34,7 +36,9 @@ pub trait Committer: Send + Sync { actions: Box> + Send + '_>, commit_metadata: CommitMetadata, ) -> DeltaResult; +} +pub trait Publisher: Send + Sync { fn published(&self, _version: Version) -> DeltaResult<()> { Ok(()) } diff --git a/kernel/src/log_path.rs b/kernel/src/log_path.rs index 5bbf9bfee..70abb1476 100644 --- a/kernel/src/log_path.rs +++ b/kernel/src/log_path.rs @@ -46,6 +46,7 @@ impl LogPath { // TODO: we should introduce TablePath/LogPath types which enforce checks like ending '/' // require table_root ends with '/' + println!("table_root: {}", table_root); require!( table_root.path().ends_with('/'), Error::generic("table root must be a directory-like URL ending with '/'") diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 2fed36545..26bacb466 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -8,6 +8,7 @@ use crate::actions::domain_metadata::domain_metadata_configuration; use crate::actions::set_transaction::SetTransactionScanner; use crate::actions::INTERNAL_DOMAIN_PREFIX; use crate::checkpoint::CheckpointWriter; +use crate::committer::Publisher; use crate::listed_log_files::ListedLogFiles; use crate::log_segment::LogSegment; use crate::path::ParsedLogPath; @@ -333,7 +334,7 @@ impl Snapshot { } #[cfg(feature = "catalog-managed")] - pub fn publish(mut self, engine: &dyn Engine) -> DeltaResult { + pub fn publish(mut self, engine: &dyn Engine, publisher: &dyn Publisher) -> DeltaResult { // FIXME: remove clone self.log_segment = self.log_segment.clone().publish(engine)?; @@ -343,7 +344,7 @@ impl Snapshot { } // how to get committer + context? - // committer.published(self.version(), context)?; + publisher.published(self.version())?; Ok(self) } diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 7e95a55bd..d0d3ee9c4 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -268,7 +268,8 @@ impl Transaction { // Convert EngineData to FilteredEngineData with all rows selected let filtered_actions = actions .map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected)); - let commit_metadata = CommitMetadata::new(commit_path, commit_version); + let commit_metadata = + CommitMetadata::new(commit_path, commit_version, self.commit_timestamp); match committer.commit(engine, Box::new(filtered_actions), commit_metadata) { Ok(CommitResponse::Committed { version }) => Ok(CommitResult::CommittedTransaction( self.into_committed(version), @@ -578,9 +579,11 @@ impl Transaction { } } + // FIXME: remove pub fn hack_actions(&self, engine: &dyn Engine) -> EngineDataResultIterator<'_> { let mut commit_info = CommitInfo::new( self.commit_timestamp, + Some(self.commit_timestamp), self.operation.clone(), self.engine_info.clone(), ); diff --git a/uc-catalog/src/lib.rs b/uc-catalog/src/lib.rs index eb48d2f73..a5ef76a6c 100644 --- a/uc-catalog/src/lib.rs +++ b/uc-catalog/src/lib.rs @@ -2,7 +2,9 @@ use std::sync::Arc; -use delta_kernel::{Engine, LogPath, Snapshot, Version}; +use delta_kernel::committer::{CommitMetadata, CommitResponse, Committer, Publisher}; +use delta_kernel::Error as DeltaError; +use delta_kernel::{DeltaResult, Engine, FilteredEngineData, LogPath, Snapshot, Version}; use uc_client::models::{Commit, CommitRequest}; use uc_client::prelude::*; @@ -88,7 +90,11 @@ impl<'a> UCCatalog<'a> { }; // consume uc-client's Commit and hand back a delta_kernel LogPath - let table_url = Url::parse(&table_uri)?; + let mut table_url = Url::parse(&table_uri)?; + if !table_url.path().ends_with('/') { + let path = format!("{}/", table_url.path()); + table_url.set_path(&path); + } let commits: Vec<_> = commits .commits .unwrap_or_default() @@ -114,50 +120,100 @@ impl<'a> UCCatalog<'a> { } } -/// A [UCCommitter] is a Unity Catalog [Committer] implementation for committing to delta tables in -/// UC. +/// A [UCCommitter] is a Unity Catalog [Committer] implementation for committing to a specific +/// delta table in UC. pub struct UCCommitter { client: Arc, + table_id: String, + table_uri: String, } impl UCCommitter { - pub fn new(client: Arc) -> Self { - UCCommitter { client } + pub fn new( + client: Arc, + table_id: impl Into, + table_uri: impl Into, + ) -> Self { + UCCommitter { + client, + table_id: table_id.into(), + table_uri: table_uri.into(), + } } } -use delta_kernel::committer::Committer; -use delta_kernel::EngineDataResultIterator; - impl Committer for UCCommitter { - fn commit<'a>( + fn commit( &self, - engine: &'a dyn Engine, - actions: EngineDataResultIterator<'a>, - commit_metadata: delta_kernel::committer::CommitMetadata, - ) -> delta_kernel::DeltaResult { - // let commit_req = CommitRequest::new( - // commit_metadata.table_id, - // commit_metadata.table_uri, - // Commit::new( - // commit_metadata.version as i64, - // commit_metadata.timestamp, - // format!("{}.json", commit_metadata.version), - // 123, - // 123456789, - // ), - // ); - // self.client.commit(commit_req) // ASYNC! - todo!() + engine: &dyn Engine, + actions: Box> + Send + '_>, + commit_metadata: CommitMetadata, + ) -> DeltaResult { + let uuid = uuid::Uuid::new_v4(); + let filename = format!( + "{version:020}.{uuid}.json", + version = commit_metadata.version + ); + // FIXME use table path from commit_metadata? + let mut commit_path = Url::parse(&self.table_uri)?; + commit_path.path_segments_mut().unwrap().extend(&[ + "_delta_log", + "_staged_commits", + &filename, + ]); + + // commit info only + engine + .json_handler() + .write_json_file(&commit_path, Box::new(actions), false)?; + + let mut other = Url::parse(&self.table_uri)?; + other.path_segments_mut().unwrap().extend(&[ + "_delta_log", + "_staged_commits", + &format!("{:020}", commit_metadata.version), + ]); + let committed = engine + .storage_handler() + .list_from(&other)? + .next() + .unwrap() + .unwrap(); + println!("wrote commit file: {:?}", committed); + + let commit_req = CommitRequest::new( + self.table_id.clone(), + self.table_uri.clone(), + Commit::new( + commit_metadata.version.try_into().unwrap(), + commit_metadata.timestamp, + filename, + committed.size as i64, + committed.last_modified, + ), + ); + // FIXME + let handle = tokio::runtime::Handle::current(); + tokio::task::block_in_place(|| { + handle.block_on(async move { + self.client + .commit(commit_req) + .await + .map_err(|e| DeltaError::Generic(format!("UC commit error: {e}"))) + }) + })?; + Ok(CommitResponse::Committed { + version: commit_metadata.version, + }) } +} - fn published( - &self, - version: Version, - context: &UCCommitterContext, - ) -> delta_kernel::DeltaResult<()> { - let request = - CommitRequest::ack_publish(&context.table_id, &context.table_uri, version as i64); +impl Publisher for UCCommitter { + fn published(&self, version: Version) -> delta_kernel::DeltaResult<()> { + let version = version + .try_into() + .map_err(|e| delta_kernel::Error::Generic(format!("version conversion error: {e}")))?; + let request = CommitRequest::ack_publish(&self.table_id, &self.table_uri, version); let handle = tokio::runtime::Handle::current(); tokio::task::block_in_place(move || { handle.block_on(async move { @@ -256,7 +312,7 @@ mod tests { // ignored test which you can run manually to play around with writing to a UC table. run with: // `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t write_uc_table --nocapture -- --ignored` #[ignore] - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn write_uc_table() -> Result<(), Box> { let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set"); let token = env::var("TOKEN").expect("TOKEN environment variable not set"); @@ -286,11 +342,15 @@ mod tests { ]; let table_url = Url::parse(&table_uri)?; - let (store, path) = object_store::parse_url_opts(&table_url, options)?; + let (store, _path) = object_store::parse_url_opts(&table_url, options)?; let store: Arc = store.into(); let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); - let committer = Arc::new(UCCommitter::new(client.clone())); + let committer = Arc::new(UCCommitter::new( + client.clone(), + table_id.clone(), + table_uri.clone(), + )); let snapshot = catalog .load_snapshot(&table_id, &table_uri, &engine) .await?; @@ -298,100 +358,46 @@ mod tests { let txn = snapshot .clone() .transaction()? - .with_committer(committer as Arc); + .with_committer(committer.clone() as Arc); let _write_context = txn.get_write_context(); // do a write. - let last_modified = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH)? - .as_millis() as i64; - let size: i64 = 1500; - let uuid = uuid::Uuid::new_v4(); - let version = snapshot.version() + 1; - let filename = format!("{version:020}.{uuid}.json"); - let mut commit_path = table_url.clone(); - commit_path.path_segments_mut().unwrap().extend(&[ - "_delta_log", - "_staged_commits", - &filename, - ]); - - // commit info only - let actions = txn.hack_actions(&engine); - engine - .json_handler() - .write_json_file(&commit_path, actions, false)?; - // print the log - use futures::stream::StreamExt; - let mut stream = store.list(Some(&path)); - while let Some(path) = stream.next().await { - println!("object: {:?}", path.unwrap().location); + // use futures::stream::StreamExt; + // let mut stream = store.list(Some(&path)); + // while let Some(path) = stream.next().await { + // println!("object: {:?}", path.unwrap()); + // } + + match txn.commit(&engine)? { + CommitResult::CommittedTransaction(t) => { + println!("🎉 committed version {}", t.commit_version()); + // FIXME! ideally should use post-commit snapshot (need to make sure to plumb + // through log tail) + // let snapshot = t.unwrap().post_commit_snapshot(&engine)?; + let snapshot = catalog + .load_snapshot_at(&table_id, &table_uri, t.commit_version(), &engine) + .await?; + let _published = Arc::into_inner(snapshot) + .unwrap() + .publish(&engine, committer.as_ref())?; + } + CommitResult::ConflictedTransaction(t) => { + println!("💥 commit conflicted at version {}", t.conflict_version()); + } + CommitResult::RetryableTransaction(_) => { + println!("we should retry..."); + } } - let commit_req = CommitRequest::new( - table_id, - table_uri, - Commit::new( - version.try_into().unwrap(), - last_modified, - filename, - size, - last_modified, - ), - ); - client.commit(commit_req).await?; - - // match txn.commit(&engine)? { - // CommitResult::CommittedTransaction(t) => { - // println!("🎉 committed version {}", t.version()); - // } - // CommitResult::ConflictedTransaction(t) => { - // println!("💥 commit conflicted at version {}", t.conflict_version); - // } - // CommitResult::RetryableTransaction(_) => { - // println!("we should retry..."); - // } - // } + // some debug // let commit = store - // .get(&object_store::path::Path::from("19a85dee-54bc-43a2-87ab-023d0ec16013/tables/cdac4b37-fc11-4148-ae02-512e6cc4e1bd/_delta_log/_staged_commits/00000000000000000001.5400fd23-8e08-4f87-8049-7b37431fb826.json")) + // .get(&object_store::path::Path::from("19a85dee-54bc-43a2-87ab-023d0ec16013/tables/3553b6b9-91ce-47d7-b2f9-8eb4ba5e93f5/_delta_log/_staged_commits/00000000000000000001.5cabaf8b-037d-4a84-b493-2e3d8ab899df.json")) // .await // .unwrap(); // let bytes = commit.bytes().await?; // println!("commit file contents:\n{}", String::from_utf8_lossy(&bytes)); - // let ctx = UCCommitterContext { - // table_id: table_id.clone(), - // table_uri: table_uri.clone(), - // }; - // let t = match txn.commit(&engine, ctx)? { - // CommitResult::CommittedTransaction(t) => { - // println!("🎉 committed version {}", t.version()); - // Some(t) - // } - // CommitResult::ConflictedTransaction(t) => { - // println!("💥 commit conflicted at version {}", t.conflict_version); - // None - // } - // CommitResult::RetryableTransaction(_) => { - // println!("we should retry..."); - // None - // } - // }; - - // // FIXME! need to plumb log_tail - // // let snapshot = t.unwrap().post_commit_snapshot(&engine)?; - // let new_version = snapshot.version() + 1; - // let snapshot = catalog - // .load_snapshot_at(&table_id, &table_uri, new_version, &engine) - // .await?; - // let _published = Arc::into_inner(snapshot).unwrap().publish(&engine)?; - - // // notify UC of publish. since Commmitter is tied to Transaction it's a bit hard to plumb - // // this through to Snapshot... TODO - // let request = CommitRequest::ack_publish(&table_id, &table_uri, new_version as i64); - // client.commit(request).await?; - Ok(()) } } diff --git a/uc-client/src/lib.rs b/uc-client/src/lib.rs index 46c90f1ad..c24b9c4a7 100644 --- a/uc-client/src/lib.rs +++ b/uc-client/src/lib.rs @@ -34,7 +34,6 @@ pub use error::{Error, Result}; #[doc(hidden)] pub mod prelude { pub use crate::client::UCClient; - pub use crate::error::Result; pub use crate::models::{ commits::{Commit, CommitsRequest, CommitsResponse}, credentials::{Operation, TemporaryTableCredentials}, From 268c26d2b361b373e2e260fef94f5fd5596da03d Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 15 Oct 2025 15:25:35 -0700 Subject: [PATCH 7/7] cleanup --- kernel/src/committer.rs | 15 ++++++++------- kernel/src/log_segment.rs | 12 ++++++++++++ kernel/src/log_segment/tests.rs | 6 ++++++ kernel/src/snapshot.rs | 6 +----- kernel/src/transaction/mod.rs | 8 ++++++-- uc-catalog/src/lib.rs | 33 +++++++-------------------------- uc-client/src/models/commits.rs | 3 ++- 7 files changed, 42 insertions(+), 41 deletions(-) diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs index 1a275d175..c930c9b56 100644 --- a/kernel/src/committer.rs +++ b/kernel/src/committer.rs @@ -10,14 +10,21 @@ pub struct CommitMetadata { pub commit_path: ParsedLogPath, pub version: Version, pub timestamp: i64, + pub latest_published_version: Option, } impl CommitMetadata { - pub(crate) fn new(commit_path: ParsedLogPath, version: Version, timestamp: i64) -> Self { + pub(crate) fn new( + commit_path: ParsedLogPath, + version: Version, + timestamp: i64, + latest_published_version: Option, + ) -> Self { Self { commit_path, version, timestamp, + latest_published_version, } } } @@ -38,12 +45,6 @@ pub trait Committer: Send + Sync { ) -> DeltaResult; } -pub trait Publisher: Send + Sync { - fn published(&self, _version: Version) -> DeltaResult<()> { - Ok(()) - } -} - pub(crate) struct FileSystemCommitter; impl FileSystemCommitter { diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 1124870bb..a0da1bb23 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -61,6 +61,8 @@ pub(crate) struct LogSegment { /// The latest commit file found during listing, which may not be part of the /// contiguous segment but is needed for ICT timestamp reading pub latest_commit_file: Option, + /// The latest published commit version. If there are no published commits, this is None. + pub latest_published_version: Option, } impl LogSegment { @@ -123,6 +125,15 @@ impl LogSegment { ); } + // FIXME: this 'misses' published commits that currently overlap with log_tail (staged + // commits) + // get the latest published version from the commit files + let latest_published_version = ascending_commit_files + .partition_point(|c| matches!(c.file_type, LogPathFileType::Commit)) + .checked_sub(1) + .and_then(|idx| ascending_commit_files.get(idx)) + .map(|c| c.version); + Ok(LogSegment { end_version: effective_version, checkpoint_version, @@ -132,6 +143,7 @@ impl LogSegment { checkpoint_parts, latest_crc_file, latest_commit_file, + latest_published_version, }) } diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index aabffc421..84e67a8db 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -2336,3 +2336,9 @@ fn test_publish_validation() { panic!("Expected Error::Generic"); } } + +fn test_log_segment_latest_published_version() { + // log segment with commits commits 0,1,2,3, / staged 4,5 -> expect 3 + // log segment with commits commits 0,1,2 -> expect 2 + // log segment with staged commits 2, 3 -> expect None +} diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 26bacb466..1e500dd1c 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -8,7 +8,6 @@ use crate::actions::domain_metadata::domain_metadata_configuration; use crate::actions::set_transaction::SetTransactionScanner; use crate::actions::INTERNAL_DOMAIN_PREFIX; use crate::checkpoint::CheckpointWriter; -use crate::committer::Publisher; use crate::listed_log_files::ListedLogFiles; use crate::log_segment::LogSegment; use crate::path::ParsedLogPath; @@ -334,7 +333,7 @@ impl Snapshot { } #[cfg(feature = "catalog-managed")] - pub fn publish(mut self, engine: &dyn Engine, publisher: &dyn Publisher) -> DeltaResult { + pub fn publish(mut self, engine: &dyn Engine) -> DeltaResult { // FIXME: remove clone self.log_segment = self.log_segment.clone().publish(engine)?; @@ -343,9 +342,6 @@ impl Snapshot { println!("commit: {:?}", commit); } - // how to get committer + context? - publisher.published(self.version())?; - Ok(self) } diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index d0d3ee9c4..7561db8eb 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -268,8 +268,12 @@ impl Transaction { // Convert EngineData to FilteredEngineData with all rows selected let filtered_actions = actions .map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected)); - let commit_metadata = - CommitMetadata::new(commit_path, commit_version, self.commit_timestamp); + let commit_metadata = CommitMetadata::new( + commit_path, + commit_version, + self.commit_timestamp, + self.read_snapshot.log_segment().latest_published_version, + ); match committer.commit(engine, Box::new(filtered_actions), commit_metadata) { Ok(CommitResponse::Committed { version }) => Ok(CommitResult::CommittedTransaction( self.into_committed(version), diff --git a/uc-catalog/src/lib.rs b/uc-catalog/src/lib.rs index a5ef76a6c..b9f67c6ef 100644 --- a/uc-catalog/src/lib.rs +++ b/uc-catalog/src/lib.rs @@ -2,7 +2,7 @@ use std::sync::Arc; -use delta_kernel::committer::{CommitMetadata, CommitResponse, Committer, Publisher}; +use delta_kernel::committer::{CommitMetadata, CommitResponse, Committer}; use delta_kernel::Error as DeltaError; use delta_kernel::{DeltaResult, Engine, FilteredEngineData, LogPath, Snapshot, Version}; @@ -59,7 +59,6 @@ impl<'a> UCCatalog<'a> { start_version: Some(0), end_version: version.and_then(|v| v.try_into().ok()), }; - // TODO: does it paginate? let commits = self.client.get_commits(req).await?; // sort them @@ -122,6 +121,7 @@ impl<'a> UCCatalog<'a> { /// A [UCCommitter] is a Unity Catalog [Committer] implementation for committing to a specific /// delta table in UC. +#[derive(Debug, Clone)] pub struct UCCommitter { client: Arc, table_id: String, @@ -162,7 +162,6 @@ impl Committer for UCCommitter { &filename, ]); - // commit info only engine .json_handler() .write_json_file(&commit_path, Box::new(actions), false)?; @@ -191,6 +190,9 @@ impl Committer for UCCommitter { committed.size as i64, committed.last_modified, ), + commit_metadata + .latest_published_version + .map(|v| v.try_into().unwrap()), // FIXME ); // FIXME let handle = tokio::runtime::Handle::current(); @@ -208,26 +210,6 @@ impl Committer for UCCommitter { } } -impl Publisher for UCCommitter { - fn published(&self, version: Version) -> delta_kernel::DeltaResult<()> { - let version = version - .try_into() - .map_err(|e| delta_kernel::Error::Generic(format!("version conversion error: {e}")))?; - let request = CommitRequest::ack_publish(&self.table_id, &self.table_uri, version); - let handle = tokio::runtime::Handle::current(); - tokio::task::block_in_place(move || { - handle.block_on(async move { - match self.client.commit(request).await { - Ok(_) => Ok(()), - Err(e) => Err(delta_kernel::Error::Generic(format!( - "publish_commit failed: {e}" - ))), - } - }) - }) - } -} - #[cfg(test)] mod tests { use std::env; @@ -378,9 +360,8 @@ mod tests { let snapshot = catalog .load_snapshot_at(&table_id, &table_uri, t.commit_version(), &engine) .await?; - let _published = Arc::into_inner(snapshot) - .unwrap() - .publish(&engine, committer.as_ref())?; + let published = Arc::into_inner(snapshot).unwrap().publish(&engine)?; + println!("published snapshot: {published:?}"); } CommitResult::ConflictedTransaction(t) => { println!("💥 commit conflicted at version {}", t.conflict_version()); diff --git a/uc-client/src/models/commits.rs b/uc-client/src/models/commits.rs index 1aee33173..f6e040f5c 100644 --- a/uc-client/src/models/commits.rs +++ b/uc-client/src/models/commits.rs @@ -101,12 +101,13 @@ impl CommitRequest { table_id: impl Into, table_uri: impl Into, commit_info: Commit, + latest_backfilled_version: Option, ) -> Self { Self { table_id: table_id.into(), table_uri: table_uri.into(), commit_info: Some(commit_info), - latest_backfilled_version: None, + latest_backfilled_version, metadata: None, protocol: None, }