Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
7a24229
Stage datakit design doc
ravwojdyla Mar 9, 2026
2d4ab40
Add datakit download, normalize, and tokenize modules
ravwojdyla Mar 23, 2026
3f78ea6
StepSpec: auto-prefix relative override_output_path with marin_prefix
ravwojdyla Mar 25, 2026
4f84f71
Convert datakit/download to package and move HF download modules
ravwojdyla Mar 25, 2026
0a1413a
Move specialized downloaders to datakit/download/ with StepSpec facto…
ravwojdyla Mar 25, 2026
df63b15
Convert marin.download.* to backward-compat re-export shims
ravwojdyla Mar 25, 2026
23bf3c4
Extract pretraining download definitions into datakit/download/pretra…
ravwojdyla Mar 25, 2026
6ffda47
Fix mock targets in download tests to use canonical module paths
ravwojdyla Mar 25, 2026
3aa8bcc
Migrate all imports from marin.download.* to marin.datakit.download.*
ravwojdyla Mar 25, 2026
ef02bd8
Migrate imports to canonical paths and simplify download functions
ravwojdyla Mar 25, 2026
4c8f38f
Remove unused config dataclasses from download functions
ravwojdyla Mar 25, 2026
16f5c20
Update datakit design doc: use Parquet instead of Vortex
ravwojdyla Mar 25, 2026
5af3272
Remove global HfFileSystem() instance from stream_remove_columns
ravwojdyla Mar 25, 2026
f6959ec
Inline pretraining downloads into simple.py, delete pretraining.py
ravwojdyla Mar 25, 2026
ddb2037
Delete old marin.download/ shim directory
ravwojdyla Mar 25, 2026
f4f7cab
Remove unused stackexchange data files
ravwojdyla Mar 25, 2026
64fd456
Revert "Remove unused stackexchange data files"
ravwojdyla Mar 25, 2026
d603b25
Move upload_gcs_to_hf from datakit/download/ to utilities/
ravwojdyla Mar 25, 2026
642b699
Convert ar5iv into a package with its JSON data file alongside
ravwojdyla Mar 25, 2026
9242092
Delete unused filesystem transfer module
ravwojdyla Mar 25, 2026
cd65de5
Move ar5iv logic from __init__.py to download.py within the package
ravwojdyla Mar 25, 2026
7601f45
Delete unused stream_remove_columns module and its test
ravwojdyla Mar 25, 2026
64f5c48
Remove unused dclm_hq_step function
ravwojdyla Mar 25, 2026
76e3336
Simplify nemotron_cc_step to download_nemotron_cc_step
ravwojdyla Mar 25, 2026
90b9b65
Rename nemotron_cc.py to nemotron_v1.py
ravwojdyla Mar 25, 2026
1760f2f
Extract nemotron_v2 download definitions into datakit/download/nemotr…
ravwojdyla Mar 25, 2026
d03a03a
Rename nemotron step functions for consistency
ravwojdyla Mar 25, 2026
39fe0d1
Remove unnecessary __all__ from uncheatable_eval module
ravwojdyla Mar 25, 2026
c53407f
Remove unused wikipedia_step function
ravwojdyla Mar 25, 2026
a85e541
Flatten wikipedia download to plain parameters, remove draccus CLI
ravwojdyla Mar 25, 2026
e7ac5be
Remove unused draccus CLI from huggingface download module
ravwojdyla Mar 25, 2026
0c7587a
Remove backward-compat aliases from datakit/download/__init__.py
ravwojdyla Mar 25, 2026
aa2252d
Remove output_path_prefix from download_hf_step
ravwojdyla Mar 25, 2026
5556576
Remove unused datakit/tokenize.py module
ravwojdyla Mar 25, 2026
f2983ba
Remove unused datakit/normalize.py module
ravwojdyla Mar 25, 2026
764e117
Move tests/download/ to tests/datakit/download/
ravwojdyla Mar 25, 2026
05d58e2
Restore tests/datakit/__init__.py and test_datakit.py
ravwojdyla Mar 25, 2026
bed1015
Replace nemotron downloads dict with nemotron_cc_download variable
ravwojdyla Mar 25, 2026
debf1fe
Replace nemotron_cc_download global with a function
ravwojdyla Mar 25, 2026
6ba73d5
Fix typo in nemotron_v1 comment
ravwojdyla Mar 25, 2026
4b86e36
Rename huggingface.py to huggingface_utils.py, update all imports
ravwojdyla Mar 25, 2026
e2ac4de
Add override_output_path to nemotron_v2 datasets
ravwojdyla Mar 25, 2026
7c2ab1e
Revert huggingface_utils.py rename back to huggingface.py
ravwojdyla Mar 25, 2026
63de1bf
Delete unused dclm_hq download and transform modules
ravwojdyla Mar 25, 2026
38f6474
Extract dolmino download into datakit/download/dolmino.py
ravwojdyla Mar 25, 2026
83a01f8
Extract dolma download into datakit/download/dolma.py
ravwojdyla Mar 26, 2026
76b3ae8
Move DOLMA_OLMO_MIXTURE_WEIGHTS back to experiment file
ravwojdyla Mar 26, 2026
213d250
Add download_wikipedia_step with override pointing at existing data
ravwojdyla Mar 26, 2026
80b8a23
Simplify download_wikipedia_step and remove revision param
ravwojdyla Mar 26, 2026
3f85e10
Wire download_wikipedia_step into exp934 as StepSpec dependency
ravwojdyla Mar 26, 2026
5032fc6
Fix Wikipedia download override path to wikipedia-9273e1
ravwojdyla Mar 26, 2026
d99a392
Restore revision parameter in download_wikipedia
ravwojdyla Mar 26, 2026
845739f
Make revision required in download_wikipedia_step
ravwojdyla Mar 26, 2026
d549990
Wire ar5iv_step into exp934 as StepSpec dependency
ravwojdyla Mar 26, 2026
eae62fa
Remove unused download entries from simple.py
ravwojdyla Mar 26, 2026
6e75084
Address PR review comments
ravwojdyla Mar 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions docs/design/2355_datakit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
Marin has most of the pieces for end-to-end data processing \- download, dedup, filtering, classification, decontamination, tokenization \- but the code is scattered across `experiments/` and `lib/marin/` with inconsistent formats, ad-hoc ID handling, and unclear provenance.

