Skip to content
Merged
43 changes: 33 additions & 10 deletions ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ pub unsafe extern "C" fn add_files(
txn.add_files(write_metadata);
}

///
/// Mark the transaction as having data changes or not (these are recorded at the file level).
///
/// # Safety
///
/// Caller is responsible for passing a valid handle.
#[no_mangle]
pub unsafe extern "C" fn set_data_change(mut txn: Handle<ExclusiveTransaction>, data_change: bool) {
let underlying_txn = unsafe { txn.as_mut() };
underlying_txn.set_data_change(data_change);
}

/// Attempt to commit a transaction to the table. Returns version number if successful.
/// Returns error if the commit fails.
///
Expand Down Expand Up @@ -136,11 +148,11 @@ mod tests {
use delta_kernel::arrow::ffi::to_ffi;
use delta_kernel::arrow::json::reader::ReaderBuilder;
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;

use delta_kernel::engine::arrow_conversion::TryIntoArrow;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::parquet::arrow::arrow_writer::ArrowWriter;
use delta_kernel::parquet::file::properties::WriterProperties;
use delta_kernel::transaction::add_files_schema;

use delta_kernel_ffi::engine_data::get_engine_data;
use delta_kernel_ffi::engine_data::ArrowFFIData;
Expand Down Expand Up @@ -194,25 +206,25 @@ mod tests {
fn create_file_metadata(
path: &str,
num_rows: i64,
metadata_schema: ArrowSchema,
) -> Result<ArrowFFIData, Box<dyn std::error::Error>> {
let schema: ArrowSchema = add_files_schema().as_ref().try_into_arrow()?;

let current_time: i64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;

let file_metadata = format!(
r#"{{"path":"{path}", "partitionValues": {{}}, "size": {num_rows}, "modificationTime": {current_time}, "dataChange": true, "stats": {{"numRecords": {num_rows}}}}}"#,
r#"{{"path":"{path}", "partitionValues": {{}}, "size": {num_rows}, "modificationTime": {current_time}, "stats": {{"numRecords": {num_rows}}}}}"#,
);

create_arrow_ffi_from_json(schema, file_metadata.as_str())
create_arrow_ffi_from_json(metadata_schema, file_metadata.as_str())
}

fn write_parquet_file(
delta_path: &str,
file_path: &str,
batch: &RecordBatch,
metadata_schema: ArrowSchema,
) -> Result<ArrowFFIData, Box<dyn std::error::Error>> {
// WriterProperties can be used to set Parquet file options
let props = WriterProperties::builder().build();
Expand All @@ -226,7 +238,7 @@ mod tests {
// writer must be closed to write footer
let res = writer.close().unwrap();

create_file_metadata(file_path, res.num_rows)
create_file_metadata(file_path, res.num_rows, metadata_schema)
}

#[tokio::test]
Expand Down Expand Up @@ -260,6 +272,7 @@ mod tests {
let txn = ok_or_panic(unsafe {
transaction(kernel_string_slice!(table_path_str), engine.shallow_copy())
});
unsafe { set_data_change(txn.shallow_copy(), false) };

// Add engine info
let engine_info = "default_engine";
Expand Down Expand Up @@ -313,8 +326,18 @@ mod tests {
),
])
.unwrap();

let file_info = write_parquet_file(table_path_str, "my_file.parquet", &batch)?;
let parquet_schema = unsafe {
txn_with_engine_info
.shallow_copy()
.as_ref()
.add_files_schema()
};
let file_info = write_parquet_file(
table_path_str,
"my_file.parquet",
&batch,
parquet_schema.as_ref().try_into_arrow()?,
)?;

let file_info_engine_data = ok_or_panic(unsafe {
get_engine_data(
Expand Down Expand Up @@ -365,7 +388,7 @@ mod tests {
"partitionValues": {},
"size": 0,
"modificationTime": 0,
"dataChange": true,
"dataChange": false,
"stats": "{\"numRecords\":5}"
}
}),
Expand Down
5 changes: 3 additions & 2 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ async fn try_main() -> DeltaResult<()> {
let mut txn = snapshot
.transaction()?
.with_operation("INSERT".to_string())
.with_engine_info("default_engine/write-table-example");
.with_engine_info("default_engine/write-table-example")
.with_data_change(true);

// Write the data using the engine
let write_context = Arc::new(txn.get_write_context());
let file_metadata = engine
.write_parquet(&sample_data, write_context.as_ref(), HashMap::new(), true)
.write_parquet(&sample_data, write_context.as_ref(), HashMap::new())
.await?;

// Add the file metadata to the transaction
Expand Down
8 changes: 1 addition & 7 deletions kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ impl<E: TaskExecutor> DefaultEngine<E> {
data: &ArrowEngineData,
write_context: &WriteContext,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let transform = write_context.logical_to_physical();
let input_schema = Schema::try_from_arrow(data.record_batch().schema())?;
Expand All @@ -110,12 +109,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
);
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(
write_context.target_dir(),
physical_data,
partition_values,
data_change,
)
.write_parquet_file(write_context.target_dir(), physical_data, partition_values)
.await
}
}
Expand Down
34 changes: 14 additions & 20 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::ops::Range;
use std::sync::Arc;

use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
use crate::arrow::array::{BooleanArray, Int64Array, RecordBatch, StringArray, StructArray};
use crate::arrow::array::{Int64Array, RecordBatch, StringArray, StructArray};
use crate::arrow::datatypes::{DataType, Field};
use crate::parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
Expand All @@ -28,7 +28,6 @@ use crate::engine::arrow_utils::{
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::transaction::add_files_schema;
use crate::{
DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetHandler,
PredicateRef,
Expand Down Expand Up @@ -67,7 +66,6 @@ impl DataFileMetadata {
fn as_record_batch(
&self,
partition_values: &HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let DataFileMetadata {
file_meta:
Expand Down Expand Up @@ -99,7 +97,6 @@ impl DataFileMetadata {
.try_into()
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
let size = Arc::new(Int64Array::from(vec![size]));
let data_change = Arc::new(BooleanArray::from(vec![data_change]));
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
let stats = Arc::new(StructArray::try_new_with_length(
vec![Field::new("numRecords", DataType::Int64, true)].into(),
Expand All @@ -109,15 +106,12 @@ impl DataFileMetadata {
)?);

Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
Arc::new(add_files_schema().as_ref().try_into_arrow()?),
vec![
path,
partitions,
size,
modification_time,
data_change,
stats,
],
Arc::new(
crate::transaction::BASE_ADD_FILES_SCHEMA
.as_ref()
.try_into_arrow()?,
),
vec![path, partitions, size, modification_time, stats],
)?)))
}
}
Expand Down Expand Up @@ -192,16 +186,18 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
/// metadata as an EngineData batch which matches the [add file metadata] schema (where `<uuid>`
/// is a generated UUIDv4).
///
/// [add file metadata]: crate::transaction::add_files_schema
/// Note that the schema does not contain the dataChange column. In order to set `data_change` flag,
/// use [`crate::transaction::Transaction::with_data_change`].
///
/// [add file metadata]: crate::transaction::Transaction::add_files_schema
pub async fn write_parquet_file(
&self,
path: &url::Url,
data: Box<dyn EngineData>,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let parquet_metadata = self.write_parquet(path, data).await?;
parquet_metadata.as_record_batch(&partition_values, data_change)
parquet_metadata.as_record_batch(&partition_values)
}
}

Expand Down Expand Up @@ -504,14 +500,13 @@ mod tests {
let file_metadata = FileMeta::new(location.clone(), last_modified, size);
let data_file_metadata = DataFileMetadata::new(file_metadata, num_records);
let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]);
let data_change = true;
let actual = data_file_metadata
.as_record_batch(&partition_values, data_change)
.as_record_batch(&partition_values)
.unwrap();
let actual = ArrowEngineData::try_from_engine_data(actual).unwrap();

let schema = Arc::new(
crate::transaction::add_files_schema()
crate::transaction::BASE_ADD_FILES_SCHEMA
.as_ref()
.try_into_arrow()
.unwrap(),
Expand Down Expand Up @@ -544,7 +539,6 @@ mod tests {
Arc::new(partition_values),
Arc::new(Int64Array::from(vec![size as i64])),
Arc::new(Int64Array::from(vec![last_modified])),
Arc::new(BooleanArray::from(vec![data_change])),
Arc::new(stats_struct),
],
)
Expand Down
Loading
Loading