merge does not use column stats to skip files? #2701
-
I have a huge table that does not fit into memory and need to upsert into it. Only the data for the last 10 days need to be revised. But delta-rs seems to always attempt to load the entire table into memory first when merge() function is called (#2573). Is that intended? Is merge ever a streamable action (I'm asking this because I understand that you cannot modify a parquet file since it's compressed so if we want to change a record we may need to write an entirely new parquet file. But even with that in mind I don't feel it a must to load the entire dataset into memory for merge)? Also, a more general point about rust engine, which I believe merge() runs with - it just does not seem to write anything in streaming mode at all (#1984). Although it can be worked around by using pyarrow engine instead, which does streaming writing correctly, other functionalities such as append with schema evolution or merge are only available in rust engine, which means they don't work with data larger than your memory. Lastly about file skipping: it just doesn't seem to be supported as claimed in the doc, either internally (as in the case of merge), or externally (as in the case of Polars where pl.scan_delta() simply calls to_pyarrow_dataset(), which at best can only push predicates down to PyArrow Dataset and thus does not use any file-level column stats from Delta like those from get_add_actions() to skip parquet files). Considering the limitation with the not-really-usable-merge, the under-supported file skipping, and a non-streamable rust engine, I feel that delta-rs is in a weird state where it pragmatically cannot deal with (write/merge) larger-than-memory data, although append() works fine but that's only due to pyarrow. Can anyone please shed some light on it? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Merge and write now support streamed execution since v0.25 which should improve memory pressure a lot! For write it's always enabled because there is no downside to it. For merge there is an opt-out by setting #1984 you linked is about using multipart writer and a potential refactor to use more datafusion components. It's not directly related to streamed execution I've added above. On your polars note, this is simply wrong. scan_delta with pyarrow=True will push predicates into pyarrow expressions, which are then used to prune down file fragments of the pyarrow dataset. |
Beta Was this translation helpful? Give feedback.
Merge and write now support streamed execution since v0.25 which should improve memory pressure a lot! For write it's always enabled because there is no downside to it. For merge there is an opt-out by setting
streamed_exec=False
the reason is the existing merge implementation can't use the stats from the source data to prune down the target further when you do a streamed execution, since deriving stats would require you to materialize the table in memory. So when you havestreamed_exec=true
, it's important to be more explicit with your partition predicate (e.g. partition in ['foo","bar"])#1984 you linked is about using multipart writer and a potential refactor to use more datafusion com…