Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"feature-tests",
"mem-test",
"uc-client", # WIP: this is an experimental UC client for catalog-managed table work
"uc-catalog", # WIP: this is an experimental UC catalog for catalog-managed table work
]
# note that in addition to the members above, the workspace includes examples:
# - inspect-table
Expand Down
25 changes: 0 additions & 25 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,6 @@ impl Protocol {
/// Check if writing to a table with this protocol is supported. That is: does the kernel
/// support the specified protocol writer version and all enabled writer features?
pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> {
#[cfg(feature = "catalog-managed")]
require!(
!self.is_catalog_managed(),
Error::unsupported("Writes are not yet supported for catalog-managed tables")
);
match &self.writer_features {
Some(writer_features) if self.min_writer_version == 7 => {
// if we're on version 7, make sure we support all the specified features
Expand Down Expand Up @@ -1451,26 +1446,6 @@ mod tests {
assert_eq!(parse_features::<ReaderFeature>(features), expected);
}

#[test]
fn test_no_catalog_managed_writes() {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeature::CatalogManaged]),
Some([WriterFeature::CatalogManaged]),
)
.unwrap();
assert!(protocol.ensure_write_supported().is_err());
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeature::CatalogOwnedPreview]),
Some([WriterFeature::CatalogOwnedPreview]),
)
.unwrap();
assert!(protocol.ensure_write_supported().is_err());
}