We propose consolidating this into **datakit**: a set of composable pipeline stages with standardized formats and conventions, living in `lib/marin/datakit/`. Dataset-specific wiring (e.g., "for Arxiv, apply these transforms") lives in `experiments/` or reference configurations.


Links:
* [marin\#2355](https://github.com/marin-community/marin/issues/2355)
* [gdoc](https://docs.google.com/document/d/1kDSzONg32zv2VnCO4FJiMP0fcjRSjgP0uTDpI4_C4O0)

# Golden Path

The canonical pipeline for getting a dataset from source to training:

`Download → Normalize → Embed → Classify/Filter → Dedup → Tokenize`

Notably, datakit in the proposed form, doesn’t include **data mixing** or **training**.

## 1\. Download

Download raw dataset from Hugging Face (or other sources). Raw downloads are preserved as-is in their original format and directory structure.

## 2\. Normalize to Standard Format

Convert raw data into the **datakit standard format**:

* **File format**: Parquet \- columnar, widely supported, supports pushdown filters and column projection.
* **Mandatory columns**:
* `id` \- unique document identifier (see [ID Column](#id-column) below)
* `text` \- primary text content \- we enforce UTF-8
* **Arbitrary additional columns**: any fields present in the raw data are preserved
* **Directory structure**: preserver original directory structure
* **Partition structure**: partition layout from the source does NOT need to be preserved at this point \- and in most cases it will not be
* We may want to introduce a more efficient partitioning at this stage and preserve the new partitioning until tokenization
* The partitions must follow `part-x-of-y` suffix naming convention
* **Sort invariant**: each partition is sorted by `id`
* **Typed output:** in the code the data has typed representation via `Artifact`

This is the "intake" step \- all downstream stages operate on normalized Parquet datasets.

## 3\. Embed

Produce vector embeddings for each document. Output is an **attributes dataset** (see [Attributes Datasets](#attributes-datasets)) with embedding vectors keyed by `id`.

## 4\. Quality Classification, Topic Assignment

Each classifier produces an **attributes dataset** containing scores/labels keyed by `id`.

## 5\. Deduplication

Produces an **attributes dataset** marking duplicate spans or documents.

## 7\. Consolidation

Join attributes datasets back to the source documents and apply filters:

* Filter by classifier thresholds (e.g., quality score \> 0.8)
* Remove duplicate spans/documents

Output is a clean, filtered Parquet dataset \- still sorted by `id`, still co-partitioned.

## 8\. Tokenize

Convert clean text into tokenized Levanter cache format.

**Tokenization is the boundary where per-document structure ends.** The tokenizer concatenates documents into fixed-size token sequences for efficient training. Partition structure from earlier stages does not carry through \- the output is sharded Levanter TreeStore caches with a `.stats.json` summary.

# Core Design Decisions

## Parquet as the Standard Format

All intermediate datasets (from normalization through consolidation) use the Parquet columnar format. Benefits:

* Column projection (only read the columns you need)
* Filter pushdown
* Efficient sorted merge joins via Zephyr
* Mature ecosystem with broad tooling support

NOTE: We initially considered Vortex for its pushdown and lookup capabilities, but encountered blocking issues with Zephyr pipeline integration (see [vortex\#6905](https://github.com/vortex-data/vortex/issues/6905)). Parquet provides the same columnar benefits with a proven ecosystem. If Vortex matures, we can revisit.

## ID Column {#id-column}

* **Preserve existing IDs** when present in the raw data (e.g., WARC-Record-ID in DCLM, HF row indices). These carry provenance meaning and aid debugging.
* But rename column to `source_id`
* **Generate deterministic IDs** via content hash. Column named `id`. Deterministic hashing ensures reproducibility \- re-running the pipeline produces the same IDs, which preserves caching and diffing.

## Co-Partitioning Invariant

The key invariant that enables efficient joins: **Attributes datasets must have the same number of shards and the same key-range partitioning as their source dataset.**

This means:

* The normalization step determines the partition structure
* All downstream stages (embed, classify, dedup) preserve this structure \- same shard count, same ID ranges per shard
* Consolidation can use Zephyr's `sorted_merge_join` without a costly `group_by` shuffle

This is enforced by convention: each processing stage reads source partitions 1:1 and writes output partitions with matching structure.

## Attributes Datasets {#attributes-datasets}

Processing stages (embed, classify, dedup) produce **attributes datasets** \- lightweight Parquet files containing:

* `id` — matching the source document ID
* Stage-specific output columns (e.g., `quality_score`, `is_duplicate`, `topic_label`)

Attributes datasets:

* Use Parquet format
* Are co-partitioned with the source (same shard count and key ranges)
* Are sorted by `id` within each partition
* Can be joined back to source documents via `sorted_merge_join`

Multiple attribute datasets from different stages can be joined together during consolidation to apply compound filters.

## Step Orchestration via StepSpec

Datakit builds on `StepSpec` \- the pure-data step descriptor that captures identity, dependencies. Each datakit stage (normalize, classify, dedup, etc.) is a `StepSpec` with:

* **`name`**: human-readable stage name (e.g., `"fineweb/normalize"`)
* **`deps`**: upstream `StepSpec`s whose `output_path` this stage reads from
* **`hash_attrs`**: configuration values that affect output (model name, thresholds, etc.) — changes invalidate the cache
* **`fn`**: the callable that performs the work, receiving `output_path` as its argument

`StepSpec` gives us automatic cache invalidation (via `hash_id` derived from name \+ attrs \+ dep paths), dependency tracking, and deterministic output paths. The step runner handles locking, heartbeats, and status \- datakit stages just describe what to run.

Example wiring:

```py
download = StepSpec(
name="fineweb/download",
fn=lambda output_path: download_hf(output_path=output_path, dataset_id="HuggingFaceFW/fineweb"),
hash_attrs={"dataset_id": "HuggingFaceFW/fineweb", "revision": "abc1234"},
)

normalize = StepSpec(
name="fineweb/normalize",
deps=[download],
fn=lambda output_path: normalize_to_parquet(
input_path=download.output_path, output_path=output_path, text_field="text",
),
hash_attrs={"text_field": "text"},
)

quality = StepSpec(
name="fineweb/quality",
deps=[normalize],
fn=lambda output_path: classify(
input_path=normalize.output_path, output_path=output_path, model="fasttext-quality-v1",
),
hash_attrs={"model": "fasttext-quality-v1"},
)

dedup = StepSpec(
name="fineweb/dedup",
deps=[normalize],
fn=lambda output_path: deduplicate(
input_path=normalize.output_path, output_path=output_path, mode="fuzzy_document",
),
hash_attrs={"mode": "fuzzy_document"},
)

consolidated = StepSpec(
name="fineweb/consolidated",
deps=[normalize, quality, dedup],
fn=lambda output_path: consolidate(
source_path=normalize.output_path,
attribute_paths=[quality.output_path, dedup.output_path],
output_path=output_path,
quality_threshold=0.8,
),
hash_attrs={"quality_threshold": 0.8},
)

tokenized = StepSpec(
name="fineweb/tokenized",
deps=[consolidated],
fn=lambda output_path: tokenize(
input_path=consolidated.output_path, output_path=output_path,
tokenizer="meta-llama/Llama-3.1-8B",
),
hash_attrs={"tokenizer": "meta-llama/Llama-3.1-8B"},
)
```

# API Surface

## `lib/marin/datakit/`

Core primitives — the reusable building blocks:

```
lib/marin/datakit/
normalize # Raw format -> standard Parquet (id, text, ...)
embed # Document embedding
classify # Quality/topic classification
dedup # Deduplication (exact + fuzzy)
consolidate # Join attributes + apply filters
```

## `experiments/` (or reference configurations)

Dataset-specific wiring \- which transforms to apply for a given dataset, expressed as `StepSpec` DAGs.

# Execution Plan

* Implement `datakit/normalize.py` \- standard schema definitions, ID generation, raw format to Parquet conversion with mandatory columns
* Integration tests for the normalize step
* Integration tests covering download, normalize, dedup and tokenize at reasonable scale
* Update Grug/ferry experiment definitions to consume datakit pipeline outputs directly

# Non-Goals

* **Replacing the mixing or training APIs** \- datakit standardizes everything upstream of tokenization.
* **Supporting non-text modalities** \- the initial scope is text datasets with a mandatory `text` field. Multimodal support can be added later by relaxing this constraint.

# Open Questions

1. **ID uniqueness enforcement**: Per-partition validation is cheap and will be the default. Should we also support global uniqueness checks? What's the failure mode — warn or error?
2. **Non-text datasets**: Code datasets, structured data \- do we need a configurable primary field, or is `text` always sufficient?
3. **Versioning**: How do we version datakit outputs so that downstream consumers (Grug) can pin to a specific processing run? `StepSpec.hash_id` provides content-based versioning, but do we need human-readable version tags as well?
2 changes: 1 addition & 1 deletion experiments/common_pile/tokenize_common_pile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from experiments.defaults import default_tokenize
from experiments.llama import llama3_tokenizer
from marin.download.huggingface.download_hf import DownloadConfig, download_hf
from marin.datakit.download.huggingface import DownloadConfig, download_hf
from marin.execution.executor import ExecutorStep, executor_main, this_output_path
from marin.processing.tokenize.data_configs import TokenizerStep, lm_mixture_data_config

Expand Down
2 changes: 1 addition & 1 deletion experiments/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from experiments.simple_sft_config import SimpleSFTConfig
from experiments.simple_train_config import SimpleTrainConfig
from levanter.utils.mesh import MeshConfig
from marin.download.huggingface.download_hf import DownloadConfig, download_hf
from marin.datakit.download.huggingface import DownloadConfig, download_hf
from marin.evaluation.evaluation_config import EvalTaskConfig
from marin.execution.executor import (
ExecutorStep,
Expand Down
2 changes: 1 addition & 1 deletion experiments/eval_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import dataclasses

from marin.download.huggingface.download_hf import DownloadConfig, download_hf
from marin.datakit.download.huggingface import DownloadConfig, download_hf
from marin.execution.executor import ExecutorStep, executor_main, this_output_path, versioned
from marin.transform.huggingface.dataset_to_eval import DatasetConversionConfig, OutputFormatOptions, hf_dataset_to_jsonl

Expand Down
2 changes: 1 addition & 1 deletion experiments/evals/exp1600_uncheatable_evals.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from experiments.models import ModelConfig as HFModelConfig, download_model_step
from fray.cluster import ResourceConfig
from levanter.compat.hf_checkpoints import HFCheckpointConverter
from marin.download.uncheatable_eval.download import make_uncheatable_eval_step
from marin.datakit.download.uncheatable_eval import make_uncheatable_eval_step
from marin.evaluation.log_probs import default_lm_log_probs
from marin.execution.executor import ExecutorStep, executor_main, output_path_of
from marin.processing.tokenize import TokenizeConfig
Expand Down
80 changes: 48 additions & 32 deletions experiments/exp934_hq_vs_pt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
datasets used by various training experiments.
"""

from marin.datakit.download.ar5iv import ar5iv_step
from marin.datakit.download.wikipedia import download_wikipedia_step
from marin.execution.executor import ExecutorStep, mirrored, this_output_path, versioned
from marin.execution.step_spec import StepSpec
from marin.schemas.web.convert import HtmlToMarkdownConfig, ResiliparseConfig
from marin.schemas.web.selectors import ARXIV_BLACKLISTED_SELECTORS, WIKI_BLACKLISTED_SELECTORS
from marin.transform.ar5iv.transform_ar5iv import Ar5ivExtractionConfig, process_ar5iv_dump
Expand Down Expand Up @@ -42,48 +45,61 @@
),
).with_output_path("documents/stackexchange-resiliparse-custom-fork-ab41ad")

# Wikipedia resiliparse custom fork step (data already exists at hardcoded path)
wikipedia_resiliparse_custom_fork = (
ExecutorStep(
name="documents/wikipedia-resiliparse-custom-fork",
fn=process_wiki_dump,
config=WikiExtractionConfig(
input_path=mirrored("raw/wikipedia-a7dad0/20241201", budget_gb=1),
revision=versioned("20241201"),
output_path=this_output_path(),
_wikipedia_download = download_wikipedia_step()

# Wikipedia resiliparse custom fork step
_wikipedia_transform = StepSpec(
name="documents/wikipedia-resiliparse-custom-fork",
fn=lambda output_path: process_wiki_dump(
WikiExtractionConfig(
input_path=f"{_wikipedia_download.output_path}/20241201",
revision="20241201",
output_path=output_path,
extract_method="resiliparse",
extract_config=ResiliparseConfig(
links=False,
skip_elements=WIKI_BLACKLISTED_SELECTORS,
markdownify_config=HtmlToMarkdownConfig(include_images=False, include_links=False),
),
remove_reference_section=versioned(True),
digit_threshold=versioned(50),
word_threshold=versioned(70),
special_char_threshold=versioned(50),
),
)
.with_output_path("documents/wikipedia-resiliparse-custom-fork-2569de")
.cd("20241201")
remove_reference_section=True,
digit_threshold=50,
word_threshold=70,
special_char_threshold=50,
)
),
deps=[_wikipedia_download],
hash_attrs={"revision": "20241201", "extract_method": "resiliparse"},
override_output_path="documents/wikipedia-resiliparse-custom-fork-2569de",
)
wikipedia_resiliparse_custom_fork = _wikipedia_transform.as_executor_step().cd("20241201")

# ar5iv resiliparse custom fork step (data already exists at hardcoded path)
ar5iv_no_problem_resiliparse_custom_fork = ExecutorStep(
_ar5iv_download = ar5iv_step(
input_path="gs://marin-us-central2/raw/ar5iv/ar5iv-04-2024-no-problem.zip",
override_output_path="raw/ar5iv/ar5iv-04-2024-no-problem-49c4e3",
)

# ar5iv resiliparse custom fork step
_ar5iv_transform = StepSpec(
name="documents/ar5iv/ar5iv-04-2024-no-problem",
fn=process_ar5iv_dump,
config=Ar5ivExtractionConfig(
input_path=mirrored("raw/ar5iv/ar5iv-04-2024-no-problem-49c4e3/202404", budget_gb=1),
revision="042024",
output_path=this_output_path("resiliparse-custom-fork"),
extract_method=versioned("resiliparse"),
extract_config=ResiliparseConfig(
links=versioned(False),
prepend_title=True,
skip_elements=ARXIV_BLACKLISTED_SELECTORS,
),
remove_reference_section=versioned(True),
fn=lambda output_path: process_ar5iv_dump(
Ar5ivExtractionConfig(
input_path=f"{_ar5iv_download.output_path}/202404",
revision="042024",
output_path=output_path,
extract_method="resiliparse",
extract_config=ResiliparseConfig(
links=False,
prepend_title=True,
skip_elements=ARXIV_BLACKLISTED_SELECTORS,
),
remove_reference_section=True,
)
),
).with_output_path("documents/ar5iv/ar5iv-04-2024-no-problem-3971f")
deps=[_ar5iv_download],
hash_attrs={"revision": "042024", "extract_method": "resiliparse"},
override_output_path="documents/ar5iv/ar5iv-04-2024-no-problem-3971f",
)
ar5iv_no_problem_resiliparse_custom_fork = _ar5iv_transform.as_executor_step()

# MMLU Science QA tokenization
medu_mmlu_science_qa_tokenized = default_tokenize(
Expand Down
2 changes: 1 addition & 1 deletion experiments/midtraining_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from experiments.common_pile.tokenize_common_pile import stackv2_edu_filtered
from experiments.defaults import default_download, default_tokenize
from experiments.llama import llama3_tokenizer
from marin.download.huggingface.download_hf import DownloadConfig, download_hf
from marin.datakit.download.huggingface import DownloadConfig, download_hf
from marin.execution import versioned
from marin.execution.executor import ExecutorStep, this_output_path
from marin.processing.tokenize import lm_mixture_data_config
Expand Down
2 changes: 1 addition & 1 deletion experiments/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from dataclasses import dataclass

from marin.download.huggingface.download_hf import DownloadConfig, download_hf
from marin.datakit.download.huggingface import DownloadConfig, download_hf
from marin.execution.executor import ExecutorStep, this_output_path, versioned
from marin.utils import get_directory_friendly_name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from experiments.llama import llama3_tokenizer
from experiments.multilingual_fineweb2_hq.constants import FINEWEB2_DATASETS
from marin.download.huggingface.download_hf import DownloadConfig, download_hf
from marin.datakit.download.huggingface import DownloadConfig, download_hf
from marin.execution.executor import ExecutorStep, executor_main, output_path_of, this_output_path, versioned
from marin.processing.tokenize import TokenizeConfig, tokenize
from marin.processing.tokenize.data_configs import TokenizerStep
Expand Down
2 changes: 1 addition & 1 deletion experiments/paloma.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import os.path

from marin.download.huggingface.download_hf import DownloadConfig as HfDownloadConfig, download_hf
from marin.datakit.download.huggingface import DownloadConfig as HfDownloadConfig, download_hf

# cyclic dependency
# from experiments.llama import llama3_tokenizer
Expand Down
Loading
Loading