[zephyr] External sort spill: Parquet instead of pickle+zstd#4695
[zephyr] External sort spill: Parquet instead of pickle+zstd#4695
Conversation
|
Claude finished @rjpower's task in 3m 14s —— View job PR Review: [zephyr] External sort spill: Parquet instead of pickle+zstd
The format swap from I left 4 inline comments. Here's the summary: Issues1. SpillWriter.close() deadlock when background thread has errored (inline) If 2. When the Nits3. Bare Silently swallows metadata read failures. A 4. Minor robustness improvement — wrap the entire loop body including Not flagged (looks good)
No unit tests for SpillWriter itself — it's exercised indirectly through shuffle/groupby/execution tests, but edge cases (empty writes, background thread error propagation, byte threshold boundaries) aren't covered. Worth adding, especially given the threading. |
| def close(self) -> None: | ||
| """Flush remaining accumulated data and wait for the background thread to finish.""" | ||
| if self._closed: | ||
| return | ||
| self._closed = True | ||
| remaining = self._accumulator.flush() | ||
| if remaining is not None: | ||
| self._queue.put(remaining) | ||
| self._queue.put(_SENTINEL) | ||
| self._thread.join() | ||
| self._writer.close() | ||
| self._check_error() |
There was a problem hiding this comment.
Potential deadlock when background thread exits with an error and there's remaining data.
If _background_writer_loop catches a BaseException from writer.write_table(item) and returns early, the thread dies. Then close() runs:
self._accumulator.flush()returns remaining dataself._queue.put(remaining)— succeeds (queue was empty after the failed item was dequeued)self._queue.put(_SENTINEL)— blocks forever:maxsize=1, the queue is full from step 2, and the thread is dead so nobody is draining it
The _check_error() call is at the end of close() and never reached.
Suggestion: check for errors before putting to the queue, or use put with a timeout, or check self._thread.is_alive() before attempting puts:
def close(self) -> None:
if self._closed:
return
self._closed = True
remaining = self._accumulator.flush()
if remaining is not None and not self._error_box:
self._queue.put(remaining)
if not self._error_box:
self._queue.put(_SENTINEL)
self._thread.join()
self._writer.close()
self._check_error()| def __exit__(self, exc_type, exc_val, exc_tb) -> None: | ||
| self.close() |
There was a problem hiding this comment.
__exit__ flushes and writes on error paths too.
When the with SpillWriter(...) block raises (e.g., heapq.merge hits a bad key), __exit__ still calls close(), which flushes remaining accumulated data and writes it — wasted I/O for a partial run that will never be read. More importantly, this interacts with the deadlock above: if the write path errored, close() could hang on the queue put.
Consider short-circuiting when entering from an exception:
| def __exit__(self, exc_type, exc_val, exc_tb) -> None: | |
| self.close() | |
| def __exit__(self, exc_type, exc_val, exc_tb) -> None: | |
| if exc_type is not None: | |
| # Error path: don't flush remaining data, just tear down the thread. | |
| self._closed = True | |
| self._queue.put(_SENTINEL) | |
| self._thread.join(timeout=5) | |
| self._writer.close() | |
| else: | |
| self.close() |
| def _background_writer_loop( | ||
| write_queue: "queue.Queue[pa.Table | object]", | ||
| writer: pq.ParquetWriter, | ||
| error_box: list[BaseException], | ||
| ) -> None: | ||
| """Drain write_queue, writing each table as a row group. Stops on _SENTINEL.""" | ||
| while True: | ||
| item = write_queue.get() | ||
| if item is _SENTINEL: | ||
| return | ||
| try: | ||
| writer.write_table(item) | ||
| except BaseException as exc: | ||
| error_box.append(exc) | ||
| return |
There was a problem hiding this comment.
Minor: if write_queue.get() itself raises (e.g., the thread is interrupted), the exception propagates uncaught and the thread dies silently. The caller would then deadlock on close(). This is very unlikely in practice, but a catch-all around the entire loop body (including get()) would make it robust:
def _background_writer_loop(write_queue, writer, error_box):
while True:
try:
item = write_queue.get()
if item is _SENTINEL:
return
writer.write_table(item)
except BaseException as exc:
error_box.append(exc)
returnThere was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2b1bf78812
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if remaining is not None: | ||
| self._queue.put(remaining) | ||
| self._queue.put(_SENTINEL) | ||
| self._thread.join() |
There was a problem hiding this comment.
Prevent close from hanging after background write failure
close() unconditionally does blocking put() calls (remaining and then _SENTINEL) before checking whether the writer thread has already failed. If writer.write_table() raises in _background_writer_loop, the thread exits, and a subsequent put() can block forever on the single-slot queue because no consumer remains. In that failure mode (e.g., transient GCS/Parquet write error), external sort can hang instead of surfacing the exception.
Useful? React with 👍 / 👎.
Replaces the .pkl.zst spill files in external_sort_merge with Parquet files written through a new SpillWriter (byte-budgeted ParquetWriter with a background I/O thread). Items are cloudpickle-style serialized into a single `_zephyr_payload` binary column; the Python heapq.merge semantics on both passes are unchanged, so behavior is identical. This is the minimal slice cherry-picked from #4178 (arrow-scatter-reduce) that removes raw pickle files from zephyr's shuffle data plane without touching the scatter envelope or reduce merge. Follow-ups will promote the sort key to a first-class column and move reduce to columnar Arrow merge. Pass-2 read-batch-size estimation now reads row-group metadata directly from the parquet file instead of probing a pickled sample.
It's a minor improvement, though the focus was more on using the same file format everywhere: ⏺ Phase-1 results (~770MT, one shard, one run each) Step-level wall-clock from step_runner / stage completion logs:
|
There was a problem hiding this comment.
As much as Parquet shuffle was an improvement, I become increasingly convinced it's not the right path forward 1. Parquet adds aspects that are not easy to control (Arrow), adds requirements that we don't care about (e.g. consistent schema and row-groups).
A simpler approach could be to use zstd chunks in a binary file with a shard -> range sidecar metadata. Effectively a deconstructed parquet, something tailored to our use-case? At least in my quick tests it's more efficient, less memory intensive than parquet while the code is less complicated (no schema dance, no parquet reader shenanigans).
Footnotes
-
unless maybe we go full Arrow/rust. ↩
| return sum(len(t) for t in self._tables) | ||
|
|
||
|
|
||
| def _background_writer_loop( |
There was a problem hiding this comment.
should we reuse/abstract the zephyr.writers.ThreadedBatchWriter?
|
Hrm, I'm a bit loathe to create our own file format. I agree with you Parquet adds some additional complexity, but as you note, blocked zstd + sidecar is kind of like Parquet. I agree with you on the "full Parquet" - trying to infer a schema from arbitrary Python - but in this case our schema is a single column, so unlikely to be a fuss. How about we restructure this:
The rest of the Zephyr code won't see any schemas, just use the reader/writer interfaces. |
|
@rjpower box around shuffle format sounds great! |
Rename spill_writer.py -> spill.py. Make SpillWriter/SpillReader an opaque chunked row-format API: no pa.Schema, pa.Table, or parquet types in the public surface. Callers write Python items and iterate back items (or chunks of items); the Parquet single-payload-column backing is an implementation detail. external_sort.py now imports only zephyr.spill and drops the _zephyr_payload / _SPILL_SCHEMA / _items_to_table plumbing. Pass-2 memory budgeting uses SpillReader.approx_item_bytes instead of reading parquet row-group metadata directly. Run files are now *.spill. SpillWriter is rewritten on top of zephyr.writers.ThreadedBatchWriter (per ravwojdyla's PR review) instead of open-coding a background thread + single-slot queue + sentinel handshake. This drops ~50 lines of threading plumbing and picks up ThreadedBatchWriter's polling submit (fixes the P1 close-hang-after-write-error), error propagation on submit/close, and error-path __exit__ that doesn't block.
2b1bf78 to
e18849b
Compare
|
Okay, rewrote so it's just the SpillWriter & SpillReader, and we can replace the internals with our own format at our leisure. |
|
I thought about trying to use the same thing for the reducer spill & the scatter outputs, but decided it would be a bigger change than we wanted here. Probably better to do that at the "new file format" level instead, and just update the SpillWriter/Reader to do it. |
Summary
.pkl.zstspill files inexternal_sort_mergewith Parquet files written via a newSpillWriter(byte-budgetedpq.ParquetWriterwith a background I/O thread)._zephyr_payloadbinary column. Pythonheapq.mergesemantics on both passes are unchanged, so behavior is identical — this is a format swap only.pq.ParquetFile.iter_batchesand unpickles one row group at a time to feed the heap merge.This is the minimal slice cherry-picked from #4178 that removes raw pickle files from zephyr's shuffle data plane. The scatter envelope and reduce merge are untouched; follow-ups will promote the sort key to a first-class column (Tier 2) and move reduce to columnar Arrow merge (Tier 3).
SpillWriteris added aslib/zephyr/src/zephyr/spill_writer.pyverbatim from #4178.external_sort.pyis its only caller in this PR.