[zephyr] Use Arrow IPC for scatter chunks, fall back to pickle#5088
[zephyr] Use Arrow IPC for scatter chunks, fall back to pickle#5088hsuhanooi wants to merge 1 commit intomarin-community:mainfrom
Conversation
Replace the cloudpickle-only chunk format with Arrow IPC when items are Arrow-compatible (plain dicts with primitive/string values). Chunks that cannot be Arrow-encoded (frozenset, custom classes) fall back to cloudpickle sub-batches. A one-byte format tag at the start of each frame distinguishes the two paths on read. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
🤖 Closing after benchmarking. Arrow IPC is 6-8x slower on read due to to_pylist() transposing columnar data back to Python dicts, and 20-40% slower on write. The format only wins on compressed size for wide numeric tables. Not a net gain for this use case. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ac090af849
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if fmt == _FRAME_FORMAT_ARROW: | ||
| ipc_bytes = zstd.ZstdDecompressor().decompress(payload) | ||
| reader = pa.ipc.open_stream(pa.py_buffer(ipc_bytes)) | ||
| yield from reader.read_all().to_pylist() |
There was a problem hiding this comment.
Stream Arrow rows instead of full-table materialization
The Arrow read path currently does reader.read_all().to_pylist(), which eagerly materializes an entire chunk before yielding any rows. With INTERMEDIATE_CHUNK_SIZE at 100,000, each iterator can hold a full chunk of Python objects at once; reducers often have many chunk iterators open, so this can cause large memory spikes/OOMs and defeats the existing _SUB_BATCH_SIZE-based streaming assumptions used by shuffle memory planning.
Useful? React with 👍 / 👎.
| cloudpickle.dump(items[i : i + _SUB_BATCH_SIZE], zf, protocol=pickle.HIGHEST_PROTOCOL) | ||
| return raw.getvalue() | ||
| try: | ||
| table = pa.Table.from_pylist(items) |
There was a problem hiding this comment.
Restrict Arrow path to rows with stable dict schemas
Using pa.Table.from_pylist(items) on arbitrary dict rows can silently normalize row shape (for example, missing keys become present with None after to_pylist()), which changes payload semantics compared to the previous pickle-preserving behavior. This means sparse/plain dict records can be mutated in transit, and downstream transforms that depend on key presence or original Python structure may produce different results.
Useful? React with 👍 / 👎.
Replace the cloudpickle-only chunk format with Arrow IPC when items are Arrow-compatible (plain dicts with primitive or string values). Items that Arrow cannot encode — frozenset, custom classes — fall back to cloudpickle sub-batches unchanged. A one-byte format tag prefixed to each zstd frame distinguishes the two paths on read, so both formats coexist in the same scatter file.