#[test]
fn test_into_engine_data() {
let engine = ExprEngine::new();
Expand Down
79 changes: 79 additions & 0 deletions kernel/src/committer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::sync::Arc;

use crate::path::ParsedLogPath;
use crate::{DeltaResult, Engine, Error, FilteredEngineData, Version};

use url::Url;

#[derive(Debug)]
pub struct CommitMetadata {

Check failure on line 9 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

unreachable `pub` item

Check failure on line 9 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

unreachable `pub` item

Check failure on line 9 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / run-examples

unreachable `pub` item
pub commit_path: ParsedLogPath<Url>,
pub version: Version,
pub timestamp: i64,

Check failure on line 12 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

fields `timestamp` and `latest_published_version` are never read

Check failure on line 12 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

fields `timestamp` and `latest_published_version` are never read

Check failure on line 12 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / run-examples

fields `timestamp` and `latest_published_version` are never read
pub latest_published_version: Option<Version>,
}

impl CommitMetadata {
pub(crate) fn new(
commit_path: ParsedLogPath<Url>,
version: Version,
timestamp: i64,
latest_published_version: Option<Version>,
) -> Self {
Self {
commit_path,
version,
timestamp,
latest_published_version,
}
}
}

#[derive(Debug)]
/// Result of committing a transaction.
pub enum CommitResponse {

Check failure on line 34 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

unreachable `pub` item

Check failure on line 34 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

unreachable `pub` item

Check failure on line 34 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / run-examples

unreachable `pub` item
Committed { version: Version },
Conflict { version: Version },
}

pub trait Committer: Send + Sync {

Check failure on line 39 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

unreachable `pub` item

Check failure on line 39 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

unreachable `pub` item

Check failure on line 39 in kernel/src/committer.rs

View workflow job for this annotation

GitHub Actions / run-examples

unreachable `pub` item
fn commit(
&self,
engine: &dyn Engine,
actions: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse>;
}

pub(crate) struct FileSystemCommitter;

impl FileSystemCommitter {
pub(crate) fn new() -> Arc<Self> {
Arc::new(Self {})
}
}

impl Committer for FileSystemCommitter {
fn commit(
&self,
engine: &dyn Engine,
actions: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse> {
let json_handler = engine.json_handler();

match json_handler.write_json_file(
&commit_metadata.commit_path.location,
Box::new(actions),
false,
) {
Ok(()) => Ok(CommitResponse::Committed {
version: commit_metadata.version,
}),
Err(Error::FileAlreadyExists(_)) => Ok(CommitResponse::Conflict {
version: commit_metadata.version,
}),
Err(e) => Err(e),
}
}
}
11 changes: 11 additions & 0 deletions kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {

Ok(Box::new(receiver.into_iter()))
}

fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()> {
let from_path = Path::from_url_path(from.path())?;
let to_path = Path::from_url_path(to.path())?;
let store = self.inner.clone();
println!("Copying from {from_path:?} to {to_path:?}");
// FIXME: ? copy_if_not_exists doesn't work on S3
self.task_executor
.block_on(async move { store.copy(&from_path, &to_path).await })
.map_err(|e| Error::Generic(format!("Failed to copy file: {e}")))
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions kernel/src/engine/sync/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ impl StorageHandler for SyncStorageHandler {
});
Ok(Box::new(iter))
}

fn copy(&self, _from: &Url, _to: &Url) -> DeltaResult<()> {
Err(Error::unsupported(
"Copy not yet implemented for SyncStorageHandler",
))
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ pub mod table_properties;
pub mod transaction;
pub(crate) mod transforms;

#[cfg(not(feature = "catalog-managed"))]
mod committer;
#[cfg(feature = "catalog-managed")]
pub mod committer;

pub use log_path::LogPath;

mod row_tracking;
Expand Down Expand Up @@ -182,6 +187,10 @@ pub type FileDataReadResult = (FileMeta, Box<dyn EngineData>);
pub type FileDataReadResultIterator =
Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>;

/// Type alias for an iterator of [`EngineData`] results.
pub type EngineDataResultIterator<'a> =
Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a>;

/// The metadata that describes an object.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileMeta {
Expand Down Expand Up @@ -533,6 +542,9 @@ pub trait StorageHandler: AsAny {
&self,
files: Vec<FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>>;

/// Performs a copy. Must not overwrite.
fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()>;
}

/// Provides JSON handling functionality to Delta Kernel.
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/listed_log_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,9 @@ mod list_log_files_with_log_tail_tests {
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<bytes::Bytes>>>> {
panic!("read_files used");
}
fn copy(&self, from: &Url, to: &Url) -> DeltaResult<()> {
panic!("copy used from {from} to {to}");
}
}

// when log_tail covers the entire requested range, no filesystem listing should occur
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/log_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl LogPath {
// TODO: we should introduce TablePath/LogPath types which enforce checks like ending '/'

// require table_root ends with '/'
println!("table_root: {}", table_root);
require!(
table_root.path().ends_with('/'),
Error::generic("table root must be a directory-like URL ending with '/'")
Expand Down Expand Up @@ -92,6 +93,8 @@ mod test {
assert_eq!(path.location, expected);
}

// FIXME
#[ignore]
#[test]
fn test_staged_commit_path_creation_failures() {
let last_modified = 1234567890i64;
Expand Down
41 changes: 41 additions & 0 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
/// The latest commit file found during listing, which may not be part of the
/// contiguous segment but is needed for ICT timestamp reading
pub latest_commit_file: Option<ParsedLogPath>,
/// The latest published commit version. If there are no published commits, this is None.
pub latest_published_version: Option<Version>,
}

impl LogSegment {
Expand Down Expand Up @@ -123,6 +125,15 @@
);
}

// FIXME: this 'misses' published commits that currently overlap with log_tail (staged
// commits)
// get the latest published version from the commit files
let latest_published_version = ascending_commit_files
.partition_point(|c| matches!(c.file_type, LogPathFileType::Commit))
.checked_sub(1)
.and_then(|idx| ascending_commit_files.get(idx))
.map(|c| c.version);

Ok(LogSegment {
end_version: effective_version,
checkpoint_version,
Expand All @@ -132,6 +143,7 @@
checkpoint_parts,
latest_crc_file,
latest_commit_file,
latest_published_version,
})
}

Expand Down Expand Up @@ -588,4 +600,33 @@
);
Ok(())
}

// TODO: decide on API:
// 1. should this mutate? should it consume self and hand back a new one?
// 2. should we take finer-grained args like which version to publish up to?
pub(crate) fn publish(mut self, engine: &dyn Engine) -> DeltaResult<Self> {

Check failure on line 607 in kernel/src/log_segment.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

method `publish` is never used

Check failure on line 607 in kernel/src/log_segment.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

method `publish` is never used

Check failure on line 607 in kernel/src/log_segment.rs

View workflow job for this annotation

GitHub Actions / run-examples

method `publish` is never used
let storage = engine.storage_handler();

// Transform staged commits into published commits
for i in 0..self.ascending_commit_files.len() {
if matches!(
self.ascending_commit_files[i].file_type,
LogPathFileType::StagedCommit
) {
// Clone the staged commit to get source location before transforming
let source_location = self.ascending_commit_files[i].location.location.clone();

// Take ownership of the commit to transform it
let staged_commit = self.ascending_commit_files.remove(i);
let published_commit = staged_commit.into_published()?;

// Copy the actual file from staged to published location
storage.copy(&source_location, &published_commit.location.location)?;

// Insert the published commit back
self.ascending_commit_files.insert(i, published_commit);
}
}
Ok(self)
}
}
6 changes: 6 additions & 0 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2296,7 +2296,7 @@
create_log_path("file:///path/_delta_log/00000000000000000002.json"),
];

let log_segment = LogSegment {

Check failure on line 2299 in kernel/src/log_segment/tests.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

missing field `latest_published_version` in initializer of `log_segment::LogSegment`

Check failure on line 2299 in kernel/src/log_segment/tests.rs

View workflow job for this annotation

GitHub Actions / msrv-run-tests

missing field `latest_published_version` in initializer of `LogSegment`

Check failure on line 2299 in kernel/src/log_segment/tests.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

missing field `latest_published_version` in initializer of `LogSegment`

Check failure on line 2299 in kernel/src/log_segment/tests.rs

View workflow job for this annotation

GitHub Actions / coverage

missing field `latest_published_version` in initializer of `LogSegment`

Check failure on line 2299 in kernel/src/log_segment/tests.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

missing field `latest_published_version` in initializer of `log_segment::LogSegment`
ascending_commit_files: regular_commits,
ascending_compaction_files: vec![],
checkpoint_parts: vec![],
Expand All @@ -2316,7 +2316,7 @@
create_log_path("file:///path/_delta_log/_staged_commits/00000000000000000002.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json"),
];

