-
Couldn't load subscription status.
- Fork 118
feat!: add ffi for idempotent write primitives #1191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
a0d5a4c
97f158c
7d99270
72a9fa6
1de723c
79ebb13
b32ab40
442e4ed
e454eeb
63ef6e6
fa12905
a4ef758
d5458b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,185 @@ | ||
| use crate::error::ExternResult; | ||
| use crate::handle::Handle; | ||
| use crate::transaction::ExclusiveTransaction; | ||
| use crate::{ | ||
| ExternEngine, IntoExternResult, KernelStringSlice, OptionalValue, SharedExternEngine, | ||
| SharedSnapshot, TryFromStringSlice, | ||
| }; | ||
| use delta_kernel::transaction::Transaction; | ||
| use delta_kernel::{DeltaResult, Snapshot}; | ||
| use std::sync::Arc; | ||
samansmink marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /// Associates an app_id and version with a transaction. These will be applied to the table on commit. | ||
| /// | ||
| /// # Returns | ||
| /// A new handle to the transaction that will set the `app_id` version to `version` on commit | ||
| /// | ||
| /// # Safety | ||
| /// Caller is responsible for passing [valid][Handle#Validity] handles. The `app_id` string slice must be valid. | ||
| /// CONSUMES TRANSACTION | ||
| #[no_mangle] | ||
| pub unsafe extern "C" fn with_transaction_id( | ||
| txn: Handle<ExclusiveTransaction>, | ||
| app_id: KernelStringSlice, | ||
| version: i64, | ||
| engine: Handle<SharedExternEngine>, | ||
| ) -> ExternResult<Handle<ExclusiveTransaction>> { | ||
| let txn = unsafe { txn.into_inner() }; | ||
| let engine = unsafe { engine.as_ref() }; | ||
| let app_id_string: DeltaResult<String> = unsafe { TryFromStringSlice::try_from_slice(&app_id) }; | ||
| with_transaction_id_impl(*txn, app_id_string, version).into_extern_result(&engine) | ||
samansmink marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| fn with_transaction_id_impl( | ||
| txn: Transaction, | ||
| app_id: DeltaResult<String>, | ||
| version: i64, | ||
| ) -> DeltaResult<Handle<ExclusiveTransaction>> { | ||
| Ok(Box::new(txn.with_transaction_id(app_id?, version)).into()) | ||
| } | ||
samansmink marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /// Retrieves the version associated with an app_id from a snapshot. | ||
| /// | ||
| /// # Returns | ||
| /// The version number if found, or an error of type `MissingDataError` when the app_id was not set | ||
| /// | ||
| /// # Safety | ||
| /// Caller must ensure [valid][Handle#Validity] handles are provided for snapshot and engine. The `app_id` | ||
| /// string slice must be valid. | ||
| #[no_mangle] | ||
| pub unsafe extern "C" fn get_app_id_version( | ||
| snapshot: Handle<SharedSnapshot>, | ||
| app_id: KernelStringSlice, | ||
| engine: Handle<SharedExternEngine>, | ||
| ) -> ExternResult<OptionalValue<i64>> { | ||
| let snapshot = unsafe { snapshot.clone_as_arc() }; | ||
| let engine = unsafe { engine.as_ref() }; | ||
| let app_id = unsafe { String::try_from_slice(&app_id) }; | ||
|
|
||
| get_app_id_version_impl(snapshot, app_id, engine) | ||
samansmink marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .map(OptionalValue::from) | ||
| .into_extern_result(&engine) | ||
| } | ||
|
|
||
| fn get_app_id_version_impl( | ||
| snapshot: Arc<Snapshot>, | ||
| app_id: DeltaResult<String>, | ||
| extern_engine: &dyn ExternEngine, | ||
| ) -> DeltaResult<Option<i64>> { | ||
| snapshot.get_app_id_version(&app_id?, extern_engine.engine().as_ref()) | ||
samansmink marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
Comment on lines
66
to
72
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need this or could we just inline There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well the app_id might be a Perhaps my rust skills are lacking here, but how else would we cleanly return an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense that it's easier to return the error 👍 |
||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
| use crate::ffi_test_utils::ok_or_panic; | ||
| use crate::kernel_string_slice; | ||
| use crate::tests::get_default_engine; | ||
| use crate::transaction::{commit, transaction}; | ||
| use delta_kernel::schema::{DataType, StructField, StructType}; | ||
| use delta_kernel::Snapshot; | ||
| use std::sync::Arc; | ||
| use tempfile::tempdir; | ||
| use test_utils::setup_test_tables; | ||
| use url::Url; | ||
|
|
||
| #[cfg(feature = "default-engine-base")] | ||
| #[tokio::test] | ||
| #[cfg_attr(miri, ignore)] // FIXME: re-enable miri (can't call foreign function `linkat` on OS `linux`) | ||
| async fn test_write_txn_actions() -> Result<(), Box<dyn std::error::Error>> { | ||
| // Create a temporary local directory for use during this test | ||
| let tmp_test_dir = tempdir()?; | ||
| let tmp_dir_local_url = Url::from_directory_path(tmp_test_dir.path()).unwrap(); | ||
|
|
||
| // create a simple table: one int column named 'number' | ||
| let schema = Arc::new(StructType::new(vec![StructField::nullable( | ||
|
Check failure on line 96 in ffi/src/transaction/transaction_id.rs
|
||
| "number", | ||
| DataType::INTEGER, | ||
| )])); | ||
|
|
||
| for (table_url, engine, _store, _table_name) in | ||
| setup_test_tables(schema, &[], Some(&tmp_dir_local_url), "test_table").await? | ||
| { | ||
| let table_path = table_url.to_file_path().unwrap(); | ||
| let table_path_str = table_path.to_str().unwrap(); | ||
| let default_engine_handle = get_default_engine(table_path_str); | ||
|
|
||
| // Start the transaction | ||
| let txn = ok_or_panic(unsafe { | ||
| transaction( | ||
| kernel_string_slice!(table_path_str), | ||
| default_engine_handle.shallow_copy(), | ||
| ) | ||
| }); | ||
|
|
||
| // Add app ids | ||
| let app_id1 = "app_id1"; | ||
| let app_id2 = "app_id2"; | ||
| let txn = ok_or_panic(unsafe { | ||
| with_transaction_id( | ||
| txn, | ||
| kernel_string_slice!(app_id1), | ||
| 1, | ||
| default_engine_handle.shallow_copy(), | ||
| ) | ||
| }); | ||
| let txn = ok_or_panic(unsafe { | ||
| with_transaction_id( | ||
| txn, | ||
| kernel_string_slice!(app_id2), | ||
| 2, | ||
| default_engine_handle.shallow_copy(), | ||
| ) | ||
| }); | ||
|
|
||
| // commit! | ||
| ok_or_panic(unsafe { commit(txn, default_engine_handle.shallow_copy()) }); | ||
|
|
||
| // Check versions | ||
| let snapshot = Arc::new(Snapshot::try_new(table_url.clone(), &engine, Some(1))?); | ||
|
Check failure on line 140 in ffi/src/transaction/transaction_id.rs
|
||
| assert_eq!( | ||
| snapshot.clone().get_app_id_version("app_id1", &engine)?, | ||
| Some(1) | ||
| ); | ||
| assert_eq!( | ||
| snapshot.clone().get_app_id_version("app_id2", &engine)?, | ||
| Some(2) | ||
| ); | ||
| assert_eq!( | ||
| snapshot.clone().get_app_id_version("app_id3", &engine)?, | ||
| None | ||
| ); | ||
|
|
||
| // Check versions through ffi handles | ||
| let version1 = ok_or_panic(unsafe { | ||
| get_app_id_version( | ||
| Handle::from(snapshot.clone()), | ||
| kernel_string_slice!(app_id1), | ||
| default_engine_handle.shallow_copy(), | ||
| ) | ||
| }); | ||
| assert_eq!(version1, OptionalValue::Some(1)); | ||
|
|
||
| let version2 = ok_or_panic(unsafe { | ||
| get_app_id_version( | ||
| Handle::from(snapshot.clone()), | ||
| kernel_string_slice!(app_id2), | ||
| default_engine_handle.shallow_copy(), | ||
| ) | ||
| }); | ||
| assert_eq!(version2, OptionalValue::Some(2)); | ||
|
|
||
| let app_id3 = "app_id3"; | ||
| let version3 = ok_or_panic(unsafe { | ||
| get_app_id_version( | ||
| Handle::from(snapshot.clone()), | ||
| kernel_string_slice!(app_id3), | ||
| default_engine_handle.shallow_copy(), | ||
| ) | ||
| }); | ||
| assert_eq!(version3, OptionalValue::None); | ||
| } | ||
| Ok(()) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed function signature but not test change points to missing coverage? perhaps let's track an issue to fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do!