Skip to content

[zephyr] Eliminate coordinator-side scatter manifest consolidation#4853

Open
ravwojdyla-agent wants to merge 1 commit intomainfrom
rav/zephyr-sidecar-direct-read
Open

[zephyr] Eliminate coordinator-side scatter manifest consolidation#4853
ravwojdyla-agent wants to merge 1 commit intomainfrom
rav/zephyr-sidecar-direct-read

Conversation

@ravwojdyla-agent
Copy link
Copy Markdown
Contributor

Summary

  • Removes the coordinator's scatter-manifest consolidation step — reducers now read per-mapper .scatter_meta sidecars directly via a 32-thread pool
  • Eliminates a scaling bottleneck where the coordinator spent significant time and memory reading all sidecars, serializing a multi-MB JSON manifest, and writing it to GCS while reducers waited
  • The coordinator was also a single point of failure during this phase (OOM or preemption killed the manifest write and lost all scatter progress)

Motivation

During nemotron-v1 normalization (1.3B records, 6307 output shards), the coordinator:

  1. Read ~4000 sidecar files sequentially (via 256-thread pool, but still a single-machine bottleneck)
  2. Serialized them into a single JSON manifest
  3. Wrote the manifest to GCS
  4. Only then could reducers start

With the coordinator on preemptible VMs, this phase was the #1 cause of pipeline restarts — each preemption lost all scatter progress and forced a full redo.

Changes

  • execution.py: _regroup_result_refs passes scatter paths directly via MemChunk instead of writing a manifest
  • shuffle.py: ScatterReader.from_sidecars() replaces from_manifest(); _read_sidecars_parallel() does the 32-thread fan-out; removed _write_scatter_manifest, _read_scatter_manifest, _build_scatter_shard_from_manifest
  • plan.py: reduce stage calls ScatterShard.from_sidecars()
  • test_shuffle.py: tests use direct sidecar paths

Test plan

  • 265 zephyr tests pass (including 13 shuffle-specific tests)
  • Validated on nemotron-v1 medium-low/actual split (1.3B records) — normalize completed with 0 empty shards, peak memory 12.93 GB on 16 GB workers

🤖 Generated with Claude Code

The coordinator no longer reads per-mapper .scatter_meta sidecars or
writes a consolidated scatter_metadata manifest. Instead, the list of
scatter data-file paths is passed directly to every reducer. Each
reducer reads the sidecars itself via a 32-thread pool and filters for
its own target shard.

This eliminates a scaling bottleneck: with thousands of mappers the
coordinator spent significant time and memory reading all sidecars,
serializing a multi-MB JSON manifest, and writing it to GCS — all
while reducers waited. The coordinator was also a single point of
failure during this phase (OOM or preemption killed the manifest
write and lost all scatter progress).

Now the coordinator's only scatter-stage responsibility is collecting
the list of data-file paths (already in memory from task results) and
fanning them out. Sidecar I/O is distributed across all reducer
workers in parallel.

Changes:
- execution.py: _regroup_result_refs passes scatter paths directly
  via MemChunk instead of writing a manifest
- shuffle.py: ScatterReader.from_sidecars() replaces from_manifest();
  _read_sidecars_parallel() does the 32-thread fan-out;
  _write_scatter_manifest, _read_scatter_manifest, and
  _build_scatter_shard_from_manifest removed
- plan.py: reduce stage calls ScatterShard.from_sidecars()
- test_shuffle.py: tests use direct sidecar paths

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@ravwojdyla-agent ravwojdyla-agent added the agent-generated Created by automation/agent label Apr 17, 2026
@ravwojdyla ravwojdyla requested a review from rjpower April 17, 2026 00:35


# Per-worker caches for sidecar + manifest reads.
# Per-worker cache for sidecar reads.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these caches? Doesn't this belong in the ScatterReader?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need a cache let's put it in the worker itself, e.g. maybe a scatter reader can be reused.

@ravwojdyla
Copy link
Copy Markdown
Contributor

For posterity this change is a decision based on @rjpower benchmark from #4782 (review).

I'm re-running normalization of a nemotron split, to confirm this change is not causing regressions at scale. Will update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent-generated Created by automation/agent

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants