diff --git a/docs/design/2355_datakit.md b/docs/design/2355_datakit.md new file mode 100644 index 0000000000..0cb3ac8327 --- /dev/null +++ b/docs/design/2355_datakit.md @@ -0,0 +1,219 @@ +Marin has most of the pieces for end-to-end data processing \- download, dedup, filtering, classification, decontamination, tokenization \- but the code is scattered across `experiments/` and `lib/marin/` with inconsistent formats, ad-hoc ID handling, and unclear provenance. + +We propose consolidating this into **datakit**: a set of composable pipeline stages with standardized formats and conventions, living in `lib/marin/datakit/`. Dataset-specific wiring (e.g., "for Arxiv, apply these transforms") lives in `experiments/` or reference configurations. + + +Links: + * [marin\#2355](https://github.com/marin-community/marin/issues/2355) + * [gdoc](https://docs.google.com/document/d/1kDSzONg32zv2VnCO4FJiMP0fcjRSjgP0uTDpI4_C4O0) + +# Golden Path + +The canonical pipeline for getting a dataset from source to training: + +`Download → Normalize → Embed → Classify/Filter → Dedup → Tokenize` + +Notably, datakit in the proposed form, doesn’t include **data mixing** or **training**. + +## 1\. Download + +Download raw dataset from Hugging Face (or other sources). Raw downloads are preserved as-is in their original format and directory structure. + +## 2\. Normalize to Standard Format + +Convert raw data into the **datakit standard format**: + +* **File format**: Parquet \- columnar, widely supported, supports pushdown filters and column projection. +* **Mandatory columns**: + * `id` \- unique document identifier (see [ID Column](#id-column) below) + * `text` \- primary text content \- we enforce UTF-8 +* **Arbitrary additional columns**: any fields present in the raw data are preserved +* **Directory structure**: preserver original directory structure +* **Partition structure**: partition layout from the source does NOT need to be preserved at this point \- and in most cases it will not be + * We may want to introduce a more efficient partitioning at this stage and preserve the new partitioning until tokenization + * The partitions must follow `part-x-of-y` suffix naming convention +* **Sort invariant**: each partition is sorted by `id` +* **Typed output:** in the code the data has typed representation via `Artifact` + +This is the "intake" step \- all downstream stages operate on normalized Parquet datasets. + +## 3\. Embed + +Produce vector embeddings for each document. Output is an **attributes dataset** (see [Attributes Datasets](#attributes-datasets)) with embedding vectors keyed by `id`. + +## 4\. Quality Classification, Topic Assignment + +Each classifier produces an **attributes dataset** containing scores/labels keyed by `id`. + +## 5\. Deduplication + +Produces an **attributes dataset** marking duplicate spans or documents. + +## 7\. Consolidation + +Join attributes datasets back to the source documents and apply filters: + +* Filter by classifier thresholds (e.g., quality score \> 0.8) +* Remove duplicate spans/documents + +Output is a clean, filtered Parquet dataset \- still sorted by `id`, still co-partitioned. + +## 8\. Tokenize + +Convert clean text into tokenized Levanter cache format. + +**Tokenization is the boundary where per-document structure ends.** The tokenizer concatenates documents into fixed-size token sequences for efficient training. Partition structure from earlier stages does not carry through \- the output is sharded Levanter TreeStore caches with a `.stats.json` summary. + +# Core Design Decisions + +## Parquet as the Standard Format + +All intermediate datasets (from normalization through consolidation) use the Parquet columnar format. Benefits: + +* Column projection (only read the columns you need) +* Filter pushdown +* Efficient sorted merge joins via Zephyr +* Mature ecosystem with broad tooling support + +NOTE: We initially considered Vortex for its pushdown and lookup capabilities, but encountered blocking issues with Zephyr pipeline integration (see [vortex\#6905](https://github.com/vortex-data/vortex/issues/6905)). Parquet provides the same columnar benefits with a proven ecosystem. If Vortex matures, we can revisit. + +## ID Column {#id-column} + +* **Preserve existing IDs** when present in the raw data (e.g., WARC-Record-ID in DCLM, HF row indices). These carry provenance meaning and aid debugging. + * But rename column to `source_id` +* **Generate deterministic IDs** via content hash. Column named `id`. Deterministic hashing ensures reproducibility \- re-running the pipeline produces the same IDs, which preserves caching and diffing. + +## Co-Partitioning Invariant + +The key invariant that enables efficient joins: **Attributes datasets must have the same number of shards and the same key-range partitioning as their source dataset.** + +This means: + +* The normalization step determines the partition structure +* All downstream stages (embed, classify, dedup) preserve this structure \- same shard count, same ID ranges per shard +* Consolidation can use Zephyr's `sorted_merge_join` without a costly `group_by` shuffle + +This is enforced by convention: each processing stage reads source partitions 1:1 and writes output partitions with matching structure. + +## Attributes Datasets {#attributes-datasets} + +Processing stages (embed, classify, dedup) produce **attributes datasets** \- lightweight Parquet files containing: + +* `id` — matching the source document ID +* Stage-specific output columns (e.g., `quality_score`, `is_duplicate`, `topic_label`) + +Attributes datasets: + +* Use Parquet format +* Are co-partitioned with the source (same shard count and key ranges) +* Are sorted by `id` within each partition +* Can be joined back to source documents via `sorted_merge_join` + +Multiple attribute datasets from different stages can be joined together during consolidation to apply compound filters. + +## Step Orchestration via StepSpec + +Datakit builds on `StepSpec` \- the pure-data step descriptor that captures identity, dependencies. Each datakit stage (normalize, classify, dedup, etc.) is a `StepSpec` with: + +* **`name`**: human-readable stage name (e.g., `"fineweb/normalize"`) +* **`deps`**: upstream `StepSpec`s whose `output_path` this stage reads from +* **`hash_attrs`**: configuration values that affect output (model name, thresholds, etc.) — changes invalidate the cache +* **`fn`**: the callable that performs the work, receiving `output_path` as its argument + +`StepSpec` gives us automatic cache invalidation (via `hash_id` derived from name \+ attrs \+ dep paths), dependency tracking, and deterministic output paths. The step runner handles locking, heartbeats, and status \- datakit stages just describe what to run. + +Example wiring: + +```py +download = StepSpec( + name="fineweb/download", + fn=lambda output_path: download_hf(output_path=output_path, dataset_id="HuggingFaceFW/fineweb"), + hash_attrs={"dataset_id": "HuggingFaceFW/fineweb", "revision": "abc1234"}, +) + +normalize = StepSpec( + name="fineweb/normalize", + deps=[download], + fn=lambda output_path: normalize_to_parquet( + input_path=download.output_path, output_path=output_path, text_field="text", + ), + hash_attrs={"text_field": "text"}, +) + +quality = StepSpec( + name="fineweb/quality", + deps=[normalize], + fn=lambda output_path: classify( + input_path=normalize.output_path, output_path=output_path, model="fasttext-quality-v1", + ), + hash_attrs={"model": "fasttext-quality-v1"}, +) + +dedup = StepSpec( + name="fineweb/dedup", + deps=[normalize], + fn=lambda output_path: deduplicate( + input_path=normalize.output_path, output_path=output_path, mode="fuzzy_document", + ), + hash_attrs={"mode": "fuzzy_document"}, +) + +consolidated = StepSpec( + name="fineweb/consolidated", + deps=[normalize, quality, dedup], + fn=lambda output_path: consolidate( + source_path=normalize.output_path, + attribute_paths=[quality.output_path, dedup.output_path], + output_path=output_path, + quality_threshold=0.8, + ), + hash_attrs={"quality_threshold": 0.8}, +) + +tokenized = StepSpec( + name="fineweb/tokenized", + deps=[consolidated], + fn=lambda output_path: tokenize( + input_path=consolidated.output_path, output_path=output_path, + tokenizer="meta-llama/Llama-3.1-8B", + ), + hash_attrs={"tokenizer": "meta-llama/Llama-3.1-8B"}, +) +``` + +# API Surface + +## `lib/marin/datakit/` + +Core primitives — the reusable building blocks: + +``` +lib/marin/datakit/ + normalize # Raw format -> standard Parquet (id, text, ...) + embed # Document embedding + classify # Quality/topic classification + dedup # Deduplication (exact + fuzzy) + consolidate # Join attributes + apply filters +``` + +## `experiments/` (or reference configurations) + +Dataset-specific wiring \- which transforms to apply for a given dataset, expressed as `StepSpec` DAGs. + +# Execution Plan + +* Implement `datakit/normalize.py` \- standard schema definitions, ID generation, raw format to Parquet conversion with mandatory columns +* Integration tests for the normalize step +* Integration tests covering download, normalize, dedup and tokenize at reasonable scale +* Update Grug/ferry experiment definitions to consume datakit pipeline outputs directly + +# Non-Goals + +* **Replacing the mixing or training APIs** \- datakit standardizes everything upstream of tokenization. +* **Supporting non-text modalities** \- the initial scope is text datasets with a mandatory `text` field. Multimodal support can be added later by relaxing this constraint. + +# Open Questions + +1. **ID uniqueness enforcement**: Per-partition validation is cheap and will be the default. Should we also support global uniqueness checks? What's the failure mode — warn or error? +2. **Non-text datasets**: Code datasets, structured data \- do we need a configurable primary field, or is `text` always sufficient? +3. **Versioning**: How do we version datakit outputs so that downstream consumers (Grug) can pin to a specific processing run? `StepSpec.hash_id` provides content-based versioning, but do we need human-readable version tags as well? diff --git a/experiments/common_pile/tokenize_common_pile.py b/experiments/common_pile/tokenize_common_pile.py index cf90e364ee..faee07fc76 100644 --- a/experiments/common_pile/tokenize_common_pile.py +++ b/experiments/common_pile/tokenize_common_pile.py @@ -5,7 +5,7 @@ from experiments.defaults import default_tokenize from experiments.llama import llama3_tokenizer -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.huggingface import DownloadConfig, download_hf from marin.execution.executor import ExecutorStep, executor_main, this_output_path from marin.processing.tokenize.data_configs import TokenizerStep, lm_mixture_data_config diff --git a/experiments/defaults.py b/experiments/defaults.py index 2636c945b5..01e9583442 100644 --- a/experiments/defaults.py +++ b/experiments/defaults.py @@ -46,7 +46,7 @@ from experiments.simple_sft_config import SimpleSFTConfig from experiments.simple_train_config import SimpleTrainConfig from levanter.utils.mesh import MeshConfig -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.huggingface import DownloadConfig, download_hf from marin.evaluation.evaluation_config import EvalTaskConfig from marin.execution.executor import ( ExecutorStep, diff --git a/experiments/eval_datasets.py b/experiments/eval_datasets.py index 1a79a4a994..f55df8b3fc 100644 --- a/experiments/eval_datasets.py +++ b/experiments/eval_datasets.py @@ -3,7 +3,7 @@ import dataclasses -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.huggingface import DownloadConfig, download_hf from marin.execution.executor import ExecutorStep, executor_main, this_output_path, versioned from marin.transform.huggingface.dataset_to_eval import DatasetConversionConfig, OutputFormatOptions, hf_dataset_to_jsonl diff --git a/experiments/evals/exp1600_uncheatable_evals.py b/experiments/evals/exp1600_uncheatable_evals.py index 50f57d63df..e2787a3351 100644 --- a/experiments/evals/exp1600_uncheatable_evals.py +++ b/experiments/evals/exp1600_uncheatable_evals.py @@ -22,7 +22,7 @@ from experiments.models import ModelConfig as HFModelConfig, download_model_step from fray.cluster import ResourceConfig from levanter.compat.hf_checkpoints import HFCheckpointConverter -from marin.download.uncheatable_eval.download import make_uncheatable_eval_step +from marin.datakit.download.uncheatable_eval import make_uncheatable_eval_step from marin.evaluation.log_probs import default_lm_log_probs from marin.execution.executor import ExecutorStep, executor_main, output_path_of from marin.processing.tokenize import TokenizeConfig diff --git a/experiments/exp934_hq_vs_pt.py b/experiments/exp934_hq_vs_pt.py index 56a385cb32..d062454ae3 100644 --- a/experiments/exp934_hq_vs_pt.py +++ b/experiments/exp934_hq_vs_pt.py @@ -8,7 +8,10 @@ datasets used by various training experiments. """ +from marin.datakit.download.ar5iv import ar5iv_step +from marin.datakit.download.wikipedia import download_wikipedia_step from marin.execution.executor import ExecutorStep, mirrored, this_output_path, versioned +from marin.execution.step_spec import StepSpec from marin.schemas.web.convert import HtmlToMarkdownConfig, ResiliparseConfig from marin.schemas.web.selectors import ARXIV_BLACKLISTED_SELECTORS, WIKI_BLACKLISTED_SELECTORS from marin.transform.ar5iv.transform_ar5iv import Ar5ivExtractionConfig, process_ar5iv_dump @@ -42,48 +45,61 @@ ), ).with_output_path("documents/stackexchange-resiliparse-custom-fork-ab41ad") -# Wikipedia resiliparse custom fork step (data already exists at hardcoded path) -wikipedia_resiliparse_custom_fork = ( - ExecutorStep( - name="documents/wikipedia-resiliparse-custom-fork", - fn=process_wiki_dump, - config=WikiExtractionConfig( - input_path=mirrored("raw/wikipedia-a7dad0/20241201", budget_gb=1), - revision=versioned("20241201"), - output_path=this_output_path(), +_wikipedia_download = download_wikipedia_step() + +# Wikipedia resiliparse custom fork step +_wikipedia_transform = StepSpec( + name="documents/wikipedia-resiliparse-custom-fork", + fn=lambda output_path: process_wiki_dump( + WikiExtractionConfig( + input_path=f"{_wikipedia_download.output_path}/20241201", + revision="20241201", + output_path=output_path, extract_method="resiliparse", extract_config=ResiliparseConfig( links=False, skip_elements=WIKI_BLACKLISTED_SELECTORS, markdownify_config=HtmlToMarkdownConfig(include_images=False, include_links=False), ), - remove_reference_section=versioned(True), - digit_threshold=versioned(50), - word_threshold=versioned(70), - special_char_threshold=versioned(50), - ), - ) - .with_output_path("documents/wikipedia-resiliparse-custom-fork-2569de") - .cd("20241201") + remove_reference_section=True, + digit_threshold=50, + word_threshold=70, + special_char_threshold=50, + ) + ), + deps=[_wikipedia_download], + hash_attrs={"revision": "20241201", "extract_method": "resiliparse"}, + override_output_path="documents/wikipedia-resiliparse-custom-fork-2569de", ) +wikipedia_resiliparse_custom_fork = _wikipedia_transform.as_executor_step().cd("20241201") -# ar5iv resiliparse custom fork step (data already exists at hardcoded path) -ar5iv_no_problem_resiliparse_custom_fork = ExecutorStep( +_ar5iv_download = ar5iv_step( + input_path="gs://marin-us-central2/raw/ar5iv/ar5iv-04-2024-no-problem.zip", + override_output_path="raw/ar5iv/ar5iv-04-2024-no-problem-49c4e3", +) + +# ar5iv resiliparse custom fork step +_ar5iv_transform = StepSpec( name="documents/ar5iv/ar5iv-04-2024-no-problem", - fn=process_ar5iv_dump, - config=Ar5ivExtractionConfig( - input_path=mirrored("raw/ar5iv/ar5iv-04-2024-no-problem-49c4e3/202404", budget_gb=1), - revision="042024", - output_path=this_output_path("resiliparse-custom-fork"), - extract_method=versioned("resiliparse"), - extract_config=ResiliparseConfig( - links=versioned(False), - prepend_title=True, - skip_elements=ARXIV_BLACKLISTED_SELECTORS, - ), - remove_reference_section=versioned(True), + fn=lambda output_path: process_ar5iv_dump( + Ar5ivExtractionConfig( + input_path=f"{_ar5iv_download.output_path}/202404", + revision="042024", + output_path=output_path, + extract_method="resiliparse", + extract_config=ResiliparseConfig( + links=False, + prepend_title=True, + skip_elements=ARXIV_BLACKLISTED_SELECTORS, + ), + remove_reference_section=True, + ) ), -).with_output_path("documents/ar5iv/ar5iv-04-2024-no-problem-3971f") + deps=[_ar5iv_download], + hash_attrs={"revision": "042024", "extract_method": "resiliparse"}, + override_output_path="documents/ar5iv/ar5iv-04-2024-no-problem-3971f", +) +ar5iv_no_problem_resiliparse_custom_fork = _ar5iv_transform.as_executor_step() # MMLU Science QA tokenization medu_mmlu_science_qa_tokenized = default_tokenize( diff --git a/experiments/midtraining_datasets.py b/experiments/midtraining_datasets.py index f96217880d..2706f8a4e9 100644 --- a/experiments/midtraining_datasets.py +++ b/experiments/midtraining_datasets.py @@ -4,7 +4,7 @@ from experiments.common_pile.tokenize_common_pile import stackv2_edu_filtered from experiments.defaults import default_download, default_tokenize from experiments.llama import llama3_tokenizer -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.huggingface import DownloadConfig, download_hf from marin.execution import versioned from marin.execution.executor import ExecutorStep, this_output_path from marin.processing.tokenize import lm_mixture_data_config diff --git a/experiments/models.py b/experiments/models.py index 9e2a2db79b..972ca4f753 100644 --- a/experiments/models.py +++ b/experiments/models.py @@ -18,7 +18,7 @@ from dataclasses import dataclass -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.huggingface import DownloadConfig, download_hf from marin.execution.executor import ExecutorStep, this_output_path, versioned from marin.utils import get_directory_friendly_name diff --git a/experiments/multilingual_fineweb2_hq/download_and_tokenize_fineweb2_hq.py b/experiments/multilingual_fineweb2_hq/download_and_tokenize_fineweb2_hq.py index 6c93fba71a..a3fd2ae82a 100644 --- a/experiments/multilingual_fineweb2_hq/download_and_tokenize_fineweb2_hq.py +++ b/experiments/multilingual_fineweb2_hq/download_and_tokenize_fineweb2_hq.py @@ -13,7 +13,7 @@ from experiments.llama import llama3_tokenizer from experiments.multilingual_fineweb2_hq.constants import FINEWEB2_DATASETS -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.huggingface import DownloadConfig, download_hf from marin.execution.executor import ExecutorStep, executor_main, output_path_of, this_output_path, versioned from marin.processing.tokenize import TokenizeConfig, tokenize from marin.processing.tokenize.data_configs import TokenizerStep diff --git a/experiments/paloma.py b/experiments/paloma.py index 74bd98e25f..24c1a536df 100644 --- a/experiments/paloma.py +++ b/experiments/paloma.py @@ -9,7 +9,7 @@ import os.path -from marin.download.huggingface.download_hf import DownloadConfig as HfDownloadConfig, download_hf +from marin.datakit.download.huggingface import DownloadConfig as HfDownloadConfig, download_hf # cyclic dependency # from experiments.llama import llama3_tokenizer diff --git a/experiments/posttrain/preference_datasets.py b/experiments/posttrain/preference_datasets.py index e93e94a61b..105722d2af 100644 --- a/experiments/posttrain/preference_datasets.py +++ b/experiments/posttrain/preference_datasets.py @@ -22,7 +22,7 @@ from collections.abc import Sequence from dataclasses import dataclass, field -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.huggingface import DownloadConfig, download_hf from marin.execution.executor import ( ExecutorStep, executor_main, diff --git a/experiments/pretraining_datasets/__init__.py b/experiments/pretraining_datasets/__init__.py index 090a298498..2e0f6cc004 100644 --- a/experiments/pretraining_datasets/__init__.py +++ b/experiments/pretraining_datasets/__init__.py @@ -19,12 +19,12 @@ # Import downloads and tokenized dicts from each module from experiments.pretraining_datasets.dolma import ( - DOLMA_DATASETS, DOLMA_LLAMA3_OVERRIDES, DOLMA_OLMO_MIXTURE_WEIGHTS, downloads as dolma_downloads, tokenize_dolma, ) +from marin.datakit.download.dolma import DOLMA_DATASETS from experiments.pretraining_datasets.dolmino import ( DOLMINO_DATASETS, DOLMINO_LLAMA3_OVERRIDES, @@ -37,7 +37,7 @@ NEMOTRON_DATASETS, NEMOTRON_LLAMA3_OVERRIDES, NEMOTRON_WEIGHTS, - downloads as nemotron_downloads, + nemotron_cc_download, nemotron_mix, nemotron_mix_block_shuffle, tokenize_nemotron, @@ -119,7 +119,7 @@ }, "nemotron_cc": { "subsets": list(NEMOTRON_DATASETS.keys()), - "download": nemotron_downloads["nemotron_cc"], + "download": nemotron_cc_download(), "tokenize_fn": tokenize_nemotron, }, "dolma": { @@ -130,7 +130,7 @@ # Nemotron v2 datasets (from nvidia/Nemotron-Pre-Training-Datasets collection) **{ family: { - "subsets": list(info["subsets"].keys()), + "subsets": list(info.subsets.keys()), "download": nemotron_v2_downloads[family], "tokenize_fn": lambda f=family: tokenize_nemotron_v2_family(f), } diff --git a/experiments/pretraining_datasets/dolma.py b/experiments/pretraining_datasets/dolma.py index 5c176c01f7..256ea0b58e 100644 --- a/experiments/pretraining_datasets/dolma.py +++ b/experiments/pretraining_datasets/dolma.py @@ -1,82 +1,41 @@ # Copyright The Marin Authors # SPDX-License-Identifier: Apache-2.0 -""" -DOLMA 1.7 dataset definitions and tokenization. - -This module defines the raw DOLMA dataset download and tokenization -logic for all 15 splits. -""" +"""DOLMA 1.7 dataset definitions and tokenization.""" import os.path -from marin.download.huggingface.download_hf import DownloadConfig, download_hf -from marin.execution.executor import ExecutorStep, this_output_path, versioned, InputName +from marin.datakit.download.dolma import DOLMA_DATASETS, download_dolma_step +from marin.execution.executor import ExecutorStep, InputName, this_output_path, versioned from marin.processing.tokenize import TokenizeConfig, tokenize from marin.processing.tokenize.data_configs import TokenizerStep -# Raw dataset download step -downloads = { - "dolma": ExecutorStep( - name="raw/dolma", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="allenai/dolma", - revision="7f48140", - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - override_output_path="raw/dolma", - ) -} - - -# For dolma 1.7, we hardcode the path since it was added before versioning -_DOLMA_V1_7_PATH = InputName.hardcoded("raw/dolma/v1.7") +_dolma_download = download_dolma_step().as_executor_step() +# Backward compat — some consumers import this +downloads = {"dolma": _dolma_download} # Sampling proportion comes from https://huggingface.co/datasets/allenai/dolma DOLMA_OLMO_MIXTURE_WEIGHTS = { - "dolma/algebraic-stack": 12.6, # 12.6 * 1.0 - "dolma/arxiv": 28.0, # 28.0 * 1.0 - "dolma/gutenberg": 5.3, # 5.3 * 1.0 - "dolma/c4": 124.95, # 249.9 * 0.5 - "dolma/cc": 597.75, # 1,195.5 * 0.5 - "dolma/cc-news": 14.3, # 1.0 - "dolma/falcon": 456.4, # 1.0, refined web - "dolma/megawika": 4.6, # 1.0 - "dolma/open-web-math": 12.6, # 1.0 - "dolma/pes2o": 57.2, # 1.0 - "dolma/reddit": 79.9, # 1.0 - "dolma/stackexchange": 19.6, # 1.0 - "dolma/starcoder": 263.8, # 1.0 - "dolma/flan": 16.5, # 6.5 * 1.0 - "dolma/wiki": 7.4, # 3.7 * 2.0 + "dolma/algebraic-stack": 12.6, + "dolma/arxiv": 28.0, + "dolma/gutenberg": 5.3, + "dolma/c4": 124.95, + "dolma/cc": 597.75, + "dolma/cc-news": 14.3, + "dolma/falcon": 456.4, + "dolma/megawika": 4.6, + "dolma/open-web-math": 12.6, + "dolma/pes2o": 57.2, + "dolma/reddit": 79.9, + "dolma/stackexchange": 19.6, + "dolma/starcoder": 263.8, + "dolma/flan": 16.5, + "dolma/wiki": 7.4, } -DOLMA_DATASETS = { - "algebraic-stack": ["algebraic-stack-train-{0000..0015}.json.gz"], - "arxiv": ["arxiv-{0000..0099}.json.gz"], - "gutenberg": ["books-{0000..0002}.json.gz"], - "c4": ["c4-{0000..0170}.json.gz"], - "cc": [ - "cc_en_head-{0000..0274}.json.gz", - "cc_en_middle-{0000..0238}.json.gz", - "cc_en_middle-{0240..0379}.json.gz", - "cc_en_tail-{0000..0152}.json.gz", - "cc_en_tail-{0154..0444}.json.gz", - ], - "cc-news": ["cc_news_head-{0000..0004}.json.gz", "cc_news_middle-{0000..0002}.json.gz", "cc_news_tail-0000.json.gz"], - "falcon": ["falcon-{0000..0499}.json.gz"], - "megawika": ["megawika-{0000..0261}.json.gz"], - "open-web-math": ["open-web-math-train-{0000..0012}.json.gz"], - "pes2o": ["pes2o-{0000..0025}.json.gz"], - "reddit": ["reddit-{0000..0077}.json.gz"], - "stackexchange": ["stackexchange-{0000..0025}.json.gz"], - "starcoder": ["starcoder-{0000..0048}.json.gz"], - "flan": ["tulu_flan-{0000..0065}.json.gz"], - "wiki": ["wiki-{0000..0001}.json.gz"], -} +# For dolma 1.7, we hardcode the path since it was added before versioning +_DOLMA_V1_7_PATH = InputName.hardcoded("raw/dolma/v1.7") # NB: we changed how hashes were computed for this corpus and we'd like to avoid recomputing them DOLMA_LLAMA3_OVERRIDES = { @@ -118,7 +77,6 @@ def tokenize_dolma(*, tokenizer: str | None = None) -> dict[str, TokenizerStep]: ), ) - # Check if we need to use override path for llama3 if tokenizer == llama3_tokenizer and dataset in DOLMA_LLAMA3_OVERRIDES: step = step.with_output_path(DOLMA_LLAMA3_OVERRIDES[dataset]) dolma_steps[os.path.join("dolma", dataset)] = step diff --git a/experiments/pretraining_datasets/dolmino.py b/experiments/pretraining_datasets/dolmino.py index 414e0e28dc..8126ded701 100644 --- a/experiments/pretraining_datasets/dolmino.py +++ b/experiments/pretraining_datasets/dolmino.py @@ -5,46 +5,16 @@ import os.path -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.dolmino import DOLMINO_DATASETS, download_dolmino_step from marin.execution.executor import ExecutorStep, this_output_path, versioned from marin.processing.tokenize import TokenizeConfig, tokenize from marin.processing.tokenize.data_configs import TokenizerStep -# Raw dataset download step -downloads = { - "dolmino": ( - ExecutorStep( - name="raw/dolmino-mix-1124", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="allenai/dolmino-mix-1124", - revision="bb54cab", - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - ) - .with_output_path("raw/dolmino-mix-1124-157960") - .cd("bb54cab") - ) -} +_dolmino_download = download_dolmino_step().as_executor_step() +_dolmino_base_dir = _dolmino_download.cd("bb54cab").cd("data") -_dolmino_base_dir = downloads["dolmino"].cd("data") - -# The following dataset splits define file patterns for each split. -DOLMINO_DATASETS = { - "dclm": ["**/*.json.zst"], - "flan": ["**/*.json.gz"], - "math/codesearchnet-owmfilter": ["**/*.jsonl.gz"], - "math/dolmino_math_synth": ["**/*.jsonl"], - "math/gsm8k": ["**/*.jsonl.zst"], - "math/mathcoder2-synthmath": ["**/*.jsonl"], - "math/metamath-owmfilter": ["**/*.jsonl.gz"], - "math/tinyGSM-MIND": ["**/*.jsonl.gz"], - "math/tulu_math": ["**/*.jsonl"], - "pes2o": ["**/*.json.gz"], - "stackexchange": ["**/*.json.gz"], - "wiki": ["**/*.json.gz"], -} +# Backward compat — some consumers import this +downloads = {"dolmino": _dolmino_download.cd("bb54cab")} # NB: we changed how hashes were computed for this corpus and we'd like to avoid recomputing them DOLMINO_LLAMA3_OVERRIDES = { diff --git a/experiments/pretraining_datasets/nemotron.py b/experiments/pretraining_datasets/nemotron.py index 52c9d17d69..d822c4dd5b 100644 --- a/experiments/pretraining_datasets/nemotron.py +++ b/experiments/pretraining_datasets/nemotron.py @@ -8,23 +8,15 @@ from experiments.defaults import DEFAULT_NEW_RUN_DATA_SHUFFLE from experiments.pretraining_datasets.dclm import dclm_components_llama3 -from marin.download.nemotron_cc.download_nemotron_cc import NemotronIngressConfig, download_nemotron_cc +from marin.datakit.download.nemotron_v1 import download_nemotron_v1_step from marin.execution.executor import ExecutorStep, output_path_of, this_output_path, versioned from marin.processing.tokenize import TokenizeConfig, lm_mixture_data_config, tokenize from marin.processing.tokenize.data_configs import TokenizerStep -# Raw dataset download step -downloads = { - "nemotron_cc": ExecutorStep( - name="raw/nemotro-cc", - fn=download_nemotron_cc, - config=NemotronIngressConfig( - output_path=this_output_path(), - ), - ) -} -_nemotron_cc_path = output_path_of(downloads["nemotron_cc"], "contrib/Nemotron/Nemotron-CC/data-jsonl/") +def nemotron_cc_download() -> ExecutorStep: + return download_nemotron_v1_step().as_executor_step() + NEMOTRON_DATASETS = { "hq_actual": ["quality=high/kind=actual/**/*.jsonl.*"], @@ -61,8 +53,8 @@ def _get_nemotron_split_paths(split: str): """Helper to get file paths for a nemotron split.""" - patterns = NEMOTRON_DATASETS[split] - return [_nemotron_cc_path / pattern for pattern in patterns] + base = output_path_of(nemotron_cc_download(), "contrib/Nemotron/Nemotron-CC/data-jsonl/") + return [base / pattern for pattern in NEMOTRON_DATASETS[split]] def tokenize_nemotron( diff --git a/experiments/pretraining_datasets/nemotron_v2.py b/experiments/pretraining_datasets/nemotron_v2.py index 66d618ad53..980b5edc90 100644 --- a/experiments/pretraining_datasets/nemotron_v2.py +++ b/experiments/pretraining_datasets/nemotron_v2.py @@ -2,134 +2,26 @@ # SPDX-License-Identifier: Apache-2.0 """ -Nemotron v2 pre-training dataset definitions and tokenization. +Nemotron v2 pre-training dataset tokenization. -These datasets come from the nvidia/Nemotron-Pre-Training-Datasets collection -on HuggingFace. They are additive to the original Nemotron-CC (v1) dataset -defined in nemotron.py. - -Most of these datasets are gated and require HF_TOKEN at download time. -All use parquet format with a "text" field. +Download definitions live in marin.datakit.download.nemotron_v2. +This file wires them into tokenization steps for experiment pipelines. """ import os.path -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.nemotron_v2 import NEMOTRON_V2_DATASETS, download_nemotron_v2_step from marin.execution.executor import ExecutorStep, this_output_path, versioned from marin.processing.tokenize import TokenizeConfig, tokenize from marin.processing.tokenize.data_configs import TokenizerStep -# ============================================================================ -# DATASET DEFINITIONS -# ============================================================================ - -# Each entry: (hf_id, revision, subsets_dict) -# subsets_dict maps subset_name -> glob pattern for parquet files within the download - -NEMOTRON_V2_DATASETS = { - "nemotron_cc_v2": { - "hf_dataset_id": "nvidia/Nemotron-CC-v2", - "revision": "229a2e7", - "subsets": { - "diverse_qa": "Diverse-QA/**/*.parquet", - "high_quality": "High-Quality/**/*.parquet", - "high_quality_synthetic": "High-Quality-Synthetic/**/*.parquet", - "medium_high_quality": "Medium-High-Quality/**/*.parquet", - "medium_quality": "Medium-Quality/**/*.parquet", - "translated_diverse_qa": "Translated-Diverse-QA/**/*.parquet", - }, - }, - "nemotron_cc_v2_1": { - "hf_dataset_id": "nvidia/Nemotron-CC-v2.1", - "revision": "ba6f2aa", - "subsets": { - "high_quality": "High-Quality/**/*.parquet", - "high_quality_dqa": "High-Quality-DQA/**/*.parquet", - "high_quality_synthetic": "High-Quality-Synthetic/**/*.parquet", - "high_quality_translated": "High-Quality-Translated-To-English/**/*.parquet", - "high_quality_translated_synthetic": "High-Quality-Translated-To-English-Synthetic/**/*.parquet", - "medium_high_quality": "Medium-High-Quality/**/*.parquet", - "medium_high_quality_synthetic": "Medium-High-Quality-Synthetic/**/*.parquet", - "medium_high_quality_translated": "Medium-High-Quality-Translated-To-English/**/*.parquet", - "medium_quality": "Medium-Quality/**/*.parquet", - }, - }, - "nemotron_cc_code_v1": { - "hf_dataset_id": "nvidia/Nemotron-CC-Code-v1", - "revision": "5c5bebc", - "subsets": { - "all": "data/**/*.parquet", - }, - }, - "nemotron_cc_math_v1": { - "hf_dataset_id": "nvidia/Nemotron-CC-Math-v1", - "revision": "397a250", - "subsets": { - "3": "3/**/*.parquet", - "4plus": "4plus/**/*.parquet", - "4plus_mind": "4plus_MIND/**/*.parquet", - }, - }, - "nemotron_pretraining_code_v1": { - "hf_dataset_id": "nvidia/Nemotron-Pretraining-Code-v1", - "revision": "01393d3", - "subsets": { - "synthetic_code": "Synthetic-Code/**/*.parquet", - "code_metadata": "Nemotron-Code-Metadata/**/*.parquet", - }, - }, - "nemotron_pretraining_code_v2": { - "hf_dataset_id": "nvidia/Nemotron-Pretraining-Code-v2", - "revision": "7b1a453", - "subsets": { - "code_metadata": "Nemotron-Code-Metadata/**/*.parquet", - "synthetic_question_answering": "Synthetic-Question-Answering/**/*.parquet", - "synthetic_student_teacher": "Synthetic-Student-Teacher/**/*.parquet", - "synthetic_code_review": "Synthetic-Code-Review/**/*.parquet", - "synthetic_rewriting": "Synthetic-Rewriting/**/*.parquet", - "synthetic_transpilation": "Synthetic-Transpilation/**/*.parquet", - }, - }, - "nemotron_pretraining_specialized_v1": { - "hf_dataset_id": "nvidia/Nemotron-Pretraining-Specialized-v1", - "revision": "9ed3718", - "subsets": { - "wiki_rewrite": "Nemotron-Pretraining-Wiki-Rewrite/**/*.parquet", - "math_textbooks": "Nemotron-Pretraining-Math-Textbooks/**/*.parquet", - "stem_sft": "Nemotron-Pretraining-STEM-SFT/**/*.parquet", - "scientific_coding": "Nemotron-Pretraining-Scientific-Coding/**/*.parquet", - "rqa": "Nemotron-Pretraining-RQA/**/*.parquet", - "infinibyte_reasoning": "Nemotron-Pretraining-InfiniByte-Reasoning/**/*.parquet", - }, - }, - "nemotron_pretraining_sft_v1": { - "hf_dataset_id": "nvidia/Nemotron-Pretraining-SFT-v1", - "revision": "3f1a5b8", - "subsets": { - "sft_code": "Nemotron-SFT-Code/**/*.parquet", - "sft_general": "Nemotron-SFT-General/**/*.parquet", - "sft_math": "Nemotron-SFT-MATH/**/*.parquet", - }, - }, -} - - # ============================================================================ # RAW DATASET DOWNLOADS # ============================================================================ -downloads: dict[str, ExecutorStep] = {} -for _family, _info in NEMOTRON_V2_DATASETS.items(): - downloads[_family] = ExecutorStep( - name=f"raw/{_family}", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id=_info["hf_dataset_id"], - revision=versioned(_info["revision"]), - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - ) +downloads: dict[str, ExecutorStep] = { + family: download_nemotron_v2_step(family).as_executor_step() for family in NEMOTRON_V2_DATASETS +} # ============================================================================ @@ -152,7 +44,7 @@ def tokenize_nemotron_v2_family( download_step = downloads[family] steps: dict[str, ExecutorStep[TokenizeConfig]] = {} - for subset, glob_pattern in info["subsets"].items(): + for subset, glob_pattern in info.subsets.items(): output_name = os.path.join("tokenized", family, subset) step = ExecutorStep( name=output_name, diff --git a/experiments/pretraining_datasets/simple.py b/experiments/pretraining_datasets/simple.py index 7f51364735..1e3e4ff6e7 100644 --- a/experiments/pretraining_datasets/simple.py +++ b/experiments/pretraining_datasets/simple.py @@ -12,8 +12,8 @@ from levanter.data.text import TextLmDatasetFormat from levanter.store.cache import CacheOptions -from marin.download.huggingface.download_hf import DownloadConfig, download_hf -from marin.execution.executor import ExecutorStep, this_output_path, versioned +from marin.datakit.download.huggingface import download_hf_step +from marin.execution.executor import ExecutorStep, InputName, this_output_path, versioned from marin.processing.tokenize import TokenizeConfig, tokenize from experiments.llama import llama3_tokenizer @@ -25,7 +25,7 @@ def _tokenize_simple( name: str, - raw_dataset: ExecutorStep, + raw_dataset: ExecutorStep | InputName, tokenizer: str | None = None, override_path: str | None = None, text_format: TextLmDatasetFormat = TextLmDatasetFormat(), @@ -53,157 +53,65 @@ def _tokenize_simple( return step +def _dl( + name: str, hf_dataset_id: str, revision: str, output_path: str, *, append_sha_to_path: bool = False +) -> ExecutorStep: + """Create a download ExecutorStep from a StepSpec.""" + return download_hf_step( + name, + hf_dataset_id=hf_dataset_id, + revision=revision, + append_sha_to_path=append_sha_to_path, + override_output_path=output_path, + ).as_executor_step() + + # ============================================================================ # RAW DATASET DOWNLOADS # ============================================================================ -downloads = { - "fineweb": ExecutorStep( - name="raw/fineweb", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="HuggingFaceFW/fineweb", - revision="cd85054", - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - override_output_path="raw/fineweb", - ), - "fineweb_edu": ( - ( - fineweb_edu_base_step := ExecutorStep( - name="raw/fineweb-edu", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="HuggingFaceFW/fineweb-edu", - revision=versioned((revision := "87f0914")), - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - override_output_path=f"raw/fineweb-edu-{revision}", + +def _build_downloads() -> dict[str, ExecutorStep | InputName]: + fineweb_edu_base = _dl("raw/fineweb-edu", "HuggingFaceFW/fineweb-edu", "87f0914", "raw/fineweb-edu-87f0914") + + return { + "fineweb_edu": fineweb_edu_base.cd("data"), + "fineweb_edu_sample_10bt": fineweb_edu_base.cd("sample/10BT"), + "fineweb_edu_sample_100bt": fineweb_edu_base.cd("sample/100BT"), + "fineweb_edu_sample_350bt": fineweb_edu_base.cd("sample/350BT"), + "slimpajama": ( + _dl("raw/SlimPajama-627B", "cerebras/SlimPajama-627B", "2d0accd", "raw/SlimPajama-627B-262830").cd( + "2d0accd/huggingface.co/datasets/cerebras/SlimPajama-627B/resolve/2d0accd" ) - ).cd("data") - ), - "fineweb_edu_sample_10bt": fineweb_edu_base_step.cd("sample/10BT"), - "fineweb_edu_sample_100bt": fineweb_edu_base_step.cd("sample/100BT"), - "fineweb_edu_sample_350bt": fineweb_edu_base_step.cd("sample/350BT"), - "slimpajama": ( - ExecutorStep( - name="raw/SlimPajama-627B", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="cerebras/SlimPajama-627B", - revision="2d0accd", - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - override_output_path="raw/SlimPajama-627B-262830", - ).cd("2d0accd/huggingface.co/datasets/cerebras/SlimPajama-627B/resolve/2d0accd") - ), - "slimpajama_6b": ( - ExecutorStep( - name="raw/SlimPajama-6B", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="DKYoon/SlimPajama-6B", - revision="b5f90f4", - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - override_output_path="raw/SlimPajama-6B-be35b7", - ).cd("data") - ), - "dolma3_mix_150b_1025": ( - ExecutorStep( - name="raw/dolma3_mix-150B-1025", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="allenai/dolma3_mix-150B-1025", - revision="15d04ee", - gcs_output_path=this_output_path(), - wait_for_completion=True, + ), + "slimpajama_6b": ( + _dl("raw/SlimPajama-6B", "DKYoon/SlimPajama-6B", "b5f90f4", "raw/SlimPajama-6B-be35b7").cd("data") + ), + "dolma3_mix_150b_1025": ( + _dl( + "raw/dolma3_mix-150B-1025", + "allenai/dolma3_mix-150B-1025", + "15d04ee", + "raw/dolma3_mix-150B-1025-15d04ee", append_sha_to_path=True, - ), - override_output_path="raw/dolma3_mix-150B-1025-15d04ee", - ).cd("15d04ee") - ), - "dclm_baseline_wrong": ExecutorStep( - name="raw/dclm-baseline-1.0", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="mlfoundations/dclm-baseline-1.0", - revision="a3b142c", - gcs_output_path=this_output_path(), - wait_for_completion=True, + ).cd("15d04ee") ), - override_output_path="raw/dclm_WRONG_20250211/", - ), - "dclm_baseline": ( - ExecutorStep( - name="raw/dclm-baseline-1.0", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="mlfoundations/dclm-baseline-1.0", - revision="a3b142c", - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - override_output_path="raw/dclm", - ).cd("a3b142c") - ), - "the_stack_dedup": ( - ExecutorStep( - name="raw/the-stack-dedup", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="bigcode/the-stack-dedup", - revision="17cad72", - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - override_output_path="raw/the-stack-dedup-4ba450", - ).cd("17cad72") - ), - "proofpile_2": ( - ExecutorStep( - name="raw/proof-pile-2", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="EleutherAI/proof-pile-2", - revision="901a927", - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - override_output_path="raw/proof-pile-2-f1b1d8", - ).cd("901a927/huggingface.co/datasets/EleutherAI/proof-pile-2/resolve/901a927") - ), - "the_pile_openwebtext2": ( - ExecutorStep( - name="raw/the_pile_openwebtext2", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="vietgpt/the_pile_openwebtext2", - revision="1de27c6", - gcs_output_path=this_output_path(), - wait_for_completion=True, - ), - override_output_path="raw/the_pile_openwebtext2", - ).cd("1de27c6/huggingface.co/datasets/vietgpt/the_pile_openwebtext2/resolve/1de27c6") - ), - # TODO: Earlier datasets were stored in gcs_output_path/ instead of gcs_output_path. - # Migrate the dataset and cd can be removed. - "starcoderdata": ExecutorStep( - name="raw/starcoderdata", - fn=download_hf, - config=DownloadConfig( - hf_dataset_id="bigcode/starcoderdata", - revision="9fc30b5", - gcs_output_path=this_output_path(), - wait_for_completion=True, + "dclm_baseline_wrong": _dl( + "raw/dclm-baseline-1.0", "mlfoundations/dclm-baseline-1.0", "a3b142c", "raw/dclm_WRONG_20250211/" ), - override_output_path="raw/starcoderdata-720c8c", - ), -} + "dclm_baseline": ( + _dl("raw/dclm-baseline-1.0", "mlfoundations/dclm-baseline-1.0", "a3b142c", "raw/dclm").cd("a3b142c") + ), + "proofpile_2": ( + _dl("raw/proof-pile-2", "EleutherAI/proof-pile-2", "901a927", "raw/proof-pile-2-f1b1d8").cd( + "901a927/huggingface.co/datasets/EleutherAI/proof-pile-2/resolve/901a927" + ) + ), + "starcoderdata": _dl("raw/starcoderdata", "bigcode/starcoderdata", "9fc30b5", "raw/starcoderdata-720c8c"), + } + + +downloads = _build_downloads() # ============================================================================ diff --git a/experiments/tootsie/exp1063_upload_tootsie.py b/experiments/tootsie/exp1063_upload_tootsie.py index 55d06ec875..c23d5de683 100644 --- a/experiments/tootsie/exp1063_upload_tootsie.py +++ b/experiments/tootsie/exp1063_upload_tootsie.py @@ -25,7 +25,7 @@ from dataclasses import dataclass, field -from marin.download.huggingface.upload_gcs_to_hf import UploadConfig, upload_gcs_to_hf +from marin.utilities.upload_gcs_to_hf import UploadConfig, upload_gcs_to_hf from marin.execution.executor import ExecutorStep, executor_main diff --git a/experiments/train_test_overlap/eval_datasets_overlap.py b/experiments/train_test_overlap/eval_datasets_overlap.py index c6e7469221..b7df8679aa 100644 --- a/experiments/train_test_overlap/eval_datasets_overlap.py +++ b/experiments/train_test_overlap/eval_datasets_overlap.py @@ -1,7 +1,7 @@ # Copyright The Marin Authors # SPDX-License-Identifier: Apache-2.0 -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.huggingface import DownloadConfig, download_hf from marin.execution.executor import ExecutorStep, executor_main, this_output_path, versioned from marin.transform.huggingface.dataset_to_eval import DatasetConversionConfig, OutputFormatOptions, hf_dataset_to_jsonl diff --git a/experiments/train_test_overlap/train_test_total.py b/experiments/train_test_overlap/train_test_total.py index e08dbfb4f2..af280c552b 100644 --- a/experiments/train_test_overlap/train_test_total.py +++ b/experiments/train_test_overlap/train_test_total.py @@ -37,7 +37,7 @@ from experiments.midtraining_datasets import finemath_3_plus from experiments.pretraining_datasets.simple import downloads from experiments.pretraining_datasets.dolmino import downloads as dolmino_downloads -from experiments.pretraining_datasets.nemotron import downloads as nemotron_downloads +from experiments.pretraining_datasets.nemotron import nemotron_cc_download from experiments.train_test_overlap.eval_datasets_overlap import EVAL_DATASET_STEPS logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") @@ -81,7 +81,7 @@ def run_train_test_overlap(config: DeconConfig) -> str: DatasetConfig(name="starcoder", path=downloads["starcoderdata"], text_field="content"), DatasetConfig(name="proofpile", path=downloads["proofpile_2"]), DatasetConfig(name="dolmino", path=dolmino_downloads["dolmino"]), - DatasetConfig(name="nemotron_cc", path=nemotron_downloads["nemotron_cc"]), + DatasetConfig(name="nemotron_cc", path=nemotron_cc_download()), ] diff --git a/experiments/two_stage/data.py b/experiments/two_stage/data.py index 9aeca84456..c78daf0ab1 100644 --- a/experiments/two_stage/data.py +++ b/experiments/two_stage/data.py @@ -6,7 +6,7 @@ from experiments.midtraining_datasets import finemath_3_plus_tokenized from experiments.pretraining_datasets import tokenize_dolma from experiments.pretraining_datasets.simple import tokenized -from marin.download.huggingface.download_hf import DownloadConfig, download_hf +from marin.datakit.download.huggingface import DownloadConfig, download_hf from marin.execution.executor import ExecutorStep, this_output_path dolma_components = tokenize_dolma() diff --git a/lib/marin/src/marin/datakit/__init__.py b/lib/marin/src/marin/datakit/__init__.py new file mode 100644 index 0000000000..ec8bc038b7 --- /dev/null +++ b/lib/marin/src/marin/datakit/__init__.py @@ -0,0 +1,2 @@ +# Copyright The Marin Authors +# SPDX-License-Identifier: Apache-2.0 diff --git a/lib/marin/src/marin/datakit/download/__init__.py b/lib/marin/src/marin/datakit/download/__init__.py new file mode 100644 index 0000000000..ec8bc038b7 --- /dev/null +++ b/lib/marin/src/marin/datakit/download/__init__.py @@ -0,0 +1,2 @@ +# Copyright The Marin Authors +# SPDX-License-Identifier: Apache-2.0 diff --git a/lib/marin/src/marin/download/ar5iv/download.py b/lib/marin/src/marin/datakit/download/ar5iv.py similarity index 83% rename from lib/marin/src/marin/download/ar5iv/download.py rename to lib/marin/src/marin/datakit/download/ar5iv.py index 9483370c71..86498e12e1 100644 --- a/lib/marin/src/marin/download/ar5iv/download.py +++ b/lib/marin/src/marin/datakit/download/ar5iv.py @@ -19,6 +19,7 @@ import draccus from iris.marin_fs import open_url +from marin.execution.step_spec import StepSpec from zephyr import Dataset, ZephyrContext from zephyr.writers import atomic_rename from iris.logging import configure_logging @@ -27,7 +28,7 @@ @dataclass -class DownloadConfig: +class Ar5ivDownloadConfig: input_path: str output_path: str max_files: int | None = None # Maximum number of shards to process @@ -63,7 +64,7 @@ def process_shard(shard_task: dict) -> dict: return {"shard_id": shard_id, "num_files": len(file_list), "output_path": gcs_path} -def download(cfg: DownloadConfig) -> None: +def download(cfg: Ar5ivDownloadConfig) -> None: """ Download and process Ar5iv dataset from a zip file in GCS. @@ -127,8 +128,32 @@ def download(cfg: DownloadConfig) -> None: logger.info("Transfer completed successfully!") +def ar5iv_step( + name: str = "raw/ar5iv", + *, + input_path: str, + max_files: int | None = None, + deps: list[StepSpec] | None = None, + output_path_prefix: str | None = None, + override_output_path: str | None = None, +) -> StepSpec: + """Create a StepSpec that downloads and processes the Ar5iv dataset from a zip file.""" + + def _run(output_path: str) -> None: + download(Ar5ivDownloadConfig(input_path=input_path, output_path=output_path, max_files=max_files)) + + return StepSpec( + name=name, + fn=_run, + deps=deps or [], + hash_attrs={"input_path": input_path, "max_files": max_files}, + output_path_prefix=output_path_prefix, + override_output_path=override_output_path, + ) + + @draccus.wrap() -def main(cfg: DownloadConfig) -> None: +def main(cfg: Ar5ivDownloadConfig) -> None: """CLI entrypoint for downloading and processing Ar5iv dataset.""" configure_logging(level=logging.INFO) diff --git a/lib/marin/src/marin/datakit/download/dolma.py b/lib/marin/src/marin/datakit/download/dolma.py new file mode 100644 index 0000000000..7e9ac26e3b --- /dev/null +++ b/lib/marin/src/marin/datakit/download/dolma.py @@ -0,0 +1,41 @@ +# Copyright The Marin Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Dolma 1.7 dataset download definition and split metadata.""" + +from marin.datakit.download.huggingface import download_hf_step +from marin.execution.step_spec import StepSpec + +DOLMA_DATASETS = { + "algebraic-stack": ["algebraic-stack-train-{0000..0015}.json.gz"], + "arxiv": ["arxiv-{0000..0099}.json.gz"], + "gutenberg": ["books-{0000..0002}.json.gz"], + "c4": ["c4-{0000..0170}.json.gz"], + "cc": [ + "cc_en_head-{0000..0274}.json.gz", + "cc_en_middle-{0000..0238}.json.gz", + "cc_en_middle-{0240..0379}.json.gz", + "cc_en_tail-{0000..0152}.json.gz", + "cc_en_tail-{0154..0444}.json.gz", + ], + "cc-news": ["cc_news_head-{0000..0004}.json.gz", "cc_news_middle-{0000..0002}.json.gz", "cc_news_tail-0000.json.gz"], + "falcon": ["falcon-{0000..0499}.json.gz"], + "megawika": ["megawika-{0000..0261}.json.gz"], + "open-web-math": ["open-web-math-train-{0000..0012}.json.gz"], + "pes2o": ["pes2o-{0000..0025}.json.gz"], + "reddit": ["reddit-{0000..0077}.json.gz"], + "stackexchange": ["stackexchange-{0000..0025}.json.gz"], + "starcoder": ["starcoder-{0000..0048}.json.gz"], + "flan": ["tulu_flan-{0000..0065}.json.gz"], + "wiki": ["wiki-{0000..0001}.json.gz"], +} + + +def download_dolma_step() -> StepSpec: + """Download the Dolma 1.7 dataset from HuggingFace.""" + return download_hf_step( + "raw/dolma", + hf_dataset_id="allenai/dolma", + revision="7f48140", + override_output_path="raw/dolma", + ) diff --git a/lib/marin/src/marin/datakit/download/dolmino.py b/lib/marin/src/marin/datakit/download/dolmino.py new file mode 100644 index 0000000000..0e1b063cf2 --- /dev/null +++ b/lib/marin/src/marin/datakit/download/dolmino.py @@ -0,0 +1,32 @@ +# Copyright The Marin Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Dolmino dataset download definition and split metadata.""" + +from marin.datakit.download.huggingface import download_hf_step +from marin.execution.step_spec import StepSpec + +DOLMINO_DATASETS = { + "dclm": ["**/*.json.zst"], + "flan": ["**/*.json.gz"], + "math/codesearchnet-owmfilter": ["**/*.jsonl.gz"], + "math/dolmino_math_synth": ["**/*.jsonl"], + "math/gsm8k": ["**/*.jsonl.zst"], + "math/mathcoder2-synthmath": ["**/*.jsonl"], + "math/metamath-owmfilter": ["**/*.jsonl.gz"], + "math/tinyGSM-MIND": ["**/*.jsonl.gz"], + "math/tulu_math": ["**/*.jsonl"], + "pes2o": ["**/*.json.gz"], + "stackexchange": ["**/*.json.gz"], + "wiki": ["**/*.json.gz"], +} + + +def download_dolmino_step() -> StepSpec: + """Download the dolmino-mix-1124 dataset from HuggingFace.""" + return download_hf_step( + "raw/dolmino-mix-1124", + hf_dataset_id="allenai/dolmino-mix-1124", + revision="bb54cab", + override_output_path="raw/dolmino-mix-1124-157960", + ) diff --git a/lib/marin/src/marin/download/huggingface/download_hf.py b/lib/marin/src/marin/datakit/download/huggingface.py similarity index 88% rename from lib/marin/src/marin/download/huggingface/download_hf.py rename to lib/marin/src/marin/datakit/download/huggingface.py index 089ef63e0c..f6ee228cd5 100644 --- a/lib/marin/src/marin/download/huggingface/download_hf.py +++ b/lib/marin/src/marin/datakit/download/huggingface.py @@ -14,13 +14,13 @@ import time from dataclasses import dataclass, field -import draccus import huggingface_hub from huggingface_hub import HfFileSystem from iris.marin_fs import open_url, url_to_fs from huggingface_hub.errors import HfHubHTTPError from packaging.version import Version from marin.execution.executor import THIS_OUTPUT_PATH +from marin.execution.step_spec import StepSpec from marin.utilities.validation_utils import write_provenance_json from zephyr import Dataset, ZephyrContext from zephyr.writers import atomic_rename @@ -343,11 +343,57 @@ def download_hf(cfg: DownloadConfig) -> None: logger.info(f"Streamed all files and wrote provenance JSON; check {output_path}.") -@draccus.wrap() -def main(cfg: DownloadConfig) -> None: - """Download HuggingFace dataset.""" - download_hf(cfg) +def download_hf_step( + name: str, + *, + hf_dataset_id: str, + revision: str, + hf_urls_glob: list[str] | None = None, + append_sha_to_path: bool = False, + zephyr_max_parallelism: int = 8, + deps: list[StepSpec] | None = None, + override_output_path: str | None = None, +) -> StepSpec: + """Create a StepSpec that downloads a HuggingFace dataset. + The raw download is preserved as-is in its original format and directory structure. -if __name__ == "__main__": - main() + Args: + name: Step name (e.g. "raw/fineweb"). + hf_dataset_id: HuggingFace dataset identifier (e.g. "HuggingFaceFW/fineweb"). + revision: Commit hash from the HF dataset repo. + hf_urls_glob: Glob patterns to select specific files. Empty means all files. + append_sha_to_path: If True, write outputs under ``output_path/``. + zephyr_max_parallelism: Maximum download parallelism. + deps: Optional upstream dependencies. + override_output_path: Override the computed output path entirely. + + Returns: + A StepSpec whose output_path contains the raw downloaded files. + """ + resolved_glob = hf_urls_glob or [] + + def _run(output_path: str) -> None: + download_hf( + DownloadConfig( + hf_dataset_id=hf_dataset_id, + revision=revision, + hf_urls_glob=resolved_glob, + gcs_output_path=output_path, + append_sha_to_path=append_sha_to_path, + zephyr_max_parallelism=zephyr_max_parallelism, + ) + ) + + return StepSpec( + name=name, + fn=_run, + deps=deps or [], + hash_attrs={ + "hf_dataset_id": hf_dataset_id, + "revision": revision, + "hf_urls_glob": resolved_glob, + "append_sha_to_path": append_sha_to_path, + }, + override_output_path=override_output_path, + ) diff --git a/lib/marin/src/marin/download/nemotron_cc/download_nemotron_cc.py b/lib/marin/src/marin/datakit/download/nemotron_v1.py similarity index 78% rename from lib/marin/src/marin/download/nemotron_cc/download_nemotron_cc.py rename to lib/marin/src/marin/datakit/download/nemotron_v1.py index 77c9d82cf5..3a4f9a0a98 100644 --- a/lib/marin/src/marin/download/nemotron_cc/download_nemotron_cc.py +++ b/lib/marin/src/marin/datakit/download/nemotron_v1.py @@ -1,25 +1,17 @@ # Copyright The Marin Authors # SPDX-License-Identifier: Apache-2.0 -""" -Download and process Nemotron-CC dataset from Common Crawl. - -Example Usage: -uv run zephyr --backend=ray --max-parallelism=100 --memory=4GB \ - lib/marin/src/marin/download/nemotron_cc/download_nemotron_cc.py \ - --output_path gs://bucket/nemotron-output -""" +"""Download and process Nemotron-CC dataset from Common Crawl""" import json import logging import os from collections.abc import Iterator -from dataclasses import dataclass import requests import zstandard from iris.marin_fs import open_url -from marin.execution import THIS_OUTPUT_PATH +from marin.execution.step_spec import StepSpec from marin.utils import fsspec_exists from requests.adapters import HTTPAdapter from urllib3.util import Retry @@ -84,13 +76,10 @@ def download_single_nemotron_path(input_file_path: str, output_file_path: str) - return {"input_file": input_file_path, "output_file": output_file_path, "num_records": num_records} -@dataclass -class NemotronIngressConfig: - output_path: str = THIS_OUTPUT_PATH - +def download_nemotron_cc(output_path: str) -> None: + """Download and process Nemotron-CC dataset from Common Crawl.""" -def download_nemotron_cc(cfg: NemotronIngressConfig): - paths_file_path = os.path.join(cfg.output_path, "data-jsonl.paths") + paths_file_path = os.path.join(output_path, "data-jsonl.paths") logger.info(f"Downloading Nemotron CC path file {paths_file_path}") with open_url(NCC_PATH_FILE_URL, "rb") as f, open_url(paths_file_path, "wb") as f_out: @@ -101,7 +90,7 @@ def download_nemotron_cc(cfg: NemotronIngressConfig): with open_url(paths_file_path, "r", compression="gzip") as f: for line in f: file = line.strip() - output_file_path = os.path.join(cfg.output_path, file).replace("jsonl.zstd", "jsonl.zst") + output_file_path = os.path.join(output_path, file).replace("jsonl.zstd", "jsonl.zst") all_files.append((file, output_file_path)) logger.info(f"Processing {len(all_files)} Nemotron CC files") @@ -110,10 +99,21 @@ def download_nemotron_cc(cfg: NemotronIngressConfig): Dataset.from_list(all_files) .filter(lambda file_info: not fsspec_exists(file_info[1])) .map(lambda file_info: download_single_nemotron_path(*file_info)) - .write_jsonl(os.path.join(cfg.output_path, ".metrics/download-{shard:05d}.jsonl"), skip_existing=True) + .write_jsonl(os.path.join(output_path, ".metrics/download-{shard:05d}.jsonl"), skip_existing=True) ) ctx = ZephyrContext(name="download-nemotron-cc") ctx.execute(pipeline) - logger.info(f"Downloaded Nemotron CC files to {cfg.output_path}") + logger.info(f"Downloaded Nemotron CC files to {output_path}") + + +def download_nemotron_v1_step() -> StepSpec: + """Create a StepSpec that downloads the Nemotron-CC dataset from Common Crawl.""" + + return StepSpec( + name="raw/nemotron_v1", + fn=lambda output_path: download_nemotron_cc(output_path=output_path), + # NOTE: use the existing output to avoid re-downloading. Yes this is missing the `n`. + override_output_path="raw/nemotro-cc-eeb783", + ) diff --git a/lib/marin/src/marin/datakit/download/nemotron_v2.py b/lib/marin/src/marin/datakit/download/nemotron_v2.py new file mode 100644 index 0000000000..91b644730b --- /dev/null +++ b/lib/marin/src/marin/datakit/download/nemotron_v2.py @@ -0,0 +1,133 @@ +# Copyright The Marin Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Nemotron v2 pre-training dataset download definitions. + +These datasets come from the nvidia/Nemotron-Pre-Training-Datasets collection +on HuggingFace. They are additive to the original Nemotron-CC (v1) dataset. + +Most of these datasets are gated and require HF_TOKEN at download time. +All use parquet format with a "text" field. +""" + +from dataclasses import dataclass, field + +from marin.datakit.download.huggingface import download_hf_step +from marin.execution.step_spec import StepSpec + + +@dataclass(frozen=True) +class NemotronV2Dataset: + """Metadata for a single Nemotron v2 HuggingFace dataset.""" + + hf_dataset_id: str + revision: str + subsets: dict[str, str] = field(default_factory=dict) + """Maps subset_name -> glob pattern for parquet files within the download.""" + override_output_path: str | None = None + """Allow to point at existing download output to avoid re-downloading""" + + +NEMOTRON_V2_DATASETS: dict[str, NemotronV2Dataset] = { + "nemotron_cc_v2": NemotronV2Dataset( + hf_dataset_id="nvidia/Nemotron-CC-v2", + revision="229a2e7", + subsets={ + "diverse_qa": "Diverse-QA/**/*.parquet", + "high_quality": "High-Quality/**/*.parquet", + "high_quality_synthetic": "High-Quality-Synthetic/**/*.parquet", + "medium_high_quality": "Medium-High-Quality/**/*.parquet", + "medium_quality": "Medium-Quality/**/*.parquet", + "translated_diverse_qa": "Translated-Diverse-QA/**/*.parquet", + }, + override_output_path="raw/nemotron_cc_v2-674913", + ), + "nemotron_cc_v2_1": NemotronV2Dataset( + hf_dataset_id="nvidia/Nemotron-CC-v2.1", + revision="ba6f2aa", + subsets={ + "high_quality": "High-Quality/**/*.parquet", + "high_quality_dqa": "High-Quality-DQA/**/*.parquet", + "high_quality_synthetic": "High-Quality-Synthetic/**/*.parquet", + "high_quality_translated": "High-Quality-Translated-To-English/**/*.parquet", + "high_quality_translated_synthetic": "High-Quality-Translated-To-English-Synthetic/**/*.parquet", + "medium_high_quality": "Medium-High-Quality/**/*.parquet", + "medium_high_quality_synthetic": "Medium-High-Quality-Synthetic/**/*.parquet", + "medium_high_quality_translated": "Medium-High-Quality-Translated-To-English/**/*.parquet", + "medium_quality": "Medium-Quality/**/*.parquet", + }, + override_output_path="raw/nemotron_cc_v2_1-a7afb6", + ), + "nemotron_cc_code_v1": NemotronV2Dataset( + hf_dataset_id="nvidia/Nemotron-CC-Code-v1", + revision="5c5bebc", + subsets={"all": "data/**/*.parquet"}, + override_output_path="raw/nemotron_cc_code_v1-c55cd9", + ), + "nemotron_cc_math_v1": NemotronV2Dataset( + hf_dataset_id="nvidia/Nemotron-CC-Math-v1", + revision="397a250", + subsets={ + "3": "3/**/*.parquet", + "4plus": "4plus/**/*.parquet", + "4plus_mind": "4plus_MIND/**/*.parquet", + }, + override_output_path="raw/nemotron_cc_math_v1-322fe4", + ), + "nemotron_pretraining_code_v1": NemotronV2Dataset( + hf_dataset_id="nvidia/Nemotron-Pretraining-Code-v1", + revision="01393d3", + subsets={ + "synthetic_code": "Synthetic-Code/**/*.parquet", + "code_metadata": "Nemotron-Code-Metadata/**/*.parquet", + }, + override_output_path="raw/nemotron_pretraining_code_v1-175b37", + ), + "nemotron_pretraining_code_v2": NemotronV2Dataset( + hf_dataset_id="nvidia/Nemotron-Pretraining-Code-v2", + revision="7b1a453", + subsets={ + "code_metadata": "Nemotron-Code-Metadata/**/*.parquet", + "synthetic_question_answering": "Synthetic-Question-Answering/**/*.parquet", + "synthetic_student_teacher": "Synthetic-Student-Teacher/**/*.parquet", + "synthetic_code_review": "Synthetic-Code-Review/**/*.parquet", + "synthetic_rewriting": "Synthetic-Rewriting/**/*.parquet", + "synthetic_transpilation": "Synthetic-Transpilation/**/*.parquet", + }, + override_output_path="raw/nemotron_pretraining_code_v2-d15a24", + ), + "nemotron_pretraining_specialized_v1": NemotronV2Dataset( + hf_dataset_id="nvidia/Nemotron-Pretraining-Specialized-v1", + revision="9ed3718", + subsets={ + "wiki_rewrite": "Nemotron-Pretraining-Wiki-Rewrite/**/*.parquet", + "math_textbooks": "Nemotron-Pretraining-Math-Textbooks/**/*.parquet", + "stem_sft": "Nemotron-Pretraining-STEM-SFT/**/*.parquet", + "scientific_coding": "Nemotron-Pretraining-Scientific-Coding/**/*.parquet", + "rqa": "Nemotron-Pretraining-RQA/**/*.parquet", + "infinibyte_reasoning": "Nemotron-Pretraining-InfiniByte-Reasoning/**/*.parquet", + }, + override_output_path="raw/nemotron_pretraining_specialized_v1-a31fae", + ), + "nemotron_pretraining_sft_v1": NemotronV2Dataset( + hf_dataset_id="nvidia/Nemotron-Pretraining-SFT-v1", + revision="3f1a5b8", + subsets={ + "sft_code": "Nemotron-SFT-Code/**/*.parquet", + "sft_general": "Nemotron-SFT-General/**/*.parquet", + "sft_math": "Nemotron-SFT-MATH/**/*.parquet", + }, + override_output_path="raw/nemotron_pretraining_sft_v1-10f77e", + ), +} + + +def download_nemotron_v2_step(family: str) -> StepSpec: + """Create a download StepSpec for a Nemotron v2 dataset family.""" + info = NEMOTRON_V2_DATASETS[family] + return download_hf_step( + f"raw/{family}", + hf_dataset_id=info.hf_dataset_id, + revision=info.revision, + override_output_path=info.override_output_path, + ) diff --git a/lib/marin/src/marin/download/stackexchange/README.md b/lib/marin/src/marin/datakit/download/stackexchange/README.md similarity index 100% rename from lib/marin/src/marin/download/stackexchange/README.md rename to lib/marin/src/marin/datakit/download/stackexchange/README.md diff --git a/lib/marin/src/marin/download/stackexchange/stackexchange-urls.tsv b/lib/marin/src/marin/datakit/download/stackexchange/stackexchange-urls.tsv similarity index 100% rename from lib/marin/src/marin/download/stackexchange/stackexchange-urls.tsv rename to lib/marin/src/marin/datakit/download/stackexchange/stackexchange-urls.tsv diff --git a/lib/marin/src/marin/download/uncheatable_eval/download.py b/lib/marin/src/marin/datakit/download/uncheatable_eval.py similarity index 87% rename from lib/marin/src/marin/download/uncheatable_eval/download.py rename to lib/marin/src/marin/datakit/download/uncheatable_eval.py index b77195ed63..f009ba158c 100644 --- a/lib/marin/src/marin/download/uncheatable_eval/download.py +++ b/lib/marin/src/marin/datakit/download/uncheatable_eval.py @@ -16,7 +16,8 @@ import requests from iris.marin_fs import open_url -from marin.execution import THIS_OUTPUT_PATH, ExecutorStep, VersionedValue, ensure_versioned, this_output_path +from marin.execution import THIS_OUTPUT_PATH, ExecutorStep, VersionedValue +from marin.execution.step_spec import StepSpec from marin.utils import fsspec_mkdirs from requests.adapters import HTTPAdapter from urllib3.util import Retry @@ -353,6 +354,52 @@ def download_latest_uncheatable_eval(cfg: UncheatableEvalDownloadConfig) -> dict return {"success": True, "processed": metadata_records} +def uncheatable_eval_step( + name: str = "raw/uncheatable-eval/latest", + *, + repo_owner: str = "ziqing-huang", + repo_name: str = "uncheatable_eval", + data_path: str = "data", + branch: str = "master", + max_concurrent_downloads: int = 8, + request_timeout: int = 120, + github_token: str | None = None, + skip_existing: bool = True, + deps: list[StepSpec] | None = None, + output_path_prefix: str | None = None, + override_output_path: str | None = None, +) -> StepSpec: + """Create a StepSpec that downloads the latest Uncheatable Eval dumps.""" + + def _run(output_path: str) -> dict: + cfg = UncheatableEvalDownloadConfig( + output_path=output_path, + repo_owner=repo_owner, + repo_name=repo_name, + data_path=data_path, + branch=branch, + max_concurrent_downloads=max_concurrent_downloads, + request_timeout=request_timeout, + github_token=github_token, + skip_existing=skip_existing, + ) + return download_latest_uncheatable_eval(cfg) + + return StepSpec( + name=name, + fn=_run, + deps=deps or [], + hash_attrs={ + "repo_owner": repo_owner, + "repo_name": repo_name, + "data_path": data_path, + "branch": branch, + }, + output_path_prefix=output_path_prefix, + override_output_path=override_output_path, + ) + + def make_uncheatable_eval_step( *, name: str = "raw/uncheatable-eval/latest", @@ -364,31 +411,19 @@ def make_uncheatable_eval_step( request_timeout: int = 120, github_token: str | None = None, skip_existing: bool = True, -) -> ExecutorStep[UncheatableEvalDownloadConfig]: - """Create an :class:`ExecutorStep` that downloads the latest Uncheatable Eval dumps.""" - - config = UncheatableEvalDownloadConfig( - output_path=this_output_path(), - repo_owner=ensure_versioned(repo_owner), - repo_name=ensure_versioned(repo_name), - data_path=ensure_versioned(data_path), - branch=ensure_versioned(branch), +) -> ExecutorStep: + """Create an ExecutorStep that downloads the latest Uncheatable Eval dumps. + + Backward-compat wrapper around uncheatable_eval_step(). + """ + return uncheatable_eval_step( + name=name, + repo_owner=repo_owner, + repo_name=repo_name, + data_path=data_path, + branch=branch, max_concurrent_downloads=max_concurrent_downloads, request_timeout=request_timeout, github_token=github_token, skip_existing=skip_existing, - ) - - return ExecutorStep( - name=name, - fn=download_latest_uncheatable_eval, - config=config, - ) - - -__all__ = [ - "UncheatableEvalDataset", - "UncheatableEvalDownloadConfig", - "download_latest_uncheatable_eval", - "make_uncheatable_eval_step", -] + ).as_executor_step() diff --git a/lib/marin/src/marin/download/wikipedia/download.py b/lib/marin/src/marin/datakit/download/wikipedia.py similarity index 77% rename from lib/marin/src/marin/download/wikipedia/download.py rename to lib/marin/src/marin/datakit/download/wikipedia.py index 552e546bf9..a989b1ea97 100644 --- a/lib/marin/src/marin/download/wikipedia/download.py +++ b/lib/marin/src/marin/datakit/download/wikipedia.py @@ -2,8 +2,6 @@ # SPDX-License-Identifier: Apache-2.0 """ -wikipedia/download.py - Download script for the Wikipedia raw HTML data, provided by Wikimedia. Home Page: https://dumps.wikimedia.org/other/enterprise_html/runs/ @@ -11,14 +9,14 @@ Example Usage (production, large dataset): ENWIKI=https://dumps.wikimedia.org/other/enterprise_html/runs/20250320/enwiki-NS0-20250320-ENTERPRISE-HTML.json.tar.gz uv run zephyr --backend=ray --max-parallelism=10 \ - lib/marin/src/marin/download/wikipedia/download.py \ + lib/marin/src/marin/datakit/download/wikipedia.py \ --input_urls $ENWIKI \ --revision 20250320 --output_path gs://path/to/output Example Usage (local testing, small dataset): SIMPLEWIKI=https://dumps.wikimedia.org/other/enterprise_html/runs/20250320/simplewiki-NS0-20250320-ENTERPRISE-HTML.json.tar.gz -uv run zephyr --backend=threadpool --max-parallelism=4 --entry-point=download \ - lib/marin/src/marin/download/wikipedia/download.py \ +uv run zephyr --backend=threadpool --max-parallelism=4 --entry-point=main \ + lib/marin/src/marin/datakit/download/wikipedia.py \ --input_urls "[$SIMPLEWIKI]" \ --revision 20250320 --output_path /tmp/wikipedia_test @@ -30,11 +28,10 @@ import os import tarfile from collections.abc import Iterable -from dataclasses import dataclass -import draccus import requests from iris.marin_fs import open_url +from marin.execution.step_spec import StepSpec from marin.utils import fsspec_size from tqdm_loggable.auto import tqdm from zephyr import Dataset, ZephyrContext, atomic_rename, load_jsonl @@ -42,14 +39,7 @@ logger = logging.getLogger(__name__) -@dataclass -class DownloadConfig: - input_urls: list[str] - revision: str - output_path: str - - -def download_tar(url: str, output_prefix) -> str: +def download_tar(url: str, output_prefix: str) -> str: shard_filename = url.split("/")[-1] output_filename = os.path.join(output_prefix, shard_filename) logger.info(f"Downloading URL: {url} to {output_filename}") @@ -100,15 +90,14 @@ def process_file(input_file: str, output_path: str) -> Iterable[str]: raise e -@draccus.wrap() -def download(cfg: DownloadConfig) -> None: +def download_wikipedia(input_urls: list[str], revision: str, output_path: str) -> None: """Download and process Wikipedia data.""" logger.info("Starting transfer of Wikipedia dump...") - output_base = os.path.join(cfg.output_path, cfg.revision) + output_base = os.path.join(output_path, revision) ctx = ZephyrContext(name="download-wikipedia") download_metrics = ctx.execute( - Dataset.from_list(cfg.input_urls) + Dataset.from_list(input_urls) .map(lambda url: download_tar(url, output_base)) .write_jsonl(f"{output_base}/.metrics/download-{{shard:05d}}.jsonl", skip_existing=True), ) @@ -123,3 +112,24 @@ def download(cfg: DownloadConfig) -> None: ) logger.info("Wikipedia dump transfer complete, wrote: %s", list(extracted)) + + +def download_wikipedia_step( + *, + input_urls: list[str] | None = None, + revision: str | None = None, +) -> StepSpec: + """Download Wikipedia HTML dumps.""" + + def _run(output_path: str) -> None: + assert input_urls is not None, "input_urls must be provided to download Wikipedia data" + assert revision is not None, "revision must be provided to download Wikipedia data" + download_wikipedia(input_urls, revision, output_path) + + return StepSpec( + name="raw/wikipedia", + fn=_run, + hash_attrs={"input_urls": input_urls, "revision": revision}, + # NOTE: if no inputs are provided, use the previously downloaded 2024-12-01 data + override_output_path="raw/wikipedia-9273e1" if input_urls is None else None, + ) diff --git a/lib/marin/src/marin/download/__init__.py b/lib/marin/src/marin/download/__init__.py deleted file mode 100644 index b5a56a002d..0000000000 --- a/lib/marin/src/marin/download/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# Copyright The Marin Authors -# SPDX-License-Identifier: Apache-2.0 - -from .huggingface.download_hf import DownloadConfig as HfDownloadConfig -from .huggingface.download_hf import download_hf -from .huggingface.download_hf import download_hf as download_hf_ungated diff --git a/lib/marin/src/marin/download/ar5iv/ar5iv-v04-2024.json b/lib/marin/src/marin/download/ar5iv/ar5iv-v04-2024.json deleted file mode 100644 index 7c178afb61..0000000000 --- a/lib/marin/src/marin/download/ar5iv/ar5iv-v04-2024.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "dataset": "ar5iv", - "version": "v04.2024", - "links": [ - { - "name": "C-UDA-1.0.md", - "url": "", - "checksum": {"type": "md5", "encoding": "hex", "hash": "0476ea786ce0e3291f6eaaabc43e250e"} - }, - { - "name": "ar5iv-04-2024-errors.zip", - "url": "", - "checksum": {"type": "md5", "encoding": "hex", "hash": "9178d9635085a657956402077b4f8301"} - }, - { - "name": "ar5iv-04-2024-no-problem.zip", - "url": "", - "checksum": {"type": "md5", "encoding": "hex", "hash": "6ffa80fa273f29716527db36e1841abf"} - }, - { - "name": "ar5iv-04-2024-warnings.zip", - "url": "", - "checksum": {"type": "md5", "encoding": "hex", "hash": "51582b218f55286e5fe08431eb5e299d"} - } - ] -} diff --git a/lib/marin/src/marin/download/dclm_hq/download_dclm_hq_html.py b/lib/marin/src/marin/download/dclm_hq/download_dclm_hq_html.py deleted file mode 100644 index 9250ede43d..0000000000 --- a/lib/marin/src/marin/download/dclm_hq/download_dclm_hq_html.py +++ /dev/null @@ -1,208 +0,0 @@ -# Copyright The Marin Authors -# SPDX-License-Identifier: Apache-2.0 - -""" -Download DCLM HQ HTML data by fetching HTML content from Common Crawl. - -Processes DCLM HQ JSONL files and enriches them with HTML content fetched from Common Crawl -via a custom index server. Uses zephyr for parallel processing with flattened parallelism. - -Example Usage: -uv run zephyr --backend=ray --max-parallelism=800 --memory=2GB \ - lib/marin/src/marin/download/dclm_hq/download_dclm_hq_html.py \ - --input_path gs://marin-us-central2/raw/dclm-baseline-1.0-parquet/global/ \ - --output_path gs://marin-data/processed/dclm-hq-html/ -""" - -import io -import json -import logging -import os -import re -from dataclasses import dataclass - -import requests -from iris.marin_fs import open_url -import warcio -from marin.utils import fsspec_glob -from tqdm import tqdm -from zephyr import Dataset, ZephyrContext -from zephyr.writers import ensure_parent_dir - -CC_IDX_HOST_URL = "http://34.72.201.218:8080" -logger = logging.getLogger(__name__) - - -@dataclass -class DCLMHQDownloadConfig: - input_path: str - output_path: str - - -@dataclass -class FileTask: - """Represents a single file processing task.""" - - input_file_path: str - output_file_path: str - - -def fetch_warc_from_cc(s3_warc_path: str, length: int, offset: int) -> str: - """ - Fetch a WARC record from Common Crawl S3 bucket using byte range requests we get - from the CC index via `find_html_in_cc`. - Args: - s3_warc_path: Path to WARC file in S3 bucket - length: Length of the record in bytes - offset: Byte offset of the record in the WARC file - Returns: - The WARC record content as a string - """ - # Convert string values to integers - offset = int(offset) - length = int(length) - - # Make range request to CommonCrawl - response = requests.get( - f"https://data.commoncrawl.org/{s3_warc_path}", headers={"Range": f"bytes={offset}-{offset + length - 1}"} - ) - response.raise_for_status() - - # Parse WARC record and extract HTML content - with io.BytesIO(response.content) as stream: - for record in warcio.ArchiveIterator(stream): - content = record.content_stream().read() - return content.decode(errors="ignore") - - raise ValueError(f"No WARC records found in response from {s3_warc_path}") - - -def find_html_in_cc(split_id: str, target_uri: str) -> str | None: - """ - We host our own index of the Common Crawl over GCP which we use in this function. - For each call we receive a list of chunks that contain the HTML content for the given target URI. - We then fetch each chunk and concatenate them together to form the complete HTML content. - Args: - split_id: The split ID of the Common Crawl - target_uri: The target URI to find the HTML content for - Returns: - The HTML content as a string - """ - resp = requests.get(f"{CC_IDX_HOST_URL}/{split_id}-index?url={target_uri}&output=json") - - resp.raise_for_status() - - chunks = [json.loads(chunk) for chunk in resp.text.split("\n") if chunk] - sorted_chunks = sorted(chunks, key=lambda x: x["offset"]) - - html_content = "" - - for chunk in sorted_chunks: - warc_path = chunk["filename"] - length = chunk["length"] - offset = chunk["offset"] - - warc_record = fetch_warc_from_cc(warc_path, length, offset) - - html_content += warc_record - - return html_content - - -def process_file(task: FileTask) -> None: - """Process a single DCLM file, fetching HTML from Common Crawl. - - Args: - task: FileTask containing input and output file paths - """ - logger.info(f"Starting processing of file {task.input_file_path}") - logger.info(f"Source: {task.input_file_path}") - logger.info(f"Destination: {task.output_file_path}") - try: - ensure_parent_dir(task.output_file_path) - with ( - open_url(task.input_file_path, compression="zstd") as source, - open_url(task.output_file_path, "wt", compression="gzip") as output, - ): - text_wrapper = io.TextIOWrapper(source, encoding="utf-8") - - for line in tqdm(text_wrapper, desc="Processing lines"): - row = json.loads(line.strip()) - - # We need to extract the split from where the record was for querying the index - # The only place we have this information is in the warcinfo key in DCLM HQ - # The format is: - # warc-type: WARC/1.1 - # ... - # isPartOf: CC-MAIN-2024-01 - # This however is a string and not a key-value pair, so we need to extract - # the split from it via regex pattern `isPartOf:\s*(CC-MAIN-\d{4}-\d{2})`. - # This pattern groups the value of the key `isPartOf` that is of the form - # `CC-MAIN-xxxx-xx` where `xxxx` is a year and `xx` is a month. - match = re.search(r"isPartOf:\s*(CC-MAIN-\d{4}-\d{2})", row["metadata"]["warcinfo"]) - if match is None: - logger.error(f"No split found for record ID: {row['metadata']['WARC-Record-ID']}") - continue - - is_part_of = match.group(1) - - try: - html_string = find_html_in_cc(is_part_of, row["metadata"]["WARC-Target-URI"]) - - if html_string is None: - logger.error(f"No HTML found for record ID: {row['metadata']['WARC-Record-ID']}") - continue - - if "text" in row: - row.pop("text") - - row["html"] = html_string - - print(json.dumps(row), file=output) - except Exception as e: - logger.exception(f"Error processing line: {e}") - continue - - logger.info("\nProcessing completed successfully!") - logger.info(f"File available at: {task.output_file_path}") - - except Exception as e: - logger.error(f"Error during processing: {e}") - raise - - -def extract_dclm_hq_dump(cfg: DCLMHQDownloadConfig) -> None: - """Process the DCLM HQ dump in the input path and save the results to the output path. - - Flattens the nested directory structure (shards → files) into a single list of files - and processes them in parallel using zephyr. - """ - logger.info(f"Starting processing of DCLM HQ dump in {cfg.input_path}") - - # Flatten nested structure: discover all files upfront - all_files = [] - paths = [i.split("/")[-1] for i in fsspec_glob(os.path.join(cfg.input_path, "*"))] - - logger.info(f"Found {len(paths)} shards to process") - - for path in paths: - input_path = os.path.join(cfg.input_path, path) - shard_paths = fsspec_glob(os.path.join(input_path, "*.json.zst")) - - for shard_path in shard_paths: - input_file_path = shard_path - output_file_path = os.path.join(cfg.output_path, path, os.path.basename(shard_path)).replace( - ".json.zst", ".jsonl.gz" - ) - - all_files.append(FileTask(input_file_path=input_file_path, output_file_path=output_file_path)) - - logger.info(f"Found {len(all_files)} files to process") - - # Single-level parallelism over all files - pipeline = Dataset.from_list(all_files).map(process_file) - - ctx = ZephyrContext(name="download-dclm-html") - ctx.execute(pipeline) - - logger.info("Processing completed successfully!") diff --git a/lib/marin/src/marin/download/filesystem/transfer.py b/lib/marin/src/marin/download/filesystem/transfer.py deleted file mode 100644 index e28a6667d8..0000000000 --- a/lib/marin/src/marin/download/filesystem/transfer.py +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright The Marin Authors -# SPDX-License-Identifier: Apache-2.0 - -import os -import random -import time -from dataclasses import dataclass - -from iris.marin_fs import url_to_fs -from zephyr import Dataset, ZephyrContext - -from marin.utils import fsspec_exists, fsspec_glob - - -@dataclass -class TransferConfig: - input_path: str - output_path: str - - # Selectively choose the number of random files to transfer. None means all files - num_random_files: int | None = None - filetype: str = "jsonl.zst" - - -def transfer_files(config: TransferConfig) -> None: - """Transfers files from the input path to the output path. - - When num_random_files is None, copies the entire directory recursively. - When num_random_files is specified, randomly samples that many files and - copies them in parallel using zephyr. - """ - if config.input_path.endswith("/"): - input_path = config.input_path[:-1] - else: - input_path = config.input_path - - print(f"Downloading {input_path} from GCS.") - start_time: float = time.time() - fs, _ = url_to_fs(input_path) - if not fs.exists(input_path): - raise FileNotFoundError(f"{input_path} does not exist.") - - # Glob all matching files - filenames = fsspec_glob(os.path.join(input_path, f"**/*.{config.filetype}")) - - # Select files: either random sample or all files - if config.num_random_files is None: - selected_files = filenames - else: - random.seed(42) - random.shuffle(filenames) - selected_files = filenames[: config.num_random_files] - - def copy_file(filename: str) -> None: - """Copy a single file if it doesn't already exist at destination.""" - output_filename = os.path.join(config.output_path, os.path.basename(filename)) - if not fsspec_exists(output_filename): - # Ensure output directory exists - fs.makedirs(config.output_path, exist_ok=True) - fs.copy(filename, output_filename) - - # Always use parallel copying via zephyr - pipeline = Dataset.from_list(selected_files).map(copy_file) - ctx = ZephyrContext(name="fs-transfer") - ctx.execute(pipeline) - - elapsed_time_seconds: float = time.time() - start_time - print(f"Downloaded {input_path} to {config.output_path} ({elapsed_time_seconds}s).") diff --git a/lib/marin/src/marin/download/huggingface/stream_remove_columns.py b/lib/marin/src/marin/download/huggingface/stream_remove_columns.py deleted file mode 100644 index b16e3a1f1b..0000000000 --- a/lib/marin/src/marin/download/huggingface/stream_remove_columns.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright The Marin Authors -# SPDX-License-Identifier: Apache-2.0 - -"""Remove unnecessary columns while streaming data from huggingface.""" - -import logging -import os -from dataclasses import dataclass - -import pandas as pd -import pyarrow.parquet as pq -from huggingface_hub import HfFileSystem -from tqdm import tqdm -from zephyr import Dataset, ZephyrContext - -hf_fs = HfFileSystem() -logger = logging.getLogger(__name__) - - -def prune_stream_and_save(input_file: str, output_file: str, keep_columns: list[str]): - """ - Prunes and saves a parquet file by removing un-specified columns. - - Reads the input parquet file in batches, removes columns not in keep_columns, - and writes the result to output_file. Processing in batches avoids memory issues. - - Args: - input_file (str): Path to input parquet file on HuggingFace - output_file (str): Path where pruned parquet file will be saved - keep_columns (list[str]): List of column names to retain - """ - parquet_file = pq.ParquetFile(hf_fs.open(input_file)) - - full_df_list = [] - for batch in tqdm(parquet_file.iter_batches(batch_size=10000), desc=f"Processing {input_file}"): - df = batch.to_pandas() - - drop_columns = [col for col in df.columns if col not in keep_columns] - df = df.drop(columns=drop_columns) - - full_df_list.append(df) - - full_df = pd.concat(full_df_list) - logger.info(f"Saving pruned dataset of shape {full_df.shape} to {output_file}") - full_df.to_parquet(output_file, index=False) - - -def get_file_tasks(hf_path: str, output_path: str, keep_columns: list[str]): - """ - Generate file processing tasks for a HuggingFace subset. - - Args: - hf_path (str): The HuggingFace dataset path to load - output_path (str): The output path to save the pruned dataset - keep_columns (list[str]): The columns to keep in the pruned dataset - - Yields: - Dict with input_file, output_file, and keep_columns for each parquet file - """ - logger.info(f"Loading dataset from {hf_path}") - parquet_list = hf_fs.glob(f"{hf_path}/*.parquet") - - for file in parquet_list: - output_file = os.path.join(output_path, os.path.basename(file)) - yield {"input_file": file, "output_file": output_file, "keep_columns": keep_columns} - - -@dataclass -class DatasetConfig: - hf_repo_id: str - hf_revision: str - hf_paths: list[str] - output_path: str - keep_columns: list[str] - - -def prune_hf_dataset(cfg: DatasetConfig): - logger.info(f"Starting dataset pruning for {cfg.hf_paths}") - - # Build list of subset paths to process - subset_tasks = [] - for path in cfg.hf_paths: - # HF Path form: hf://[][@]/ - hf_path = f"hf://datasets/{cfg.hf_repo_id}@{cfg.hf_revision}/{path}" - logger.info(f"Processing subset {hf_path}") - output_path = os.path.join(cfg.output_path, path) - subset_tasks.append({"hf_path": hf_path, "output_path": output_path}) - - # Build pipeline with nested parallelism: - # - Outer level: process subsets (MAX_CONCURRENT_WORKERS=1) - # - Inner level: process files within each subset - pipeline = ( - Dataset.from_list(subset_tasks) - .flat_map(lambda task: get_file_tasks(task["hf_path"], task["output_path"], cfg.keep_columns)) - .map(lambda task: prune_stream_and_save(task["input_file"], task["output_file"], cfg.keep_columns)) - ) - - logger.info("Executing pipeline") - ctx = ZephyrContext(name="hf-remove-columns") - ctx.execute(pipeline) - logger.info("Successfully processed all subsets") diff --git a/lib/marin/src/marin/execution/step_spec.py b/lib/marin/src/marin/execution/step_spec.py index 76ef153aaa..bed4db725d 100644 --- a/lib/marin/src/marin/execution/step_spec.py +++ b/lib/marin/src/marin/execution/step_spec.py @@ -10,10 +10,18 @@ from dataclasses import dataclass from functools import cached_property from typing import Any +from urllib.parse import urlparse from iris.marin_fs import marin_prefix +def _is_relative_path(url_or_path: str) -> bool: + """Return True if the path is relative (not a URL and doesn't start with /).""" + if urlparse(url_or_path).scheme: + return False + return not url_or_path.startswith("/") + + @dataclass(frozen=True) class _StepSpecMigrationConfig: """Temporary config used by ``StepSpec.as_executor_step()`` during the @@ -86,11 +94,17 @@ def name_with_hash(self) -> str: @cached_property def output_path(self) -> str: - """Output path of the step""" - if self.override_output_path is not None: - return self.override_output_path + """Output path of the step. + If ``override_output_path`` is set and relative (no URL scheme, doesn't + start with ``/``), it is automatically prefixed with ``output_path_prefix`` + or ``marin_prefix()``. + """ prefix = self.output_path_prefix or marin_prefix() + if self.override_output_path is not None: + if _is_relative_path(self.override_output_path): + return f"{prefix}/{self.override_output_path}" + return self.override_output_path return f"{prefix}/{self.name_with_hash}" def as_executor_step(self) -> ExecutorStep: # noqa: F821 diff --git a/lib/marin/src/marin/processing/tokenize/download_pretokenized.py b/lib/marin/src/marin/processing/tokenize/download_pretokenized.py index 91fdaca495..cab2433bec 100644 --- a/lib/marin/src/marin/processing/tokenize/download_pretokenized.py +++ b/lib/marin/src/marin/processing/tokenize/download_pretokenized.py @@ -18,7 +18,7 @@ ) from levanter.store.cache import CacheOptions -from marin.download.huggingface.download_hf import ( +from marin.datakit.download.huggingface import ( DownloadConfig as HfDownloadConfig, download_hf as hf_download_logic, ) diff --git a/lib/marin/src/marin/speedrun/paloma_local_download.py b/lib/marin/src/marin/speedrun/paloma_local_download.py index c7335a52c5..e2ee68f766 100644 --- a/lib/marin/src/marin/speedrun/paloma_local_download.py +++ b/lib/marin/src/marin/speedrun/paloma_local_download.py @@ -8,8 +8,8 @@ """ from experiments.paloma import paloma_tokenized -from marin.download import HfDownloadConfig -from marin.download.huggingface.download_hf import download_hf +from marin.datakit.download.huggingface import DownloadConfig as HfDownloadConfig +from marin.datakit.download.huggingface import download_hf from marin.execution.executor import ExecutorStep, executor_main, this_output_path, versioned llama3_tokenizer = "meta-llama/Meta-Llama-3.1-8B" diff --git a/lib/marin/src/marin/transform/dolmino/transform_dclm_hq.py b/lib/marin/src/marin/transform/dolmino/transform_dclm_hq.py deleted file mode 100644 index 773cb3242a..0000000000 --- a/lib/marin/src/marin/transform/dolmino/transform_dclm_hq.py +++ /dev/null @@ -1,155 +0,0 @@ -# Copyright The Marin Authors -# SPDX-License-Identifier: Apache-2.0 - -""" -marin/transform/dolmino/transform_dclm_hq.py - -Performs HTML->Text/MD conversion using the specified tools over a DCLM HQ dump save in DOLMA format. - -Example Usage (production, large dataset): -uv run zephyr --backend=ray --max-parallelism=200 --memory=2GB \ - lib/marin/src/marin/transform/dolmino/transform_dclm_hq.py \ - --entry-point=process_dclm_hq_dump \ - --input_hf_path "hf://datasets/allenai/dolmino-mix-1124@main/data/dclm" \ - --output_path gs://bucket/processed/dclm-hq \ - --extract_method resiliparse \ - --extract_config.type resiliparse \ - --hf_repo_id "allenai/dolmino-mix-1124" \ - --hf_revision "main" \ - --hf_paths '["data/dclm"]' - -Example Usage (local testing, small dataset): -uv run zephyr --backend=threadpool --max-parallelism=2 --entry-point=process_dclm_hq_dump \ - lib/marin/src/marin/transform/dolmino/transform_dclm_hq.py \ - --input_hf_path "hf://datasets/allenai/dolmino-mix-1124@main/data/dclm" \ - --output_path /tmp/dclm_hq_test \ - --extract_method trafilatura \ - --extract_config.type trafilatura \ - --extract_config.favor_precision false \ - --extract_config.favor_recall true \ - --hf_repo_id "allenai/dolmino-mix-1124" \ - --hf_revision "main" \ - --hf_paths '["data/dclm"]' \ - --max_split 1 -""" - -import json -import logging -import os -from dataclasses import dataclass - -import draccus -from iris.marin_fs import open_url, url_to_fs -from marin.download.dclm_hq.download_dclm_hq_html import find_html_in_cc -from marin.download.huggingface.stream_remove_columns import hf_fs -from marin.schemas.web.convert import ExtractionConfig -from marin.web.convert import convert_page -from tqdm import tqdm -from zephyr import Dataset, ZephyrContext -from zephyr.writers import atomic_rename - -logger = logging.getLogger(__name__) - - -@dataclass -class DCLMHQExtractionConfig: - input_hf_path: str - output_path: str - extract_method: str - extract_config: ExtractionConfig - hf_repo_id: str - hf_revision: str - hf_paths: list[str] - max_split: int | None = None - - -def process_file( - input_file_path: str, - output_file_path: str, - extract_method: str, - extract_config: ExtractionConfig, -) -> None: - logger.info(f"Starting processing of file {input_file_path}") - logger.info(f"Source: {input_file_path}") - logger.info(f"Destination: {output_file_path}") - - with atomic_rename(output_file_path) as temp_path: - with ( - open_url(input_file_path, "rt", compression="zstd") as source, - open_url(temp_path, "wt", compression="gzip") as output, - ): - for line in tqdm(source, desc="Processing lines"): - row = json.loads(line) - - try: - html_string = find_html_in_cc(row["metadata"]["WARC-Record-ID"], row["metadata"]["WARC-Target-URI"]) - - if html_string is None: - logger.error(f"No HTML found for record ID: {row['metadata']['WARC-Record-ID']}") - continue - - content = convert_page(html_string, extract_method=extract_method, config=extract_config)["content"] - - if content is None: - continue - - out_dict = { - "id": row["id"], - "source": row["source"], - "metadata": row["metadata"], - "text": content, - } - - print(json.dumps(out_dict), file=output) # Without this line, the JSON file will be corrupted - except Exception as e: - logger.exception(f"Error processing line: {e}") - continue - - logger.info("\nProcessing completed successfully!") - logger.info(f"File available at: {output_file_path}") - - -@draccus.wrap() -def process_dclm_hq_dump(cfg: DCLMHQExtractionConfig) -> None: - logger.info(f"Starting processing of DCLM HQ dump in {cfg.input_hf_path}") - - # Glob all files across all shards upfront - all_files = [] - paths = [i.split("/")[-1] for i in hf_fs.ls(cfg.input_hf_path, detail=False)] - paths = paths[: cfg.max_split] if cfg.max_split else paths - - logger.info(f"Found {len(paths)} shards to process") - - for path in paths: - input_path = os.path.join(cfg.input_hf_path, path) - shard_paths = [i.split("/")[-1] for i in hf_fs.glob(os.path.join(input_path, "*.json.zst"))] - - for shard_path in shard_paths: - input_file_path = os.path.join(input_path, shard_path) - output_file_path = os.path.join(cfg.output_path, path, shard_path).replace(".json.zst", ".jsonl.gz") - all_files.append( - { - "input": input_file_path, - "output": output_file_path, - "extract_method": cfg.extract_method, - "extract_config": cfg.extract_config, - } - ) - - logger.info(f"Total files to process: {len(all_files)}") - - pipeline = ( - Dataset.from_list(all_files) - .filter(lambda f: not url_to_fs(f["output"])[0].exists(f["output"])) - .map( - lambda f: process_file( - input_file_path=f["input"], - output_file_path=f["output"], - extract_method=f["extract_method"], - extract_config=f["extract_config"], - ) - ) - ) - - ctx = ZephyrContext(name="transform-dclm-hq") - ctx.execute(pipeline) diff --git a/lib/marin/src/marin/download/huggingface/upload_gcs_to_hf.py b/lib/marin/src/marin/utilities/upload_gcs_to_hf.py similarity index 100% rename from lib/marin/src/marin/download/huggingface/upload_gcs_to_hf.py rename to lib/marin/src/marin/utilities/upload_gcs_to_hf.py diff --git a/tests/datakit/__init__.py b/tests/datakit/__init__.py new file mode 100644 index 0000000000..ec8bc038b7 --- /dev/null +++ b/tests/datakit/__init__.py @@ -0,0 +1,2 @@ +# Copyright The Marin Authors +# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/datakit/download/__init__.py b/tests/datakit/download/__init__.py new file mode 100644 index 0000000000..ec8bc038b7 --- /dev/null +++ b/tests/datakit/download/__init__.py @@ -0,0 +1,2 @@ +# Copyright The Marin Authors +# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/download/conftest.py b/tests/datakit/download/conftest.py similarity index 100% rename from tests/download/conftest.py rename to tests/datakit/download/conftest.py diff --git a/tests/download/test_ar5iv.py b/tests/datakit/download/test_ar5iv.py similarity index 97% rename from tests/download/test_ar5iv.py rename to tests/datakit/download/test_ar5iv.py index 442d557637..570fb706e3 100644 --- a/tests/download/test_ar5iv.py +++ b/tests/datakit/download/test_ar5iv.py @@ -7,7 +7,7 @@ import pytest -from marin.download.ar5iv.download import DownloadConfig, download +from marin.datakit.download.ar5iv import Ar5ivDownloadConfig as DownloadConfig, download @pytest.fixture diff --git a/tests/download/test_huggingface.py b/tests/datakit/download/test_huggingface.py similarity index 75% rename from tests/download/test_huggingface.py rename to tests/datakit/download/test_huggingface.py index 1019c83633..f055cc94ca 100644 --- a/tests/download/test_huggingface.py +++ b/tests/datakit/download/test_huggingface.py @@ -7,19 +7,14 @@ import json from unittest.mock import MagicMock, Mock, patch -import pandas as pd import pytest -from marin.download.huggingface.download_hf import ( +from marin.datakit.download.huggingface import ( DownloadConfig, _relative_path_in_source, download_hf, stream_file_to_fsspec, ) -from marin.download.huggingface.stream_remove_columns import ( - DatasetConfig, - prune_hf_dataset, -) @pytest.fixture @@ -81,7 +76,7 @@ def test_download_hf_basic(mock_hf_fs, tmp_path): ) # Mock HfFileSystem creation - with patch("marin.download.huggingface.download_hf.HfFileSystem", return_value=hf_fs): + with patch("marin.datakit.download.huggingface.HfFileSystem", return_value=hf_fs): download_hf(cfg) # Verify files were downloaded @@ -123,7 +118,7 @@ def test_download_hf_appends_sha_when_configured(mock_hf_fs, tmp_path): append_sha_to_path=True, ) - with patch("marin.download.huggingface.download_hf.HfFileSystem", return_value=hf_fs): + with patch("marin.datakit.download.huggingface.HfFileSystem", return_value=hf_fs): download_hf(cfg) target_output = base_output_path / revision @@ -155,50 +150,6 @@ def test_download_hf_bucket_requires_newer_huggingface_hub(tmp_path): download_hf(cfg) -def test_prune_hf_dataset(tmp_path): - """Test full dataset pruning pipeline.""" - # Create test parquet data - test_data = pd.DataFrame( - { - "id": [1, 2], - "text": ["hello", "world"], - "unwanted": ["a", "b"], - } - ) - - # Create multiple buffers since each call needs a fresh one - def create_buffer(): - buffer = io.BytesIO() - test_data.to_parquet(buffer, index=False) - buffer.seek(0) - return buffer - - cfg = DatasetConfig( - hf_repo_id="test-org/test-dataset", - hf_revision="main", - hf_paths=["data"], - output_path=str(tmp_path / "output"), - keep_columns=["id", "text"], - ) - - # Create output directory structure - output_dir = tmp_path / "output" / "data" - output_dir.mkdir(parents=True) - - mock_fs = MagicMock() - mock_fs.glob = Mock(return_value=["hf://datasets/test-org/test-dataset@main/data/file.parquet"]) - mock_fs.open = Mock(side_effect=lambda path, mode="rb": create_buffer()) - - with patch("marin.download.huggingface.stream_remove_columns.hf_fs", mock_fs): - prune_hf_dataset(cfg) - - # Verify output - output_file = tmp_path / "output" / "data" / "file.parquet" - assert output_file.exists() - result_df = pd.read_parquet(output_file) - assert list(result_df.columns) == ["id", "text"] - - def test_stream_file_to_fsspec_retries_on_timeout(tmp_path): """A socket timeout while reading should trigger retry and then succeed.""" file_path = "datasets/test-org/test-dataset/data/file1.txt" @@ -229,8 +180,8 @@ def read(self, chunk_size): hf_fs.open.side_effect = lambda path, mode="rb", **_kwargs: FlakyReader() with ( - patch("marin.download.huggingface.download_hf.HfFileSystem", return_value=hf_fs), - patch("marin.download.huggingface.download_hf.time.sleep", return_value=None), + patch("marin.datakit.download.huggingface.HfFileSystem", return_value=hf_fs), + patch("marin.datakit.download.huggingface.time.sleep", return_value=None), ): result = stream_file_to_fsspec( str(output_path), diff --git a/tests/download/test_nemotron_cc.py b/tests/datakit/download/test_nemotron_cc.py similarity index 91% rename from tests/download/test_nemotron_cc.py rename to tests/datakit/download/test_nemotron_cc.py index 6f3bdff56c..e8ed0e2de1 100644 --- a/tests/download/test_nemotron_cc.py +++ b/tests/datakit/download/test_nemotron_cc.py @@ -9,10 +9,10 @@ import pytest import zstandard as zstd from iris.marin_fs import open_url as _real_open_url -from marin.download.nemotron_cc.download_nemotron_cc import NemotronIngressConfig, download_nemotron_cc +from marin.datakit.download.nemotron_v1 import download_nemotron_cc -_OPEN_URL_TARGET = "marin.download.nemotron_cc.download_nemotron_cc.open_url" -_REQUESTS_SESSION_TARGET = "marin.download.nemotron_cc.download_nemotron_cc.requests.Session" +_OPEN_URL_TARGET = "marin.datakit.download.nemotron_v1.open_url" +_REQUESTS_SESSION_TARGET = "marin.datakit.download.nemotron_v1.requests.Session" SAMPLE_NEMOTRON_RECORDS = [ { @@ -114,8 +114,7 @@ def test_download_nemotron_cc_pipeline(tmp_path, mock_paths_open): patch(_OPEN_URL_TARGET, side_effect=mock_paths_open(paths)), patch(_REQUESTS_SESSION_TARGET, _mock_session_for({"file1": file1_data, "file2": file2_data})), ): - cfg = NemotronIngressConfig(output_path=str(output_dir)) - download_nemotron_cc(cfg) + download_nemotron_cc(str(output_dir)) all_records = read_all_jsonl_zst(output_dir / "contrib" / "Nemotron") @@ -152,8 +151,7 @@ def test_download_nemotron_cc_dolma_format(tmp_path, mock_paths_open): patch(_OPEN_URL_TARGET, side_effect=mock_paths_open(paths)), patch(_REQUESTS_SESSION_TARGET, _mock_session_for({"test": compressed_data})), ): - cfg = NemotronIngressConfig(output_path=str(output_dir)) - download_nemotron_cc(cfg) + download_nemotron_cc(str(output_dir)) records = read_all_jsonl_zst(output_dir / "contrib" / "Nemotron") assert len(records) == 1 @@ -188,8 +186,7 @@ def test_download_nemotron_cc_skips_existing(tmp_path, mock_paths_open): patch(_OPEN_URL_TARGET, side_effect=mock_paths_open(paths)), patch(_REQUESTS_SESSION_TARGET) as mock_session, ): - cfg = NemotronIngressConfig(output_path=str(output_dir)) - download_nemotron_cc(cfg) + download_nemotron_cc(str(output_dir)) mock_session.return_value.get.assert_not_called() assert existing_output.read_text() == "existing" diff --git a/tests/datakit/test_datakit.py b/tests/datakit/test_datakit.py new file mode 100644 index 0000000000..8a9286763b --- /dev/null +++ b/tests/datakit/test_datakit.py @@ -0,0 +1,61 @@ +# Copyright The Marin Authors +# SPDX-License-Identifier: Apache-2.0 + +"""Integration test for the datakit pipeline: download → tokenize, wired as StepSpecs.""" + +from pathlib import Path + +import numpy as np +import pytest +from levanter.store.cache import CacheLedger, TreeCache + +from marin.datakit.download.huggingface import download_hf_step +from marin.execution.step_runner import StepRunner +from marin.execution.step_spec import StepSpec +from marin.processing.tokenize.tokenize import TokenizeConfig, tokenize + + +@pytest.mark.slow +def test_download_and_tokenize(tmp_path): + """Download → tokenize as a StepSpec DAG via StepRunner.""" + + dl = download_hf_step( + "datakit/download", + hf_dataset_id="wikitext", + revision="main", + hf_urls_glob=["wikitext-2-v1/test-*.parquet"], + override_output_path=str(tmp_path / "raw"), + ) + + tok = StepSpec( + name="datakit/tokenize", + fn=lambda output_path: tokenize( + TokenizeConfig( + train_paths=[dl.output_path], + validation_paths=[], + cache_path=output_path, + tokenizer="gpt2", + allow_test_in_train=True, + ) + ), + deps=[dl], + hash_attrs={"tokenizer": "gpt2"}, + override_output_path=str(tmp_path / "tokenized"), + ) + + StepRunner().run([dl, tok]) + + # -- Verify download output -- + raw_files = [f for f in Path(dl.output_path).rglob("*") if f.is_file() and not f.name.startswith(".")] + assert len(raw_files) >= 1 + + # -- Verify tokenize output -- + train_dir = Path(tok.output_path) / "train" + ledger = CacheLedger.load(str(train_dir)) + assert ledger.is_finished + assert ledger.total_num_rows > 0 + + exemplar = {"input_ids": np.array([0], dtype=np.int32)} + cache = TreeCache.load(str(train_dir), exemplar=exemplar) + assert len(cache) == ledger.total_num_rows + assert len(cache[0]["input_ids"]) > 0 diff --git a/tests/download/test_dclm_hq.py b/tests/download/test_dclm_hq.py deleted file mode 100644 index 1636f3c34d..0000000000 --- a/tests/download/test_dclm_hq.py +++ /dev/null @@ -1,197 +0,0 @@ -# Copyright The Marin Authors -# SPDX-License-Identifier: Apache-2.0 - -"""Tests for DCLM HQ download script that fetches HTML from Common Crawl.""" - -import json -from unittest.mock import patch - -import zstandard as zstd -from marin.download.dclm_hq.download_dclm_hq_html import DCLMHQDownloadConfig, extract_dclm_hq_dump - -SAMPLE_DCLM_RECORDS = [ - { - "id": "test-doc-001", - "source": "common-crawl", - "text": "This is the original text that should be removed.", - "metadata": { - "WARC-Record-ID": "", - "WARC-Target-URI": "http://example.com/test-page", - "warcinfo": ( - "warc-type: WARC/1.1\nWARC-Date: 2024-01-15T10:30:00Z\nisPartOf: CC-MAIN-2024-01\ndescription: Test WARC" - ), - }, - }, - { - "id": "test-doc-002", - "source": "common-crawl", - "text": "This is another original text.", - "metadata": { - "WARC-Record-ID": "", - "WARC-Target-URI": "http://example.com/another-page", - "warcinfo": ( - "warc-type: WARC/1.1\nWARC-Date: 2024-01-15T11:30:00Z\nisPartOf: CC-MAIN-2024-01\ndescription: Test WARC" - ), - }, - }, - { - "id": "test-doc-003", - "source": "common-crawl", - "text": "Third document text.", - "metadata": { - "WARC-Record-ID": "", - "WARC-Target-URI": "http://example.com/third-page", - "warcinfo": ( - "warc-type: WARC/1.1\nWARC-Date: 2024-02-10T09:00:00Z\nisPartOf: CC-MAIN-2024-10\ndescription: Test WARC" - ), - }, - }, -] - -SAMPLE_WARC_HTML = { - "http://example.com/test-page": ( - """ - -Test Page - -

Test Article

-

This is test content from Common Crawl.

- -""" - ), - "http://example.com/another-page": ( - """ - -Another Page - -

Another Article

-

Different content here.

- -""" - ), - "http://example.com/third-page": ( - """ - -Third Page - -

Third Article

-

More content.

- -""" - ), -} - - -def create_warc_bytes(html_content: str) -> bytes: - """Create minimal WARC record bytes for testing.""" - http_response = ( - "HTTP/1.1 200 OK\r\n" - "Content-Type: text/html\r\n" - f"Content-Length: {len(html_content.encode())}\r\n" - "\r\n" - f"{html_content}" - ) - - warc_header = ( - "WARC/1.0\r\n" - "WARC-Type: response\r\n" - "WARC-Record-ID: \r\n" - "WARC-Target-URI: http://example.com/test\r\n" - "Content-Type: application/http; msgtype=response\r\n" - f"Content-Length: {len(http_response.encode())}\r\n" - "\r\n" - ) - - full_warc = warc_header + http_response + "\r\n\r\n" - return full_warc.encode() - - -def create_zstd_compressed_jsonl(records: list[dict]) -> bytes: - """Create zstd compressed JSONL content.""" - jsonl_content = "\n".join(json.dumps(record) for record in records) + "\n" - jsonl_bytes = jsonl_content.encode("utf-8") - cctx = zstd.ZstdCompressor() - return cctx.compress(jsonl_bytes) - - -def test_extract_dclm_hq_pipeline(tmp_path, read_all_jsonl_gz): - """Test full DCLM HQ download pipeline with zephyr integration.""" - output_dir = tmp_path / "output" - output_dir.mkdir() - - # Create input files in nested structure - shard1_dir = tmp_path / "input" / "shard1" - shard2_dir = tmp_path / "input" / "shard2" - shard1_dir.mkdir(parents=True) - shard2_dir.mkdir(parents=True) - - file1_data = create_zstd_compressed_jsonl([SAMPLE_DCLM_RECORDS[0]]) - file2_data = create_zstd_compressed_jsonl(SAMPLE_DCLM_RECORDS[1:]) - - file1_path = shard1_dir / "file1.json.zst" - file2_path = shard2_dir / "file2.json.zst" - - file1_path.write_bytes(file1_data) - file2_path.write_bytes(file2_data) - - def mock_requests_get(url, **kwargs): - from unittest.mock import Mock - - # Mock CC index server responses - if "CC-MAIN-2024-01-index" in url: - response = Mock() - response.status_code = 200 - if "test-page" in url: - response.text = json.dumps({"filename": "test.warc.gz", "offset": "0", "length": "1000"}) - else: # another-page - response.text = json.dumps({"filename": "test2.warc.gz", "offset": "0", "length": "1000"}) - response.raise_for_status = Mock() - return response - elif "CC-MAIN-2024-10-index" in url: - response = Mock() - response.status_code = 200 - response.text = json.dumps({"filename": "test3.warc.gz", "offset": "0", "length": "1000"}) - response.raise_for_status = Mock() - return response - # Mock Common Crawl WARC fetches - elif "data.commoncrawl.org" in url: - response = Mock() - response.status_code = 200 - # Determine which HTML to return based on the WARC file - if "test.warc.gz" in url: - html_content = SAMPLE_WARC_HTML["http://example.com/test-page"] - elif "test2.warc.gz" in url: - html_content = SAMPLE_WARC_HTML["http://example.com/another-page"] - else: # test3.warc.gz - html_content = SAMPLE_WARC_HTML["http://example.com/third-page"] - response.content = create_warc_bytes(html_content) - response.raise_for_status = Mock() - return response - - raise ValueError(f"Unexpected URL: {url}") - - with patch("marin.download.dclm_hq.download_dclm_hq_html.requests.get", side_effect=mock_requests_get): - cfg = DCLMHQDownloadConfig(input_path=str(tmp_path / "input"), output_path=str(output_dir)) - extract_dclm_hq_dump(cfg) - - # Verify output files were created in nested structure - shard1_output = output_dir / "shard1" - shard2_output = output_dir / "shard2" - - assert shard1_output.exists() - assert shard2_output.exists() - - # Read all records - all_records = [] - all_records.extend(read_all_jsonl_gz(shard1_output, "*.jsonl.gz")) - all_records.extend(read_all_jsonl_gz(shard2_output, "*.jsonl.gz")) - - assert len(all_records) == 3 - - # Verify records have HTML and no text - for record in all_records: - assert "id" in record - assert "html" in record - assert "text" not in record - assert "metadata" in record - assert len(record["html"]) > 0 diff --git a/tests/test_hfdataset_spec.py b/tests/test_hfdataset_spec.py index 7bdd0d535c..14ad782471 100644 --- a/tests/test_hfdataset_spec.py +++ b/tests/test_hfdataset_spec.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 from experiments.defaults import default_download, default_tokenize -from marin.download.huggingface.download_hf import DownloadConfig +from marin.datakit.download.huggingface import DownloadConfig from marin.processing.tokenize import HfDatasetSpec from marin.processing.tokenize.tokenize import HfTokenizeConfig, TokenizeConfig