Skip to content

Use shared ts.Transaction for metadata consolidation#4105

Merged
yonromai merged 1 commit intomainfrom
fix/consolidate-metadata-transaction
Mar 25, 2026
Merged

Use shared ts.Transaction for metadata consolidation#4105
yonromai merged 1 commit intomainfrom
fix/consolidate-metadata-transaction

Conversation

@yonromai
Copy link
Copy Markdown
Contributor

Summary

  • Replace per-shard asyncio.run + per-shard ts.Transaction in consolidate_shard_caches with a single shared ts.Transaction wrapping all metadata writes.
  • Delete dead _extend_cache_metadata_with_other and _virtual_offset.
  • Use info["ledger"].total_num_rows (already in memory) instead of redundant source.async_len() R2 reads.
  • Eager numpy reads for source offsets/shapes instead of lazy ts.virtual_chunked views (avoids weak-reference lifetime issues across loop iterations).
  • Fix unsliced shapes write: source_array.shapes[:source_num_rows] instead of bare source_array.shapes.
  • Preserve rate-limit retry with exponential backoff on the shared transaction.

Why: each of N shards committed its own transaction on the same zarr3 write chunks → O(N) read-modify-write cycles (tensorstore#202). With 2755 shards: 4-11 hours on R2. A single transaction coalesces to ~6 chunk writes.

Test plan

  • test_consolidate_metadata_flat — round-trip with single rank-1 field (no shapes metadata)
  • test_consolidate_metadata_shaped — round-trip with multi-field exemplar including rank-2 leaf (exercises shapes metadata path)
  • Existing test_tree_store.py (17 tests) and test_new_cache.py (10 tests) pass
  • Run on a real multi-shard tokenization job on R2 to validate wall-time improvement

Closes #4100

🤖 Generated with Claude Code

@yonromai yonromai added the agent-generated Created by automation/agent label Mar 24, 2026
@yonromai
Copy link
Copy Markdown
Contributor Author

cc: @dlwh, it's not a large PR but quite a few changes.

FYI I'm looking more into the deets and testing this patch on my tokenize nemotron job.

Replace per-shard asyncio.run + per-shard transactions in
consolidate_shard_caches with a single shared ts.Transaction that
coalesces all metadata writes. Delete the now-unused per-shard
_extend_cache_metadata_with_other.

- O(num_shards) read-modify-write cycles → O(num_write_chunks)
- Use info["ledger"].total_num_rows instead of redundant async_len() reads
- Fix unsliced shapes write: shapes[:source_num_rows] vs bare shapes

Closes #4100

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@yonromai
Copy link
Copy Markdown
Contributor Author

Empirical validation: 2755-shard tokenization job on R2

Ran a full Nemotron CC hq_actual tokenization pipeline (2755 shards, R2 backend via CoreWeave) with this PR deployed. The job completed end-to-end successfully:

train pipeline complete: 746,497,814 docs, 537,620,495,269 tokens in 8130.0s

Stats written to s3://marin-na/marin/tokenized/nemotron_cc/hq_actual-5af4cc/train/.stats.json.

Before/after: metadata consolidation

Before #4105 After #4105
Time 4-11 hours (5-15 s/shard x 2755 shards, measured via py-spy) Approximately seconds (see caveat below)
Outcome Killed at shard 857/2755 after running for hours Completed twice (tokenization + cache-copy) without issue

Measurement caveat

We did not have precise timing instrumentation deployed on the cluster (the import time; logger.info(...) patch was only in the local worktree, not in the deployed image). The "approximately seconds" estimate is inferred from log timestamp gaps:

  • Tokenization metadata consolidation: py-spy showed data_size processing shard 2710 at ~01:01; the cache-copy pipeline started at 01:02:12. Metadata consolidation completed within that ~1 min window (which also includes some pipeline setup overhead).
  • Cache-copy metadata consolidation: AWS config errors from TensorStore opens appear at ~02:25; final stats logged at ~02:25:50. Consolidation fits within that ~50 s window.

Both consolidation passes (tokenization phase and cache-copy phase) completed within their respective timestamp gaps, consistent with the single-transaction approach finishing in seconds rather than hours.

Supporting log evidence

Tokenization phase completion:

  • py-spy sample at ~01:01 shows data_size on shard 2710/2755 (near-final shard)
  • Cache-copy pipeline log: 01:02:12 start timestamp
  • Metadata consolidation ran between these two points

Cache-copy phase completion:

  • TensorStore AWS config errors at ~02:25 (initial opens during consolidation)
  • Final stats written at ~02:25:50
  • Metadata consolidation ran within this window

Previous run (before #4105):

  • py-spy profile showed the coordinator stuck in consolidate_shard_caches, committing one ts.Transaction per shard
  • Measured 5-15 s per shard; at 2755 shards that extrapolates to 4-11 hours
  • Run was manually killed at shard 857 after several hours of no progress

Other observations

@yonromai yonromai marked this pull request as ready for review March 25, 2026 02:32
@yonromai yonromai requested review from dlwh and rjpower March 25, 2026 02:32
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3da360eb3e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

row = merged[info["row_offset"]]
assert row["input_ids"][0] == i, f"shard {i} data mismatch"

return merged
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep merged store reads inside TemporaryDirectory scope

_build_and_consolidate returns a live TreeStore handle from inside a with tempfile.TemporaryDirectory(...) block, so the backing directory is deleted before callers use it. In test_consolidate_metadata_shaped, merged[0] is read after return; this can fail or become flaky when TensorStore needs to fetch uncached data from disk. This makes the new test unreliable across environments and cache behavior.

Useful? React with 👍 / 👎.

@yonromai yonromai merged commit 7b93810 into main Mar 25, 2026
49 checks passed
@yonromai yonromai deleted the fix/consolidate-metadata-transaction branch March 25, 2026 13:49
yonromai added a commit that referenced this pull request Mar 25, 2026
## Summary

- Parallelizes `TreeStore.open` + `data_size` reads across shards using
`ThreadPoolExecutor(max_workers=32)` in `consolidate_shard_caches`
- These reads are independent and dominated by remote-storage round-trip
latency; parallelizing them removes the main bottleneck in the pre-copy
phase
- Cumulative offset computation remains serial (order-dependent)

Stacked on #4105. Refs #4100.

Co-authored-by: yoblin <268258002+yoblin@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Helw150 pushed a commit that referenced this pull request Apr 8, 2026
## Summary

- Replace per-shard `asyncio.run` + per-shard `ts.Transaction` in
`consolidate_shard_caches` with a single shared `ts.Transaction`
wrapping all metadata writes.
- Delete dead `_extend_cache_metadata_with_other` and `_virtual_offset`.
- Use `info["ledger"].total_num_rows` (already in memory) instead of
redundant `source.async_len()` R2 reads.
- Eager numpy reads for source offsets/shapes instead of lazy
`ts.virtual_chunked` views (avoids weak-reference lifetime issues across
loop iterations).
- Fix unsliced shapes write: `source_array.shapes[:source_num_rows]`
instead of bare `source_array.shapes`.
- Preserve rate-limit retry with exponential backoff on the shared
transaction.

**Why**: each of N shards committed its own transaction on the same
zarr3 write chunks → O(N) read-modify-write cycles (tensorstore#202).
With 2755 shards: 4-11 hours on R2. A single transaction coalesces to ~6
chunk writes.

## Test plan

- [x] `test_consolidate_metadata_flat` — round-trip with single rank-1
field (no shapes metadata)
- [x] `test_consolidate_metadata_shaped` — round-trip with multi-field
exemplar including rank-2 leaf (exercises shapes metadata path)
- [x] Existing `test_tree_store.py` (17 tests) and `test_new_cache.py`
(10 tests) pass
- [ ] Run on a real multi-shard tokenization job on R2 to validate
wall-time improvement

Closes #4100

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: yoblin <268258002+yoblin@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Helw150 pushed a commit that referenced this pull request Apr 8, 2026
## Summary

- Parallelizes `TreeStore.open` + `data_size` reads across shards using
`ThreadPoolExecutor(max_workers=32)` in `consolidate_shard_caches`
- These reads are independent and dominated by remote-storage round-trip
latency; parallelizing them removes the main bottleneck in the pre-copy
phase
- Cumulative offset computation remains serial (order-dependent)

Stacked on #4105. Refs #4100.

Co-authored-by: yoblin <268258002+yoblin@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent-generated Created by automation/agent

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Serial metadata copy in consolidate_shard_caches takes hours for large tokenization jobs

4 participants