Skip to content
Open
168 changes: 168 additions & 0 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,25 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
self.readahead,
)
}

fn write_parquet_file(&self, url: url::Url, data: Box<dyn EngineData>) -> DeltaResult<()> {
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?;
let record_batch = batch.record_batch();

let mut buffer = vec![];
let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?;
writer.write(record_batch)?;
writer.close()?; // writer must be closed to write footer

let store = self.store.clone();
let path = Path::from_url_path(url.path())?;

// Block on the async put operation
self.task_executor
.block_on(async move { store.put(&path, buffer.into()).await })?;

Ok(())
}
}

/// Implements [`FileOpener`] for a parquet file
Expand Down Expand Up @@ -644,4 +663,153 @@ mod tests {
"Generic delta kernel error: Path must end with a trailing slash: memory:///data",
);
}

#[tokio::test]
async fn test_parquet_handler_trait_write() {
let store = Arc::new(InMemory::new());
let parquet_handler: Arc<dyn ParquetHandler> = Arc::new(DefaultParquetHandler::new(
store.clone(),
Arc::new(TokioBackgroundExecutor::new()),
));

let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![
(
"x",
Arc::new(Int64Array::from(vec![10, 20, 30])) as Arc<dyn Array>,
),
(
"y",
Arc::new(Int64Array::from(vec![100, 200, 300])) as Arc<dyn Array>,
),
])
.unwrap(),
));

// Test writing through the trait method
let file_url = Url::parse("memory:///test/data.parquet").unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), data)
.unwrap();

// Verify we can read the file back
let path = Path::from_url_path(file_url.path()).unwrap();
let reader = ParquetObjectReader::new(store.clone(), path);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.schema()
.clone();

let file_meta = FileMeta {
location: file_url,
last_modified: 0,
size: 0,
};

let data: Vec<RecordBatch> = parquet_handler
.read_parquet_files(
slice::from_ref(&file_meta),
Arc::new(physical_schema.try_into_kernel().unwrap()),
None,
)
.unwrap()
.map(into_record_batch)
.try_collect()
.unwrap();

assert_eq!(data.len(), 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need verify that field-id's are populated and we can project based on field IDs for column indirection? Will this be a follow-up?

assert_eq!(data[0].num_rows(), 3);
assert_eq!(data[0].num_columns(), 2);
}

#[tokio::test]
async fn test_parquet_handler_trait_write_and_read_roundtrip() {
let store = Arc::new(InMemory::new());
let parquet_handler: Arc<dyn ParquetHandler> = Arc::new(DefaultParquetHandler::new(
store.clone(),
Arc::new(TokioBackgroundExecutor::new()),
));

// Create test data with multiple types
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![
(
"int_col",
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])) as Arc<dyn Array>,
),
(
"string_col",
Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])) as Arc<dyn Array>,
),
(
"bool_col",
Arc::new(BooleanArray::from(vec![true, false, true, false, true]))
as Arc<dyn Array>,
),
])
.unwrap(),
));

// Write the data
let file_url = Url::parse("memory:///roundtrip/test.parquet").unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), data)
.unwrap();

// Read it back
let path = Path::from_url_path(file_url.path()).unwrap();
let reader = ParquetObjectReader::new(store.clone(), path);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.schema()
.clone();

let file_meta = FileMeta {
location: file_url.clone(),
last_modified: 0,
size: 0,
};

let data: Vec<RecordBatch> = parquet_handler
.read_parquet_files(
slice::from_ref(&file_meta),
Arc::new(physical_schema.try_into_kernel().unwrap()),
None,
)
.unwrap()
.map(into_record_batch)
.try_collect()
.unwrap();

// Verify the data
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 5);
assert_eq!(data[0].num_columns(), 3);

// Verify column values
let int_col = data[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(int_col.values(), &[1, 2, 3, 4, 5]);

let string_col = data[0]
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(string_col.value(0), "a");
assert_eq!(string_col.value(4), "e");

let bool_col = data[0]
.column(2)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
assert!(bool_col.value(0));
assert!(!bool_col.value(1));
}
}
113 changes: 110 additions & 3 deletions kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::fs::File;

use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use crate::parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder};
use crate::parquet::arrow::arrow_writer::ArrowWriter;
use std::fs::File;
use url::Url;