let log_segment_with_staged = LogSegment {

Check failure on line 2319 in kernel/src/log_segment/tests.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

missing field `latest_published_version` in initializer of `log_segment::LogSegment`

Check failure on line 2319 in kernel/src/log_segment/tests.rs

View workflow job for this annotation

GitHub Actions / msrv-run-tests

missing field `latest_published_version` in initializer of `LogSegment`

Check failure on line 2319 in kernel/src/log_segment/tests.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

missing field `latest_published_version` in initializer of `LogSegment`

Check failure on line 2319 in kernel/src/log_segment/tests.rs

View workflow job for this annotation

GitHub Actions / coverage

missing field `latest_published_version` in initializer of `LogSegment`

Check failure on line 2319 in kernel/src/log_segment/tests.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

missing field `latest_published_version` in initializer of `log_segment::LogSegment`
ascending_commit_files: with_staged,
ascending_compaction_files: vec![],
checkpoint_parts: vec![],
Expand All @@ -2336,3 +2336,9 @@
panic!("Expected Error::Generic");
}
}

fn test_log_segment_latest_published_version() {
// log segment with commits commits 0,1,2,3, / staged 4,5 -> expect 3
// log segment with commits commits 0,1,2 -> expect 2
// log segment with staged commits 2, 3 -> expect None
}
71 changes: 71 additions & 0 deletions kernel/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,77 @@
None => Err(Error::generic("Commit file contains no actions")),
}
}

/// Transform a staged commit into a published commit by updating its location
pub(crate) fn into_published(self) -> DeltaResult<ParsedLogPath<FileMeta>> {

Check failure on line 262 in kernel/src/path.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

method `into_published` is never used

Check failure on line 262 in kernel/src/path.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

method `into_published` is never used

Check failure on line 262 in kernel/src/path.rs

View workflow job for this annotation

GitHub Actions / run-examples

method `into_published` is never used
if !matches!(self.file_type, LogPathFileType::StagedCommit) {
return Err(Error::internal_error(
"Unable to create a published path from a non-staged commit",
));
}

let published_filename = format!("{:020}.json", self.version);
let published_url = transform_staged_commit_url(self.location.location.clone())?;

Ok(ParsedLogPath {
location: FileMeta {
location: published_url,
last_modified: self.location.last_modified,
size: self.location.size,
},
filename: published_filename,
extension: "json".to_string(),
version: self.version,
file_type: LogPathFileType::Commit,
})
}
}

fn transform_staged_commit_url(mut url: Url) -> DeltaResult<Url> {

Check failure on line 286 in kernel/src/path.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

function `transform_staged_commit_url` is never used

Check failure on line 286 in kernel/src/path.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

function `transform_staged_commit_url` is never used

Check failure on line 286 in kernel/src/path.rs

View workflow job for this annotation

GitHub Actions / run-examples

function `transform_staged_commit_url` is never used
// Collect segments into owned strings to avoid borrowing issues
let segments: Vec<String> = url
.path_segments()
.ok_or(Error::generic("cannot parse path segments"))?
.map(|s| s.to_string())
.collect();

let staged_commits_index = segments
.iter()
.rposition(|s| s == "_staged_commits")
.ok_or(Error::generic("_staged_commits not found in path"))?;

// Build new path: everything before _staged_commits + modified filename
let mut new_path = String::new();

// Add segments up to (but not including) _staged_commits
for (i, segment) in segments.iter().enumerate() {
if i >= staged_commits_index {
break;
}
if !new_path.is_empty() || !segment.is_empty() {
new_path.push('/');
}
new_path.push_str(segment);
}

// Add the modified filename (remove UUID)
if let Some(filename) = segments.get(staged_commits_index + 1) {
// Remove UUID from filename: 00000000000000000005.{uuid}.json -> 00000000000000000005.json
let new_filename = filename
.split('.')
.next()
.ok_or(Error::generic("invalid filename format"))?
.to_string()
+ ".json";

if !new_path.is_empty() {
new_path.push('/');
}
new_path.push_str(&new_filename);
}

url.set_path(&new_path);
Ok(url)
}

impl ParsedLogPath<Url> {
Expand Down
13 changes: 13 additions & 0 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,19 @@ impl Snapshot {
Transaction::try_new(self)
}

#[cfg(feature = "catalog-managed")]
pub fn publish(mut self, engine: &dyn Engine) -> DeltaResult<Self> {
// FIXME: remove clone
self.log_segment = self.log_segment.clone().publish(engine)?;

println!("Published log segment");
for commit in &self.log_segment.ascending_commit_files {
println!("commit: {:?}", commit);
}

Ok(self)
}

/// Fetch the latest version of the provided `application_id` for this snapshot. Filters the txn based on the SetTransactionRetentionDuration property and lastUpdated
///
/// Note that this method performs log replay (fetches and processes metadata from storage).
Expand Down
Loading
Loading