From 5b5ee3a2b979a6870b8f314df543c162bbb1f508 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Fri, 26 Sep 2025 09:33:46 -0700 Subject: [PATCH] arc instead of box --- ffi/src/engine_data.rs | 9 +++++---- ffi/src/lib.rs | 3 +++ ffi/src/transaction/mod.rs | 4 ++-- kernel/src/engine/default/mod.rs | 2 +- kernel/src/engine/default/parquet.rs | 14 +++++++++----- kernel/src/transaction/mod.rs | 4 ++-- 6 files changed, 22 insertions(+), 14 deletions(-) diff --git a/ffi/src/engine_data.rs b/ffi/src/engine_data.rs index 0010bf7de..219c38b67 100644 --- a/ffi/src/engine_data.rs +++ b/ffi/src/engine_data.rs @@ -13,12 +13,13 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::DeltaResult; use delta_kernel::EngineData; use std::ffi::c_void; +use std::sync::Arc; #[cfg(feature = "default-engine-base")] use crate::error::AllocateErrorFn; use crate::ExclusiveEngineData; #[cfg(feature = "default-engine-base")] -use crate::{ExternResult, IntoExternResult, SharedExternEngine}; +use crate::{ExternResult, IntoExternResult, SharedEngineData, SharedExternEngine}; use super::handle::Handle; @@ -116,7 +117,7 @@ pub unsafe extern "C" fn get_engine_data( array: FFI_ArrowArray, schema: &FFI_ArrowSchema, allocate_error: AllocateErrorFn, -) -> ExternResult> { +) -> ExternResult> { get_engine_data_impl(array, schema).into_extern_result(&allocate_error) } @@ -124,10 +125,10 @@ pub unsafe extern "C" fn get_engine_data( unsafe fn get_engine_data_impl( array: FFI_ArrowArray, schema: &FFI_ArrowSchema, -) -> DeltaResult> { +) -> DeltaResult> { let array_data = unsafe { arrow::array::ffi::from_ffi(array, schema) }; let record_batch: RecordBatch = StructArray::from(array_data?).into(); let arrow_engine_data: ArrowEngineData = record_batch.into(); - let engine_data: Box = Box::new(arrow_engine_data); + let engine_data: Arc = Arc::new(arrow_engine_data); Ok(engine_data.into()) } diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index a70b531f1..d9882b6ab 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -343,6 +343,9 @@ pub unsafe extern "C" fn free_row_indexes(slice: KernelRowIndexArray) { #[handle_descriptor(target=dyn EngineData, mutable=true)] pub struct ExclusiveEngineData; +#[handle_descriptor(target=dyn EngineData, mutable=false)] +pub struct SharedEngineData; + /// Drop an `ExclusiveEngineData`. /// /// # Safety diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 8bb3681b0..4dfa8d569 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -6,7 +6,7 @@ use crate::handle::Handle; use crate::KernelStringSlice; use crate::{unwrap_and_parse_path_as_url, TryFromStringSlice}; use crate::{DeltaResult, ExternEngine, Snapshot, Url}; -use crate::{ExclusiveEngineData, SharedExternEngine}; +use crate::{SharedEngineData, SharedExternEngine}; use delta_kernel::transaction::{CommitResult, Transaction}; use delta_kernel_ffi_macros::handle_descriptor; @@ -87,7 +87,7 @@ fn with_engine_info_impl( #[no_mangle] pub unsafe extern "C" fn add_files( mut txn: Handle, - write_metadata: Handle, + write_metadata: Handle, ) { let txn = unsafe { txn.as_mut() }; let write_metadata = unsafe { write_metadata.into_inner() }; diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 051a6a32c..a6a7802a2 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -99,7 +99,7 @@ impl DefaultEngine { write_context: &WriteContext, partition_values: HashMap, data_change: bool, - ) -> DeltaResult> { + ) -> DeltaResult> { let transform = write_context.logical_to_physical(); let input_schema = Schema::try_from_arrow(data.record_batch().schema())?; let output_schema = write_context.schema(); diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index fa77b3294..57f8c7c02 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -68,7 +68,7 @@ impl DataFileMetadata { &self, partition_values: &HashMap, data_change: bool, - ) -> DeltaResult> { + ) -> DeltaResult> { let DataFileMetadata { file_meta: FileMeta { @@ -108,7 +108,7 @@ impl DataFileMetadata { 1, )?); - Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( + Ok(Arc::new(ArrowEngineData::new(RecordBatch::try_new( Arc::new(add_files_schema().as_ref().try_into_arrow()?), vec![ path, @@ -199,7 +199,7 @@ impl DefaultParquetHandler { data: Box, partition_values: HashMap, data_change: bool, - ) -> DeltaResult> { + ) -> DeltaResult> { let parquet_metadata = self.write_parquet(path, data).await?; parquet_metadata.as_record_batch(&partition_values, data_change) } @@ -505,10 +505,14 @@ mod tests { let data_file_metadata = DataFileMetadata::new(file_metadata, num_records); let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]); let data_change = true; - let actual = data_file_metadata + let actual_arc = data_file_metadata .as_record_batch(&partition_values, data_change) .unwrap(); - let actual = ArrowEngineData::try_from_engine_data(actual).unwrap(); + // Downcast Arc to get the ArrowEngineData for testing + let actual_any = actual_arc.as_any(); + let actual = actual_any + .downcast_ref::() + .unwrap(); let schema = Arc::new( crate::transaction::add_files_schema() diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 88d34b658..87d4e0445 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -117,7 +117,7 @@ pub struct Transaction { read_snapshot: SnapshotRef, operation: Option, engine_info: Option, - add_files_metadata: Vec>, + add_files_metadata: Vec>, // NB: hashmap would require either duplicating the appid or splitting SetTransaction // key/payload. HashSet requires Borrow<&str> with matching Eq, Ord, and Hash. Plus, // HashSet::insert drops the to-be-inserted value without returning the existing one, which @@ -360,7 +360,7 @@ impl Transaction { /// to add multiple batches. /// /// The expected schema for `add_metadata` is given by [`add_files_schema`]. - pub fn add_files(&mut self, add_metadata: Box) { + pub fn add_files(&mut self, add_metadata: Arc) { self.add_files_metadata.push(add_metadata); }