consolidate: map-side merge join + flat kwargs api#4758
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fd3b67e212
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| attr_paths = [_resolve_attribute_path(input_path, inp, filt, filetype) for inp in input_paths] | ||
| attr_paths = [p for p in attr_paths if p is not None] | ||
|
|
There was a problem hiding this comment.
Fail on missing attribute shards for percentile thresholds
When keep_fraction is used, missing attribute files are silently removed from attr_paths and the threshold is computed from only the remaining shards. In the case of a bad/missing shard path, this produces a biased global threshold and can over/under-filter the rest of the dataset without surfacing an error. The previous implementation would fail on missing paths, so this change introduces a silent correctness regression for percentile-based filtering.
Useful? React with 👍 / 👎.
| # Shard has __iter__, iterate to get the single file path | ||
| input_path = next(iter(shard)) | ||
| corpus_type = "dclm" if "dclm" in input_path else "default" | ||
| class _PeekableIter: |
There was a problem hiding this comment.
this seems fine though i didn't quite follow the use.
using a peek buffer of a one element list would get rid of the need for the "has_peek"/_peeked and probably be a bit simpler
* Switch consolidate from a per-shard in-memory hash join to a streaming sorted-merge join. Datakit guarantees attribute files share the input file partitioning (1:1 file pairing, sorted by id), so each shard can join with constant memory per filter regardless of row count. Works for all filter types (CLASSIFY, REMOVE_DOC, REMOVE_SPANS). * Replace the ConsolidateConfig dataclass with flat keyword arguments on consolidate() and calculate_percentile_thresholds() for a lighter API. * Update the datakit ferry, the consolidate unit tests, and the marin integration test (switching the consolidate step from ExecutorStep+config to StepSpec+kwargs). Validated on the marin Iris cluster: tests/integration_test.py runs through html-to-md -> dedup_exact_paragraph -> consolidate (REMOVE_SPANS via map-side join) -> tokenize -> train end-to-end. The consolidate step in the smoke ferry went from 379s to ~104s on FineWeb-Edu sample/10BT (-73%). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace the custom streaming merge walk inside process_file_shard with N chained Zephyr sorted_merge_join ops — one left join per filter, each with its keep/mutate/drop logic encoded in the combiner. A filter between joins drops rows the combiner rejected (as None) so the next join's key extractor never sees them. The empty-text check for REMOVE_SPANS moves into the REMOVE_SPANS combiner so all per-filter semantics live in one place. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
fd3b67e to
73a2b3a
Compare
CLASSIFY was only exercised by tests and a broken quickstart yaml (no CLI backs it). The two real callers — datakit_ferry and the integration test — use REMOVE_DOC and REMOVE_SPANS. Removing the dead path drops ddsketch and the percentile threshold code with it.
Datakit docs all use a flat "id" field, so the corpus_type dispatch (default vs. dclm nested WARC-Record-ID) was dead configurability. Dropping it lets the join key directly on r["id"]. Also returns the execution result so callers can inspect written shards instead of parsing log output.
sorted_merge_joinops — one left join per filter, no shuffleid) 1CLASSIFY/REMOVE_DOC: returnleftorNoneREMOVE_SPANS: return the span-stripped doc, orNoneif text was stripped to emptyfilter(lambda r: r is not None)between joins drops rejected rows so the next join's key extractor never seesNoneConsolidateConfig;consolidate(...)andcalculate_percentile_thresholds(...)now take flat kwargs (input_path,output_path,filters,filetype,worker_resources)experiments/ferries/datakit_ferry.py,tests/processing/classification/test_consolidate.py,tests/integration_test.pyExecutorStep+configtoStepSpec+ kwargsmarinIris cluster viatests/integration_test.py(html-to-md → dedup → consolidate (REMOVE_SPANS) → tokenize → train)sample/10BT(-73%) 2Footnotes
previously the per-shard hash join already relied on this implicitly via
rebase_file_pathpath pairing;sorted_merge_joinmakes the invariant explicit at the framework level and swaps the hash table for a streaming merge. ↩measured on a prior iteration that used an inline streaming merge walk instead of
sorted_merge_join— expected to be similar since both are map-side. ↩