Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ members = [
"kernel/examples/*",
"test-utils",
"feature-tests",
"uc-client", # WIP: this is an experimental UC client for catalog-managed table work
"uc-client", # WIP: this is an experimental UC client for catalog-managed table work
"uc-catalog", # WIP: this is an experimental UC catalog implementation
]
# note that in addition to the members above, the workspace includes examples:
# - inspect-table
Expand Down
14 changes: 7 additions & 7 deletions ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
/// This struct provides a safe wrapper around the underlying `Transaction` type,
/// ensuring exclusive access to transaction operations. The transaction can be used
/// to stage changes and commit them atomically to the Delta table.
#[handle_descriptor(target=Transaction, mutable=true, sized=true)]

Check failure on line 18 in ffi/src/transaction/mod.rs

View workflow job for this annotation

GitHub Actions / check_if_pr_breaks_semver

missing generics for struct `delta_kernel::transaction::Transaction`
pub struct ExclusiveTransaction;

/// Start a transaction on the latest snapshot of the table.
Expand Down Expand Up @@ -69,7 +69,7 @@
}

fn with_engine_info_impl(
txn: Transaction,

Check failure on line 72 in ffi/src/transaction/mod.rs

View workflow job for this annotation

GitHub Actions / check_if_pr_breaks_semver

missing generics for struct `delta_kernel::transaction::Transaction`
engine_info: KernelStringSlice,
) -> DeltaResult<Handle<ExclusiveTransaction>> {
let info_string: DeltaResult<&str> =
Expand Down Expand Up @@ -112,13 +112,13 @@
// TODO: for now this removes the enum, which prevents doing any conflict resolution. We should fix
// this by making the commit function return the enum somehow.
match txn.commit(engine.as_ref()) {
Ok(CommitResult::Committed {
version: v,
post_commit_stats: _,
}) => Ok(v),
Ok(CommitResult::Conflict(_, v)) => Err(delta_kernel::Error::Generic(format!(
"commit conflict at version {v}"
))),
Ok(CommitResult::CommittedTransaction(committed)) => Ok(committed.version()),
Ok(CommitResult::RetryableTransaction(_)) => Err(delta_kernel::Error::unsupported(
"commit failed: retryable transaction not supported in FFI (yet)",
)),
Ok(CommitResult::ConflictedTransaction(conflicted)) => Err(delta_kernel::Error::Generic(
format!("commit conflict at version {}", conflicted.conflict_version),
)),
Err(e) => Err(e),
}
.into_extern_result(&extern_engine)
Expand Down
47 changes: 31 additions & 16 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::transaction::CommitResult;
use delta_kernel::transaction::{CommitResult, RetryableTransaction};
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};

/// An example program that writes to a Delta table and creates it if necessary.
Expand Down Expand Up @@ -99,23 +99,38 @@ async fn try_main() -> DeltaResult<()> {
// Add the file metadata to the transaction
txn.add_files(file_metadata);

// Commit the transaction
match txn.commit(&engine)? {
CommitResult::Committed { version, .. } => {
println!("✓ Committed transaction at version {version}");
println!("✓ Successfully wrote {} rows to the table", cli.num_rows);
// Commit the transaction (in a simple retry loop)
let mut retries = 0;
let committed = loop {
if retries > 5 {
return Err(Error::generic(
"Exceeded maximum 5 retries for committing transaction",
));
}
txn = match txn.commit(&engine)? {
CommitResult::CommittedTransaction(committed) => break committed,
CommitResult::ConflictedTransaction(conflicted) => {
let conflicting_version = conflicted.conflict_version;
println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}");
return Err(Error::generic("Commit failed"));
}
CommitResult::RetryableTransaction(RetryableTransaction { transaction, error }) => {
println!("✗ Failed to commit, retrying... retryable error: {error}");
transaction
}
};
retries += 1;
};

// Read and display the data
read_and_display_data(&url, engine).await?;
println!("✓ Successfully read data from the table");
let version = committed.version();
println!("✓ Committed transaction at version {version}");
println!("✓ Successfully wrote {} rows to the table", cli.num_rows);

Ok(())
}
CommitResult::Conflict(_, conflicting_version) => {
println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}");
Err(Error::generic("Commit failed"))
}
}
// Read and display the data
read_and_display_data(&url, engine).await?;
println!("✓ Successfully read data from the table");

Ok(())
}

