Skip to content

Conversation

@kssenii
Copy link
Contributor

@kssenii kssenii commented Sep 22, 2025

What changes are proposed in this pull request?

Change Data Feed FFI APIs #1196

How was this change tested?

Tests in ffi/src/table_changes.rs

&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + use<'_>> {
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you 👍

.is_some_and(|first_commit| first_commit.version == start_version),
Error::generic(format!(
"Expected the first commit to have version {start_version}"
"Expected the first commit to have version {start_version}, got {:?}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👍

Comment on lines 5 to 8
use delta_kernel::arrow::array::{
ffi::{FFI_ArrowArray, FFI_ArrowSchema},
ArrayData, RecordBatch, StructArray,
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use delta_kernel::arrow::array::{
ffi::{FFI_ArrowArray, FFI_ArrowSchema},
ArrayData, RecordBatch, StructArray,
};
use delta_kernel::arrow::array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use delta_kernel::arrow::array::ArrayData, RecordBatch, StructArray;

let mut data = data
.data
.lock()
.map_err(|_| Error::generic("poisoned mutex"))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map_err(|_| Error::generic("poisoned mutex"))?;
.map_err(|_| Error::generic("poisoned scan table changes iterator mutex"))?;

}

#[tokio::test]
async fn test_table_changes() -> Result<(), Box<dyn std::error::Error>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn test_table_changes() -> Result<(), Box<dyn std::error::Error>> {
async fn test_table_changes_getters() -> Result<(), Box<dyn std::error::Error>> {

Comment on lines 741 to 743
let data_ref = &mut *data;
let array = std::mem::replace(&mut data_ref.array, FFI_ArrowArray::empty());
get_engine_data(array, &(*data).schema, allocate_err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you make scan_table_changes_next return ArrowFFIData by value, you can do this:

Suggested change
let data_ref = &mut *data;
let array = std::mem::replace(&mut data_ref.array, FFI_ArrowArray::empty());
get_engine_data(array, &(*data).schema, allocate_err)
get_engine_data(data.array, &data.schema, allocate_err)

let mut visitor_state = KernelExpressionVisitorState::default();
let pred_id = (predicate.visitor)(predicate.predicate, &mut visitor_state);
let predicate = unwrap_kernel_predicate(&mut visitor_state, pred_id);
debug!("Got predicate: {:#?}", predicate);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
debug!("Got predicate: {:#?}", predicate);
debug!("Table changes got predicate: {:#?}", predicate);

Copy link
Collaborator

@OussamaSaoudi OussamaSaoudi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some details can be iterated on, but I think this is super valuable. Thanks @kssenii!!


let batch_struct_array: StructArray = record_batch.into();
let array_data: ArrayData = batch_struct_array.into_data();
let (out_array, out_schema) = to_ffi(&array_data)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line will produce a new schema for each call. I think we need to eventually just return ArrowData instead of ArrowFFIData. We can do that as a followup though :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to eventually just return ArrowData

e.g. FFI_ArrowArray?

.data
.lock()
.map_err(|_| Error::generic("poisoned scan table changes iterator mutex"))?;
if let Some(scan_result) = data.next().transpose()? {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can avoid nesting by simply returning early :)

Suggested change
if let Some(scan_result) = data.next().transpose()? {
let Some(scan_result) = data.next().transpose()? else {
return Ok(ArrowFFIData::empty());
}

Comment on lines +288 to +291
#[no_mangle]
pub unsafe extern "C" fn scan_table_changes_next(
data: Handle<SharedScanTableChangesIterator>,
) -> ExternResult<ArrowFFIData> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're checking that the iteration has ended by checking that the ArrowFFIData is empty. I think we'll eventually want to change this too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be a better way to do this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likely the FFI OptionalValue once that's merged. @samansmink is working on that, so we can do this as a followup.

@OussamaSaoudi
Copy link
Collaborator

One more thing @kssenii Can you track the CDF FFI followups in the tracking issue in #1335? Main item is removing the duplicate work when constructing the schema for ArrowFFIData.

@codecov
Copy link

codecov bot commented Sep 26, 2025

Codecov Report

❌ Patch coverage is 87.62475% with 62 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.85%. Comparing base (733a117) to head (c4f5303).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
ffi/src/table_changes.rs 87.19% 14 Missing and 48 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1335      +/-   ##
==========================================
+ Coverage   84.79%   84.85%   +0.06%     
==========================================
  Files         118      119       +1     
  Lines       30366    30862     +496     
  Branches    30366    30862     +496     
==========================================
+ Hits        25748    26188     +440     
- Misses       3387     3395       +8     
- Partials     1231     1279      +48     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@OussamaSaoudi
Copy link
Collaborator

Hey @kssenii could you patch up the CI issues? You can click on the jobs to see what's failing (example: cargo clippy --benches --tests --all-features -- -D warnings)

@kssenii
Copy link
Contributor Author

kssenii commented Oct 6, 2025

@OussamaSaoudi, could you please reenable ci workflow?
All checks should be fixed now, though for Miri I made a blind fix as I cannot run it locally (getting test table_changes::tests::test_table_changes_between_commits ... error: unsupported operation: can't call foreign function 'ioctl' on OS 'linux' when running on linux and test table_changes::tests::test_table_changes_getters ... error: unsupported operation: can't call foreign function 'kqueue' on OS 'macos' when running on macos...)

@OussamaSaoudi
Copy link
Collaborator

@kssenii

Reenabled CI 👍

test table_changes::tests::test_table_changes_between_commits ... error: unsupported operation: can't call foreign function 'ioctl' on OS 'linux'

Hmm this might be a known issue with CI on FFI. Let me check.

@OussamaSaoudi
Copy link
Collaborator

@kssenii for now, please disable miri on the test.

    #[cfg_attr(miri, ignore)] // FIXME: re-enable miri (can't call foreign function `ioctl` on OS `linux`)

@kssenii
Copy link
Contributor Author

kssenii commented Oct 6, 2025

for now, please disable miri on the test.

In CI it did not get the error I have locally, but this:

test result: ok. 16 passed; 0 failed; 1 ignored; 0 measured; 0 filtered out; finished in 239.42s

error: the main thread terminated without waiting for all remaining threads

note: set `MIRIFLAGS=-Zmiri-ignore-leaks` to disable this check

error: aborting due to 1 previous error

error: test failed, to rerun pass `--lib`

(https://github.com/delta-io/delta-kernel-rs/actions/runs/18136387522/job/51844035758)
So my blind fix is: 52d4015

Though on latest CI run it fails completely differently, checking...

Comment on lines 629 to 632
unsafe {
free_table_changes_scan(table_changes_scan);
free_engine(engine);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kssenii the latest build indicates that we're leaking memory for the schema:

507 | #[derive(Debug, PartialEq, Clone, Eq)]
    |                            ^^^^^
note: inside `table_changes::table_changes_schema`
   --> ffi/src/table_changes.rs:118:14
    |
118 |     Arc::new(table_changes.schema().clone()).into()
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: inside closure
   --> ffi/src/table_changes.rs:584:31
    |
584 |         let schema = unsafe { table_changes_schema(table_changes.shallow_copy()).shallow_copy() };

I think we may need to call free_schema in the test for the newly allocated copy of the schema.

Suggested change
unsafe {
free_table_changes_scan(table_changes_scan);
free_engine(engine);
}
unsafe {
free_table_changes_scan(table_changes_scan);
free_engine(engine);
free_schema(schema);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

pub mod engine_data;
pub mod engine_funcs;
pub mod error;
pub mod table_changes;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's gate the entire feature since it depends on Arrow.

Suggested change
pub mod table_changes;
#[cfg(feature = "default-engine-base")]
pub mod table_changes;

This lets us remove the repeated #[cfg(feature = "default-engine-base")] below

@kssenii kssenii requested a review from OussamaSaoudi October 10, 2025 19:13
@kssenii
Copy link
Contributor Author

kssenii commented Oct 15, 2025

Hi @OussamaSaoudi, is there anything needed to be done to have this PR merged?

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! thanks for the contribution!

pub schema: FFI_ArrowSchema,
}

#[cfg(feature = "default-engine-base")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too bad the arrow folks didn't implement Default for FFI_ArrowArray and FFI_ArrowSchema

@OussamaSaoudi
Copy link
Collaborator

@kssenii pls update the branch and ping me so we can merge :D

Thank you for your contribution!

@kssenii
Copy link
Contributor Author

kssenii commented Oct 21, 2025

@OussamaSaoudi-db, ping :)

@zachschuermann zachschuermann merged commit 2b49385 into delta-io:main Oct 23, 2025
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants