Skip to content

Commit 3c0d43b

Browse files
authored
zaphyr: vortex^H^H^H^H^H^H parquet shuffle (#3482)
* Shuffle via Parquet, with Pickle fallback * Closes: #3478 * Scatter operations now write intermediate shuffle data as Parquet columnar files instead of per-chunk pickle blobs (we would write mapper x reducer x chunks files before, which cause problems at scale). Each mapper shard writes Parquet file(s) [^1] with `shard_idx` and `chunk_idx` columns, reducers filter via predicate pushdown to their respective chunks and do k-merge via sorted chunks (as before) * Add `ParquetDiskChunk` - references a slice of a shared Parquet file, filtered to target shard and chunk * Rename `DiskChunk` → `PickleDiskChunk` to clarify its role as fallback * Fall back to pickle with a warning when item is not Arrow-serializable [^1]: a single mapper may produce more than one parquet file IFF the schema of the chunks changes, that's possible e.g. when null field becomes concrete type, i.e. evolves to optional field.
1 parent 7afc138 commit 3c0d43b

File tree

6 files changed

+392
-102
lines changed

6 files changed

+392
-102
lines changed

lib/zephyr/src/zephyr/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import logging
77

88
from zephyr.dataset import Dataset
9-
from zephyr.execution import DiskChunk, WorkerContext, ZephyrContext, zephyr_worker_ctx
9+
from zephyr.execution import WorkerContext, ZephyrContext, zephyr_worker_ctx
1010
from zephyr.expr import Expr, col, lit
1111
from zephyr.plan import ExecutionHint, compute_plan
1212
from zephyr.readers import InputFileSpec, load_file, load_jsonl, load_parquet, load_vortex, load_zip_members

lib/zephyr/src/zephyr/dataset.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,9 @@ def group_by(
759759
The reducer receives (key, iterator_of_items) and returns a single result or an iterator of
760760
results for that group.
761761
762+
Incoming records are strongly encouraged to be Arrow-serializable (dicts, lists, scalars, etc.).
763+
Custom dataclasses and arbitrary objects will have degraded performance (serde via pickle).
764+
762765
Args:
763766
key: Function extracting grouping key from item (must be hashable)
764767
reducer: Function from (key, Iterator[items]) -> result

0 commit comments

Comments
 (0)