Skip to content
Open
56 changes: 31 additions & 25 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
//! Some utilities for working with arrow data types

use std::collections::{HashMap, HashSet};
use std::io::{BufRead, BufReader};
use std::ops::Range;
use std::sync::{Arc, OnceLock};

use crate::engine::arrow_conversion::{TryFromKernel as _, TryIntoArrow as _};
use crate::engine::ensure_data_types::DataTypeCompat;
use crate::engine_data::FilteredEngineData;
Expand All @@ -15,6 +10,10 @@ use crate::{
utils::require,
DeltaResult, EngineData, Error,
};
use std::collections::{HashMap, HashSet};
use std::io::{BufRead, BufReader};
use std::ops::Range;
use std::sync::{Arc, OnceLock};

use crate::arrow::array::{
cast::AsArray, make_array, new_null_array, Array as ArrowArray, BooleanArray, GenericListArray,
Expand Down Expand Up @@ -1074,6 +1073,31 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR
Ok(concat_batches(&schema, output.iter())?)
}

pub(crate) fn filter_to_record_batch(
filtered_data: FilteredEngineData,
) -> DeltaResult<RecordBatch> {
// Honor the new contract: if selection vector is shorter than the number of rows,
// then all rows not covered by the selection vector are assumed to be selected
let (underlying_data, mut selection_vector) = filtered_data.into_parts();
let batch = extract_record_batch(&*underlying_data)?;
let num_rows = batch.num_rows();

let result_batch = if selection_vector.is_empty() {
// If selection vector is empty, write all rows per contract.
batch.clone()
} else {
// Extend the selection vector with `true` for uncovered rows
if selection_vector.len() < num_rows {
selection_vector.resize(num_rows, true);
}

filter_record_batch(batch, &BooleanArray::from(selection_vector))
.map_err(|e| Error::generic(format!("Failed to filter record batch: {e}")))?
};

Ok(result_batch)
}

/// serialize an arrow RecordBatch to a JSON string by appending to a buffer.
// TODO (zach): this should stream data to the JSON writer and output an iterator.
#[internal_api]
Expand All @@ -1082,26 +1106,8 @@ pub(crate) fn to_json_bytes(
) -> DeltaResult<Vec<u8>> {
let mut writer = LineDelimitedWriter::new(Vec::new());
for chunk in data {
let filtered_data = chunk?;
// Honor the new contract: if selection vector is shorter than the number of rows,
// then all rows not covered by the selection vector are assumed to be selected
let (underlying_data, mut selection_vector) = filtered_data.into_parts();
let batch = extract_record_batch(&*underlying_data)?;
let num_rows = batch.num_rows();

if selection_vector.is_empty() {
// If selection vector is empty, write all rows per contract.
writer.write(batch)?;
} else {
// Extend the selection vector with `true` for uncovered rows
if selection_vector.len() < num_rows {
selection_vector.resize(num_rows, true);
}

let filtered_batch = filter_record_batch(batch, &BooleanArray::from(selection_vector))
.map_err(|e| Error::generic(format!("Failed to filter record batch: {e}")))?;
writer.write(&filtered_batch)?
};
let batch = filter_to_record_batch(chunk?)?;
writer.write(&batch)?;
}
writer.finish()?;
Ok(writer.into_inner())
Expand Down
204 changes: 199 additions & 5 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ use super::UrlExt;
use crate::engine::arrow_conversion::TryIntoArrow as _;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{
fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes,
RowIndexBuilder,
filter_to_record_batch, fixup_parquet_read, generate_mask, get_requested_indices,
ordering_needs_row_indexes, RowIndexBuilder,
};
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetHandler,
PredicateRef,
DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, FilteredEngineData,
ParquetHandler, PredicateRef,
};

#[derive(Debug)]
Expand Down Expand Up @@ -242,6 +242,45 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
self.readahead,
)
}

fn write_parquet_file(
&self,
location: url::Url,
data: FilteredEngineData,
) -> DeltaResult<FileMeta> {
// Convert FilteredEngineData to RecordBatch, applying selection filter
let batch = filter_to_record_batch(data)?;

// We buffer it in the application first, and then push everything to the object-store.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the async_writer

As they note on that page: object_store provides it’s native implementation of AsyncFileWriter by ParquetObjectWriter.

So you could do something like:

let path = Path::from_url_path(location.path())?;
let object_writer = ParquetObjectWriter::new(self.store.clone(), path);
let mut writer = AsyncArrowWriter::try_new(
    object_writer,
    batch.schema(),
    None, // could be some props if needed
)?;

// Block on the async write
self.task_executor
  .block_on(async move { writer.write(&batch).await })?; 

// The storage API does not seem to allow for streaming to the store, which is okay
// since often the object stores allocate their own buffers.
let mut buffer = Vec::new();

// Scope to ensure writer is dropped before we use buffer
{
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None)?;
writer.write(&batch)?;
writer.close()?; // writer must be closed to write footer
}

