-
Couldn't load subscription status.
- Fork 118
CDF API for FFI #1335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CDF API for FFI #1335
Conversation
| &self, | ||
| engine: Arc<dyn Engine>, | ||
| ) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + use<'_>> { | ||
| ) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you 👍
| .is_some_and(|first_commit| first_commit.version == start_version), | ||
| Error::generic(format!( | ||
| "Expected the first commit to have version {start_version}" | ||
| "Expected the first commit to have version {start_version}, got {:?}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice 👍
ffi/src/table_changes.rs
Outdated
| use delta_kernel::arrow::array::{ | ||
| ffi::{FFI_ArrowArray, FFI_ArrowSchema}, | ||
| ArrayData, RecordBatch, StructArray, | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| use delta_kernel::arrow::array::{ | |
| ffi::{FFI_ArrowArray, FFI_ArrowSchema}, | |
| ArrayData, RecordBatch, StructArray, | |
| }; | |
| use delta_kernel::arrow::array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; | |
| use delta_kernel::arrow::array::ArrayData, RecordBatch, StructArray; |
ffi/src/table_changes.rs
Outdated
| let mut data = data | ||
| .data | ||
| .lock() | ||
| .map_err(|_| Error::generic("poisoned mutex"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .map_err(|_| Error::generic("poisoned mutex"))?; | |
| .map_err(|_| Error::generic("poisoned scan table changes iterator mutex"))?; |
ffi/src/table_changes.rs
Outdated
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_table_changes() -> Result<(), Box<dyn std::error::Error>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| async fn test_table_changes() -> Result<(), Box<dyn std::error::Error>> { | |
| async fn test_table_changes_getters() -> Result<(), Box<dyn std::error::Error>> { |
ffi/src/table_changes.rs
Outdated
| let data_ref = &mut *data; | ||
| let array = std::mem::replace(&mut data_ref.array, FFI_ArrowArray::empty()); | ||
| get_engine_data(array, &(*data).schema, allocate_err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once you make scan_table_changes_next return ArrowFFIData by value, you can do this:
| let data_ref = &mut *data; | |
| let array = std::mem::replace(&mut data_ref.array, FFI_ArrowArray::empty()); | |
| get_engine_data(array, &(*data).schema, allocate_err) | |
| get_engine_data(data.array, &data.schema, allocate_err) |
ffi/src/table_changes.rs
Outdated
| let mut visitor_state = KernelExpressionVisitorState::default(); | ||
| let pred_id = (predicate.visitor)(predicate.predicate, &mut visitor_state); | ||
| let predicate = unwrap_kernel_predicate(&mut visitor_state, pred_id); | ||
| debug!("Got predicate: {:#?}", predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| debug!("Got predicate: {:#?}", predicate); | |
| debug!("Table changes got predicate: {:#?}", predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think some details can be iterated on, but I think this is super valuable. Thanks @kssenii!!
ffi/src/table_changes.rs
Outdated
|
|
||
| let batch_struct_array: StructArray = record_batch.into(); | ||
| let array_data: ArrayData = batch_struct_array.into_data(); | ||
| let (out_array, out_schema) = to_ffi(&array_data)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line will produce a new schema for each call. I think we need to eventually just return ArrowData instead of ArrowFFIData. We can do that as a followup though :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to eventually just return ArrowData
e.g. FFI_ArrowArray?
ffi/src/table_changes.rs
Outdated
| .data | ||
| .lock() | ||
| .map_err(|_| Error::generic("poisoned scan table changes iterator mutex"))?; | ||
| if let Some(scan_result) = data.next().transpose()? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We can avoid nesting by simply returning early :)
| if let Some(scan_result) = data.next().transpose()? { | |
| let Some(scan_result) = data.next().transpose()? else { | |
| return Ok(ArrowFFIData::empty()); | |
| } |
| #[no_mangle] | ||
| pub unsafe extern "C" fn scan_table_changes_next( | ||
| data: Handle<SharedScanTableChangesIterator>, | ||
| ) -> ExternResult<ArrowFFIData> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're checking that the iteration has ended by checking that the ArrowFFIData is empty. I think we'll eventually want to change this too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be a better way to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likely the FFI OptionalValue once that's merged. @samansmink is working on that, so we can do this as a followup.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1335 +/- ##
==========================================
+ Coverage 84.79% 84.85% +0.06%
==========================================
Files 118 119 +1
Lines 30366 30862 +496
Branches 30366 30862 +496
==========================================
+ Hits 25748 26188 +440
- Misses 3387 3395 +8
- Partials 1231 1279 +48 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Hey @kssenii could you patch up the CI issues? You can click on the jobs to see what's failing (example: |
|
@OussamaSaoudi, could you please reenable ci workflow? |
|
Reenabled CI 👍
Hmm this might be a known issue with CI on FFI. Let me check. |
|
@kssenii for now, please disable miri on the test. #[cfg_attr(miri, ignore)] // FIXME: re-enable miri (can't call foreign function `ioctl` on OS `linux`) |
In CI it did not get the error I have locally, but this: (https://github.com/delta-io/delta-kernel-rs/actions/runs/18136387522/job/51844035758) Though on latest CI run it fails completely differently, checking... |
| unsafe { | ||
| free_table_changes_scan(table_changes_scan); | ||
| free_engine(engine); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kssenii the latest build indicates that we're leaking memory for the schema:
507 | #[derive(Debug, PartialEq, Clone, Eq)]
| ^^^^^
note: inside `table_changes::table_changes_schema`
--> ffi/src/table_changes.rs:118:14
|
118 | Arc::new(table_changes.schema().clone()).into()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: inside closure
--> ffi/src/table_changes.rs:584:31
|
584 | let schema = unsafe { table_changes_schema(table_changes.shallow_copy()).shallow_copy() };
I think we may need to call free_schema in the test for the newly allocated copy of the schema.
| unsafe { | |
| free_table_changes_scan(table_changes_scan); | |
| free_engine(engine); | |
| } | |
| unsafe { | |
| free_table_changes_scan(table_changes_scan); | |
| free_engine(engine); | |
| free_schema(schema); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
| pub mod engine_data; | ||
| pub mod engine_funcs; | ||
| pub mod error; | ||
| pub mod table_changes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's gate the entire feature since it depends on Arrow.
| pub mod table_changes; | |
| #[cfg(feature = "default-engine-base")] | |
| pub mod table_changes; |
This lets us remove the repeated #[cfg(feature = "default-engine-base")] below
|
Hi @OussamaSaoudi, is there anything needed to be done to have this PR merged? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! thanks for the contribution!
| pub schema: FFI_ArrowSchema, | ||
| } | ||
|
|
||
| #[cfg(feature = "default-engine-base")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too bad the arrow folks didn't implement Default for FFI_ArrowArray and FFI_ArrowSchema
|
@kssenii pls update the branch and ping me so we can merge :D Thank you for your contribution! |
|
@OussamaSaoudi-db, ping :) |
What changes are proposed in this pull request?
Change Data Feed FFI APIs #1196
How was this change tested?
Tests in
ffi/src/table_changes.rs