From 37aee772aa555f99578121689c243485d77890a0 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 19 Sep 2025 17:28:02 +0200 Subject: [PATCH 01/10] CDF API for FFI --- ffi/src/lib.rs | 1 + ffi/src/table_changes.rs | 849 +++++++++++++++++++++++++++++++ kernel/src/log_segment.rs | 3 +- kernel/src/table_changes/scan.rs | 12 +- kernel/tests/cdf.rs | 2 +- test-utils/src/lib.rs | 2 +- 6 files changed, 862 insertions(+), 7 deletions(-) create mode 100644 ffi/src/table_changes.rs diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index a70b531f1..50a12cbc7 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -37,6 +37,7 @@ pub use domain_metadata::get_domain_metadata; pub mod engine_data; pub mod engine_funcs; pub mod error; +pub mod table_changes; use error::{AllocateError, AllocateErrorFn, ExternResult, IntoExternResult}; pub mod expressions; #[cfg(feature = "tracing")] diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs new file mode 100644 index 000000000..202caeecf --- /dev/null +++ b/ffi/src/table_changes.rs @@ -0,0 +1,849 @@ +//! TableChanges related ffi code + +use std::sync::{Arc, Mutex}; + +use delta_kernel::arrow::array::{ + ffi::{FFI_ArrowArray, FFI_ArrowSchema}, + ArrayData, RecordBatch, StructArray, +}; +use delta_kernel::arrow::compute::filter_record_batch; +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::scan::ScanResult; +use delta_kernel::table_changes::scan::TableChangesScan; +use delta_kernel::table_changes::TableChanges; +use delta_kernel::{DeltaResult, Error, Version}; +use delta_kernel_ffi_macros::handle_descriptor; +use tracing::debug; + +use super::handle::Handle; +use url::Url; + +use crate::engine_data::ArrowFFIData; +use crate::expressions::kernel_visitor::{unwrap_kernel_predicate, KernelExpressionVisitorState}; +use crate::scan::EnginePredicate; +use crate::{ + kernel_string_slice, unwrap_and_parse_path_as_url, AllocateStringFn, ExternEngine, + ExternResult, IntoExternResult, KernelStringSlice, NullableCvoid, SharedExternEngine, + SharedSchema, +}; + +#[handle_descriptor(target=TableChanges, mutable=true, sized=true)] +pub struct ExclusiveTableChanges; + +/// Get the table changes from the specified table at a specific version +/// +/// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) +/// - `engine`: Implementation of [`Engine`] apis. +/// - `start_version`: The start version of the change data feed +/// End version will be the newest table version. +/// +/// # Safety +/// +/// Caller is responsible for passing valid handles and path pointer. +#[no_mangle] +pub unsafe extern "C" fn table_changes_from_version( + path: KernelStringSlice, + engine: Handle, + start_version: Version, +) -> ExternResult> { + let url = unsafe { unwrap_and_parse_path_as_url(path) }; + let engine = unsafe { engine.as_ref() }; + table_changes_impl(url, engine, start_version, None).into_extern_result(&engine) +} + +/// Get the table changes from the specified table between two versions +/// +/// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) +/// - `engine`: Implementation of [`Engine`] apis. +/// - `start_version`: The start version of the change data feed +/// - `end_version`: The end version (inclusive) of the change data feed. +/// +/// # Safety +/// +/// Caller is responsible for passing valid handles and path pointer. +#[no_mangle] +pub unsafe extern "C" fn table_changes_between_versions( + path: KernelStringSlice, + engine: Handle, + start_version: Version, + end_version: Version, +) -> ExternResult> { + let url = unsafe { unwrap_and_parse_path_as_url(path) }; + let engine = unsafe { engine.as_ref() }; + table_changes_impl(url, engine, start_version, end_version.into()).into_extern_result(&engine) +} + +fn table_changes_impl( + url: DeltaResult, + extern_engine: &dyn ExternEngine, + start_version: Version, + end_version: Option, +) -> DeltaResult> { + let table_changes = TableChanges::try_new( + url?, + extern_engine.engine().as_ref(), + start_version, + end_version, + ); + Ok(Box::new(table_changes?).into()) +} + +/// Drops table changes. +/// +/// # Safety +/// Caller is responsible for passing a valid table changes handle. +#[no_mangle] +pub unsafe extern "C" fn free_table_changes(table_changes: Handle) { + table_changes.drop_handle(); +} + +/// Get schema from the specified TableChanges. +/// +/// # Safety +/// +/// Caller is responsible for passing a valid table changes handle. +#[no_mangle] +pub unsafe extern "C" fn table_changes_schema( + table_changes: Handle, +) -> Handle { + let table_changes = unsafe { table_changes.as_ref() }; + Arc::new(table_changes.schema().clone()).into() +} + +/// Get table root from the specified TableChanges. +/// +/// # Safety +/// +/// Caller is responsible for passing a valid table changes handle. +#[no_mangle] +pub unsafe extern "C" fn table_changes_table_root( + table_changes: Handle, + allocate_fn: AllocateStringFn, +) -> NullableCvoid { + let table_changes = unsafe { table_changes.as_ref() }; + let table_root = table_changes.table_root().to_string(); + allocate_fn(kernel_string_slice!(table_root)) +} + +/// Get start version from the specified TableChanges. +/// +/// # Safety +/// +/// Caller is responsible for passing a valid table changes handle. +#[no_mangle] +pub unsafe extern "C" fn table_changes_start_version( + table_changes: Handle, +) -> u64 { + let table_changes = unsafe { table_changes.as_ref() }; + table_changes.start_version() +} + +/// Get end version from the specified TableChanges. +/// +/// # Safety +/// +/// Caller is responsible for passing a valid table changes handle. +#[no_mangle] +pub unsafe extern "C" fn table_changes_end_version( + table_changes: Handle, +) -> u64 { + let table_changes = unsafe { table_changes.as_ref() }; + table_changes.end_version() +} + +#[handle_descriptor(target=TableChangesScan, mutable=false, sized=true)] +pub struct SharedTableChangesScan; + +/// Get a [`TableChangesScan`] over the table specified by the passed table changes. +/// It is the responsibility of the _engine_ to free this scan when complete by calling [`free_table_changes_scan`]. +/// Consumes TableChanges. +/// +/// # Safety +/// +/// Caller is responsible for passing a valid table changes pointer, and engine pointer +#[no_mangle] +pub unsafe extern "C" fn table_changes_scan( + table_changes: Handle, + engine: Handle, + predicate: Option<&mut EnginePredicate>, +) -> ExternResult> { + let table_changes = unsafe { table_changes.into_inner() }; + table_changes_scan_impl(*table_changes, predicate).into_extern_result(&engine.as_ref()) +} + +fn table_changes_scan_impl( + table_changes: TableChanges, + predicate: Option<&mut EnginePredicate>, +) -> DeltaResult> { + let mut scan_builder = table_changes.into_scan_builder(); + if let Some(predicate) = predicate { + let mut visitor_state = KernelExpressionVisitorState::default(); + let pred_id = (predicate.visitor)(predicate.predicate, &mut visitor_state); + let predicate = unwrap_kernel_predicate(&mut visitor_state, pred_id); + debug!("Got predicate: {:#?}", predicate); + scan_builder = scan_builder.with_predicate(predicate.map(Arc::new)); + } + Ok(Arc::new(scan_builder.build()?).into()) +} + +/// Drops a table changes scan. +/// +/// # Safety +/// Caller is responsible for passing a valid scan handle. +#[no_mangle] +pub unsafe extern "C" fn free_table_changes_scan( + table_changes_scan: Handle, +) { + table_changes_scan.drop_handle(); +} + +/// Get the table root of a table changes scan. +/// +/// # Safety +/// Engine is responsible for providing a valid scan pointer and allocate_fn (for allocating the +/// string) +#[no_mangle] +pub unsafe extern "C" fn table_changes_scan_table_root( + table_changes_scan: Handle, + allocate_fn: AllocateStringFn, +) -> NullableCvoid { + let table_changes_scan = unsafe { table_changes_scan.as_ref() }; + let table_root = table_changes_scan.table_root().to_string(); + allocate_fn(kernel_string_slice!(table_root)) +} + +/// Get the logical schema of the specified table changes scan. +/// +/// # Safety +/// +/// Caller is responsible for passing a valid snapshot handle. +#[no_mangle] +pub unsafe extern "C" fn table_changes_scan_logical_schema( + table_changes_scan: Handle, +) -> Handle { + let table_changes_scan = unsafe { table_changes_scan.as_ref() }; + table_changes_scan.logical_schema().clone().into() +} + +/// Get the physical schema of the specified table changes scan. +/// +/// # Safety +/// +/// Caller is responsible for passing a valid snapshot handle. +#[no_mangle] +pub unsafe extern "C" fn table_changes_scan_physical_schema( + table_changes_scan: Handle, +) -> Handle { + let table_changes_scan = unsafe { table_changes_scan.as_ref() }; + table_changes_scan.physical_schema().clone().into() +} + +pub struct ScanTableChangesIterator { + data: Mutex> + Send>>, + engine: Arc, +} + +#[handle_descriptor(target=ScanTableChangesIterator, mutable=false, sized=true)] +pub struct SharedScanTableChangesIterator; + +impl Drop for ScanTableChangesIterator { + fn drop(&mut self) { + debug!("dropping ScanTableChangesIterator"); + } +} + +#[no_mangle] +pub unsafe extern "C" fn table_changes_scan_execute( + table_changes_scan: Handle, + engine: Handle, +) -> ExternResult> { + let table_changes_scan = unsafe { table_changes_scan.clone_as_arc() }; + let engine = unsafe { engine.clone_as_arc() }; + table_changes_scan_execute_impl(table_changes_scan, engine.clone()) + .into_extern_result(&engine.as_ref()) +} + +fn table_changes_scan_execute_impl( + table_changes_scan: Arc, + engine: Arc, +) -> DeltaResult> { + let table_changes_iter = table_changes_scan.execute(engine.engine().clone())?; + let data = ScanTableChangesIterator { + data: Mutex::new(Box::new(table_changes_iter)), + engine: engine.clone(), + }; + Ok(Arc::new(data).into()) +} + +/// # Safety +/// +/// Drops table changes iterator. +/// Caller is responsible for (at most once) passing a valid pointer returned by a call to +/// [`scan_table_changes_execute`]. +#[no_mangle] +pub unsafe extern "C" fn free_scan_table_changes_iter( + data: Handle, +) { + data.drop_handle(); +} + +#[no_mangle] +pub unsafe extern "C" fn scan_table_changes_next( + data: Handle, +) -> ExternResult<*mut ArrowFFIData> { + let data = unsafe { data.as_ref() }; + scan_table_changes_next_impl(data).into_extern_result(&data.engine.as_ref()) +} + +fn scan_table_changes_next_impl(data: &ScanTableChangesIterator) -> DeltaResult<*mut ArrowFFIData> { + let mut data = data + .data + .lock() + .map_err(|_| Error::generic("poisoned mutex"))?; + if let Some(scan_result) = data.next().transpose()? { + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let mut record_batch: RecordBatch = data + .into_any() + .downcast::() + .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? + .into(); + + if let Some(mask) = mask { + record_batch = filter_record_batch(&record_batch, &mask.into())?; + } + + let sa: StructArray = record_batch.into(); + let array_data: ArrayData = sa.into(); + let array = FFI_ArrowArray::new(&array_data); + let schema = FFI_ArrowSchema::try_from(array_data.data_type())?; + let ret_data = Box::new(ArrowFFIData { array, schema }); + Ok(Box::leak(ret_data)) + } else { + Ok(std::ptr::null_mut()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ffi_test_utils::{ + allocate_err, allocate_str, ok_or_panic, recover_string, EngineErrorWithMessage, + }; + use crate::{engine_to_handle, kernel_string_slice}; + + use delta_kernel::arrow::array::{ArrayRef, Int32Array, StringArray}; + use delta_kernel::arrow::datatypes::{Field, Schema}; + use delta_kernel::arrow::error::ArrowError; + use delta_kernel::arrow::record_batch::RecordBatch; + use delta_kernel::arrow::util::pretty::pretty_format_batches; + use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; + use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; + use delta_kernel::schema::{DataType, StructField, StructType}; + use delta_kernel::Engine; + use delta_kernel_ffi::engine_data::get_engine_data; + use itertools::Itertools; + use object_store::{memory::InMemory, path::Path, ObjectStore}; + use std::sync::Arc; + use test_utils::{ + actions_to_string_with_metadata, add_commit, generate_batch, record_batch_to_bytes, + to_arrow, IntoArray as _, TestAction, + }; + + const PARQUET_FILE1: &str = + "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet"; + const PARQUET_FILE2: &str = + "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet"; + + pub const METADATA: &str = r#" + {"commitInfo": { + "timestamp": 1587968586154, + "operation": "WRITE", + "operationParameters": { + "mode": "ErrorIfExists", + "partitionBy": "[]" + }, + "isBlindAppend": true + }} + {"protocol": { + "minReaderVersion": 1, + "minWriterVersion": 2 + }} + {"metaData": { + "id": "5fba94ed-9794-4965-ba6e-6ee3c0d22af9", + "format": { + "provider": "parquet", + "options": {} + }, + "schemaString": "{ + \"type\": \"struct\", + \"fields\": [ + { + \"name\": \"id\", + \"type\": \"integer\", + \"nullable\": true, + \"metadata\": {} + }, + { + \"name\": \"val\", + \"type\": \"string\", + \"nullable\": true, + \"metadata\": {} + } + ] + }", + "partitionColumns": [], + "configuration": { + "delta.enableChangeDataFeed": "true" + }, + "createdTime": 1587968585495 + }} + "#; + + async fn commit_add_file( + storage: &dyn ObjectStore, + version: u64, + file: String, + ) -> Result<(), Box> { + add_commit( + storage, + version, + actions_to_string_with_metadata( + vec![TestAction::Metadata, TestAction::Add(file)], + METADATA, + ), + ) + .await + } + + async fn commit_remove_file( + storage: &dyn ObjectStore, + version: u64, + file: String, + ) -> Result<(), Box> { + add_commit( + storage, + version, + actions_to_string_with_metadata( + vec![TestAction::Metadata, TestAction::Remove(file)], + METADATA, + ), + ) + .await + } + + async fn put_file( + storage: &dyn ObjectStore, + file: String, + batch: &RecordBatch, + ) -> Result<(), Box> { + storage + .put(&Path::from(file), record_batch_to_bytes(&batch).into()) + .await?; + Ok(()) + } + + pub fn generate_batch_with_id(start_i: i32) -> Result { + generate_batch(vec![ + ("id", vec![start_i, start_i + 1, start_i + 2].into_array()), + ("val", vec!["a", "b", "c"].into_array()), + ]) + } + + pub fn get_batch_schema() -> Arc { + Arc::new( + StructType::try_new(vec![ + StructField::nullable("id", DataType::INTEGER), + StructField::nullable("val", DataType::STRING), + StructField::nullable("_change_type", DataType::STRING), + StructField::nullable("_commit_version", DataType::INTEGER), + ]) + .unwrap(), + ) + } + + fn check_columns_in_schema(fields: &[&str], schema: &StructType) -> bool { + fields.iter().all(|f| schema.contains(f)) + } + + fn read_scan( + scan: &TableChangesScan, + engine: Arc, + ) -> DeltaResult> { + let scan_results = scan.execute(engine)?; + scan_results + .map(|scan_result| -> DeltaResult<_> { + let scan_result = scan_result?; + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let record_batch = to_arrow(data)?; + if let Some(mask) = mask { + Ok(filter_record_batch(&record_batch, &mask.into())?) + } else { + Ok(record_batch) + } + }) + .try_collect() + } + + fn filter_batches(batches: Vec) -> Vec { + batches + .into_iter() + .map(|batch| { + let schema = batch.schema(); + let keep_indices: Vec = schema + .fields() + .iter() + .enumerate() + .filter_map(|(i, field)| { + if field.name() != "_commit_timestamp" { + Some(i) + } else { + None + } + }) + .collect(); + + let columns: Vec = keep_indices + .iter() + .map(|&i| batch.column(i).clone()) + .collect(); + + let fields: Vec> = keep_indices + .iter() + .map(|&i| Arc::new(schema.field(i).clone())) + .collect(); + + let filtered_schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(filtered_schema, columns).unwrap() + }) + .collect() + } + + #[tokio::test] + async fn test_table_changes() -> Result<(), Box> { + let storage = Arc::new(InMemory::new()); + commit_add_file(storage.as_ref(), 0, PARQUET_FILE1.to_string()).await?; + commit_add_file(storage.as_ref(), 1, PARQUET_FILE2.to_string()).await?; + + let batch = generate_batch_with_id(1)?; + put_file(storage.as_ref(), PARQUET_FILE1.to_string(), &batch).await?; + let batch = generate_batch_with_id(4)?; + put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?; + + let path = "memory:///"; + let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = engine_to_handle(Arc::new(engine), allocate_err); + + let table_changes = unsafe { + table_changes_from_version(kernel_string_slice!(path), engine.shallow_copy(), 0) + }; + + match table_changes { + ExternResult::Ok(ref handle) => { + assert_eq!( + unsafe { table_changes_start_version(handle.shallow_copy()) }, + 0 + ); + assert_eq!( + unsafe { table_changes_end_version(handle.shallow_copy()) }, + 1 + ); + + let table_root = + unsafe { table_changes_table_root(handle.shallow_copy(), allocate_str) }; + assert_eq!(recover_string(table_root.unwrap()), path); + + let schema = unsafe { table_changes_schema(handle.shallow_copy()).shallow_copy() }; + let schema_ref = unsafe { schema.as_ref() }; + assert_eq!(schema_ref.fields().len(), 5); + check_columns_in_schema( + &[ + "id", + "val", + "_change_type", + "_commit_version", + "_commit_timestamp", + ], + schema_ref, + ); + } + ExternResult::Err(e) => unsafe { + let err_with_msg: &EngineErrorWithMessage = &*(e as *mut EngineErrorWithMessage); + eprintln!( + "Error type: {:?}, message: {:?}", + (*err_with_msg).etype, + (*err_with_msg).message + ); + std::process::exit(1); + }, + } + + let table_changes = ok_or_panic(table_changes); + let table_changes_scan = + unsafe { table_changes_scan(table_changes, engine.shallow_copy(), None) }; + + match table_changes_scan { + ExternResult::Ok(ref handle) => { + let table_root = + unsafe { table_changes_scan_table_root(handle.shallow_copy(), allocate_str) }; + assert_eq!(recover_string(table_root.unwrap()), path); + + let logical_schema = unsafe { + table_changes_scan_logical_schema(handle.shallow_copy()).shallow_copy() + }; + let logical_schema_ref = unsafe { logical_schema.as_ref() }; + assert_eq!(logical_schema_ref.fields().len(), 5); + check_columns_in_schema( + &[ + "id", + "val", + "_change_type", + "_commit_version", + "_commit_timestamp", + ], + logical_schema_ref, + ); + + let physical_schema = unsafe { + table_changes_scan_physical_schema(handle.shallow_copy()).shallow_copy() + }; + let physical_schema_ref = unsafe { physical_schema.as_ref() }; + assert_eq!(physical_schema_ref.fields().len(), 2); + check_columns_in_schema(&["id", "val"], physical_schema_ref); + } + ExternResult::Err(e) => unsafe { + let err_with_msg: &EngineErrorWithMessage = &*(e as *mut EngineErrorWithMessage); + eprintln!( + "Error type: {:?}, message: {:?}", + (*err_with_msg).etype, + (*err_with_msg).message + ); + std::process::exit(1); + }, + } + + let table_changes_scan = ok_or_panic(table_changes_scan); + unsafe { + free_table_changes_scan(table_changes_scan); + } + Ok(()) + } + + #[tokio::test] + async fn test_table_changes_scan() -> Result<(), Box> { + let storage = Arc::new(InMemory::new()); + commit_add_file(storage.as_ref(), 0, PARQUET_FILE1.to_string()).await?; + commit_add_file(storage.as_ref(), 1, PARQUET_FILE2.to_string()).await?; + + let batch = generate_batch_with_id(1)?; + put_file(storage.as_ref(), PARQUET_FILE1.to_string(), &batch).await?; + let batch = generate_batch_with_id(4)?; + put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?; + + let path = "memory:///"; + let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = engine_to_handle(Arc::new(engine), allocate_err); + + let table_changes = ok_or_panic(unsafe { + table_changes_from_version(kernel_string_slice!(path), engine.shallow_copy(), 0) + }); + let table_changes_scan = + ok_or_panic(unsafe { table_changes_scan(table_changes, engine.shallow_copy(), None) }); + let batches = unsafe { + read_scan( + &table_changes_scan.into_inner(), + engine.into_inner().engine(), + ) + }; + let batches: Vec = batches.into_iter().flatten().collect(); + let filtered_batches: Vec = filter_batches(batches); + + let table_schema = get_batch_schema(); + let expected = &ArrowEngineData::new(RecordBatch::try_new( + Arc::new(table_schema.as_ref().try_into_arrow()?), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])), + Arc::new(StringArray::from(vec!["a", "b", "c", "a", "b", "c"])), + Arc::new(StringArray::from(vec![ + "insert", "insert", "insert", "insert", "insert", "insert", + ])), + Arc::new(Int32Array::from(vec![0, 0, 0, 1, 1, 1])), + ], + )?); + + let formatted = pretty_format_batches(&filtered_batches) + .unwrap() + .to_string(); + let expected = pretty_format_batches(&[expected.record_batch().clone()]) + .unwrap() + .to_string(); + + println!("actual:\n{formatted}"); + println!("expected:\n{expected}"); + assert_eq!(formatted, expected); + + Ok(()) + } + + #[tokio::test] + async fn test_table_changes_scan_iterator() -> Result<(), Box> { + let storage = Arc::new(InMemory::new()); + commit_add_file(storage.as_ref(), 0, PARQUET_FILE1.to_string()).await?; + commit_add_file(storage.as_ref(), 1, PARQUET_FILE2.to_string()).await?; + + let batch = generate_batch_with_id(1)?; + put_file(storage.as_ref(), PARQUET_FILE1.to_string(), &batch).await?; + let batch = generate_batch_with_id(4)?; + put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?; + + let path = "memory:///"; + let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = engine_to_handle(Arc::new(engine), allocate_err); + + let table_changes = ok_or_panic(unsafe { + table_changes_from_version(kernel_string_slice!(path), engine.shallow_copy(), 0) + }); + + let table_changes_scan = + ok_or_panic(unsafe { table_changes_scan(table_changes, engine.shallow_copy(), None) }); + + let table_changes_scan_iter = unsafe { + table_changes_scan_execute(table_changes_scan.shallow_copy(), engine.shallow_copy()) + }; + + match table_changes_scan_iter { + ExternResult::Ok(ref _handle) => {} + ExternResult::Err(e) => unsafe { + let err_with_msg: &EngineErrorWithMessage = &*(e as *mut EngineErrorWithMessage); + eprintln!( + "Error type: {:?}, message: {:?}", + (*err_with_msg).etype, + (*err_with_msg).message + ); + std::process::exit(1); + }, + } + + let table_changes_scan_iter = ok_or_panic(table_changes_scan_iter); + let mut batches: Vec = Vec::new(); + let mut i: i32 = 0; + loop { + i += 1; + let data = ok_or_panic(unsafe { + scan_table_changes_next(table_changes_scan_iter.shallow_copy()) + }); + if data.is_null() { + break; + } + + let engine_data = unsafe { + let data_ref = &mut *data; + let array = std::mem::replace(&mut data_ref.array, FFI_ArrowArray::empty()); + get_engine_data(array, &(*data).schema, allocate_err) + }; + let engine_data = ok_or_panic(engine_data); + let record_batch = unsafe { to_arrow(engine_data.into_inner()) }?; + + println!("Batch ({i}) num rows {:?}", record_batch.num_rows()); + batches.push(record_batch); + } + + let filtered_batches: Vec = filter_batches(batches); + let formatted = pretty_format_batches(&filtered_batches) + .unwrap() + .to_string(); + + let table_schema = get_batch_schema(); + let expected = &ArrowEngineData::new(RecordBatch::try_new( + Arc::new(table_schema.as_ref().try_into_arrow()?), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])), + Arc::new(StringArray::from(vec!["a", "b", "c", "a", "b", "c"])), + Arc::new(StringArray::from(vec![ + "insert", "insert", "insert", "insert", "insert", "insert", + ])), + Arc::new(Int32Array::from(vec![0, 0, 0, 1, 1, 1])), + ], + )?); + + let expected = pretty_format_batches(&[expected.record_batch().clone()]) + .unwrap() + .to_string(); + + println!("actual:\n{formatted}"); + println!("expected:\n{expected}"); + assert_eq!(formatted, expected); + + unsafe { + free_table_changes_scan(table_changes_scan); + } + unsafe { + free_scan_table_changes_iter(table_changes_scan_iter); + } + Ok(()) + } + + #[tokio::test] + async fn test_table_changes_between_commits() -> Result<(), Box> { + let storage = Arc::new(InMemory::new()); + commit_add_file(storage.as_ref(), 0, PARQUET_FILE1.to_string()).await?; + commit_add_file(storage.as_ref(), 1, PARQUET_FILE2.to_string()).await?; + commit_remove_file(storage.as_ref(), 2, PARQUET_FILE1.to_string()).await?; + commit_remove_file(storage.as_ref(), 3, PARQUET_FILE2.to_string()).await?; + + let batch = generate_batch_with_id(1)?; + put_file(storage.as_ref(), PARQUET_FILE1.to_string(), &batch).await?; + let batch = generate_batch_with_id(4)?; + put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?; + + let path = "memory:///"; + let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = engine_to_handle(Arc::new(engine), allocate_err); + + let table_changes = ok_or_panic(unsafe { + table_changes_between_versions(kernel_string_slice!(path), engine.shallow_copy(), 1, 2) + }); + let table_changes_scan = + ok_or_panic(unsafe { table_changes_scan(table_changes, engine.shallow_copy(), None) }); + let batches = unsafe { + read_scan( + &table_changes_scan.into_inner(), + engine.into_inner().engine(), + ) + }; + let batches: Vec = batches.into_iter().flatten().collect(); + let filtered_batches: Vec = filter_batches(batches); + + let table_schema = Arc::new(StructType::try_new(vec![ + StructField::nullable("id", DataType::INTEGER), + StructField::nullable("val", DataType::STRING), + StructField::nullable("_change_type", DataType::STRING), + StructField::nullable("_commit_version", DataType::INTEGER), + ])?); + let expected = &ArrowEngineData::new(RecordBatch::try_new( + Arc::new(table_schema.as_ref().try_into_arrow()?), + vec![ + Arc::new(Int32Array::from(vec![4, 5, 6, 1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c", "a", "b", "c"])), + Arc::new(StringArray::from(vec![ + "insert", "insert", "insert", "delete", "delete", "delete", + ])), + Arc::new(Int32Array::from(vec![1, 1, 1, 2, 2, 2])), + ], + )?); + + let formatted = pretty_format_batches(&filtered_batches) + .unwrap() + .to_string(); + let expected = pretty_format_batches(&[expected.record_batch().clone()]) + .unwrap() + .to_string(); + + println!("actual:\n{formatted}"); + println!("expected:\n{expected}"); + assert_eq!(formatted, expected); + + Ok(()) + } +} diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 5500e5522..58cfca42b 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -209,7 +209,8 @@ impl LogSegment { .first() .is_some_and(|first_commit| first_commit.version == start_version), Error::generic(format!( - "Expected the first commit to have version {start_version}" + "Expected the first commit to have version {start_version}, got {:?}", + listed_files.ascending_commit_files.first().map(|c| c.version) )) ); LogSegment::try_new(listed_files, log_root, end_version) diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 55b0576e8..edee70d1f 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -239,7 +239,7 @@ impl TableChangesScan { pub fn execute( &self, engine: Arc, - ) -> DeltaResult> + use<'_>> { + ) -> DeltaResult>> { let scan_metadata = self.scan_metadata(engine.clone())?; let scan_files = scan_metadata_to_scan_file(scan_metadata); @@ -248,6 +248,10 @@ impl TableChangesScan { let physical_predicate = self.physical_predicate(); let dv_engine_ref = engine.clone(); + let logical_schema = self.logical_schema().clone(); + let physical_schema = self.physical_schema().clone(); + let table_root_copy = self.table_changes.table_root().clone(); + let result = scan_files .map(move |scan_file| { resolve_scan_file_dv(dv_engine_ref.as_ref(), &table_root, scan_file?) @@ -257,9 +261,9 @@ impl TableChangesScan { read_scan_file( engine.as_ref(), resolved_scan_file?, - self.table_root(), - self.logical_schema(), - self.physical_schema(), + &table_root_copy, + &logical_schema, + &physical_schema, &all_fields, physical_predicate.clone(), ) diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs index 002edc15e..07e1fb34f 100644 --- a/kernel/tests/cdf.rs +++ b/kernel/tests/cdf.rs @@ -411,7 +411,7 @@ fn invalid_range_end_before_start() { #[test] fn invalid_range_start_after_last_version_of_table() { let res = read_cdf_for_table("cdf-table-simple", 3, 4, None); - let expected_msg = "Expected the first commit to have version 3"; + let expected_msg = "Expected the first commit to have version 3, got None"; assert!(matches!(res, Err(Error::Generic(msg)) if msg == expected_msg)); } diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index c20b05fc2..7e9ebcc34 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -68,7 +68,7 @@ pub fn actions_to_string_partitioned(actions: Vec) -> String { actions_to_string_with_metadata(actions, METADATA_WITH_PARTITION_COLS) } -fn actions_to_string_with_metadata(actions: Vec, metadata: &str) -> String { +pub fn actions_to_string_with_metadata(actions: Vec, metadata: &str) -> String { actions .into_iter() .map(|test_action| match test_action { From 87a370ee750d361e3dd55774c7d6037d6b7758fd Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 24 Sep 2025 15:02:46 +0200 Subject: [PATCH 02/10] Review fixes --- ffi/src/engine_data.rs | 9 ++ ffi/src/table_changes.rs | 215 +++++++++++++++------------------------ 2 files changed, 91 insertions(+), 133 deletions(-) diff --git a/ffi/src/engine_data.rs b/ffi/src/engine_data.rs index 0010bf7de..506dca48e 100644 --- a/ffi/src/engine_data.rs +++ b/ffi/src/engine_data.rs @@ -64,6 +64,15 @@ pub struct ArrowFFIData { pub schema: FFI_ArrowSchema, } +impl ArrowFFIData { + pub fn empty() -> Self { + Self { + array: FFI_ArrowArray::empty(), + schema: FFI_ArrowSchema::empty(), + } + } +} + // TODO: This should use a callback to avoid having to have the engine free the struct /// Get an [`ArrowFFIData`] to allow binding to the arrow [C Data /// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs index 202caeecf..7e40789f2 100644 --- a/ffi/src/table_changes.rs +++ b/ffi/src/table_changes.rs @@ -2,11 +2,9 @@ use std::sync::{Arc, Mutex}; -use delta_kernel::arrow::array::{ - ffi::{FFI_ArrowArray, FFI_ArrowSchema}, - ArrayData, RecordBatch, StructArray, -}; +use delta_kernel::arrow::array::{Array, ArrayData, RecordBatch, StructArray}; use delta_kernel::arrow::compute::filter_record_batch; +use delta_kernel::arrow::ffi::to_ffi; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::scan::ScanResult; use delta_kernel::table_changes::scan::TableChangesScan; @@ -180,7 +178,7 @@ fn table_changes_scan_impl( let mut visitor_state = KernelExpressionVisitorState::default(); let pred_id = (predicate.visitor)(predicate.predicate, &mut visitor_state); let predicate = unwrap_kernel_predicate(&mut visitor_state, pred_id); - debug!("Got predicate: {:#?}", predicate); + debug!("Table changes got predicate: {:#?}", predicate); scan_builder = scan_builder.with_predicate(predicate.map(Arc::new)); } Ok(Arc::new(scan_builder.build()?).into()) @@ -257,14 +255,14 @@ pub unsafe extern "C" fn table_changes_scan_execute( table_changes_scan: Handle, engine: Handle, ) -> ExternResult> { - let table_changes_scan = unsafe { table_changes_scan.clone_as_arc() }; + let table_changes_scan = unsafe { table_changes_scan.as_ref() }; let engine = unsafe { engine.clone_as_arc() }; table_changes_scan_execute_impl(table_changes_scan, engine.clone()) .into_extern_result(&engine.as_ref()) } fn table_changes_scan_execute_impl( - table_changes_scan: Arc, + table_changes_scan: &TableChangesScan, engine: Arc, ) -> DeltaResult> { let table_changes_iter = table_changes_scan.execute(engine.engine().clone())?; @@ -290,16 +288,16 @@ pub unsafe extern "C" fn free_scan_table_changes_iter( #[no_mangle] pub unsafe extern "C" fn scan_table_changes_next( data: Handle, -) -> ExternResult<*mut ArrowFFIData> { +) -> ExternResult { let data = unsafe { data.as_ref() }; scan_table_changes_next_impl(data).into_extern_result(&data.engine.as_ref()) } -fn scan_table_changes_next_impl(data: &ScanTableChangesIterator) -> DeltaResult<*mut ArrowFFIData> { +fn scan_table_changes_next_impl(data: &ScanTableChangesIterator) -> DeltaResult { let mut data = data .data .lock() - .map_err(|_| Error::generic("poisoned mutex"))?; + .map_err(|_| Error::generic("poisoned scan table changes iterator mutex"))?; if let Some(scan_result) = data.next().transpose()? { let mask = scan_result.full_mask(); let data = scan_result.raw_data?; @@ -313,23 +311,22 @@ fn scan_table_changes_next_impl(data: &ScanTableChangesIterator) -> DeltaResult< record_batch = filter_record_batch(&record_batch, &mask.into())?; } - let sa: StructArray = record_batch.into(); - let array_data: ArrayData = sa.into(); - let array = FFI_ArrowArray::new(&array_data); - let schema = FFI_ArrowSchema::try_from(array_data.data_type())?; - let ret_data = Box::new(ArrowFFIData { array, schema }); - Ok(Box::leak(ret_data)) + let batch_struct_array: StructArray = record_batch.into(); + let array_data: ArrayData = batch_struct_array.into_data(); + let (out_array, out_schema) = to_ffi(&array_data)?; + Ok(ArrowFFIData { + array: out_array, + schema: out_schema, + }) } else { - Ok(std::ptr::null_mut()) + Ok(ArrowFFIData::empty()) } } #[cfg(test)] mod tests { use super::*; - use crate::ffi_test_utils::{ - allocate_err, allocate_str, ok_or_panic, recover_string, EngineErrorWithMessage, - }; + use crate::ffi_test_utils::{allocate_err, allocate_str, ok_or_panic, recover_string}; use crate::{engine_to_handle, kernel_string_slice}; use delta_kernel::arrow::array::{ArrayRef, Int32Array, StringArray}; @@ -521,7 +518,7 @@ mod tests { } #[tokio::test] - async fn test_table_changes() -> Result<(), Box> { + async fn test_table_changes_getters() -> Result<(), Box> { let storage = Arc::new(InMemory::new()); commit_add_file(storage.as_ref(), 0, PARQUET_FILE1.to_string()).await?; commit_add_file(storage.as_ref(), 1, PARQUET_FILE2.to_string()).await?; @@ -535,95 +532,68 @@ mod tests { let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); let engine = engine_to_handle(Arc::new(engine), allocate_err); - let table_changes = unsafe { + let table_changes = ok_or_panic(unsafe { table_changes_from_version(kernel_string_slice!(path), engine.shallow_copy(), 0) - }; + }); - match table_changes { - ExternResult::Ok(ref handle) => { - assert_eq!( - unsafe { table_changes_start_version(handle.shallow_copy()) }, - 0 - ); - assert_eq!( - unsafe { table_changes_end_version(handle.shallow_copy()) }, - 1 - ); - - let table_root = - unsafe { table_changes_table_root(handle.shallow_copy(), allocate_str) }; - assert_eq!(recover_string(table_root.unwrap()), path); - - let schema = unsafe { table_changes_schema(handle.shallow_copy()).shallow_copy() }; - let schema_ref = unsafe { schema.as_ref() }; - assert_eq!(schema_ref.fields().len(), 5); - check_columns_in_schema( - &[ - "id", - "val", - "_change_type", - "_commit_version", - "_commit_timestamp", - ], - schema_ref, - ); - } - ExternResult::Err(e) => unsafe { - let err_with_msg: &EngineErrorWithMessage = &*(e as *mut EngineErrorWithMessage); - eprintln!( - "Error type: {:?}, message: {:?}", - (*err_with_msg).etype, - (*err_with_msg).message - ); - std::process::exit(1); - }, - } + assert_eq!( + unsafe { table_changes_start_version(table_changes.shallow_copy()) }, + 0 + ); + assert_eq!( + unsafe { table_changes_end_version(table_changes.shallow_copy()) }, + 1 + ); + + let table_root = + unsafe { table_changes_table_root(table_changes.shallow_copy(), allocate_str) }; + assert_eq!(recover_string(table_root.unwrap()), path); + + let schema = unsafe { table_changes_schema(table_changes.shallow_copy()).shallow_copy() }; + let schema_ref = unsafe { schema.as_ref() }; + assert_eq!(schema_ref.fields().len(), 5); + check_columns_in_schema( + &[ + "id", + "val", + "_change_type", + "_commit_version", + "_commit_timestamp", + ], + schema_ref, + ); - let table_changes = ok_or_panic(table_changes); let table_changes_scan = - unsafe { table_changes_scan(table_changes, engine.shallow_copy(), None) }; - - match table_changes_scan { - ExternResult::Ok(ref handle) => { - let table_root = - unsafe { table_changes_scan_table_root(handle.shallow_copy(), allocate_str) }; - assert_eq!(recover_string(table_root.unwrap()), path); - - let logical_schema = unsafe { - table_changes_scan_logical_schema(handle.shallow_copy()).shallow_copy() - }; - let logical_schema_ref = unsafe { logical_schema.as_ref() }; - assert_eq!(logical_schema_ref.fields().len(), 5); - check_columns_in_schema( - &[ - "id", - "val", - "_change_type", - "_commit_version", - "_commit_timestamp", - ], - logical_schema_ref, - ); - - let physical_schema = unsafe { - table_changes_scan_physical_schema(handle.shallow_copy()).shallow_copy() - }; - let physical_schema_ref = unsafe { physical_schema.as_ref() }; - assert_eq!(physical_schema_ref.fields().len(), 2); - check_columns_in_schema(&["id", "val"], physical_schema_ref); - } - ExternResult::Err(e) => unsafe { - let err_with_msg: &EngineErrorWithMessage = &*(e as *mut EngineErrorWithMessage); - eprintln!( - "Error type: {:?}, message: {:?}", - (*err_with_msg).etype, - (*err_with_msg).message - ); - std::process::exit(1); - }, - } + ok_or_panic(unsafe { table_changes_scan(table_changes, engine.shallow_copy(), None) }); + + let table_root = unsafe { + table_changes_scan_table_root(table_changes_scan.shallow_copy(), allocate_str) + }; + assert_eq!(recover_string(table_root.unwrap()), path); + + let logical_schema = unsafe { + table_changes_scan_logical_schema(table_changes_scan.shallow_copy()).shallow_copy() + }; + let logical_schema_ref = unsafe { logical_schema.as_ref() }; + assert_eq!(logical_schema_ref.fields().len(), 5); + check_columns_in_schema( + &[ + "id", + "val", + "_change_type", + "_commit_version", + "_commit_timestamp", + ], + logical_schema_ref, + ); + + let physical_schema = unsafe { + table_changes_scan_physical_schema(table_changes_scan.shallow_copy()).shallow_copy() + }; + let physical_schema_ref = unsafe { physical_schema.as_ref() }; + assert_eq!(physical_schema_ref.fields().len(), 2); + check_columns_in_schema(&["id", "val"], physical_schema_ref); - let table_changes_scan = ok_or_panic(table_changes_scan); unsafe { free_table_changes_scan(table_changes_scan); } @@ -708,41 +678,22 @@ mod tests { let table_changes_scan = ok_or_panic(unsafe { table_changes_scan(table_changes, engine.shallow_copy(), None) }); - let table_changes_scan_iter = unsafe { + let table_changes_scan_iter_result = ok_or_panic(unsafe { table_changes_scan_execute(table_changes_scan.shallow_copy(), engine.shallow_copy()) - }; - - match table_changes_scan_iter { - ExternResult::Ok(ref _handle) => {} - ExternResult::Err(e) => unsafe { - let err_with_msg: &EngineErrorWithMessage = &*(e as *mut EngineErrorWithMessage); - eprintln!( - "Error type: {:?}, message: {:?}", - (*err_with_msg).etype, - (*err_with_msg).message - ); - std::process::exit(1); - }, - } + }); - let table_changes_scan_iter = ok_or_panic(table_changes_scan_iter); let mut batches: Vec = Vec::new(); let mut i: i32 = 0; loop { i += 1; let data = ok_or_panic(unsafe { - scan_table_changes_next(table_changes_scan_iter.shallow_copy()) + scan_table_changes_next(table_changes_scan_iter_result.shallow_copy()) }); - if data.is_null() { + if data.array.is_empty() { break; } - - let engine_data = unsafe { - let data_ref = &mut *data; - let array = std::mem::replace(&mut data_ref.array, FFI_ArrowArray::empty()); - get_engine_data(array, &(*data).schema, allocate_err) - }; - let engine_data = ok_or_panic(engine_data); + let engine_data = + ok_or_panic(unsafe { get_engine_data(data.array, &data.schema, allocate_err) }); let record_batch = unsafe { to_arrow(engine_data.into_inner()) }?; println!("Batch ({i}) num rows {:?}", record_batch.num_rows()); @@ -777,9 +728,7 @@ mod tests { unsafe { free_table_changes_scan(table_changes_scan); - } - unsafe { - free_scan_table_changes_iter(table_changes_scan_iter); + free_scan_table_changes_iter(table_changes_scan_iter_result); } Ok(()) } From 5da6338358b5e93605aaa60de8c676cc2c9a3113 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 29 Sep 2025 14:50:19 +0200 Subject: [PATCH 03/10] Review fix --- ffi/src/table_changes.rs | 43 ++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs index 7e40789f2..a525ab8fe 100644 --- a/ffi/src/table_changes.rs +++ b/ffi/src/table_changes.rs @@ -298,29 +298,30 @@ fn scan_table_changes_next_impl(data: &ScanTableChangesIterator) -> DeltaResult< .data .lock() .map_err(|_| Error::generic("poisoned scan table changes iterator mutex"))?; - if let Some(scan_result) = data.next().transpose()? { - let mask = scan_result.full_mask(); - let data = scan_result.raw_data?; - let mut record_batch: RecordBatch = data - .into_any() - .downcast::() - .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? - .into(); - - if let Some(mask) = mask { - record_batch = filter_record_batch(&record_batch, &mask.into())?; - } - let batch_struct_array: StructArray = record_batch.into(); - let array_data: ArrayData = batch_struct_array.into_data(); - let (out_array, out_schema) = to_ffi(&array_data)?; - Ok(ArrowFFIData { - array: out_array, - schema: out_schema, - }) - } else { - Ok(ArrowFFIData::empty()) + let Some(scan_result) = data.next().transpose()? else { + return Ok(ArrowFFIData::empty()); + }; + + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let mut record_batch: RecordBatch = data + .into_any() + .downcast::() + .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? + .into(); + + if let Some(mask) = mask { + record_batch = filter_record_batch(&record_batch, &mask.into())?; } + + let batch_struct_array: StructArray = record_batch.into(); + let array_data: ArrayData = batch_struct_array.into_data(); + let (out_array, out_schema) = to_ffi(&array_data)?; + Ok(ArrowFFIData { + array: out_array, + schema: out_schema, + }) } #[cfg(test)] From 5c968c0b729cee3a07d1e935249185ccd2b6928d Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 29 Sep 2025 19:55:47 +0200 Subject: [PATCH 04/10] Apply clippy --- ffi/src/table_changes.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs index a525ab8fe..2df5e417c 100644 --- a/ffi/src/table_changes.rs +++ b/ffi/src/table_changes.rs @@ -33,7 +33,7 @@ pub struct ExclusiveTableChanges; /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) /// - `engine`: Implementation of [`Engine`] apis. /// - `start_version`: The start version of the change data feed -/// End version will be the newest table version. +/// End version will be the newest table version. /// /// # Safety /// @@ -250,6 +250,13 @@ impl Drop for ScanTableChangesIterator { } } +/// Get an iterator over the data needed to perform a table changes scan. This will return a +/// [`ScanTableChangesIterator`] which can be passed to [`scan_table_changes_next`] to get the +/// actual data in the iterator. +/// +/// # Safety +/// +/// Engine is responsible for passing a valid [`SharedExternEngine`] and [`SharedTableChangesScan`] #[no_mangle] pub unsafe extern "C" fn table_changes_scan_execute( table_changes_scan: Handle, @@ -285,6 +292,12 @@ pub unsafe extern "C" fn free_scan_table_changes_iter( data.drop_handle(); } +/// Get next batch of data from the table changes iterator. +/// +/// # Safety +/// +/// The iterator must be valid (returned by [table_changes_scan_execute]) and not yet freed by +/// [`free_scan_table_changes_iter`]. #[no_mangle] pub unsafe extern "C" fn scan_table_changes_next( data: Handle, @@ -436,7 +449,7 @@ mod tests { batch: &RecordBatch, ) -> Result<(), Box> { storage - .put(&Path::from(file), record_batch_to_bytes(&batch).into()) + .put(&Path::from(file), record_batch_to_bytes(batch).into()) .await?; Ok(()) } From e39c35642cb298e8769e8157cd8e36c30b7e727c Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 30 Sep 2025 18:13:37 +0200 Subject: [PATCH 05/10] Apply cargo fmt --- kernel/src/log_segment.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 7fdc7776c..604ece668 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -219,7 +219,10 @@ impl LogSegment { .is_some_and(|first_commit| first_commit.version == start_version), Error::generic(format!( "Expected the first commit to have version {start_version}, got {:?}", - listed_files.ascending_commit_files.first().map(|c| c.version) + listed_files + .ascending_commit_files + .first() + .map(|c| c.version) )) ); LogSegment::try_new(listed_files, log_root, end_version) From f4099772d4df69a3a07f5c9101fe004895934cfb Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 3 Oct 2025 18:20:10 +0200 Subject: [PATCH 06/10] Fix Doc check --- ffi/src/table_changes.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs index 2df5e417c..adda6eeb5 100644 --- a/ffi/src/table_changes.rs +++ b/ffi/src/table_changes.rs @@ -31,7 +31,7 @@ pub struct ExclusiveTableChanges; /// Get the table changes from the specified table at a specific version /// /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) -/// - `engine`: Implementation of [`Engine`] apis. +/// - `engine`: Implementation of `Engine` apis. /// - `start_version`: The start version of the change data feed /// End version will be the newest table version. /// @@ -52,7 +52,7 @@ pub unsafe extern "C" fn table_changes_from_version( /// Get the table changes from the specified table between two versions /// /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) -/// - `engine`: Implementation of [`Engine`] apis. +/// - `engine`: Implementation of `Engine` apis. /// - `start_version`: The start version of the change data feed /// - `end_version`: The end version (inclusive) of the change data feed. /// @@ -284,7 +284,7 @@ fn table_changes_scan_execute_impl( /// /// Drops table changes iterator. /// Caller is responsible for (at most once) passing a valid pointer returned by a call to -/// [`scan_table_changes_execute`]. +/// [`table_changes_scan_execute`]. #[no_mangle] pub unsafe extern "C" fn free_scan_table_changes_iter( data: Handle, From 1f6054b50d3ab3b97dc1c362c19af7ba38ed863b Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 6 Oct 2025 16:45:03 +0200 Subject: [PATCH 07/10] Fix build with --no-default-features --- ffi/src/engine_data.rs | 1 + ffi/src/table_changes.rs | 22 ++++++++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/ffi/src/engine_data.rs b/ffi/src/engine_data.rs index 506dca48e..ee6b8c91d 100644 --- a/ffi/src/engine_data.rs +++ b/ffi/src/engine_data.rs @@ -64,6 +64,7 @@ pub struct ArrowFFIData { pub schema: FFI_ArrowSchema, } +#[cfg(feature = "default-engine-base")] impl ArrowFFIData { pub fn empty() -> Self { Self { diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs index adda6eeb5..0a57d0bd2 100644 --- a/ffi/src/table_changes.rs +++ b/ffi/src/table_changes.rs @@ -1,21 +1,31 @@ //! TableChanges related ffi code -use std::sync::{Arc, Mutex}; +use std::sync::{Arc}; +#[cfg(feature = "default-engine-base")] +use std::sync::{Mutex}; +#[cfg(feature = "default-engine-base")] use delta_kernel::arrow::array::{Array, ArrayData, RecordBatch, StructArray}; +#[cfg(feature = "default-engine-base")] use delta_kernel::arrow::compute::filter_record_batch; +#[cfg(feature = "default-engine-base")] use delta_kernel::arrow::ffi::to_ffi; +#[cfg(feature = "default-engine-base")] use delta_kernel::engine::arrow_data::ArrowEngineData; +#[cfg(feature = "default-engine-base")] use delta_kernel::scan::ScanResult; use delta_kernel::table_changes::scan::TableChangesScan; use delta_kernel::table_changes::TableChanges; -use delta_kernel::{DeltaResult, Error, Version}; +use delta_kernel::{DeltaResult, Version}; +#[cfg(feature = "default-engine-base")] +use delta_kernel::Error; use delta_kernel_ffi_macros::handle_descriptor; use tracing::debug; use super::handle::Handle; use url::Url; +#[cfg(feature = "default-engine-base")] use crate::engine_data::ArrowFFIData; use crate::expressions::kernel_visitor::{unwrap_kernel_predicate, KernelExpressionVisitorState}; use crate::scan::EnginePredicate; @@ -236,14 +246,17 @@ pub unsafe extern "C" fn table_changes_scan_physical_schema( table_changes_scan.physical_schema().clone().into() } +#[cfg(feature = "default-engine-base")] pub struct ScanTableChangesIterator { data: Mutex> + Send>>, engine: Arc, } +#[cfg(feature = "default-engine-base")] #[handle_descriptor(target=ScanTableChangesIterator, mutable=false, sized=true)] pub struct SharedScanTableChangesIterator; +#[cfg(feature = "default-engine-base")] impl Drop for ScanTableChangesIterator { fn drop(&mut self) { debug!("dropping ScanTableChangesIterator"); @@ -257,6 +270,7 @@ impl Drop for ScanTableChangesIterator { /// # Safety /// /// Engine is responsible for passing a valid [`SharedExternEngine`] and [`SharedTableChangesScan`] +#[cfg(feature = "default-engine-base")] #[no_mangle] pub unsafe extern "C" fn table_changes_scan_execute( table_changes_scan: Handle, @@ -268,6 +282,7 @@ pub unsafe extern "C" fn table_changes_scan_execute( .into_extern_result(&engine.as_ref()) } +#[cfg(feature = "default-engine-base")] fn table_changes_scan_execute_impl( table_changes_scan: &TableChangesScan, engine: Arc, @@ -285,6 +300,7 @@ fn table_changes_scan_execute_impl( /// Drops table changes iterator. /// Caller is responsible for (at most once) passing a valid pointer returned by a call to /// [`table_changes_scan_execute`]. +#[cfg(feature = "default-engine-base")] #[no_mangle] pub unsafe extern "C" fn free_scan_table_changes_iter( data: Handle, @@ -298,6 +314,7 @@ pub unsafe extern "C" fn free_scan_table_changes_iter( /// /// The iterator must be valid (returned by [table_changes_scan_execute]) and not yet freed by /// [`free_scan_table_changes_iter`]. +#[cfg(feature = "default-engine-base")] #[no_mangle] pub unsafe extern "C" fn scan_table_changes_next( data: Handle, @@ -306,6 +323,7 @@ pub unsafe extern "C" fn scan_table_changes_next( scan_table_changes_next_impl(data).into_extern_result(&data.engine.as_ref()) } +#[cfg(feature = "default-engine-base")] fn scan_table_changes_next_impl(data: &ScanTableChangesIterator) -> DeltaResult { let mut data = data .data From 52d4015f0245f9bdcc8bfd5d4e47d6c67015fa02 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 6 Oct 2025 17:51:12 +0200 Subject: [PATCH 08/10] Try fix miri check --- ffi/src/table_changes.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs index 0a57d0bd2..479175b09 100644 --- a/ffi/src/table_changes.rs +++ b/ffi/src/table_changes.rs @@ -359,7 +359,7 @@ fn scan_table_changes_next_impl(data: &ScanTableChangesIterator) -> DeltaResult< mod tests { use super::*; use crate::ffi_test_utils::{allocate_err, allocate_str, ok_or_panic, recover_string}; - use crate::{engine_to_handle, kernel_string_slice}; + use crate::{engine_to_handle, kernel_string_slice, free_engine}; use delta_kernel::arrow::array::{ArrayRef, Int32Array, StringArray}; use delta_kernel::arrow::datatypes::{Field, Schema}; @@ -628,6 +628,7 @@ mod tests { unsafe { free_table_changes_scan(table_changes_scan); + free_engine(engine); } Ok(()) } @@ -761,6 +762,7 @@ mod tests { unsafe { free_table_changes_scan(table_changes_scan); free_scan_table_changes_iter(table_changes_scan_iter_result); + free_engine(engine); } Ok(()) } From b9193dabe7c17f2b69b786eb7d1584a2f011da6b Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 8 Oct 2025 12:26:18 +0200 Subject: [PATCH 09/10] Apply cargo fmt; Try fix memory leak --- ffi/src/table_changes.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs index 479175b09..ae3eecb35 100644 --- a/ffi/src/table_changes.rs +++ b/ffi/src/table_changes.rs @@ -1,8 +1,8 @@ //! TableChanges related ffi code -use std::sync::{Arc}; +use std::sync::Arc; #[cfg(feature = "default-engine-base")] -use std::sync::{Mutex}; +use std::sync::Mutex; #[cfg(feature = "default-engine-base")] use delta_kernel::arrow::array::{Array, ArrayData, RecordBatch, StructArray}; @@ -16,9 +16,9 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::scan::ScanResult; use delta_kernel::table_changes::scan::TableChangesScan; use delta_kernel::table_changes::TableChanges; -use delta_kernel::{DeltaResult, Version}; #[cfg(feature = "default-engine-base")] use delta_kernel::Error; +use delta_kernel::{DeltaResult, Version}; use delta_kernel_ffi_macros::handle_descriptor; use tracing::debug; @@ -359,7 +359,7 @@ fn scan_table_changes_next_impl(data: &ScanTableChangesIterator) -> DeltaResult< mod tests { use super::*; use crate::ffi_test_utils::{allocate_err, allocate_str, ok_or_panic, recover_string}; - use crate::{engine_to_handle, kernel_string_slice, free_engine}; + use crate::{engine_to_handle, free_engine, free_schema, kernel_string_slice}; use delta_kernel::arrow::array::{ArrayRef, Int32Array, StringArray}; use delta_kernel::arrow::datatypes::{Field, Schema}; @@ -629,6 +629,9 @@ mod tests { unsafe { free_table_changes_scan(table_changes_scan); free_engine(engine); + free_schema(schema); + free_schema(logical_schema); + free_schema(physical_schema); } Ok(()) } From 8bcf6da3b54b2d221a8affc42fbb1b9d12af8970 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 9 Oct 2025 18:35:23 +0200 Subject: [PATCH 10/10] Remove repeated dependency --- ffi/src/lib.rs | 1 + ffi/src/table_changes.rs | 16 ---------------- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 50a12cbc7..47ea2c34c 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -37,6 +37,7 @@ pub use domain_metadata::get_domain_metadata; pub mod engine_data; pub mod engine_funcs; pub mod error; +#[cfg(feature = "default-engine-base")] pub mod table_changes; use error::{AllocateError, AllocateErrorFn, ExternResult, IntoExternResult}; pub mod expressions; diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs index ae3eecb35..1df3a682e 100644 --- a/ffi/src/table_changes.rs +++ b/ffi/src/table_changes.rs @@ -1,22 +1,15 @@ //! TableChanges related ffi code use std::sync::Arc; -#[cfg(feature = "default-engine-base")] use std::sync::Mutex; -#[cfg(feature = "default-engine-base")] use delta_kernel::arrow::array::{Array, ArrayData, RecordBatch, StructArray}; -#[cfg(feature = "default-engine-base")] use delta_kernel::arrow::compute::filter_record_batch; -#[cfg(feature = "default-engine-base")] use delta_kernel::arrow::ffi::to_ffi; -#[cfg(feature = "default-engine-base")] use delta_kernel::engine::arrow_data::ArrowEngineData; -#[cfg(feature = "default-engine-base")] use delta_kernel::scan::ScanResult; use delta_kernel::table_changes::scan::TableChangesScan; use delta_kernel::table_changes::TableChanges; -#[cfg(feature = "default-engine-base")] use delta_kernel::Error; use delta_kernel::{DeltaResult, Version}; use delta_kernel_ffi_macros::handle_descriptor; @@ -25,7 +18,6 @@ use tracing::debug; use super::handle::Handle; use url::Url; -#[cfg(feature = "default-engine-base")] use crate::engine_data::ArrowFFIData; use crate::expressions::kernel_visitor::{unwrap_kernel_predicate, KernelExpressionVisitorState}; use crate::scan::EnginePredicate; @@ -246,17 +238,14 @@ pub unsafe extern "C" fn table_changes_scan_physical_schema( table_changes_scan.physical_schema().clone().into() } -#[cfg(feature = "default-engine-base")] pub struct ScanTableChangesIterator { data: Mutex> + Send>>, engine: Arc, } -#[cfg(feature = "default-engine-base")] #[handle_descriptor(target=ScanTableChangesIterator, mutable=false, sized=true)] pub struct SharedScanTableChangesIterator; -#[cfg(feature = "default-engine-base")] impl Drop for ScanTableChangesIterator { fn drop(&mut self) { debug!("dropping ScanTableChangesIterator"); @@ -270,7 +259,6 @@ impl Drop for ScanTableChangesIterator { /// # Safety /// /// Engine is responsible for passing a valid [`SharedExternEngine`] and [`SharedTableChangesScan`] -#[cfg(feature = "default-engine-base")] #[no_mangle] pub unsafe extern "C" fn table_changes_scan_execute( table_changes_scan: Handle, @@ -282,7 +270,6 @@ pub unsafe extern "C" fn table_changes_scan_execute( .into_extern_result(&engine.as_ref()) } -#[cfg(feature = "default-engine-base")] fn table_changes_scan_execute_impl( table_changes_scan: &TableChangesScan, engine: Arc, @@ -300,7 +287,6 @@ fn table_changes_scan_execute_impl( /// Drops table changes iterator. /// Caller is responsible for (at most once) passing a valid pointer returned by a call to /// [`table_changes_scan_execute`]. -#[cfg(feature = "default-engine-base")] #[no_mangle] pub unsafe extern "C" fn free_scan_table_changes_iter( data: Handle, @@ -314,7 +300,6 @@ pub unsafe extern "C" fn free_scan_table_changes_iter( /// /// The iterator must be valid (returned by [table_changes_scan_execute]) and not yet freed by /// [`free_scan_table_changes_iter`]. -#[cfg(feature = "default-engine-base")] #[no_mangle] pub unsafe extern "C" fn scan_table_changes_next( data: Handle, @@ -323,7 +308,6 @@ pub unsafe extern "C" fn scan_table_changes_next( scan_table_changes_next_impl(data).into_extern_result(&data.engine.as_ref()) } -#[cfg(feature = "default-engine-base")] fn scan_table_changes_next_impl(data: &ScanTableChangesIterator) -> DeltaResult { let mut data = data .data