let store = self.store.clone();
let path = Path::from_url_path(location.path())?;

let size: u64 = buffer
.len()
.try_into()
.map_err(|_| Error::generic("unable to convert usize to u64"))?;

// Block on the async put operation
self.task_executor
.block_on(async move { store.put(&path, buffer.into()).await })?;

Ok(FileMeta {
location,
last_modified: 0,
size,
})
}
}

/// Implements [`FileOpener`] for a parquet file
Expand Down Expand Up @@ -427,7 +466,7 @@ mod tests {
use std::path::PathBuf;
use std::slice;

use crate::arrow::array::{Array, RecordBatch};
use crate::arrow::array::{Array, BooleanArray, RecordBatch};

use crate::engine::arrow_conversion::TryIntoKernel as _;
use crate::engine::arrow_data::ArrowEngineData;
Expand Down Expand Up @@ -638,4 +677,159 @@ mod tests {
"Generic delta kernel error: Path must end with a trailing slash: memory:///data",
);
}

#[tokio::test]
async fn test_parquet_handler_trait_write() {
let store = Arc::new(InMemory::new());
let parquet_handler: Arc<dyn ParquetHandler> = Arc::new(DefaultParquetHandler::new(
store.clone(),
Arc::new(TokioBackgroundExecutor::new()),
));

let engine_data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![
(
"x",
Arc::new(Int64Array::from(vec![10, 20, 30])) as Arc<dyn Array>,
),
(
"y",
Arc::new(Int64Array::from(vec![100, 200, 300])) as Arc<dyn Array>,
),
])
.unwrap(),
));

// Wrap in FilteredEngineData with all rows selected
let filtered_data = crate::FilteredEngineData::with_all_rows_selected(engine_data);

// Test writing through the trait method
let file_url = Url::parse("memory:///test/data.parquet").unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), filtered_data)
.unwrap();

// Verify we can read the file back
let path = Path::from_url_path(file_url.path()).unwrap();
let reader = ParquetObjectReader::new(store.clone(), path);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.schema()
.clone();

let file_meta = FileMeta {
location: file_url,
last_modified: 0,
size: 0,
};

let data: Vec<RecordBatch> = parquet_handler
.read_parquet_files(
slice::from_ref(&file_meta),
Arc::new(physical_schema.try_into_kernel().unwrap()),
None,
)
.unwrap()
.map(into_record_batch)
.try_collect()
.unwrap();

assert_eq!(data.len(), 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need verify that field-id's are populated and we can project based on field IDs for column indirection? Will this be a follow-up?

assert_eq!(data[0].num_rows(), 3);
assert_eq!(data[0].num_columns(), 2);
}

#[tokio::test]
async fn test_parquet_handler_trait_write_and_read_roundtrip() {
let store = Arc::new(InMemory::new());
let parquet_handler: Arc<dyn ParquetHandler> = Arc::new(DefaultParquetHandler::new(
store.clone(),
Arc::new(TokioBackgroundExecutor::new()),
));

// Create test data with multiple types
let engine_data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![
(
"int_col",
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])) as Arc<dyn Array>,
),
(
"string_col",
Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])) as Arc<dyn Array>,
),
(
"bool_col",
Arc::new(BooleanArray::from(vec![true, false, true, false, true]))
as Arc<dyn Array>,
),
])
.unwrap(),
));

// Wrap in FilteredEngineData with all rows selected
let filtered_data = crate::FilteredEngineData::with_all_rows_selected(engine_data);

// Write the data
let file_url = Url::parse("memory:///roundtrip/test.parquet").unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), filtered_data)
.unwrap();

// Read it back
let path = Path::from_url_path(file_url.path()).unwrap();
let reader = ParquetObjectReader::new(store.clone(), path);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.schema()
.clone();

let file_meta = FileMeta {
location: file_url.clone(),
last_modified: 0,
size: 0,
};

let data: Vec<RecordBatch> = parquet_handler
.read_parquet_files(
slice::from_ref(&file_meta),
Arc::new(physical_schema.try_into_kernel().unwrap()),
None,
)
.unwrap()
.map(into_record_batch)
.try_collect()
.unwrap();

// Verify the data
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 5);
assert_eq!(data[0].num_columns(), 3);

// Verify column values
let int_col = data[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(int_col.values(), &[1, 2, 3, 4, 5]);

let string_col = data[0]
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(string_col.value(0), "a");
assert_eq!(string_col.value(4), "e");

let bool_col = data[0]
.column(2)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
assert!(bool_col.value(0));
assert!(!bool_col.value(1));
}
}
Loading
Loading