/// Creates a new Delta table or gets an existing one.
Expand Down
5 changes: 0 additions & 5 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,6 @@ impl Protocol {
/// Check if writing to a table with this protocol is supported. That is: does the kernel
/// support the specified protocol writer version and all enabled writer features?
pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> {
#[cfg(feature = "catalog-managed")]
require!(
!self.is_catalog_managed(),
Error::unsupported("Writes are not yet supported for catalog-managed tables")
);
match &self.writer_features {
Some(writer_features) if self.min_writer_version == 7 => {
// if we're on version 7, make sure we support all the specified features
Expand Down
66 changes: 66 additions & 0 deletions kernel/src/committer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::sync::Arc;

use crate::path::ParsedLogPath;
use crate::{DeltaResult, Engine, EngineDataResultIterator, Error, Version};

use url::Url;

#[derive(Debug)]
pub struct CommitMetadata {
pub commit_path: ParsedLogPath<Url>,
pub version: Version,
}

impl CommitMetadata {
pub(crate) fn new(commit_path: ParsedLogPath<Url>, version: Version) -> Self {
Self {
commit_path,
version,
}
}
}

#[derive(Debug)]
/// Result of committing a transaction.
pub enum CommitResponse {
Committed { version: Version },
Conflict { version: Version },
}

pub trait Committer: Send + Sync {
type Context;

fn commit(
&self,
engine: &dyn Engine,
actions: EngineDataResultIterator<'_>,
version: Version,
context: &Self::Context,
) -> DeltaResult<CommitResponse>;

fn published(&self, version: Version, context: &Self::Context) -> DeltaResult<()> {
let _ = (version, context);
Ok(())
}
}

pub struct FileSystemCommitter;

impl Committer for FileSystemCommitter {
type Context = Url; // Table root
fn commit(
&self,
engine: &dyn Engine,
actions: EngineDataResultIterator<'_>,
version: Version,
context: &Self::Context,
) -> DeltaResult<CommitResponse> {
let path = ParsedLogPath::new_commit(context, version)?;
let json_handler = engine.json_handler();
match json_handler.write_json_file(&path.location, Box::new(actions), false) {
Ok(()) => Ok(CommitResponse::Committed { version }),
Err(Error::FileAlreadyExists(_)) => Ok(CommitResponse::Conflict { version }),
Err(e) => Err(e),
}
}
}
11 changes: 11 additions & 0 deletions kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {

Ok(Box::new(receiver.into_iter()))
}

fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()> {
let from_path = Path::from_url_path(from.path())?;
let to_path = Path::from_url_path(to.path())?;
let store = self.inner.clone();
println!("Copying from {from_path:?} to {to_path:?}");
// FIXME: ? copy_if_not_exists doesn't work on S3
self.task_executor
.block_on(async move { store.copy(&from_path, &to_path).await })
.map_err(|e| Error::Generic(format!("Failed to copy file: {e}")))
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ pub mod table_properties;
pub mod transaction;
pub(crate) mod transforms;

#[cfg(not(feature = "catalog-managed"))]
mod committer;
#[cfg(feature = "catalog-managed")]
pub mod committer;

pub use log_path::LogPath;

mod row_tracking;
Expand Down Expand Up @@ -183,6 +188,10 @@ pub type FileDataReadResult = (FileMeta, Box<dyn EngineData>);
pub type FileDataReadResultIterator =
Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>;

/// Type alias for an iterator of [`EngineData`] results.
pub type EngineDataResultIterator<'a> =
Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a>;

/// The metadata that describes an object.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileMeta {
Expand Down Expand Up @@ -534,6 +543,9 @@ pub trait StorageHandler: AsAny {
&self,
files: Vec<FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>>;

/// Performs a copy. Must not overwrite.
fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()>;
}

/// Provides JSON handling functionality to Delta Kernel.
Expand Down
21 changes: 9 additions & 12 deletions kernel/src/log_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use url::Url;
/// Today, a `LogPath` is a file in the `_delta_log` directory of a Delta table; in the future,
/// this will expand to support providing inline data in the log path itself.
#[derive(Debug, Clone, PartialEq)]
pub struct LogPath(ParsedLogPath);
pub struct LogPath(pub ParsedLogPath);

impl From<LogPath> for ParsedLogPath {
fn from(p: LogPath) -> Self {
Expand Down Expand Up @@ -44,18 +44,13 @@ impl LogPath {
size: FileSize,
) -> DeltaResult<LogPath> {
// TODO: we should introduce TablePath/LogPath types which enforce checks like ending '/'

// require table_root ends with '/'
require!(
table_root.path().ends_with('/'),
Error::generic("table root must be a directory-like URL ending with '/'")
);
let location = table_root
.join("_delta_log/")?
.join("_staged_commits/")?
.join(filename)?;
let mut commit_path = table_root.clone();
commit_path
.path_segments_mut()
.map_err(|()| Error::invalid_table_location(table_root))?
.extend(&["_delta_log", "_staged_commits", filename]);
let file_meta = FileMeta {
location,
location: commit_path,
last_modified,
size,
};
Expand Down Expand Up @@ -92,6 +87,8 @@ mod test {
assert_eq!(path.location, expected);
}

// FIXME
#[ignore]
#[test]
fn test_staged_commit_path_creation_failures() {
let last_modified = 1234567890i64;
Expand Down
26 changes: 26 additions & 0 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,4 +570,30 @@ impl LogSegment {
debug_assert!(to_sub <= self.end_version);
self.end_version - to_sub
}

// TODO: decide on API:
// 1. should this mutate? should it consume self and hand back a new one?
// 2. should we take finer-grained args like which version to publish up to?
pub(crate) fn publish(mut self, engine: &dyn Engine) -> DeltaResult<Self> {
let storage = engine.storage_handler();

// Transform staged commits into published commits
for i in 0..self.ascending_commit_files.len() {
if matches!(self.ascending_commit_files[i].file_type, LogPathFileType::StagedCommit) {
// Clone the staged commit to get source location before transforming
let source_location = self.ascending_commit_files[i].location.location.clone();

// Take ownership of the commit to transform it
let staged_commit = self.ascending_commit_files.remove(i);
let published_commit = staged_commit.into_published()?;

// Copy the actual file from staged to published location
storage.copy(&source_location, &published_commit.location.location)?;

// Insert the published commit back
self.ascending_commit_files.insert(i, published_commit);
}
}
Ok(self)
}
}
76 changes: 74 additions & 2 deletions kernel/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ pub(crate) enum LogPathFileType {
/// the _delta_log we may see _staged_commits/00000000000000000000.{uuid}.json, but we MUST NOT
/// include those in listing, as only the catalog can tell us which are valid commits.
#[derive(Debug, Clone, PartialEq, Eq)]
#[internal_api]
pub(crate) struct ParsedLogPath<Location: AsUrl = FileMeta> {
pub struct ParsedLogPath<Location: AsUrl = FileMeta> {
pub location: Location,
#[allow(unused)]
pub filename: String,
Expand Down Expand Up @@ -215,6 +214,79 @@ impl<Location: AsUrl> ParsedLogPath<Location> {
}
}

impl ParsedLogPath<FileMeta> {
/// Transform a staged commit into a published commit by updating its location
pub(crate) fn into_published(self) -> DeltaResult<ParsedLogPath<FileMeta>> {
if !matches!(self.file_type, LogPathFileType::StagedCommit) {
return Err(Error::internal_error(
"Unable to create a published path from a non-staged commit",
));
}

let published_filename = format!("{:020}.json", self.version);
let published_url = transform_staged_commit_url(self.location.location.clone())?;

Ok(ParsedLogPath {
location: FileMeta {
location: published_url,
last_modified: self.location.last_modified,
size: self.location.size,
},
filename: published_filename,
extension: "json".to_string(),
version: self.version,
file_type: LogPathFileType::Commit,
})
}
}

fn transform_staged_commit_url(mut url: Url) -> DeltaResult<Url> {
// Collect segments into owned strings to avoid borrowing issues
let segments: Vec<String> = url
.path_segments()
.ok_or(Error::generic("cannot parse path segments"))?
.map(|s| s.to_string())
.collect();

let staged_commits_index = segments
.iter()
.rposition(|s| s == "_staged_commits")
.ok_or(Error::generic("_staged_commits not found in path"))?;

// Build new path: everything before _staged_commits + modified filename
let mut new_path = String::new();

// Add segments up to (but not including) _staged_commits
for (i, segment) in segments.iter().enumerate() {
if i >= staged_commits_index {
break;
}
if !new_path.is_empty() || !segment.is_empty() {
new_path.push('/');
}
new_path.push_str(segment);
}

// Add the modified filename (remove UUID)
if let Some(filename) = segments.get(staged_commits_index + 1) {
// Remove UUID from filename: 00000000000000000005.{uuid}.json -> 00000000000000000005.json
let new_filename = filename
.split('.')
.next()
.ok_or(Error::generic("invalid filename format"))?
.to_string()
+ ".json";

if !new_path.is_empty() {
new_path.push('/');
}
new_path.push_str(&new_filename);
}

url.set_path(&new_path);
Ok(url)
}

impl ParsedLogPath<Url> {
const DELTA_LOG_DIR: &'static str = "_delta_log/";

Expand Down
Loading
Loading