Skip to content

[zephyr] OOM-proof scatter write buffer with byte-based flush budget#5055

Open
hsuhanooi wants to merge 8 commits intomarin-community:mainfrom
hsuhanooi:zephyr-alive-workers
Open

[zephyr] OOM-proof scatter write buffer with byte-based flush budget#5055
hsuhanooi wants to merge 8 commits intomarin-community:mainfrom
hsuhanooi:zephyr-alive-workers

Conversation

@hsuhanooi
Copy link
Copy Markdown
Contributor

Replace the fixed 100K-row-per-shard flush threshold in ScatterWriter with a total byte budget (default 256 MB) across all shard buffers. When the estimated total bytes exceeds the budget, the largest shard buffer is flushed immediately. This bounds write-side RSS regardless of item size or output shard count — previously, large items or many output shards could accumulate unbounded memory before close() flushed everything at once.

_wait_for_stage and _log_status each did an O(n_workers) scan of
_worker_states to count alive workers on every wakeup. With many workers
this scan runs under the coordinator lock on every shard completion event.

Add _set_worker_state() to centralise all 9 WorkerState transition sites
and maintain _alive_workers as a running count. _wait_for_stage and
_log_status now read a single int under the lock instead of scanning the
full dict.
Replace the fixed 100K-row-per-shard flush threshold in ScatterWriter with a
total byte budget (default 256 MB) across all shard buffers. When the estimated
total bytes exceeds the budget, the largest shard buffer is flushed immediately.
This bounds write-side RSS regardless of item size or output shard count —
previously, large items or many output shards could accumulate unbounded memory
before close() flushed everything at once.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 285d467f28

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread lib/zephyr/src/zephyr/shuffle.py Outdated
@ravwojdyla ravwojdyla self-requested a review April 22, 2026 17:59
…d close logging

Three tuning fixes for the byte-budget scatter writer:

1. Measure the first item with pickle.dumps before buffering starts, replacing
   the hardcoded 512-byte default. A static guess can be off by orders of
   magnitude for large documents, allowing millions of rows to accumulate
   before the first budget check fires.

2. Derive the default buffer budget from the cgroup memory limit (25% of
   container memory) rather than a fixed 256 MB, so the budget scales with
   the worker size. Falls back to 256 MB when the limit cannot be read.

3. Log tuning diagnostics at close: mid-write vs close-time flush counts,
   first-item estimate vs 100-item measured average, peak buffered row count,
   and effective budget in MB.
@ravwojdyla
Copy link
Copy Markdown
Contributor

@hsuhanooi thanks - pls let me know when this is good to review.

…sh()

The previous flush-time EMA was a closed loop: if the estimate was too low
no flush fired, so the EMA never ran, and the estimate stayed low. Skewed
datasets (small items early, large items later) could accumulate unbounded
memory without any flush triggering.

Fix: sample one item's pickle size every 10 writes and apply EMA directly in
write(), independent of whether any flush has occurred. The flush-time sample
(100 items first flush, 10 items ongoing) still runs for higher-quality
multi-item measurements when flushes do happen.

Adds test that confirms mid-write flushes fire when large items arrive after
a run of small items.
@hsuhanooi
Copy link
Copy Markdown
Contributor Author

@ravwojdyla I think this is ready now. Just wanted to make the estimator a little more sophisticated.

Layer 1 — first item (before any buffering)
write() measures the first item's pickle size and uses it as the initial _item_bytes_estimate. This prevents the bootstrap window where a hardcoded default (e.g. 512 bytes) could let millions of large-document rows accumulate before any check fires.

Layer 2 — interval sampling in write() (every 10 items)
Every 10th item gets measured and fed into an EMA (α=0.3). This is the critical layer for tracking drift — it runs regardless of whether any flush has occurred, breaking the closed loop where the flush-time EMA could never update because the estimate was too low to trigger a flush.

Layer 3 — flush-time sampling
On the first flush, 100 items are sampled to establish a high-quality baseline. On subsequent flushes, 10 items are sampled and folded into the same EMA. This provides better multi-item accuracy when flushes are already firing regularly.

The budget itself
buffer_limit_bytes defaults to 25% of the container's cgroup memory limit (via TaskResources.from_environment()), falling back to 256 MB if the limit can't be read. When _total_buffer_rows × _item_bytes_estimate exceeds the budget, the largest shard buffer is flushed — not the one that just received the item, but whichever is biggest, which is the greedy choice for freeing the most memory per flush.

What it doesn't bound
The budget is an estimate, not a hard cap. Between two write-time samples (up to 9 items), the actual buffered bytes can exceed the budget by up to 9 × max_item_size. For 100 KB items and a 10-item interval, that's ~900 KB of overshoot tolerance. If that matters for a particular workload, _ESTIMATE_WRITE_SAMPLE_INTERVAL can be tightened.

@ravwojdyla
Copy link
Copy Markdown
Contributor

@hsuhanooi nice - this is looking great and I think it will help, especially in some degenerate cases. Before we merge this we need to confirm this doesn't introduce regression at scale. Do you have access to the Iris cluster? If not I can trigger some job(s) to test this.

@hsuhanooi
Copy link
Copy Markdown
Contributor Author

I do not have access to Iris. Yeah agree, definitely need to test this more but on a small scale looks reasonable.

@ravwojdyla
Copy link
Copy Markdown
Contributor

FYI @hsuhanooi I should have some results on this tomorrow. Will keep you posted!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants