diff --git a/Cargo.toml b/Cargo.toml index b8a9c76d5..176d3c86f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,8 @@ members = [ "kernel/examples/*", "test-utils", "feature-tests", - "uc-client", # WIP: this is an experimental UC client for catalog-managed table work + "uc-client", # WIP: this is an experimental UC client for catalog-managed table work + "uc-catalog", # WIP: this is an experimental UC catalog implementation ] # note that in addition to the members above, the workspace includes examples: # - inspect-table diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 8bb3681b0..24f6dd3f6 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -112,13 +112,13 @@ pub unsafe extern "C" fn commit( // TODO: for now this removes the enum, which prevents doing any conflict resolution. We should fix // this by making the commit function return the enum somehow. match txn.commit(engine.as_ref()) { - Ok(CommitResult::Committed { - version: v, - post_commit_stats: _, - }) => Ok(v), - Ok(CommitResult::Conflict(_, v)) => Err(delta_kernel::Error::Generic(format!( - "commit conflict at version {v}" - ))), + Ok(CommitResult::CommittedTransaction(committed)) => Ok(committed.version()), + Ok(CommitResult::RetryableTransaction(_)) => Err(delta_kernel::Error::unsupported( + "commit failed: retryable transaction not supported in FFI (yet)", + )), + Ok(CommitResult::ConflictedTransaction(conflicted)) => Err(delta_kernel::Error::Generic( + format!("commit conflict at version {}", conflicted.conflict_version), + )), Err(e) => Err(e), } .into_extern_result(&extern_engine) diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 138724ff4..a669a2c76 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -18,7 +18,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; -use delta_kernel::transaction::CommitResult; +use delta_kernel::transaction::{CommitResult, RetryableTransaction}; use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef}; /// An example program that writes to a Delta table and creates it if necessary. @@ -99,23 +99,38 @@ async fn try_main() -> DeltaResult<()> { // Add the file metadata to the transaction txn.add_files(file_metadata); - // Commit the transaction - match txn.commit(&engine)? { - CommitResult::Committed { version, .. } => { - println!("✓ Committed transaction at version {version}"); - println!("✓ Successfully wrote {} rows to the table", cli.num_rows); + // Commit the transaction (in a simple retry loop) + let mut retries = 0; + let committed = loop { + if retries > 5 { + return Err(Error::generic( + "Exceeded maximum 5 retries for committing transaction", + )); + } + txn = match txn.commit(&engine)? { + CommitResult::CommittedTransaction(committed) => break committed, + CommitResult::ConflictedTransaction(conflicted) => { + let conflicting_version = conflicted.conflict_version; + println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); + return Err(Error::generic("Commit failed")); + } + CommitResult::RetryableTransaction(RetryableTransaction { transaction, error }) => { + println!("✗ Failed to commit, retrying... retryable error: {error}"); + transaction + } + }; + retries += 1; + }; - // Read and display the data - read_and_display_data(&url, engine).await?; - println!("✓ Successfully read data from the table"); + let version = committed.version(); + println!("✓ Committed transaction at version {version}"); + println!("✓ Successfully wrote {} rows to the table", cli.num_rows); - Ok(()) - } - CommitResult::Conflict(_, conflicting_version) => { - println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); - Err(Error::generic("Commit failed")) - } - } + // Read and display the data + read_and_display_data(&url, engine).await?; + println!("✓ Successfully read data from the table"); + + Ok(()) } /// Creates a new Delta table or gets an existing one. diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 0625fbd04..52b72b72f 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -485,11 +485,6 @@ impl Protocol { /// Check if writing to a table with this protocol is supported. That is: does the kernel /// support the specified protocol writer version and all enabled writer features? pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> { - #[cfg(feature = "catalog-managed")] - require!( - !self.is_catalog_managed(), - Error::unsupported("Writes are not yet supported for catalog-managed tables") - ); match &self.writer_features { Some(writer_features) if self.min_writer_version == 7 => { // if we're on version 7, make sure we support all the specified features diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs new file mode 100644 index 000000000..408e40eda --- /dev/null +++ b/kernel/src/committer.rs @@ -0,0 +1,66 @@ +use std::sync::Arc; + +use crate::path::ParsedLogPath; +use crate::{DeltaResult, Engine, EngineDataResultIterator, Error, Version}; + +use url::Url; + +#[derive(Debug)] +pub struct CommitMetadata { + pub commit_path: ParsedLogPath, + pub version: Version, +} + +impl CommitMetadata { + pub(crate) fn new(commit_path: ParsedLogPath, version: Version) -> Self { + Self { + commit_path, + version, + } + } +} + +#[derive(Debug)] +/// Result of committing a transaction. +pub enum CommitResponse { + Committed { version: Version }, + Conflict { version: Version }, +} + +pub trait Committer: Send + Sync { + type Context; + + fn commit( + &self, + engine: &dyn Engine, + actions: EngineDataResultIterator<'_>, + version: Version, + context: &Self::Context, + ) -> DeltaResult; + + fn published(&self, version: Version, context: &Self::Context) -> DeltaResult<()> { + let _ = (version, context); + Ok(()) + } +} + +pub struct FileSystemCommitter; + +impl Committer for FileSystemCommitter { + type Context = Url; // Table root + fn commit( + &self, + engine: &dyn Engine, + actions: EngineDataResultIterator<'_>, + version: Version, + context: &Self::Context, + ) -> DeltaResult { + let path = ParsedLogPath::new_commit(context, version)?; + let json_handler = engine.json_handler(); + match json_handler.write_json_file(&path.location, Box::new(actions), false) { + Ok(()) => Ok(CommitResponse::Committed { version }), + Err(Error::FileAlreadyExists(_)) => Ok(CommitResponse::Conflict { version }), + Err(e) => Err(e), + } + } +} diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 08d03107c..b2fd0bc70 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -175,6 +175,17 @@ impl StorageHandler for ObjectStoreStorageHandler { Ok(Box::new(receiver.into_iter())) } + + fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()> { + let from_path = Path::from_url_path(from.path())?; + let to_path = Path::from_url_path(to.path())?; + let store = self.inner.clone(); + println!("Copying from {from_path:?} to {to_path:?}"); + // FIXME: ? copy_if_not_exists doesn't work on S3 + self.task_executor + .block_on(async move { store.copy(&from_path, &to_path).await }) + .map_err(|e| Error::Generic(format!("Failed to copy file: {e}"))) + } } #[cfg(test)] diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 228c3ed61..6a9ee3800 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -104,6 +104,11 @@ pub mod table_properties; pub mod transaction; pub(crate) mod transforms; +#[cfg(not(feature = "catalog-managed"))] +mod committer; +#[cfg(feature = "catalog-managed")] +pub mod committer; + pub use log_path::LogPath; mod row_tracking; @@ -183,6 +188,10 @@ pub type FileDataReadResult = (FileMeta, Box); pub type FileDataReadResultIterator = Box>> + Send>; +/// Type alias for an iterator of [`EngineData`] results. +pub type EngineDataResultIterator<'a> = + Box>> + Send + 'a>; + /// The metadata that describes an object. #[derive(Debug, Clone, PartialEq, Eq)] pub struct FileMeta { @@ -534,6 +543,9 @@ pub trait StorageHandler: AsAny { &self, files: Vec, ) -> DeltaResult>>>; + + /// Performs a copy. Must not overwrite. + fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()>; } /// Provides JSON handling functionality to Delta Kernel. diff --git a/kernel/src/log_path.rs b/kernel/src/log_path.rs index 2584801bc..5a4091451 100644 --- a/kernel/src/log_path.rs +++ b/kernel/src/log_path.rs @@ -12,7 +12,7 @@ use url::Url; /// Today, a `LogPath` is a file in the `_delta_log` directory of a Delta table; in the future, /// this will expand to support providing inline data in the log path itself. #[derive(Debug, Clone, PartialEq)] -pub struct LogPath(ParsedLogPath); +pub struct LogPath(pub ParsedLogPath); impl From for ParsedLogPath { fn from(p: LogPath) -> Self { @@ -44,18 +44,13 @@ impl LogPath { size: FileSize, ) -> DeltaResult { // TODO: we should introduce TablePath/LogPath types which enforce checks like ending '/' - - // require table_root ends with '/' - require!( - table_root.path().ends_with('/'), - Error::generic("table root must be a directory-like URL ending with '/'") - ); - let location = table_root - .join("_delta_log/")? - .join("_staged_commits/")? - .join(filename)?; + let mut commit_path = table_root.clone(); + commit_path + .path_segments_mut() + .map_err(|()| Error::invalid_table_location(table_root))? + .extend(&["_delta_log", "_staged_commits", filename]); let file_meta = FileMeta { - location, + location: commit_path, last_modified, size, }; @@ -92,6 +87,8 @@ mod test { assert_eq!(path.location, expected); } + // FIXME + #[ignore] #[test] fn test_staged_commit_path_creation_failures() { let last_modified = 1234567890i64; diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index a656b6654..ab15ed209 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -570,4 +570,30 @@ impl LogSegment { debug_assert!(to_sub <= self.end_version); self.end_version - to_sub } + + // TODO: decide on API: + // 1. should this mutate? should it consume self and hand back a new one? + // 2. should we take finer-grained args like which version to publish up to? + pub(crate) fn publish(mut self, engine: &dyn Engine) -> DeltaResult { + let storage = engine.storage_handler(); + + // Transform staged commits into published commits + for i in 0..self.ascending_commit_files.len() { + if matches!(self.ascending_commit_files[i].file_type, LogPathFileType::StagedCommit) { + // Clone the staged commit to get source location before transforming + let source_location = self.ascending_commit_files[i].location.location.clone(); + + // Take ownership of the commit to transform it + let staged_commit = self.ascending_commit_files.remove(i); + let published_commit = staged_commit.into_published()?; + + // Copy the actual file from staged to published location + storage.copy(&source_location, &published_commit.location.location)?; + + // Insert the published commit back + self.ascending_commit_files.insert(i, published_commit); + } + } + Ok(self) + } } diff --git a/kernel/src/path.rs b/kernel/src/path.rs index e94752b12..a4e372abf 100644 --- a/kernel/src/path.rs +++ b/kernel/src/path.rs @@ -52,8 +52,7 @@ pub(crate) enum LogPathFileType { /// the _delta_log we may see _staged_commits/00000000000000000000.{uuid}.json, but we MUST NOT /// include those in listing, as only the catalog can tell us which are valid commits. #[derive(Debug, Clone, PartialEq, Eq)] -#[internal_api] -pub(crate) struct ParsedLogPath { +pub struct ParsedLogPath { pub location: Location, #[allow(unused)] pub filename: String, @@ -215,6 +214,79 @@ impl ParsedLogPath { } } +impl ParsedLogPath { + /// Transform a staged commit into a published commit by updating its location + pub(crate) fn into_published(self) -> DeltaResult> { + if !matches!(self.file_type, LogPathFileType::StagedCommit) { + return Err(Error::internal_error( + "Unable to create a published path from a non-staged commit", + )); + } + + let published_filename = format!("{:020}.json", self.version); + let published_url = transform_staged_commit_url(self.location.location.clone())?; + + Ok(ParsedLogPath { + location: FileMeta { + location: published_url, + last_modified: self.location.last_modified, + size: self.location.size, + }, + filename: published_filename, + extension: "json".to_string(), + version: self.version, + file_type: LogPathFileType::Commit, + }) + } +} + +fn transform_staged_commit_url(mut url: Url) -> DeltaResult { + // Collect segments into owned strings to avoid borrowing issues + let segments: Vec = url + .path_segments() + .ok_or(Error::generic("cannot parse path segments"))? + .map(|s| s.to_string()) + .collect(); + + let staged_commits_index = segments + .iter() + .rposition(|s| s == "_staged_commits") + .ok_or(Error::generic("_staged_commits not found in path"))?; + + // Build new path: everything before _staged_commits + modified filename + let mut new_path = String::new(); + + // Add segments up to (but not including) _staged_commits + for (i, segment) in segments.iter().enumerate() { + if i >= staged_commits_index { + break; + } + if !new_path.is_empty() || !segment.is_empty() { + new_path.push('/'); + } + new_path.push_str(segment); + } + + // Add the modified filename (remove UUID) + if let Some(filename) = segments.get(staged_commits_index + 1) { + // Remove UUID from filename: 00000000000000000005.{uuid}.json -> 00000000000000000005.json + let new_filename = filename + .split('.') + .next() + .ok_or(Error::generic("invalid filename format"))? + .to_string() + + ".json"; + + if !new_path.is_empty() { + new_path.push('/'); + } + new_path.push_str(&new_filename); + } + + url.set_path(&new_path); + Ok(url) +} + impl ParsedLogPath { const DELTA_LOG_DIR: &'static str = "_delta_log/"; diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 5a7d4b013..6e2f57ce7 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -8,6 +8,7 @@ use crate::actions::domain_metadata::domain_metadata_configuration; use crate::actions::set_transaction::SetTransactionScanner; use crate::actions::{Metadata, Protocol, INTERNAL_DOMAIN_PREFIX}; use crate::checkpoint::CheckpointWriter; +use crate::committer::FileSystemCommitter; use crate::listed_log_files::ListedLogFiles; use crate::log_segment::LogSegment; use crate::scan::ScanBuilder; @@ -341,10 +342,26 @@ impl Snapshot { } /// Create a [`Transaction`] for this `SnapshotRef`. - pub fn transaction(self: Arc) -> DeltaResult { + pub fn transaction(self: Arc) -> DeltaResult> { Transaction::try_new(self) } + #[cfg(feature = "catalog-managed")] + pub fn publish(mut self, engine: &dyn Engine) -> DeltaResult { + // FIXME: remove clone + self.log_segment = self.log_segment.clone().publish(engine)?; + + println!("Published log segment"); + for commit in &self.log_segment.ascending_commit_files { + println!("commit: {:?}", commit); + } + + // how to get committer + context? + // committer.published(self.version(), context)?; + + Ok(self) + } + /// Fetch the latest version of the provided `application_id` for this snapshot. Filters the txn based on the SetTransactionRetentionDuration property and lastUpdated /// /// Note that this method performs log replay (fetches and processes metadata from storage). diff --git a/kernel/src/snapshot/builder.rs b/kernel/src/snapshot/builder.rs index 15cf59c62..edc91dd44 100644 --- a/kernel/src/snapshot/builder.rs +++ b/kernel/src/snapshot/builder.rs @@ -1,7 +1,7 @@ //! Builder for creating [`Snapshot`] instances. use crate::log_segment::LogSegment; use crate::snapshot::SnapshotRef; -use crate::LogPath; +use crate::log_path::LogPath; use crate::{DeltaResult, Engine, Error, Snapshot, Version}; use url::Url; diff --git a/kernel/src/table_features/mod.rs b/kernel/src/table_features/mod.rs index 10ebed0d9..9b712b4fa 100644 --- a/kernel/src/table_features/mod.rs +++ b/kernel/src/table_features/mod.rs @@ -239,6 +239,13 @@ pub(crate) static SUPPORTED_WRITER_FEATURES: LazyLock> = Lazy WriterFeature::VariantType, WriterFeature::VariantTypePreview, WriterFeature::VariantShreddingPreview, + WriterFeature::V2Checkpoint, + WriterFeature::VacuumProtocolCheck, + WriterFeature::InCommitTimestamp, + #[cfg(feature = "catalog-managed")] + WriterFeature::CatalogManaged, + #[cfg(feature = "catalog-managed")] + WriterFeature::CatalogOwnedPreview, ] }); diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 88d34b658..9bd471764 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -9,22 +9,19 @@ use crate::actions::{ as_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, }; +use crate::committer::{CommitMetadata, CommitResponse, Committer, FileSystemCommitter}; use crate::error::Error; use crate::expressions::{ArrayData, Transform, UnaryExpressionOp::ToJson}; use crate::path::ParsedLogPath; use crate::row_tracking::{RowTrackingDomainMetadata, RowTrackingVisitor}; use crate::schema::{ArrayType, MapType, SchemaRef, StructField, StructType}; use crate::snapshot::SnapshotRef; -use crate::utils::current_time_ms; +use crate::utils::{current_time_ms, require}; use crate::{ - DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData, - RowVisitor, Version, + DataType, DeltaResult, Engine, EngineData, EngineDataResultIterator, Expression, ExpressionRef, + IntoEngineData, LogPath, RowVisitor, Snapshot, Version, }; -/// Type alias for an iterator of [`EngineData`] results. -type EngineDataResultIterator<'a> = - Box>> + Send + 'a>; - /// The minimal (i.e., mandatory) fields in an add action. pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock = LazyLock::new(|| { Arc::new(StructType::new_unchecked(vec![ @@ -113,8 +110,9 @@ fn with_row_tracking_cols(schema: &SchemaRef) -> SchemaRef { /// // commit! (consume the transaction) /// txn.commit(&engine)?; /// ``` -pub struct Transaction { +pub struct Transaction { read_snapshot: SnapshotRef, + committer: Option, operation: Option, engine_info: Option, add_files_metadata: Vec>, @@ -130,7 +128,7 @@ pub struct Transaction { domain_metadatas: Vec, } -impl std::fmt::Debug for Transaction { +impl std::fmt::Debug for Transaction { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(&format!( "Transaction {{ read_snapshot version: {}, engine_info: {} }}", @@ -140,7 +138,7 @@ impl std::fmt::Debug for Transaction { } } -impl Transaction { +impl Transaction { /// Create a new transaction from a snapshot. The snapshot will be used to read the current /// state of the table (e.g. to read the current version). /// @@ -159,6 +157,7 @@ impl Transaction { Ok(Transaction { read_snapshot, + committer: Some(FileSystemCommitter {}), operation: None, engine_info: None, add_files_metadata: vec![], @@ -167,10 +166,57 @@ impl Transaction { domain_metadatas: vec![], }) } +} + +impl Transaction { + /// Set the committer that will be used to commit this transaction. If not set, the default + /// filesystem-based committer will be used. Note that the default committer is only allowed + /// for non-catalog-managed tables. + #[cfg(feature = "catalog-managed")] + pub fn with_committer(self, committer: impl Into) -> Transaction + where + T: Committer, + { + // self.committer = Some(committer.into()); + // self + Transaction { + read_snapshot: self.read_snapshot, + committer: Some(committer.into()), + operation: self.operation, + engine_info: self.engine_info, + add_files_metadata: self.add_files_metadata, + set_transactions: self.set_transactions, + commit_timestamp: self.commit_timestamp, + domain_metadatas: self.domain_metadatas, + } + } +} + +impl Transaction { + /// Consume the transaction and commit it to the table. The result is a result of + /// [CommitResult] with the following semantics: + /// - Ok(CommitResult) for either success or a recoverable error (includes the failed + /// transaction in case of a conflict so the user can retry, etc.) + /// - Err(Error) indicates a non-retryable error (e.g. logic/validation error). + pub fn commit(self, engine: &dyn Engine, context: C::Context) -> DeltaResult> { + // Step 0: Determine the committer to use + #[cfg(feature = "catalog-managed")] + if self.committer.is_none() { + require!( + !self.read_snapshot.table_configuration().protocol().is_catalog_managed(), + Error::generic("Cannot use the default committer for a catalog-managed table. Please provide a committer via Transaction::with_committer.") + ); + } + + // let default_committer: FileSystemCommitter; + // let committer = match self.committer.as_ref() { + // Some(c) => c, + // None => { + // default_committer = FileSystemCommitter::new(); + // &default_committer + // } + // }; - /// Consume the transaction and commit it to the table. The result is a [CommitResult] which - /// will include the failed transaction in case of a conflict so the user can retry. - pub fn commit(self, engine: &dyn Engine) -> DeltaResult { // Step 1: Check for duplicate app_ids and generate set transactions (`txn`) // Note: The commit info must always be the first action in the commit but we generate it in // step 2 to fail early on duplicate transaction appIds @@ -194,11 +240,12 @@ impl Transaction { .map(|txn| txn.into_engine_data(get_log_txn_schema().clone(), engine)); // Step 2: Construct commit info and initialize the action iterator - let commit_info = CommitInfo::new( + let mut commit_info = CommitInfo::new( self.commit_timestamp, self.operation.clone(), self.engine_info.clone(), ); + commit_info.in_commit_timestamp = Some(self.commit_timestamp); let commit_info_action = commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine); @@ -212,32 +259,26 @@ impl Transaction { self.generate_domain_metadata_actions(engine, row_tracking_domain_metadata)?; // Step 5: Commit the actions as a JSON file to the Delta log - let commit_path = - ParsedLogPath::new_commit(self.read_snapshot.table_root(), commit_version)?; + // let commit_path = + // ParsedLogPath::new_commit(self.read_snapshot.table_root(), commit_version)?; let actions = iter::once(commit_info_action) .chain(add_actions) .chain(set_transaction_actions) .chain(domain_metadata_actions); - let json_handler = engine.json_handler(); - match json_handler.write_json_file(&commit_path.location, Box::new(actions), false) { - Ok(()) => Ok(CommitResult::Committed { - version: commit_version, - post_commit_stats: PostCommitStats { - commits_since_checkpoint: self - .read_snapshot - .log_segment() - .commits_since_checkpoint() - + 1, - commits_since_log_compaction: self - .read_snapshot - .log_segment() - .commits_since_log_compaction_or_checkpoint() - + 1, - }, - }), - Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)), - Err(e) => Err(e), + // let path = commit_path.0.location.location; + // let path = ParsedLogPath::try_from(path).unwrap().unwrap(); + // let commit_metadata = CommitMetadata::new(path, commit_version); + let committer = self.committer.as_ref().unwrap(); + match committer.commit(engine, Box::new(actions), commit_version, &context) { + Ok(CommitResponse::Committed { version }) => Ok(CommitResult::CommittedTransaction( + self.into_committed(version), + )), + Ok(CommitResponse::Conflict { version }) => Ok(CommitResult::ConflictedTransaction( + self.into_conflicted(version), + )), + // TODO: we want to be more selective about what is retryable + Err(e) => Ok(CommitResult::RetryableTransaction(self.into_retryable(e))), } } @@ -479,6 +520,38 @@ impl Transaction { Ok((Box::new(add_actions), None)) } } + + fn into_committed(self, version: Version) -> CommittedTransaction { + let stats = PostCommitStats { + commits_since_checkpoint: self.read_snapshot.log_segment().commits_since_checkpoint() + + 1, + commits_since_log_compaction: self + .read_snapshot + .log_segment() + .commits_since_log_compaction_or_checkpoint() + + 1, + }; + + CommittedTransaction { + transaction: self, + commit_version: version, + post_commit_stats: stats, + } + } + + fn into_conflicted(self, conflict_version: Version) -> ConflictedTransaction { + ConflictedTransaction { + transaction: self, + conflict_version, + } + } + + fn into_retryable(self, error: Error) -> RetryableTransaction { + RetryableTransaction { + transaction: self, + error, + } + } } /// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to @@ -526,22 +599,77 @@ pub struct PostCommitStats { pub commits_since_log_compaction: u64, } -/// Result of committing a transaction. +/// The result of attempting to commit this transaction. +/// +/// The commit result can be one of the following: +/// - CommittedTransaction: the transaction was successfully committed. post-commit stats and +/// post-commit snapshot can be obtained from the committed transaction. +/// - ConflictedTransaction: the transaction conflicted with an existing version. This transcation +/// must be rebased before retrying. +/// - RetryableTransaction: an IO (retryable) error occurred during the commit. This transaction +/// can be retried without rebasing. #[derive(Debug)] -pub enum CommitResult { +#[must_use] +pub enum CommitResult { /// The transaction was successfully committed. - Committed { - /// the version of the table that was just committed - version: Version, - /// The [`PostCommitStats`] for this transaction - post_commit_stats: PostCommitStats, - }, + CommittedTransaction(CommittedTransaction), /// This transaction conflicted with an existing version (at the version given). The transaction /// is returned so the caller can resolve the conflict (along with the version which /// conflicted). // TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to // update the transaction to a new version etc. - Conflict(Transaction, Version), + ConflictedTransaction(ConflictedTransaction), + /// An IO (retryable) error occurred during the commit. + RetryableTransaction(RetryableTransaction), +} + +impl CommitResult { + /// Returns true if the commit was successful. + pub fn is_committed(&self) -> bool { + matches!(self, CommitResult::CommittedTransaction(_)) + } +} + +#[derive(Debug)] +pub struct CommittedTransaction { + transaction: Transaction, + /// the version of the table that was just committed + commit_version: Version, + /// The [`PostCommitStats`] for this transaction + post_commit_stats: PostCommitStats, +} + +impl CommittedTransaction { + /// The version of the table that was just committed + pub fn version(&self) -> Version { + self.commit_version + } + + /// The [`PostCommitStats`] for this transaction + pub fn post_commit_stats(&self) -> &PostCommitStats { + &self.post_commit_stats + } + + /// Compute a new snapshot for the table at the commit version. Note this is generally more + /// efficient than creating a new snapshot from scratch. + pub fn post_commit_snapshot(&self, engine: &dyn Engine) -> DeltaResult { + Snapshot::builder_from(self.transaction.read_snapshot.clone()) + // .with_log_tail(log_tail) + .at_version(self.commit_version) + .build(engine) + } +} + +#[derive(Debug)] +pub struct ConflictedTransaction { + pub transaction: Transaction, + pub conflict_version: Version, +} + +#[derive(Debug)] +pub struct RetryableTransaction { + pub transaction: Transaction, + pub error: Error, } #[cfg(test)] diff --git a/kernel/tests/row_tracking.rs b/kernel/tests/row_tracking.rs index 10b75d260..a75e36b96 100644 --- a/kernel/tests/row_tracking.rs +++ b/kernel/tests/row_tracking.rs @@ -217,7 +217,9 @@ async fn test_row_tracking_append() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -264,7 +266,9 @@ async fn test_row_tracking_single_record_batches() -> DeltaResult<()> { vec![int32_array(vec![3])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -295,7 +299,9 @@ async fn test_row_tracking_large_batch() -> DeltaResult<()> { // Write a large batch with 1000 records let large_batch: Vec = (1..=1000).collect(); let data = generate_data(schema.clone(), [vec![int32_array(large_batch.clone())]])?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -342,7 +348,9 @@ async fn test_row_tracking_consecutive_transactions() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data_1).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_1) + .await? + .is_committed()); // Verify first commit verify_row_tracking_in_commit( @@ -357,7 +365,9 @@ async fn test_row_tracking_consecutive_transactions() -> DeltaResult<()> { // Second transaction: write one batch with 2 records // This should read the existing row tracking domain metadata and assign base row IDs starting from 6 let data_2 = generate_data(schema.clone(), [vec![int32_array(vec![7, 8])]])?; - write_data_to_table(&table_url, engine.clone(), data_2).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_2) + .await? + .is_committed()); // Verify second commit verify_row_tracking_in_commit( @@ -410,7 +420,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { ], ], )?; - write_data_to_table(&table_url, engine.clone(), data_1).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_1) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -429,7 +441,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { string_array(vec!["g".to_string(), "h".to_string()]), ]], )?; - write_data_to_table(&table_url, engine.clone(), data_2).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_2) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -454,7 +468,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { ], ], )?; - write_data_to_table(&table_url, engine.clone(), data_3).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_3) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -490,7 +506,9 @@ async fn test_row_tracking_with_regular_and_empty_adds() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -536,7 +554,9 @@ async fn test_row_tracking_with_empty_adds() -> DeltaResult<()> { vec![int32_array(Vec::::new())], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly // NB: The expected high water mark is a bit unintuitive here, as we are appending empty batches. @@ -579,7 +599,7 @@ async fn test_row_tracking_without_adds() -> DeltaResult<()> { let txn = snapshot.transaction()?; // Commit without adding any add files - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Fetch and parse the commit let commit_url = table_url.join(&format!("_delta_log/{:020}.json", 1))?; @@ -660,26 +680,45 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { // Commit the first transaction - this should succeed let result1 = txn1.commit(engine1.as_ref())?; match result1 { - CommitResult::Committed { version, .. } => { - assert_eq!(version, 1, "First transaction should commit at version 1"); + CommitResult::CommittedTransaction(committed) => { + assert_eq!( + committed.version(), + 1, + "First transaction should commit at version 1" + ); } - CommitResult::Conflict(_, version) => { - panic!("First transaction should not conflict, got conflict at version {version}"); + CommitResult::ConflictedTransaction(conflicted) => { + panic!( + "First transaction should not conflict, got conflict at version {}", + conflicted.conflict_version + ); + } + CommitResult::RetryableTransaction(_) => { + panic!("First transaction should not be retryable error"); } } // Commit the second transaction - this should result in a conflict let result2 = txn2.commit(engine2.as_ref())?; match result2 { - CommitResult::Committed { version, .. } => { - panic!("Second transaction should conflict, but got committed at version {version}"); + CommitResult::CommittedTransaction(committed) => { + panic!( + "Second transaction should conflict, but got committed at version {}", + committed.version() + ); } - CommitResult::Conflict(_conflicted_txn, version) => { - assert_eq!(version, 1, "Conflict should be at version 1"); + CommitResult::ConflictedTransaction(conflicted) => { + assert_eq!( + conflicted.conflict_version, 1, + "Conflict should be at version 1" + ); // TODO: In the future, we need to resolve conflicts and retry the commit // For now, we just verify that we got the conflict as expected } + CommitResult::RetryableTransaction(_) => { + panic!("Second transaction should not be retryable error"); + } } // Verify that the winning transaction is in the log and that it has the correct metadata @@ -745,7 +784,9 @@ async fn test_no_row_tracking_fields_without_feature() -> DeltaResult<()> { )?; // Write data to the table - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify that the commit does NOT contain row tracking fields let commit_url = table_url.join(&format!("_delta_log/{:020}.json", 1))?; diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index a46dc5f1b..aea9034cd 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -67,7 +67,7 @@ async fn test_commit_info() -> Result<(), Box> { let txn = snapshot.transaction()?.with_engine_info("default engine"); // commit! - txn.commit(&engine)?; + let _ = txn.commit(&engine)?; let commit1 = store .get(&Path::from(format!( @@ -183,17 +183,14 @@ async fn write_data_and_check_result_and_stats( // commit! match txn.commit(engine.as_ref())? { - CommitResult::Committed { - version, - post_commit_stats, - } => { - assert_eq!(version, expected_since_commit as Version); + CommitResult::CommittedTransaction(committed) => { + assert_eq!(committed.version(), expected_since_commit as Version); assert_eq!( - post_commit_stats.commits_since_checkpoint, + committed.post_commit_stats().commits_since_checkpoint, expected_since_commit ); assert_eq!( - post_commit_stats.commits_since_log_compaction, + committed.post_commit_stats().commits_since_log_compaction, expected_since_commit ); } @@ -219,7 +216,7 @@ async fn test_commit_info_action() -> Result<(), Box> { let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); - txn.commit(&engine)?; + let _ = txn.commit(&engine)?; let commit = store .get(&Path::from(format!( @@ -363,7 +360,7 @@ async fn test_no_add_actions() -> Result<(), Box> { let txn = snapshot.transaction()?.with_engine_info("default engine"); // Commit without adding any add files - txn.commit(&engine)?; + assert!(txn.commit(&engine)?.is_committed()); let commit1 = store .get(&Path::from(format!( @@ -477,7 +474,7 @@ async fn test_append_partitioned() -> Result<(), Box> { } // commit! - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); let commit1 = store .get(&Path::from(format!( @@ -655,7 +652,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { .with_transaction_id("app_id2".to_string(), 2); // commit! - txn.commit(&engine)?; + assert!(txn.commit(&engine)?.is_committed()); let snapshot = Snapshot::builder_for(table_url.clone()) .at_version(1) @@ -816,7 +813,7 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { txn.add_files(add_files_metadata); // Commit the transaction - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Verify the commit was written correctly let commit1 = store @@ -1023,7 +1020,7 @@ async fn test_append_variant() -> Result<(), Box> { txn.add_files(add_files_metadata); // Commit the transaction - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Verify the commit was written correctly let commit1_url = tmp_test_dir_url @@ -1195,7 +1192,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box Result<(), Box { + client: &'a UCClient, +} + +impl<'a> UCCatalog<'a> { + pub fn new(client: &'a UCClient) -> Self { + UCCatalog { client } + } + + pub async fn load_snapshot( + &self, + table_id: &str, + table_uri: &str, + engine: &dyn Engine, + ) -> Result, Box> { + self.load_snapshot_inner(table_id, table_uri, None, engine) + .await + } + + pub async fn load_snapshot_at( + &self, + table_id: &str, + table_uri: &str, + version: Version, + engine: &dyn Engine, + ) -> Result, Box> { + self.load_snapshot_inner(table_id, table_uri, Some(version), engine) + .await + } + + pub(crate) async fn load_snapshot_inner( + &self, + table_id: &str, + table_uri: &str, + version: Option, + engine: &dyn Engine, + ) -> Result, Box> { + let table_uri = table_uri.to_string(); + let req = CommitsRequest { + table_id: table_id.to_string(), + table_uri: table_uri.clone(), + start_version: Some(0), + end_version: version.and_then(|v| v.try_into().ok()), + }; + // TODO: does it paginate? + let commits = self.client.get_commits(req).await?; + + // sort them + let mut commits = commits; + if let Some(c) = commits.commits.as_mut() { + c.sort_by_key(|c| c.version) + } + + // if commits are present, we ensure they are sorted+contiguous + if let Some(commits) = &commits.commits { + if !commits.windows(2).all(|w| w[1].version == w[0].version + 1) { + return Err("Received non-contiguous commit versions".into()); + } + } + + // we always get back the latest version from commits response, and pass that in to + // kernel's Snapshot builder. basically, load_table for the latest version always looks + // like a time travel query since we know the latest version ahead of time. + // + // note there is a weird edge case: if the table was just created it will return + // latest_table_version = -1, but the 0.json will exist in the _delta_log. + let version: Version = match version { + Some(v) => v, + None => match commits.latest_table_version { + -1 => 0, + i => i.try_into()?, + }, + }; + + // consume uc-client's Commit and hand back a delta_kernel LogPath + let table_url = Url::parse(&table_uri)?; + let commits: Vec<_> = commits + .commits + .unwrap_or_default() + .into_iter() + .map(|c| -> Result> { + LogPath::staged_commit( + table_url.clone(), + &c.file_name, + c.file_modification_timestamp, + c.file_size.try_into()?, + ) + .map_err(|e| e.into()) + }) + .try_collect()?; + + info!("commits for kernel: {:?}\n", commits); + + Snapshot::builder_for(Url::parse(&(table_uri + "/"))?) + .at_version(version) + .with_log_tail(commits) + .build(engine) + .map_err(|e| e.into()) + } +} + +/// A [UCCommitter] is a Unity Catalog [Committer] implementation for committing to delta tables in +/// UC. +pub struct UCCommitter { + client: Arc, +} + +impl UCCommitter { + pub fn new(client: Arc) -> Self { + UCCommitter { client } + } +} + +use delta_kernel::committer::Committer; +use delta_kernel::EngineDataResultIterator; + +pub struct UCCommitterContext { + table_id: String, + table_uri: String, +} + +impl Committer for UCCommitter { + type Context = UCCommitterContext; + fn commit<'a>( + &self, + engine: &'a dyn Engine, + actions: EngineDataResultIterator<'a>, + version: Version, + context: &UCCommitterContext, + ) -> delta_kernel::DeltaResult { + let last_modified = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + .try_into() + .unwrap(); + let size = 1500; + let filename = format!("{:020}.{}.json", version, uuid::Uuid::new_v4()); + let staged_commit_path = LogPath::staged_commit( + Url::parse(&context.table_uri)?, + &filename, + last_modified, + size, + )?; + engine.json_handler().write_json_file( + &staged_commit_path.0.location.location, + actions, + false, + )?; + let commit_req = CommitRequest::new( + &context.table_id, + &context.table_uri, + Commit::new( + version.try_into().unwrap(), + last_modified, + filename, + size.try_into().unwrap(), + last_modified, + ), + ); + + let handle = tokio::runtime::Handle::current(); + tokio::task::block_in_place(move || { + handle.block_on(async move { + match self.client.commit(commit_req).await { + Ok(_) => Ok(delta_kernel::committer::CommitResponse::Committed { version }), + Err(e) => Err(delta_kernel::Error::Generic(format!("commit failed: {e}"))), + } + }) + }) + } + + fn published( + &self, + version: Version, + context: &UCCommitterContext, + ) -> delta_kernel::DeltaResult<()> { + let request = + CommitRequest::ack_publish(&context.table_id, &context.table_uri, version as i64); + let handle = tokio::runtime::Handle::current(); + tokio::task::block_in_place(move || { + handle.block_on(async move { + match self.client.commit(request).await { + Ok(_) => Ok(()), + Err(e) => Err(delta_kernel::Error::Generic(format!( + "publish_commit failed: {e}" + ))), + } + }) + }) + } +} + +#[cfg(test)] +mod tests { + use std::env; + + use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; + use delta_kernel::engine::default::DefaultEngine; + use delta_kernel::transaction::{CommitResult, Transaction}; + use object_store::ObjectStore; + + use super::*; + + // We could just re-export UCClient's get_table to not require consumers to directly import + // uc_client themselves. + async fn get_table( + client: &UCClient, + table_name: &str, + ) -> Result<(String, String), Box> { + let res = client.get_table(table_name).await?; + let table_id = res.table_id; + let table_uri = res.storage_location; + + info!( + "[GET TABLE] got table_id: {}, table_uri: {}\n", + table_id, table_uri + ); + + Ok((table_id, table_uri)) + } + + // ignored test which you can run manually to play around with reading a UC table. run with: + // `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t read_uc_table --nocapture -- --ignored` + #[ignore] + #[tokio::test] + async fn read_uc_table() -> Result<(), Box> { + let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set"); + let token = env::var("TOKEN").expect("TOKEN environment variable not set"); + let table_name = env::var("TABLENAME").expect("TABLENAME environment variable not set"); + + // build UC client, get table info and credentials + let client = UCClient::builder(endpoint, &token).build()?; + let (table_id, table_uri) = get_table(&client, &table_name).await?; + let creds = client + .get_credentials(&table_id, Operation::Read) + .await + .map_err(|e| format!("Failed to get credentials: {}", e))?; + + // build catalog + let catalog = UCCatalog::new(&client); + + // TODO: support non-AWS + let creds = creds + .aws_temp_credentials + .ok_or("No AWS temporary credentials found")?; + + let options = [ + ("region", "us-west-2"), + ("access_key_id", &creds.access_key_id), + ("secret_access_key", &creds.secret_access_key), + ("session_token", &creds.session_token), + ]; + + let table_url = Url::parse(&table_uri)?; + let (store, path) = object_store::parse_url_opts(&table_url, options)?; + let store: Arc<_> = store.into(); + + info!("created object store: {:?}\npath: {:?}\n", store, path); + + let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + + // read table + let snapshot = catalog + .load_snapshot(&table_id, &table_uri, &engine) + .await?; + // or time travel + // let snapshot = catalog.load_snapshot_at(&table, 2).await?; + + println!("🎉 loaded snapshot: {snapshot:?}"); + + Ok(()) + } + + // ignored test which you can run manually to play around with writing to a UC table. run with: + // `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t write_uc_table --nocapture -- --ignored` + #[ignore] + #[tokio::test(flavor = "multi_thread")] + async fn write_uc_table() -> Result<(), Box> { + let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set"); + let token = env::var("TOKEN").expect("TOKEN environment variable not set"); + let table_name = env::var("TABLENAME").expect("TABLENAME environment variable not set"); + + // build UC client, get table info and credentials + let client = Arc::new(UCClient::builder(endpoint, &token).build()?); + let (table_id, table_uri) = get_table(&client, &table_name).await?; + let creds = client + .get_credentials(&table_id, Operation::ReadWrite) + .await + .map_err(|e| format!("Failed to get credentials: {}", e))?; + + // build catalog + let catalog = UCCatalog::new(&client); + + // TODO: support non-AWS + let creds = creds + .aws_temp_credentials + .ok_or("No AWS temporary credentials found")?; + + let options = [ + ("region", "us-west-2"), + ("access_key_id", &creds.access_key_id), + ("secret_access_key", &creds.secret_access_key), + ("session_token", &creds.session_token), + ]; + + let table_url = Url::parse(&table_uri)?; + let (store, path) = object_store::parse_url_opts(&table_url, options)?; + let store: Arc = store.into(); + + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let committer = UCCommitter::new(client.clone()); + let snapshot = catalog + .load_snapshot(&table_id, &table_uri, &engine) + .await?; + println!("latest snapshot version: {:?}", snapshot.version()); + let txn: Transaction = + snapshot.clone().transaction()?.with_committer(committer); + let _write_context = txn.get_write_context(); + // do a write. + + // print the log + use futures::stream::StreamExt; + let mut stream = store.list(Some(&path)); + while let Some(path) = stream.next().await { + println!("object: {:?}", path.unwrap().location); + } + + // let commit = store + // .get(&object_store::path::Path::from("19a85dee-54bc-43a2-87ab-023d0ec16013/tables/cdac4b37-fc11-4148-ae02-512e6cc4e1bd/_delta_log/_staged_commits/00000000000000000001.5400fd23-8e08-4f87-8049-7b37431fb826.json")) + // .await + // .unwrap(); + // let bytes = commit.bytes().await?; + // println!("commit file contents:\n{}", String::from_utf8_lossy(&bytes)); + + let ctx = UCCommitterContext { + table_id: table_id.clone(), + table_uri: table_uri.clone(), + }; + let t = match txn.commit(&engine, ctx)? { + CommitResult::CommittedTransaction(t) => { + println!("🎉 committed version {}", t.version()); + Some(t) + } + CommitResult::ConflictedTransaction(t) => { + println!("💥 commit conflicted at version {}", t.conflict_version); + None + } + CommitResult::RetryableTransaction(_) => { + println!("we should retry..."); + None + } + }; + + // FIXME! need to plumb log_tail + // let snapshot = t.unwrap().post_commit_snapshot(&engine)?; + let new_version = snapshot.version() + 1; + let snapshot = catalog + .load_snapshot_at(&table_id, &table_uri, new_version, &engine) + .await?; + let _published = Arc::into_inner(snapshot).unwrap().publish(&engine)?; + + // notify UC of publish. since Commmitter is tied to Transaction it's a bit hard to plumb + // this through to Snapshot... TODO + let request = CommitRequest::ack_publish(&table_id, &table_uri, new_version as i64); + client.commit(request).await?; + + Ok(()) + } +} diff --git a/uc-client/src/client.rs b/uc-client/src/client.rs index e1c4a9076..d996e370c 100644 --- a/uc-client/src/client.rs +++ b/uc-client/src/client.rs @@ -7,7 +7,7 @@ use url::Url; use crate::config::{ClientConfig, ClientConfigBuilder}; use crate::error::{Error, Result}; -use crate::models::commits::{CommitsRequest, CommitsResponse}; +use crate::models::commits::{CommitRequest, CommitResponse, CommitsRequest, CommitsResponse}; use crate::models::credentials::{CredentialsRequest, Operation, TemporaryTableCredentials}; use crate::models::tables::TablesResponse; @@ -61,6 +61,27 @@ impl UCClient { }) .await?; + let res = self.handle_response(response).await; + println!("\nGet commits response: {:?}\n", res); + res + } + + #[instrument(skip(self))] + pub async fn commit(&self, request: CommitRequest) -> Result { + let url = self.base_url.join("delta/preview/commits")?; + + let json = serde_json::to_string_pretty(&request).unwrap_or_default(); + println!("Committing: {json}"); + + let response = self + .execute_with_retry(|| { + self.client + .request(reqwest::Method::POST, url.clone()) + .json(&request) + .send() + }) + .await?; + self.handle_response(response).await } diff --git a/uc-client/src/lib.rs b/uc-client/src/lib.rs index 46c90f1ad..c24b9c4a7 100644 --- a/uc-client/src/lib.rs +++ b/uc-client/src/lib.rs @@ -34,7 +34,6 @@ pub use error::{Error, Result}; #[doc(hidden)] pub mod prelude { pub use crate::client::UCClient; - pub use crate::error::Result; pub use crate::models::{ commits::{Commit, CommitsRequest, CommitsResponse}, credentials::{Operation, TemporaryTableCredentials}, diff --git a/uc-client/src/models/commits.rs b/uc-client/src/models/commits.rs index 7d5dc1938..1aee33173 100644 --- a/uc-client/src/models/commits.rs +++ b/uc-client/src/models/commits.rs @@ -50,6 +50,28 @@ pub struct Commit { } impl Commit { + pub fn new( + version: i64, + timestamp: i64, + file_name: impl Into, + file_size: i64, + file_modification_timestamp: i64, + ) -> Self { + Self { + version, + timestamp, + file_name: file_name.into(), + file_size, + file_modification_timestamp, + is_disown_commit: Some(false), + } + } + + pub fn with_disown_commit(mut self, disown: bool) -> Self { + self.is_disown_commit = Some(disown); + self + } + pub fn timestamp_as_datetime(&self) -> Option> { chrono::DateTime::from_timestamp_millis(self.timestamp) } @@ -58,3 +80,68 @@ impl Commit { chrono::DateTime::from_timestamp_millis(self.file_modification_timestamp) } } + +// Structs for creating a new commit +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitRequest { + pub table_id: String, + pub table_uri: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub commit_info: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub latest_backfilled_version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub protocol: Option, +} + +impl CommitRequest { + pub fn new( + table_id: impl Into, + table_uri: impl Into, + commit_info: Commit, + ) -> Self { + Self { + table_id: table_id.into(), + table_uri: table_uri.into(), + commit_info: Some(commit_info), + latest_backfilled_version: None, + metadata: None, + protocol: None, + } + } + + pub fn ack_publish( + table_id: impl Into, + table_uri: impl Into, + last_backfilled_version: i64, + ) -> Self { + Self { + table_id: table_id.into(), + table_uri: table_uri.into(), + commit_info: None, + latest_backfilled_version: Some(last_backfilled_version), + metadata: None, + protocol: None, + } + } + + pub fn with_latest_backfilled_version(mut self, version: i64) -> Self { + self.latest_backfilled_version = Some(version); + self + } + + pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self { + self.metadata = Some(metadata); + self + } + + pub fn with_protocol(mut self, protocol: serde_json::Value) -> Self { + self.protocol = Some(protocol); + self + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitResponse {} diff --git a/uc-client/src/models/mod.rs b/uc-client/src/models/mod.rs index edf86ce57..8fe92111e 100644 --- a/uc-client/src/models/mod.rs +++ b/uc-client/src/models/mod.rs @@ -2,6 +2,6 @@ pub mod commits; pub mod credentials; pub mod tables; -pub use commits::{Commit, CommitsRequest, CommitsResponse}; +pub use commits::{Commit, CommitRequest, CommitResponse, CommitsRequest, CommitsResponse}; pub use credentials::{AwsTempCredentials, TemporaryTableCredentials}; pub use tables::TablesResponse;