Skip to content

Commit 2b80c94

Browse files
ravwojdyla-agentravwojdylaclaude
authored andcommitted
Bootstrap datakit - consolidate and cleanup downloads (#4142)
- Move all download modules from marin.download to marin.datakit.download - Delete old marin.download directory entirely (no shims) - Add StepSpec factory functions (download_hf_step, ar5iv_step, etc.) for each downloader - StepSpec.output_path auto-prefixes relative override paths with marin_prefix - Extract pretraining downloads (simple.py) to use download_hf_step directly - Extract nemotron_v2 dataset definitions into datakit/download/nemotron_v2.py - Extract dolma and dolmino downloads into datakit/download/dolma.py and dolmino.py - Add download_wikipedia_step with override pointing at existing data - Wire Wikipedia and ar5iv download+transform in exp934 as StepSpec deps - Remove unused modules: stream_remove_columns, filesystem, dclm_hq, normalize, tokenize - Remove unused config dataclasses (NemotronIngressConfig, DCLMHQDownloadConfig, TransferConfig) - Remove draccus CLI wrappers with zero callers - Remove global HfFileSystem() instance - Move upload_gcs_to_hf to marin.utilities (not a download step) - Move tests to tests/datakit/download/ mirroring source layout - Update all imports across experiments and lib to canonical paths - Update design doc to use Parquet instead of Vortex (notes vortex#6905) Part of #2355 --------- Co-authored-by: Rafal Wojdyla <ravwojdyla@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cbc9f58 commit 2b80c94

52 files changed

Lines changed: 878 additions & 1331 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/design/2355_datakit.md

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
Marin has most of the pieces for end-to-end data processing \- download, dedup, filtering, classification, decontamination, tokenization \- but the code is scattered across `experiments/` and `lib/marin/` with inconsistent formats, ad-hoc ID handling, and unclear provenance.
2+
3+
We propose consolidating this into **datakit**: a set of composable pipeline stages with standardized formats and conventions, living in `lib/marin/datakit/`. Dataset-specific wiring (e.g., "for Arxiv, apply these transforms") lives in `experiments/` or reference configurations.
4+
5+
6+
Links:
7+
* [marin\#2355](https://github.com/marin-community/marin/issues/2355)
8+
* [gdoc](https://docs.google.com/document/d/1kDSzONg32zv2VnCO4FJiMP0fcjRSjgP0uTDpI4_C4O0)
9+
10+
# Golden Path
11+
12+
The canonical pipeline for getting a dataset from source to training:
13+
14+
`Download → Normalize → Embed → Classify/Filter → Dedup → Tokenize`
15+
16+
Notably, datakit in the proposed form, doesn’t include **data mixing** or **training**.
17+
18+
## 1\. Download
19+
20+
Download raw dataset from Hugging Face (or other sources). Raw downloads are preserved as-is in their original format and directory structure.
21+
22+
## 2\. Normalize to Standard Format
23+
24+
Convert raw data into the **datakit standard format**:
25+
26+
* **File format**: Parquet \- columnar, widely supported, supports pushdown filters and column projection.
27+
* **Mandatory columns**:
28+
* `id` \- unique document identifier (see [ID Column](#id-column) below)
29+
* `text` \- primary text content \- we enforce UTF-8
30+
* **Arbitrary additional columns**: any fields present in the raw data are preserved
31+
* **Directory structure**: preserver original directory structure
32+
* **Partition structure**: partition layout from the source does NOT need to be preserved at this point \- and in most cases it will not be
33+
* We may want to introduce a more efficient partitioning at this stage and preserve the new partitioning until tokenization
34+
* The partitions must follow `part-x-of-y` suffix naming convention
35+
* **Sort invariant**: each partition is sorted by `id`
36+
* **Typed output:** in the code the data has typed representation via `Artifact`
37+
38+
This is the "intake" step \- all downstream stages operate on normalized Parquet datasets.
39+
40+
## 3\. Embed
41+
42+
Produce vector embeddings for each document. Output is an **attributes dataset** (see [Attributes Datasets](#attributes-datasets)) with embedding vectors keyed by `id`.
43+
44+
## 4\. Quality Classification, Topic Assignment
45+
46+
Each classifier produces an **attributes dataset** containing scores/labels keyed by `id`.
47+
48+
## 5\. Deduplication
49+
50+
Produces an **attributes dataset** marking duplicate spans or documents.
51+
52+
## 7\. Consolidation
53+
54+
Join attributes datasets back to the source documents and apply filters:
55+
56+
* Filter by classifier thresholds (e.g., quality score \> 0.8)
57+
* Remove duplicate spans/documents
58+
59+
Output is a clean, filtered Parquet dataset \- still sorted by `id`, still co-partitioned.
60+
61+
## 8\. Tokenize
62+
63+
Convert clean text into tokenized Levanter cache format.
64+
65+
**Tokenization is the boundary where per-document structure ends.** The tokenizer concatenates documents into fixed-size token sequences for efficient training. Partition structure from earlier stages does not carry through \- the output is sharded Levanter TreeStore caches with a `.stats.json` summary.
66+
67+
# Core Design Decisions
68+
69+
## Parquet as the Standard Format
70+
71+
All intermediate datasets (from normalization through consolidation) use the Parquet columnar format. Benefits:
72+
73+
* Column projection (only read the columns you need)
74+
* Filter pushdown
75+
* Efficient sorted merge joins via Zephyr
76+
* Mature ecosystem with broad tooling support
77+
78+
NOTE: We initially considered Vortex for its pushdown and lookup capabilities, but encountered blocking issues with Zephyr pipeline integration (see [vortex\#6905](https://github.com/vortex-data/vortex/issues/6905)). Parquet provides the same columnar benefits with a proven ecosystem. If Vortex matures, we can revisit.
79+
80+
## ID Column {#id-column}
81+
82+
* **Preserve existing IDs** when present in the raw data (e.g., WARC-Record-ID in DCLM, HF row indices). These carry provenance meaning and aid debugging.
83+
* But rename column to `source_id`
84+
* **Generate deterministic IDs** via content hash. Column named `id`. Deterministic hashing ensures reproducibility \- re-running the pipeline produces the same IDs, which preserves caching and diffing.
85+
86+
## Co-Partitioning Invariant
87+
88+
The key invariant that enables efficient joins: **Attributes datasets must have the same number of shards and the same key-range partitioning as their source dataset.**
89+
90+
This means:
91+
92+
* The normalization step determines the partition structure
93+
* All downstream stages (embed, classify, dedup) preserve this structure \- same shard count, same ID ranges per shard
94+
* Consolidation can use Zephyr's `sorted_merge_join` without a costly `group_by` shuffle
95+
96+
This is enforced by convention: each processing stage reads source partitions 1:1 and writes output partitions with matching structure.
97+
98+
## Attributes Datasets {#attributes-datasets}
99+
100+
Processing stages (embed, classify, dedup) produce **attributes datasets** \- lightweight Parquet files containing:
101+
102+
* `id` — matching the source document ID
103+
* Stage-specific output columns (e.g., `quality_score`, `is_duplicate`, `topic_label`)
104+
105+
Attributes datasets:
106+
107+
* Use Parquet format
108+
* Are co-partitioned with the source (same shard count and key ranges)
109+
* Are sorted by `id` within each partition
110+
* Can be joined back to source documents via `sorted_merge_join`
111+
112+
Multiple attribute datasets from different stages can be joined together during consolidation to apply compound filters.
113+
114+
## Step Orchestration via StepSpec
115+
116+
Datakit builds on `StepSpec` \- the pure-data step descriptor that captures identity, dependencies. Each datakit stage (normalize, classify, dedup, etc.) is a `StepSpec` with:
117+
118+
* **`name`**: human-readable stage name (e.g., `"fineweb/normalize"`)
119+
* **`deps`**: upstream `StepSpec`s whose `output_path` this stage reads from
120+
* **`hash_attrs`**: configuration values that affect output (model name, thresholds, etc.) — changes invalidate the cache
121+
* **`fn`**: the callable that performs the work, receiving `output_path` as its argument
122+
123+
`StepSpec` gives us automatic cache invalidation (via `hash_id` derived from name \+ attrs \+ dep paths), dependency tracking, and deterministic output paths. The step runner handles locking, heartbeats, and status \- datakit stages just describe what to run.
124+
125+
Example wiring:
126+
127+
```py
128+
download = StepSpec(
129+
name="fineweb/download",
130+
fn=lambda output_path: download_hf(output_path=output_path, dataset_id="HuggingFaceFW/fineweb"),
131+
hash_attrs={"dataset_id": "HuggingFaceFW/fineweb", "revision": "abc1234"},
132+
)
133+
134+
normalize = StepSpec(
135+
name="fineweb/normalize",
136+
deps=[download],
137+
fn=lambda output_path: normalize_to_parquet(
138+
input_path=download.output_path, output_path=output_path, text_field="text",
139+
),
140+
hash_attrs={"text_field": "text"},
141+
)
142+
143+
quality = StepSpec(
144+
name="fineweb/quality",
145+
deps=[normalize],
146+
fn=lambda output_path: classify(
147+
input_path=normalize.output_path, output_path=output_path, model="fasttext-quality-v1",
148+
),
149+
hash_attrs={"model": "fasttext-quality-v1"},
150+
)
151+
152+
dedup = StepSpec(
153+
name="fineweb/dedup",
154+
deps=[normalize],
155+
fn=lambda output_path: deduplicate(
156+
input_path=normalize.output_path, output_path=output_path, mode="fuzzy_document",
157+
),
158+
hash_attrs={"mode": "fuzzy_document"},
159+
)
160+
161+
consolidated = StepSpec(
162+
name="fineweb/consolidated",
163+
deps=[normalize, quality, dedup],
164+
fn=lambda output_path: consolidate(
165+
source_path=normalize.output_path,
166+
attribute_paths=[quality.output_path, dedup.output_path],
167+
output_path=output_path,
168+
quality_threshold=0.8,
169+
),
170+
hash_attrs={"quality_threshold": 0.8},
171+
)
172+
173+
tokenized = StepSpec(
174+
name="fineweb/tokenized",
175+
deps=[consolidated],
176+
fn=lambda output_path: tokenize(
177+
input_path=consolidated.output_path, output_path=output_path,
178+
tokenizer="meta-llama/Llama-3.1-8B",
179+
),
180+
hash_attrs={"tokenizer": "meta-llama/Llama-3.1-8B"},
181+
)
182+
```
183+
184+
# API Surface
185+
186+
## `lib/marin/datakit/`
187+
188+
Core primitives — the reusable building blocks:
189+
190+
```
191+
lib/marin/datakit/
192+
normalize # Raw format -> standard Parquet (id, text, ...)
193+
embed # Document embedding
194+
classify # Quality/topic classification
195+
dedup # Deduplication (exact + fuzzy)
196+
consolidate # Join attributes + apply filters
197+
```
198+
199+
## `experiments/` (or reference configurations)
200+
201+
Dataset-specific wiring \- which transforms to apply for a given dataset, expressed as `StepSpec` DAGs.
202+
203+
# Execution Plan
204+
205+
* Implement `datakit/normalize.py` \- standard schema definitions, ID generation, raw format to Parquet conversion with mandatory columns
206+
* Integration tests for the normalize step
207+
* Integration tests covering download, normalize, dedup and tokenize at reasonable scale
208+
* Update Grug/ferry experiment definitions to consume datakit pipeline outputs directly
209+
210+
# Non-Goals
211+
212+
* **Replacing the mixing or training APIs** \- datakit standardizes everything upstream of tokenization.
213+
* **Supporting non-text modalities** \- the initial scope is text datasets with a mandatory `text` field. Multimodal support can be added later by relaxing this constraint.
214+
215+
# Open Questions
216+
217+
1. **ID uniqueness enforcement**: Per-partition validation is cheap and will be the default. Should we also support global uniqueness checks? What's the failure mode — warn or error?
218+
2. **Non-text datasets**: Code datasets, structured data \- do we need a configurable primary field, or is `text` always sufficient?
219+
3. **Versioning**: How do we version datakit outputs so that downstream consumers (Grug) can pin to a specific processing run? `StepSpec.hash_id` provides content-based versioning, but do we need human-readable version tags as well?

experiments/common_pile/tokenize_common_pile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from experiments.defaults import default_tokenize
77
from experiments.llama import llama3_tokenizer
8-
from marin.download.huggingface.download_hf import DownloadConfig, download_hf
8+
from marin.datakit.download.huggingface import DownloadConfig, download_hf
99
from marin.execution.executor import ExecutorStep, executor_main, this_output_path
1010
from marin.processing.tokenize.data_configs import TokenizerStep, lm_mixture_data_config
1111

experiments/defaults.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
from experiments.simple_sft_config import SimpleSFTConfig
4747
from experiments.simple_train_config import SimpleTrainConfig
4848
from levanter.utils.mesh import MeshConfig
49-
from marin.download.huggingface.download_hf import DownloadConfig, download_hf
49+
from marin.datakit.download.huggingface import DownloadConfig, download_hf
5050
from marin.evaluation.evaluation_config import EvalTaskConfig
5151
from marin.execution.executor import (
5252
ExecutorStep,

experiments/eval_datasets.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import dataclasses
55

6-
from marin.download.huggingface.download_hf import DownloadConfig, download_hf
6+
from marin.datakit.download.huggingface import DownloadConfig, download_hf
77
from marin.execution.executor import ExecutorStep, executor_main, this_output_path, versioned
88
from marin.transform.huggingface.dataset_to_eval import DatasetConversionConfig, OutputFormatOptions, hf_dataset_to_jsonl
99

experiments/evals/exp1600_uncheatable_evals.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from experiments.models import ModelConfig as HFModelConfig, download_model_step
2323
from fray.cluster import ResourceConfig
2424
from levanter.compat.hf_checkpoints import HFCheckpointConverter
25-
from marin.download.uncheatable_eval.download import make_uncheatable_eval_step
25+
from marin.datakit.download.uncheatable_eval import make_uncheatable_eval_step
2626
from marin.evaluation.log_probs import default_lm_log_probs
2727
from marin.execution.executor import ExecutorStep, executor_main, output_path_of
2828
from marin.processing.tokenize import TokenizeConfig

experiments/exp934_hq_vs_pt.py

Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
datasets used by various training experiments.
99
"""
1010

11+
from marin.datakit.download.ar5iv import ar5iv_step
12+
from marin.datakit.download.wikipedia import download_wikipedia_step
1113
from marin.execution.executor import ExecutorStep, mirrored, this_output_path, versioned
14+
from marin.execution.step_spec import StepSpec
1215
from marin.schemas.web.convert import HtmlToMarkdownConfig, ResiliparseConfig
1316
from marin.schemas.web.selectors import ARXIV_BLACKLISTED_SELECTORS, WIKI_BLACKLISTED_SELECTORS
1417
from marin.transform.ar5iv.transform_ar5iv import Ar5ivExtractionConfig, process_ar5iv_dump
@@ -42,48 +45,61 @@
4245
),
4346
).with_output_path("documents/stackexchange-resiliparse-custom-fork-ab41ad")
4447

45-
# Wikipedia resiliparse custom fork step (data already exists at hardcoded path)
46-
wikipedia_resiliparse_custom_fork = (
47-
ExecutorStep(
48-
name="documents/wikipedia-resiliparse-custom-fork",
49-
fn=process_wiki_dump,
50-
config=WikiExtractionConfig(
51-
input_path=mirrored("raw/wikipedia-a7dad0/20241201", budget_gb=1),
52-
revision=versioned("20241201"),
53-
output_path=this_output_path(),
48+
_wikipedia_download = download_wikipedia_step()
49+
50+
# Wikipedia resiliparse custom fork step
51+
_wikipedia_transform = StepSpec(
52+
name="documents/wikipedia-resiliparse-custom-fork",
53+
fn=lambda output_path: process_wiki_dump(
54+
WikiExtractionConfig(
55+
input_path=f"{_wikipedia_download.output_path}/20241201",
56+
revision="20241201",
57+
output_path=output_path,
5458
extract_method="resiliparse",
5559
extract_config=ResiliparseConfig(
5660
links=False,
5761
skip_elements=WIKI_BLACKLISTED_SELECTORS,
5862
markdownify_config=HtmlToMarkdownConfig(include_images=False, include_links=False),
5963
),
60-
remove_reference_section=versioned(True),
61-
digit_threshold=versioned(50),
62-
word_threshold=versioned(70),
63-
special_char_threshold=versioned(50),
64-
),
65-
)
66-
.with_output_path("documents/wikipedia-resiliparse-custom-fork-2569de")
67-
.cd("20241201")
64+
remove_reference_section=True,
65+
digit_threshold=50,
66+
word_threshold=70,
67+
special_char_threshold=50,
68+
)
69+
),
70+
deps=[_wikipedia_download],
71+
hash_attrs={"revision": "20241201", "extract_method": "resiliparse"},
72+
override_output_path="documents/wikipedia-resiliparse-custom-fork-2569de",
6873
)
74+
wikipedia_resiliparse_custom_fork = _wikipedia_transform.as_executor_step().cd("20241201")
6975

70-
# ar5iv resiliparse custom fork step (data already exists at hardcoded path)
71-
ar5iv_no_problem_resiliparse_custom_fork = ExecutorStep(
76+
_ar5iv_download = ar5iv_step(
77+
input_path="gs://marin-us-central2/raw/ar5iv/ar5iv-04-2024-no-problem.zip",
78+
override_output_path="raw/ar5iv/ar5iv-04-2024-no-problem-49c4e3",
79+
)
80+
81+
# ar5iv resiliparse custom fork step
82+
_ar5iv_transform = StepSpec(
7283
name="documents/ar5iv/ar5iv-04-2024-no-problem",
73-
fn=process_ar5iv_dump,
74-
config=Ar5ivExtractionConfig(
75-
input_path=mirrored("raw/ar5iv/ar5iv-04-2024-no-problem-49c4e3/202404", budget_gb=1),
76-
revision="042024",
77-
output_path=this_output_path("resiliparse-custom-fork"),
78-
extract_method=versioned("resiliparse"),
79-
extract_config=ResiliparseConfig(
80-
links=versioned(False),
81-
prepend_title=True,
82-
skip_elements=ARXIV_BLACKLISTED_SELECTORS,
83-
),
84-
remove_reference_section=versioned(True),
84+
fn=lambda output_path: process_ar5iv_dump(
85+
Ar5ivExtractionConfig(
86+
input_path=f"{_ar5iv_download.output_path}/202404",
87+
revision="042024",
88+
output_path=output_path,
89+
extract_method="resiliparse",
90+
extract_config=ResiliparseConfig(
91+
links=False,
92+
prepend_title=True,
93+
skip_elements=ARXIV_BLACKLISTED_SELECTORS,
94+
),
95+
remove_reference_section=True,
96+
)
8597
),
86-
).with_output_path("documents/ar5iv/ar5iv-04-2024-no-problem-3971f")
98+
deps=[_ar5iv_download],
99+
hash_attrs={"revision": "042024", "extract_method": "resiliparse"},
100+
override_output_path="documents/ar5iv/ar5iv-04-2024-no-problem-3971f",
101+
)
102+
ar5iv_no_problem_resiliparse_custom_fork = _ar5iv_transform.as_executor_step()
87103

88104
# MMLU Science QA tokenization
89105
medu_mmlu_science_qa_tokenized = default_tokenize(

experiments/midtraining_datasets.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from experiments.common_pile.tokenize_common_pile import stackv2_edu_filtered
55
from experiments.defaults import default_download, default_tokenize
66
from experiments.llama import llama3_tokenizer
7-
from marin.download.huggingface.download_hf import DownloadConfig, download_hf
7+
from marin.datakit.download.huggingface import DownloadConfig, download_hf
88
from marin.execution import versioned
99
from marin.execution.executor import ExecutorStep, this_output_path
1010
from marin.processing.tokenize import lm_mixture_data_config

experiments/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from dataclasses import dataclass
2020

21-
from marin.download.huggingface.download_hf import DownloadConfig, download_hf
21+
from marin.datakit.download.huggingface import DownloadConfig, download_hf
2222
from marin.execution.executor import ExecutorStep, this_output_path, versioned
2323
from marin.utils import get_directory_friendly_name
2424

experiments/multilingual_fineweb2_hq/download_and_tokenize_fineweb2_hq.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from experiments.llama import llama3_tokenizer
1515
from experiments.multilingual_fineweb2_hq.constants import FINEWEB2_DATASETS
16-
from marin.download.huggingface.download_hf import DownloadConfig, download_hf
16+
from marin.datakit.download.huggingface import DownloadConfig, download_hf
1717
from marin.execution.executor import ExecutorStep, executor_main, output_path_of, this_output_path, versioned
1818
from marin.processing.tokenize import TokenizeConfig, tokenize
1919
from marin.processing.tokenize.data_configs import TokenizerStep

experiments/paloma.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import os.path
1111

12-
from marin.download.huggingface.download_hf import DownloadConfig as HfDownloadConfig, download_hf
12+
from marin.datakit.download.huggingface import DownloadConfig as HfDownloadConfig, download_hf
1313

1414
# cyclic dependency
1515
# from experiments.llama import llama3_tokenizer

0 commit comments

Comments
 (0)