From f9e24c60ffa647fe5fea6f7215f05d698669499c Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 7 Oct 2025 15:15:09 -0700 Subject: [PATCH] snapshot log tail FFI --- ffi/Cargo.toml | 1 + ffi/src/lib.rs | 143 ++++++++++++++++++++++++++++++++++++++++---- ffi/src/log_path.rs | 93 ++++++++++++++++++++++++++++ 3 files changed, 225 insertions(+), 12 deletions(-) create mode 100644 ffi/src/log_path.rs diff --git a/ffi/Cargo.toml b/ffi/Cargo.toml index 438bb0e80..86cb3c1a9 100644 --- a/ffi/Cargo.toml +++ b/ffi/Cargo.toml @@ -46,6 +46,7 @@ object_store = "0.12.3" default = ["default-engine-rustls"] default-engine-native-tls = ["delta_kernel/default-engine-native-tls", "default-engine-base"] default-engine-rustls = ["delta_kernel/default-engine-rustls", "default-engine-base"] +catalog-managed = ["delta_kernel/catalog-managed"] # This is an 'internal' feature flag which has all the shared bits from default-engine-native-tls and # default-engine-rustls. There is a check in kernel/lib.rs to ensure you have enabled one of diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index a70b531f1..d67e18183 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -13,8 +13,7 @@ use url::Url; use delta_kernel::schema::Schema; use delta_kernel::snapshot::Snapshot; -use delta_kernel::Version; -use delta_kernel::{DeltaResult, Engine, EngineData}; +use delta_kernel::{DeltaResult, Engine, EngineData, LogPath, Version}; use delta_kernel_ffi_macros::handle_descriptor; // cbindgen doesn't understand our use of feature flags here, and by default it parses `mod handle` @@ -41,6 +40,8 @@ use error::{AllocateError, AllocateErrorFn, ExternResult, IntoExternResult}; pub mod expressions; #[cfg(feature = "tracing")] pub mod ffi_tracing; +#[cfg(feature = "catalog-managed")] +pub mod log_path; pub mod scan; pub mod schema; @@ -577,10 +578,36 @@ pub unsafe extern "C" fn snapshot( ) -> ExternResult> { let url = unsafe { unwrap_and_parse_path_as_url(path) }; let engine = unsafe { engine.as_ref() }; - snapshot_impl(url, engine, None).into_extern_result(&engine) + snapshot_impl(url, engine, None, Vec::new()).into_extern_result(&engine) } -/// Get the snapshot from the specified table at a specific version +/// Get the latest snapshot from the specified table with optional log tail +/// +/// # Safety +/// +/// Caller is responsible for passing valid handles and path pointer. +/// The log_paths array and its contents must remain valid for the duration of this call. +#[cfg(feature = "catalog-managed")] +#[no_mangle] +pub unsafe extern "C" fn snapshot_with_log_tail( + path: KernelStringSlice, + engine: Handle, + log_paths: log_path::LogPathArray, +) -> ExternResult> { + let url = unsafe { unwrap_and_parse_path_as_url(path) }; + let engine_ref = unsafe { engine.as_ref() }; + + // Convert LogPathArray to Vec + let log_tail = match unsafe { log_paths.log_paths() } { + Ok(paths) => paths, + Err(e) => return DeltaResult::Err(e).into_extern_result(&engine_ref), + }; + + snapshot_impl(url, engine_ref, None, log_tail).into_extern_result(&engine_ref) +} + +/// Get the snapshot from the specified table at a specific version. Note this is only safe for +/// non-catalog-managed tables. /// /// # Safety /// @@ -593,21 +620,52 @@ pub unsafe extern "C" fn snapshot_at_version( ) -> ExternResult> { let url = unsafe { unwrap_and_parse_path_as_url(path) }; let engine = unsafe { engine.as_ref() }; - snapshot_impl(url, engine, version.into()).into_extern_result(&engine) + snapshot_impl(url, engine, version.into(), Vec::new()).into_extern_result(&engine) +} + +/// Get the snapshot from the specified table at a specific version with log tail. +/// +/// # Safety +/// +/// Caller is responsible for passing valid handles and path pointer. +/// The log_tail array and its contents must remain valid for the duration of this call. +#[cfg(feature = "catalog-managed")] +#[no_mangle] +pub unsafe extern "C" fn snapshot_at_version_with_log_tail( + path: KernelStringSlice, + engine: Handle, + version: Version, + log_tail: log_path::LogPathArray, +) -> ExternResult> { + let url = unsafe { unwrap_and_parse_path_as_url(path) }; + let engine_ref = unsafe { engine.as_ref() }; + + // Convert LogPathArray to Vec + let log_tail = match unsafe { log_tail.log_paths() } { + Ok(paths) => paths, + Err(e) => return DeltaResult::Err(e).into_extern_result(&engine_ref), + }; + + snapshot_impl(url, engine_ref, version.into(), log_tail).into_extern_result(&engine_ref) } fn snapshot_impl( url: DeltaResult, extern_engine: &dyn ExternEngine, version: Option, + #[allow(unused_variables)] log_tail: Vec, ) -> DeltaResult> { - let builder = Snapshot::builder_for(url?); - let builder = if let Some(v) = version { - // TODO: should we include a `with_version_opt` method for the builder? - builder.at_version(v) - } else { - builder - }; + let mut builder = Snapshot::builder_for(url?); + + if let Some(v) = version { + builder = builder.at_version(v); + } + + #[cfg(feature = "catalog-managed")] + if !log_tail.is_empty() { + builder = builder.with_log_tail(log_tail); + } + let snapshot = builder.build(extern_engine.engine().as_ref())?; Ok(snapshot.into()) } @@ -935,4 +993,65 @@ mod tests { unsafe { free_engine(engine) } Ok(()) } + + #[cfg(feature = "catalog-managed")] + #[tokio::test] + async fn test_snapshot_log_tail() -> Result<(), Box> { + use test_utils::add_staged_commit; + let storage = Arc::new(InMemory::new()); + add_commit( + storage.as_ref(), + 0, + actions_to_string(vec![TestAction::Metadata]), + ) + .await?; + let commit1 = add_staged_commit( + storage.as_ref(), + 1, + actions_to_string(vec![TestAction::Add("path1".into())]), + ) + .await?; + let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = engine_to_handle(Arc::new(engine), allocate_err); + let path = "memory:///"; + + let commit1_path = format!( + "{}_delta_log/_staged_commits/{}", + path, + commit1.filename().unwrap() + ); + let log_path = + log_path::FfiLogPath::new(kernel_string_slice!(commit1_path), 123456789, 100); + let log_tail = vec![log_path]; + let log_tail = log_path::LogPathArray { + ptr: log_tail.as_ptr(), + len: log_tail.len(), + }; + let snapshot = unsafe { + ok_or_panic(snapshot_with_log_tail( + kernel_string_slice!(path), + engine.shallow_copy(), + log_tail.clone(), + )) + }; + let snapshot_version = unsafe { version(snapshot.shallow_copy()) }; + assert_eq!(snapshot_version, 1); + + // Test getting snapshot at version + let snapshot2 = unsafe { + ok_or_panic(snapshot_at_version_with_log_tail( + kernel_string_slice!(path), + engine.shallow_copy(), + 1, + log_tail, + )) + }; + let snapshot_version = unsafe { version(snapshot.shallow_copy()) }; + assert_eq!(snapshot_version, 1); + + unsafe { free_snapshot(snapshot) } + unsafe { free_snapshot(snapshot2) } + unsafe { free_engine(engine) } + Ok(()) + } } diff --git a/ffi/src/log_path.rs b/ffi/src/log_path.rs new file mode 100644 index 000000000..7b830dc0d --- /dev/null +++ b/ffi/src/log_path.rs @@ -0,0 +1,93 @@ +//! FFI interface for LogPath. + +use delta_kernel::{DeltaResult, FileMeta, LogPath}; +use url::Url; + +use crate::{KernelStringSlice, TryFromStringSlice}; + +/// FFI-safe array of LogPaths +#[repr(C)] +#[derive(Debug, Clone, Copy)] +pub struct LogPathArray { + /// Pointer to the first element of the FfiLogPath array. If len is 0, this pointer may be null, + /// otherwise it must be non-null. + pub ptr: *const FfiLogPath, + /// Number of elements in the array + pub len: usize, +} + +impl LogPathArray { + /// Create an empty LogPathArray + pub fn empty() -> Self { + Self { + ptr: std::ptr::null(), + len: 0, + } + } + + /// Convert this array into a Vec of kernel LogPaths + /// + /// # Safety + /// The ptr must point to `len` valid FfiLogPath elements, and those elements + /// must remain valid for the duration of this call + pub(crate) unsafe fn log_paths(&self) -> DeltaResult> { + if self.ptr.is_null() || self.len == 0 { + return Ok(Vec::new()); + } + + let slice = unsafe { std::slice::from_raw_parts(self.ptr, self.len) }; + slice + .iter() + .map(|ffi_path| unsafe { ffi_path.log_path() }) + .collect::, _>>() + } +} + +/// FFI-safe LogPath representation that can be passed from the engine +#[repr(C)] +pub struct FfiLogPath { + /// URL location of the log file + location: KernelStringSlice, + /// Last modified time as milliseconds since unix epoch + last_modified: i64, + /// Size in bytes of the log file + size: u64, +} + +impl FfiLogPath { + /// Create a new FFI LogPath. The location string slice must be valid UTF-8. + pub fn new(location: KernelStringSlice, last_modified: i64, size: u64) -> Self { + Self { + location, + last_modified, + size, + } + } + + /// URL location of the log file as a string slice + pub fn location(&self) -> &KernelStringSlice { + &self.location + } + + /// Last modified time as milliseconds since unix epoch + pub fn last_modified(&self) -> i64 { + self.last_modified + } + + /// Size in bytes of the log file + pub fn size(&self) -> u64 { + self.size + } + + /// Convert this FFI log path into a kernel LogPath + unsafe fn log_path(&self) -> DeltaResult { + let location_str = unsafe { TryFromStringSlice::try_from_slice(&self.location) }?; + let url = Url::parse(location_str)?; + let file_meta = FileMeta { + location: url, + last_modified: self.last_modified, + size: self.size, + }; + LogPath::try_new(file_meta) + } +}