Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions ffi/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
use delta_kernel::DeltaResult;
use delta_kernel::EngineData;
use std::ffi::c_void;
use std::sync::Arc;

Check failure on line 16 in ffi/src/engine_data.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `std::sync::Arc`

#[cfg(feature = "default-engine-base")]
use crate::error::AllocateErrorFn;
use crate::ExclusiveEngineData;
#[cfg(feature = "default-engine-base")]
use crate::{ExternResult, IntoExternResult, SharedExternEngine};
use crate::{ExternResult, IntoExternResult, SharedEngineData, SharedExternEngine};

use super::handle::Handle;

Expand Down Expand Up @@ -116,18 +117,18 @@
array: FFI_ArrowArray,
schema: &FFI_ArrowSchema,
allocate_error: AllocateErrorFn,
) -> ExternResult<Handle<ExclusiveEngineData>> {
) -> ExternResult<Handle<SharedEngineData>> {
get_engine_data_impl(array, schema).into_extern_result(&allocate_error)
}

#[cfg(feature = "default-engine-base")]
unsafe fn get_engine_data_impl(
array: FFI_ArrowArray,
schema: &FFI_ArrowSchema,
) -> DeltaResult<Handle<ExclusiveEngineData>> {
) -> DeltaResult<Handle<SharedEngineData>> {
let array_data = unsafe { arrow::array::ffi::from_ffi(array, schema) };
let record_batch: RecordBatch = StructArray::from(array_data?).into();
let arrow_engine_data: ArrowEngineData = record_batch.into();
let engine_data: Box<dyn EngineData> = Box::new(arrow_engine_data);
let engine_data: Arc<dyn EngineData> = Arc::new(arrow_engine_data);
Ok(engine_data.into())
}
3 changes: 3 additions & 0 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ pub unsafe extern "C" fn free_row_indexes(slice: KernelRowIndexArray) {
#[handle_descriptor(target=dyn EngineData, mutable=true)]
pub struct ExclusiveEngineData;

#[handle_descriptor(target=dyn EngineData, mutable=false)]
pub struct SharedEngineData;

/// Drop an `ExclusiveEngineData`.
///
/// # Safety
Expand Down
4 changes: 2 additions & 2 deletions ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::handle::Handle;
use crate::KernelStringSlice;
use crate::{unwrap_and_parse_path_as_url, TryFromStringSlice};
use crate::{DeltaResult, ExternEngine, Snapshot, Url};
use crate::{ExclusiveEngineData, SharedExternEngine};
use crate::{SharedEngineData, SharedExternEngine};
use delta_kernel::transaction::{CommitResult, Transaction};
use delta_kernel_ffi_macros::handle_descriptor;

Expand Down Expand Up @@ -87,7 +87,7 @@ fn with_engine_info_impl(
#[no_mangle]
pub unsafe extern "C" fn add_files(
mut txn: Handle<ExclusiveTransaction>,
write_metadata: Handle<ExclusiveEngineData>,
write_metadata: Handle<SharedEngineData>,
) {
let txn = unsafe { txn.as_mut() };
let write_metadata = unsafe { write_metadata.into_inner() };
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
write_context: &WriteContext,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
) -> DeltaResult<Arc<dyn EngineData>> {
let transform = write_context.logical_to_physical();
let input_schema = Schema::try_from_arrow(data.record_batch().schema())?;
let output_schema = write_context.schema();
Expand Down
14 changes: 9 additions & 5 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
&self,
partition_values: &HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
) -> DeltaResult<Arc<dyn EngineData>> {
let DataFileMetadata {
file_meta:
FileMeta {
Expand Down Expand Up @@ -108,7 +108,7 @@
1,
)?);

Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
Ok(Arc::new(ArrowEngineData::new(RecordBatch::try_new(
Arc::new(add_files_schema().as_ref().try_into_arrow()?),
vec![
path,
Expand Down Expand Up @@ -199,7 +199,7 @@
data: Box<dyn EngineData>,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
) -> DeltaResult<Arc<dyn EngineData>> {
let parquet_metadata = self.write_parquet(path, data).await?;
parquet_metadata.as_record_batch(&partition_values, data_change)
}
Expand Down Expand Up @@ -505,10 +505,14 @@
let data_file_metadata = DataFileMetadata::new(file_metadata, num_records);
let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]);
let data_change = true;
let actual = data_file_metadata
let actual_arc = data_file_metadata
.as_record_batch(&partition_values, data_change)
.unwrap();

Check warning on line 510 in kernel/src/engine/default/parquet.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/delta-kernel-rs/delta-kernel-rs/kernel/src/engine/default/parquet.rs
let actual = ArrowEngineData::try_from_engine_data(actual).unwrap();
// Downcast Arc<dyn EngineData> to get the ArrowEngineData for testing
let actual_any = actual_arc.as_any();
let actual = actual_any
.downcast_ref::<ArrowEngineData>()
.unwrap();

let schema = Arc::new(
crate::transaction::add_files_schema()
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub struct Transaction {
read_snapshot: SnapshotRef,
operation: Option<String>,
engine_info: Option<String>,
add_files_metadata: Vec<Box<dyn EngineData>>,
add_files_metadata: Vec<Arc<dyn EngineData>>,
// NB: hashmap would require either duplicating the appid or splitting SetTransaction
// key/payload. HashSet requires Borrow<&str> with matching Eq, Ord, and Hash. Plus,
// HashSet::insert drops the to-be-inserted value without returning the existing one, which
Expand Down Expand Up @@ -360,7 +360,7 @@ impl Transaction {
/// to add multiple batches.
///
/// The expected schema for `add_metadata` is given by [`add_files_schema`].
pub fn add_files(&mut self, add_metadata: Box<dyn EngineData>) {
pub fn add_files(&mut self, add_metadata: Arc<dyn EngineData>) {
self.add_files_metadata.push(add_metadata);
}

Expand Down
Loading