-
Couldn't load subscription status.
- Fork 118
feat: Add write_parquet_file to ParquetHandler
#1392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1392 +/- ##
==========================================
+ Coverage 84.65% 84.75% +0.09%
==========================================
Files 115 115
Lines 29557 29858 +301
Branches 29557 29858 +301
==========================================
+ Hits 25021 25305 +284
Misses 3329 3329
- Partials 1207 1224 +17 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
c7aacf2 to
ce1bb10
Compare
kernel/src/lib.rs
Outdated
| /// # Returns | ||
| /// | ||
| /// A [`DeltaResult`] indicating success or failure. | ||
| fn write_parquet_file(&self, url: url::Url, data: Box<dyn EngineData>) -> DeltaResult<()>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense for the API to take in write options: e.g. compression, row group size etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compression would make sense to me, but row group size is often more complex. Some writers take the number of rows, while others take the size in bytes. Instead, we can also let the engine decide on this?
kernel/src/lib.rs
Outdated
| /// # Returns | ||
| /// | ||
| /// A [`DeltaResult`] indicating success or failure. | ||
| fn write_parquet_file(&self, url: url::Url, data: Box<dyn EngineData>) -> DeltaResult<()>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't data be iterator of FilteredEngineData, because this is what checkpoint producer produces.
Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After digging a bit more into the code, I think this makes sense. Having the writer do filtering was not directly obvious to me, but it looks like we are also delegating that to the engine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it avoids a copy in cases where the kernel has to filter out some rows. Also consistent with the existing JSON write API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree. I think it would be nice to have the convenience From trait to convert EngineData into FilteredEngineData: #1397
| read_files(files, schema, predicate, try_create_from_parquet) | ||
| } | ||
|
|
||
| fn write_parquet_file( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of the machinery in this and the default client looks the same, just rip these out into a pub(crate) fn so they use the same logic. Or was there is a reason they are separate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think they are different, and it would be good to keep them separate:
SyncParquetHandlerwrites directly to a file, which makes sense since it only supports local FS.DefaultParquetHandlerbuffers first the memory in memory, and then pushes everything directly to the object store.
They look pretty similar today, but I think they might diverge more in the future when we start doing more optimizations.
kernel/src/lib.rs
Outdated
| fn write_parquet_file( | ||
| &self, | ||
| url: url::Url, | ||
| data: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just data: Box<dyn EngineData>?
- Why
Iterator? - Why
FilteredEngineData?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why
Iterator
For future proving, you could write chunks of data that are larger than the memory. By having an iterator, you can stream this into Parquet file. Arrow has a similar concept where you have ChunkedArray, where each chunk is a row-group. I think we want to mimic that a bit here.
- Why
FilteredEngineData
This was also not my first thought (see #1392 (comment)). But this nicely aligns with the JSON API. To make the syntax a bit more friendly, and reduce the visual noise, I've suggested implementing the From trait to easily convert EngineData into FilteredEngineData: #1397.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something, but I think we still need to go through all batches to identify the final schema. We're not guaranteed that it will be the same for all actions. Also, I believe you are joining everything to the memory object here: https://github.com/delta-io/delta-kernel-rs/pull/1392/files#diff-e05e7b3b94c5637bfc367192986135a7a8a3986c34dc1b22cfd4961647ce7664R64, so we still haven't addressed the potential problem of "it might not fit into memory".
With FilteredEngineData, I see we can use "selection_vector" – this is a good feature, I agree. Are we aware of cases when we can use it currently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With FilteredEngineData, I see we can use "selection_vector" – this is a good feature, I agree. Are we aware of cases when we can use it currently?
We use it in the proposed remove_files PR (#1390)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the pointer. For JSON the schema can change per row, but this can't be the case for Parquet. I've updated the code to remove the iterator for now. @anoopj WDYT?
…nto fd-write-parquet
| .try_collect() | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(data.len(), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also need verify that field-id's are populated and we can project based on field IDs for column indirection? Will this be a follow-up?
|
I'm not delta-kernel-rs maintainer, but from my POV this PR looks good. |
| predicate: Option<PredicateRef>, | ||
| ) -> DeltaResult<FileDataReadResultIterator>; | ||
|
|
||
| /// Write data to a Parquet file at the specified URL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should specify the semantics around what to do if the file already exists
| // Convert FilteredEngineData to RecordBatch, applying selection filter | ||
| let batch = filter_to_record_batch(data)?; | ||
|
|
||
| // We buffer it in the application first, and then push everything to the object-store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use the async_writer
As they note on that page: object_store provides it’s native implementation of AsyncFileWriter by ParquetObjectWriter.
So you could do something like:
let path = Path::from_url_path(location.path())?;
let object_writer = ParquetObjectWriter::new(self.store.clone(), path);
let mut writer = AsyncArrowWriter::try_new(
object_writer,
batch.schema(),
None, // could be some props if needed
)?;
// Block on the async write
self.task_executor
.block_on(async move { writer.write(&batch).await })?;
What changes are proposed in this pull request?
Hey everyone, this is a first PR to start the discussion around writing Parquet files.
Currently, the way to write Parquet is to completely delegate this to the engine, for example here: https://github.com/dl-rs-private/delta-kernel-rs/blob/a096d013f876ed29beef9379cf4cd713e9febd90/kernel/src/checkpoint/mod.rs#L44
Some things to consider:
DefaultParquetHandlerthere iswrite_parquet:delta-kernel-rs/kernel/src/engine/default/parquet.rs
Lines 142 to 151 in 29a934a
But this one is very much focussed on writing DataFiles. This is not something we really need if we want to generic Parquet (for example a checkpoint).
()as a return type so we can extend that later on. We could also return things like the size, but that would introduce another HEAD request, which we need to consider if that's something we really need.ParquetWriterthat that consumes batches ofEngineData. For the snapshot, this is not a requirement.Resolves #1376
This PR affects the following public APIs
Introduces a new public API, and extends an existing trait.
How was this change tested?