use super::read_files;
use crate::engine::arrow_data::ArrowEngineData;
Expand All @@ -11,7 +12,9 @@ use crate::engine::arrow_utils::{
};
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef};
use crate::{
DeltaResult, EngineData, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef,
};

pub(crate) struct SyncParquetHandler;

Expand Down Expand Up @@ -52,4 +55,108 @@ impl ParquetHandler for SyncParquetHandler {
) -> DeltaResult<FileDataReadResultIterator> {
read_files(files, schema, predicate, try_create_from_parquet)
}

fn write_parquet_file(&self, url: Url, data: Box<dyn EngineData>) -> DeltaResult<()> {
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?;
let record_batch = batch.record_batch();

// Convert URL to file path
let path = url
.to_file_path()
.map_err(|_| crate::Error::generic(format!("Invalid file URL: {}", url)))?;
let mut file = File::create(&path)?;

let mut writer = ArrowWriter::try_new(&mut file, record_batch.schema(), None)?;
writer.write(record_batch)?;
writer.close()?; // writer must be closed to write footer

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::array::{Array, Int64Array, RecordBatch, StringArray};
use crate::engine::arrow_conversion::TryIntoKernel as _;
use std::sync::Arc;
use tempfile::tempdir;

#[test]
fn test_sync_write_parquet_file() {
let handler = SyncParquetHandler;
let temp_dir = tempdir().unwrap();
let file_path = temp_dir.path().join("test.parquet");
let url = Url::from_file_path(&file_path).unwrap();

// Create test data
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![
(
"id",
Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc<dyn Array>,
),
(
"name",
Arc::new(StringArray::from(vec!["a", "b", "c"])) as Arc<dyn Array>,
),
])
.unwrap(),
));

// Write the file
handler.write_parquet_file(url.clone(), data).unwrap();

// Verify the file exists
assert!(file_path.exists());

// Read it back to verify
let file = File::open(&file_path).unwrap();
let reader =
crate::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap();
let schema = reader.schema().clone();

let file_meta = FileMeta {
location: url,
last_modified: 0,
size: 0,
};

let mut result = handler
.read_parquet_files(
&[file_meta],
Arc::new(schema.try_into_kernel().unwrap()),
None,
)
.unwrap();

let engine_data = result.next().unwrap().unwrap();
let batch = ArrowEngineData::try_from_engine_data(engine_data).unwrap();
let record_batch = batch.record_batch();

// Verify shape
assert_eq!(record_batch.num_rows(), 3);
assert_eq!(record_batch.num_columns(), 2);

// Verify content - id column
let id_col = record_batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(id_col.values(), &[1, 2, 3]);

// Verify content - name column
let name_col = record_batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(name_col.value(0), "a");
assert_eq!(name_col.value(1), "b");
assert_eq!(name_col.value(2), "c");

assert!(result.next().is_none());
}
}
15 changes: 15 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,21 @@ pub trait ParquetHandler: AsAny {
physical_schema: SchemaRef,
predicate: Option<PredicateRef>,
) -> DeltaResult<FileDataReadResultIterator>;

/// Write data to a Parquet file at the specified URL.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should specify the semantics around what to do if the file already exists

///
/// This method writes the provided `data` to a Parquet file at the given `url`.
///
/// # Parameters
///
/// - `url` - The full URL path where the Parquet file should be written
/// (e.g., `s3://bucket/path/file.parquet`).
/// - `data` - The data to write to the Parquet file, as EngineData.
///
/// # Returns
///
/// A [`DeltaResult`] indicating success or failure.
fn write_parquet_file(&self, url: url::Url, data: Box<dyn EngineData>) -> DeltaResult<()>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense for the API to take in write options: e.g. compression, row group size etc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compression would make sense to me, but row group size is often more complex. Some writers take the number of rows, while others take the size in bytes. Instead, we can also let the engine decide on this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't data be iterator of FilteredEngineData, because this is what checkpoint producer produces.

Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>>

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After digging a bit more into the code, I think this makes sense. Having the writer do filtering was not directly obvious to me, but it looks like we are also delegating that to the engine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it avoids a copy in cases where the kernel has to filter out some rows. Also consistent with the existing JSON write API.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. I think it would be nice to have the convenience From trait to convert EngineData into FilteredEngineData: #1397

}

/// The `Engine` trait encapsulates all the functionality an engine or connector needs to provide
Expand Down
Loading