Skip to content

[zephyr] Use msgspec msgpack for scatter chunks, fall back to pickle#5091

Closed
hsuhanooi wants to merge 2 commits intomarin-community:mainfrom
hsuhanooi:zephyr-msgpack-shuffle
Closed

[zephyr] Use msgspec msgpack for scatter chunks, fall back to pickle#5091
hsuhanooi wants to merge 2 commits intomarin-community:mainfrom
hsuhanooi:zephyr-msgpack-shuffle

Conversation

@hsuhanooi
Copy link
Copy Markdown
Contributor

Replace cloudpickle sub-batches with msgspec msgpack for scatter chunk serialization. msgpack is 2-5x faster on write and ~1.5x faster on read for plain dicts with primitive values. A one-byte format tag prefixes each frame so readers dispatch to msgpack or pickle. Items containing frozenset or set are detected before encoding and routed to the pickle path, preventing silent data loss from msgspec's frozenset-to-list coercion.

Replace cloudpickle sub-batches with msgspec msgpack for scatter chunk
serialization. msgpack is 2-5x faster on write and ~1.5x faster on read
for plain dicts. A one-byte format tag prefixes each frame so readers
can dispatch to msgpack or pickle. Items with frozenset/set values are
detected before encoding and routed to the pickle path to avoid silent
data loss from msgspec's frozenset-to-list coercion.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 8d462891b4

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread lib/zephyr/src/zephyr/shuffle.py Outdated
The previous implementation called _has_set_types only on items[0],
which silently corrupted heterogeneous chunks where later items contained
frozenset or set values (msgspec coerces them to list without raising).
Scan every item now. Also removes the list[:5] slice on nested lists so
frozensets inside longer nested containers are not missed.
Adds a regression test for the heterogeneous-chunk case.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@hsuhanooi
Copy link
Copy Markdown
Contributor Author

🤖 Micro-benchmark: msgpack vs cloudpickle serde (isolated, no I/O)

Setup: 5 runs each, N items per chunk, zstd level 3.

Dataset Write msgpack Write pickle Write speedup Read msgpack Read pickle Read speedup Size msgpack Size pickle
3-int cols (N=10K) 4.48M items/s 1.99M items/s 2.3x 2.33M items/s 1.40M items/s 1.7x 24.7 KB 26.4 KB
int+float+str (N=10K) 3.76M items/s 0.79M items/s 4.8x 1.89M items/s 1.64M items/s 1.2x 44.6 KB 45.0 KB
10-col wide (N=5K) 1.37M items/s 0.76M items/s 1.8x 0.63M items/s 0.43M items/s 1.5x 60.7 KB 58.7 KB
3-str cols (N=5K) 4.29M items/s 1.58M items/s 2.7x 1.69M items/s 1.37M items/s 1.2x 9.4 KB 8.2 KB

The string-heavy dataset is the only case where msgpack produces larger output (+15%), because cloudpickle can intern repeated short strings. For numeric and mixed data msgpack is equal or smaller.

@hsuhanooi
Copy link
Copy Markdown
Contributor Author

🤖 End-to-end benchmark: benchmark_shuffle.py (8 input shards x 50K items x 200 bytes, 8 output shards, 3 warm repeats, local executor)

Format Repeat Elapsed Items/sec MB/sec
pickle (origin/main) 0 14.79 s 27,046 5.2
pickle 1 14.28 s 28,017 5.3
pickle 2 13.77 s 29,045 5.5
msgpack (this PR) 0 13.31 s 30,051 5.7
msgpack 1 12.61 s 31,725 6.1
msgpack 2 12.59 s 31,776 6.1

Warmed throughput: 31.8K vs 29.0K items/s → +9.6% end-to-end. The serde gain is larger in isolation (2–5x) but scatter/reduce is only one slice of total pipeline time alongside item generation, file I/O, coordinator scheduling, and k-way merge.

@hsuhanooi
Copy link
Copy Markdown
Contributor Author

🤖 The local benchmark shows +10% end-to-end but the benchmark data is 97% string bytes by volume (3 int routing keys + 168-char random payload per item), which is the worst case for msgpack: the micro-benchmark showed +15% compressed size for string-heavy chunks and only 1.2x read speedup.

On GCS that size penalty is not free. For a string-heavy chunk of 50K items x 200 bytes (~10 MB uncompressed), pickle compresses to roughly 5 MB and msgpack to ~5.75 MB. At 100-200 MB/s GCS throughput that is 4-7 ms extra per chunk on both the scatter write and reduce read sides. At 1,000 mapper chunks in a large text job that is 8-14 seconds of extra network time, which exceeds the entire CPU-side serde gain measured locally.

Summary by workload type:

  • Numeric (token IDs, embeddings): 2-5x write speedup, size -6% to -26%. Clear win.
  • Mixed int+float+str: 4.8x write speedup, size ~same. Likely win.
  • String-heavy (HTML, JSONL text): 2.7x write speedup, size +15%. Likely neutral or negative on GCS.

Marin's actual shuffle workloads are almost entirely in the third row. Recommending we keep the format-tag infrastructure (the \x00/\x01 frame prefix and dispatch in _iter_chunk) for future use, but not default to msgpack until there is a numeric-heavy pipeline to target. Closing in favor of merging the OOM-proof scatter work first.

@hsuhanooi
Copy link
Copy Markdown
Contributor Author

Yeah not sure depending on the input whether this is worthwhile

@ravwojdyla
Copy link
Copy Markdown
Contributor

ravwojdyla commented Apr 22, 2026

Nice experiment! Thanks @hsuhanooi !

default to msgpack until there is a numeric-heavy pipeline to target. Closing in favor of merging the OOM-proof scatter work first.

Agreed. Would prefer to not introduce extra/non-default knobs at this time to avoid complexity 🙇

@hsuhanooi
Copy link
Copy Markdown
Contributor Author

🤖 Closing — the format-tag infrastructure (\x00/\x01 frame prefix and dispatch in _iter_chunk) is a useful foundation, but defaulting to msgpack does not net-positive for Marin's string-heavy workloads on GCS. See benchmark comments above.

@hsuhanooi hsuhanooi closed this Apr 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants