Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object_store = "0.12.3"
default = ["default-engine-rustls"]
default-engine-native-tls = ["delta_kernel/default-engine-native-tls", "default-engine-base"]
default-engine-rustls = ["delta_kernel/default-engine-rustls", "default-engine-base"]
catalog-managed = ["delta_kernel/catalog-managed"]

# This is an 'internal' feature flag which has all the shared bits from default-engine-native-tls and
# default-engine-rustls. There is a check in kernel/lib.rs to ensure you have enabled one of
Expand Down
143 changes: 131 additions & 12 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

use delta_kernel::schema::Schema;
use delta_kernel::snapshot::Snapshot;
use delta_kernel::Version;
use delta_kernel::{DeltaResult, Engine, EngineData};
use delta_kernel::{DeltaResult, Engine, EngineData, LogPath, Version};
use delta_kernel_ffi_macros::handle_descriptor;

// cbindgen doesn't understand our use of feature flags here, and by default it parses `mod handle`
Expand All @@ -41,6 +40,8 @@
pub mod expressions;
#[cfg(feature = "tracing")]
pub mod ffi_tracing;
#[cfg(feature = "catalog-managed")]
pub mod log_path;
pub mod scan;
pub mod schema;

Expand Down Expand Up @@ -577,10 +578,36 @@
) -> ExternResult<Handle<SharedSnapshot>> {
let url = unsafe { unwrap_and_parse_path_as_url(path) };
let engine = unsafe { engine.as_ref() };
snapshot_impl(url, engine, None).into_extern_result(&engine)
snapshot_impl(url, engine, None, Vec::new()).into_extern_result(&engine)
}

/// Get the snapshot from the specified table at a specific version
/// Get the latest snapshot from the specified table with optional log tail
///
/// # Safety
///
/// Caller is responsible for passing valid handles and path pointer.
/// The log_paths array and its contents must remain valid for the duration of this call.
#[cfg(feature = "catalog-managed")]
#[no_mangle]
pub unsafe extern "C" fn snapshot_with_log_tail(
path: KernelStringSlice,
engine: Handle<SharedExternEngine>,
log_paths: log_path::LogPathArray,
) -> ExternResult<Handle<SharedSnapshot>> {
let url = unsafe { unwrap_and_parse_path_as_url(path) };
let engine_ref = unsafe { engine.as_ref() };

// Convert LogPathArray to Vec<LogPath>
let log_tail = match unsafe { log_paths.log_paths() } {
Ok(paths) => paths,
Err(e) => return DeltaResult::Err(e).into_extern_result(&engine_ref),
};

snapshot_impl(url, engine_ref, None, log_tail).into_extern_result(&engine_ref)
}

/// Get the snapshot from the specified table at a specific version. Note this is only safe for
/// non-catalog-managed tables.
///
/// # Safety
///
Expand All @@ -593,21 +620,52 @@
) -> ExternResult<Handle<SharedSnapshot>> {
let url = unsafe { unwrap_and_parse_path_as_url(path) };
let engine = unsafe { engine.as_ref() };
snapshot_impl(url, engine, version.into()).into_extern_result(&engine)
snapshot_impl(url, engine, version.into(), Vec::new()).into_extern_result(&engine)
}

/// Get the snapshot from the specified table at a specific version with log tail.
///
/// # Safety
///
/// Caller is responsible for passing valid handles and path pointer.
/// The log_tail array and its contents must remain valid for the duration of this call.
#[cfg(feature = "catalog-managed")]
#[no_mangle]
pub unsafe extern "C" fn snapshot_at_version_with_log_tail(
path: KernelStringSlice,
engine: Handle<SharedExternEngine>,
version: Version,
log_tail: log_path::LogPathArray,
) -> ExternResult<Handle<SharedSnapshot>> {
let url = unsafe { unwrap_and_parse_path_as_url(path) };
let engine_ref = unsafe { engine.as_ref() };

// Convert LogPathArray to Vec<LogPath>
let log_tail = match unsafe { log_tail.log_paths() } {
Ok(paths) => paths,
Err(e) => return DeltaResult::Err(e).into_extern_result(&engine_ref),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Err(e) => return DeltaResult::Err(e).into_extern_result(&engine_ref),
err => return err.into_extern_result(&engine_ref),

};

snapshot_impl(url, engine_ref, version.into(), log_tail).into_extern_result(&engine_ref)
}

fn snapshot_impl(
url: DeltaResult<Url>,
extern_engine: &dyn ExternEngine,
version: Option<Version>,
#[allow(unused_variables)] log_tail: Vec<LogPath>,
) -> DeltaResult<Handle<SharedSnapshot>> {
let builder = Snapshot::builder_for(url?);
let builder = if let Some(v) = version {
// TODO: should we include a `with_version_opt` method for the builder?
builder.at_version(v)
} else {
builder
};
let mut builder = Snapshot::builder_for(url?);

if let Some(v) = version {
builder = builder.at_version(v);
}

#[cfg(feature = "catalog-managed")]
if !log_tail.is_empty() {
builder = builder.with_log_tail(log_tail);
}

let snapshot = builder.build(extern_engine.engine().as_ref())?;
Ok(snapshot.into())
}
Expand Down Expand Up @@ -935,4 +993,65 @@
unsafe { free_engine(engine) }
Ok(())
}

#[cfg(feature = "catalog-managed")]
#[tokio::test]
async fn test_snapshot_log_tail() -> Result<(), Box<dyn std::error::Error>> {
use test_utils::add_staged_commit;
let storage = Arc::new(InMemory::new());
add_commit(
storage.as_ref(),
0,
actions_to_string(vec![TestAction::Metadata]),
)
.await?;
let commit1 = add_staged_commit(
storage.as_ref(),
1,
actions_to_string(vec![TestAction::Add("path1".into())]),
)
.await?;
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
let engine = engine_to_handle(Arc::new(engine), allocate_err);
let path = "memory:///";

let commit1_path = format!(
"{}_delta_log/_staged_commits/{}",
path,
commit1.filename().unwrap()
);
let log_path =
log_path::FfiLogPath::new(kernel_string_slice!(commit1_path), 123456789, 100);
let log_tail = vec![log_path];

Check failure on line 1025 in ffi/src/lib.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

useless use of `vec!`

Check failure on line 1025 in ffi/src/lib.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

useless use of `vec!`
let log_tail = log_path::LogPathArray {
ptr: log_tail.as_ptr(),
len: log_tail.len(),
};
Comment on lines +1023 to +1029
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth just making a LogPathArray::fromVec.

Also, while it's handled right here, I think we need to be very careful with Vec since drop could still be called and we'd have a use-after-free.

let snapshot = unsafe {
ok_or_panic(snapshot_with_log_tail(
kernel_string_slice!(path),
engine.shallow_copy(),
log_tail.clone(),

Check failure on line 1034 in ffi/src/lib.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

using `clone` on type `LogPathArray` which implements the `Copy` trait

Check failure on line 1034 in ffi/src/lib.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

using `clone` on type `LogPathArray` which implements the `Copy` trait
))
};
let snapshot_version = unsafe { version(snapshot.shallow_copy()) };
assert_eq!(snapshot_version, 1);

// Test getting snapshot at version
let snapshot2 = unsafe {
ok_or_panic(snapshot_at_version_with_log_tail(
kernel_string_slice!(path),
engine.shallow_copy(),
1,
log_tail,
))
};
let snapshot_version = unsafe { version(snapshot.shallow_copy()) };
assert_eq!(snapshot_version, 1);

unsafe { free_snapshot(snapshot) }
unsafe { free_snapshot(snapshot2) }
unsafe { free_engine(engine) }
Ok(())
}
}
93 changes: 93 additions & 0 deletions ffi/src/log_path.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! FFI interface for LogPath.

use delta_kernel::{DeltaResult, FileMeta, LogPath};
use url::Url;

use crate::{KernelStringSlice, TryFromStringSlice};

/// FFI-safe array of LogPaths
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct LogPathArray {
/// Pointer to the first element of the FfiLogPath array. If len is 0, this pointer may be null,
/// otherwise it must be non-null.
pub ptr: *const FfiLogPath,
/// Number of elements in the array
pub len: usize,
}

impl LogPathArray {
/// Create an empty LogPathArray
pub fn empty() -> Self {
Self {
ptr: std::ptr::null(),
len: 0,
}
}

/// Convert this array into a Vec of kernel LogPaths
///
/// # Safety
/// The ptr must point to `len` valid FfiLogPath elements, and those elements
/// must remain valid for the duration of this call
pub(crate) unsafe fn log_paths(&self) -> DeltaResult<Vec<LogPath>> {
if self.ptr.is_null() || self.len == 0 {
return Ok(Vec::new());
}

let slice = unsafe { std::slice::from_raw_parts(self.ptr, self.len) };
slice
.iter()
.map(|ffi_path| unsafe { ffi_path.log_path() })
.collect::<Result<Vec<_>, _>>()
}
}

/// FFI-safe LogPath representation that can be passed from the engine
#[repr(C)]
pub struct FfiLogPath {
/// URL location of the log file
location: KernelStringSlice,
/// Last modified time as milliseconds since unix epoch
last_modified: i64,
/// Size in bytes of the log file
size: u64,
}

impl FfiLogPath {
/// Create a new FFI LogPath. The location string slice must be valid UTF-8.
pub fn new(location: KernelStringSlice, last_modified: i64, size: u64) -> Self {
Self {
location,
last_modified,
size,
}
}

/// URL location of the log file as a string slice
pub fn location(&self) -> &KernelStringSlice {
&self.location
}

/// Last modified time as milliseconds since unix epoch
pub fn last_modified(&self) -> i64 {
self.last_modified
}

/// Size in bytes of the log file
pub fn size(&self) -> u64 {
self.size
}

/// Convert this FFI log path into a kernel LogPath
unsafe fn log_path(&self) -> DeltaResult<LogPath> {
let location_str = unsafe { TryFromStringSlice::try_from_slice(&self.location) }?;
let url = Url::parse(location_str)?;
let file_meta = FileMeta {
location: url,
last_modified: self.last_modified,
size: self.size,
};
LogPath::try_new(file_meta)
}
}
Loading