Skip to content

Commit 1c759e1

Browse files
ravwojdylaclaude
andcommitted
Add datakit normalize steps for nsf_awards and nemotron v1/v2
Extend the convention established by common_corpus and starcoder2-extras (#4626): expose a normalize_<dataset>_step factory in each datakit download module and wire experiments through download -> normalize -> tokenize. Since normalize now processes a single directory (#4886), datasets with multiple sub-datasets (nemotron v1/v2) get one normalize step per split: - nsf_awards: one step, id_field="awd_id", file_extensions=(".parquet",) - nemotron_v1: one step per quality/kind split (7 splits defined in NEMOTRON_V1_SPLITS), file_extensions=(".jsonl.gz",) - nemotron_v2: one step per (family, subset) pair, file_extensions=(".parquet",) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 40c88f1 commit 1c759e1

5 files changed

Lines changed: 104 additions & 13 deletions

File tree

experiments/pretraining_datasets/nemotron_v2.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,36 @@
55
Nemotron v2 pre-training dataset tokenization.
66
77
Download definitions live in marin.datakit.download.nemotron_v2.
8-
This file wires them into tokenization steps for experiment pipelines.
8+
This file wires them into normalize + tokenization steps for experiment
9+
pipelines.
910
"""
1011

1112
import os.path
1213

13-
from marin.datakit.download.nemotron_v2 import NEMOTRON_V2_DATASETS, download_nemotron_v2_step
14+
from marin.datakit.download.nemotron_v2 import (
15+
NEMOTRON_V2_DATASETS,
16+
download_nemotron_v2_step,
17+
normalize_nemotron_v2_step,
18+
)
1419
from marin.execution.executor import ExecutorStep, this_output_path, versioned
1520
from marin.processing.tokenize import TokenizeConfig, tokenize
1621
from marin.processing.tokenize.data_configs import TokenizerStep
1722

1823
# ============================================================================
19-
# RAW DATASET DOWNLOADS
24+
# RAW DATASET DOWNLOADS AND NORMALIZED OUTPUTS
2025
# ============================================================================
2126

22-
downloads: dict[str, ExecutorStep] = {
23-
family: download_nemotron_v2_step(family).as_executor_step() for family in NEMOTRON_V2_DATASETS
24-
}
27+
_downloads = {family: download_nemotron_v2_step(family) for family in NEMOTRON_V2_DATASETS}
28+
29+
downloads: dict[str, ExecutorStep] = {family: step.as_executor_step() for family, step in _downloads.items()}
30+
31+
# One normalize step per (family, subset) — normalize now processes a single directory.
32+
normalized: dict[str, dict[str, ExecutorStep]] = {}
33+
for _family, _dl in _downloads.items():
34+
normalized[_family] = {
35+
subset: normalize_nemotron_v2_step(_dl, family=_family, subset=subset).as_executor_step()
36+
for subset in NEMOTRON_V2_DATASETS[_family].subsets
37+
}
2538

2639

2740
# ============================================================================
@@ -34,23 +47,28 @@ def tokenize_nemotron_v2_family(
3447
*,
3548
tokenizer: str | None = None,
3649
) -> dict[str, TokenizerStep]:
37-
"""Generate tokenization steps for all subsets of a Nemotron HF dataset family."""
50+
"""Generate tokenization steps for all subsets of a Nemotron HF dataset family.
51+
52+
Each subset has its own normalize step; tokenize reads from its
53+
``outputs/main/`` directory.
54+
"""
3855
if tokenizer is None:
3956
from experiments.llama import llama3_tokenizer
4057

4158
tokenizer = llama3_tokenizer
4259

4360
info = NEMOTRON_V2_DATASETS[family]
44-
download_step = downloads[family]
61+
family_normalized = normalized[family]
4562

4663
steps: dict[str, ExecutorStep[TokenizeConfig]] = {}
47-
for subset, glob_pattern in info.subsets.items():
64+
for subset in info.subsets:
4865
output_name = os.path.join("tokenized", family, subset)
66+
normalized_step = family_normalized[subset]
4967
step = ExecutorStep(
5068
name=output_name,
5169
fn=tokenize,
5270
config=TokenizeConfig(
53-
train_paths=[download_step / glob_pattern],
71+
train_paths=[normalized_step / "outputs/main/**/*.parquet"],
5472
validation_paths=versioned([]),
5573
cache_path=this_output_path(),
5674
tokenizer=versioned(tokenizer),

experiments/pretraining_datasets/nsf_awards.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
# Copyright The Marin Authors
22
# SPDX-License-Identifier: Apache-2.0
33

4-
"""NSF awards dataset download and tokenization."""
4+
"""NSF awards dataset download, normalization, and tokenization."""
55

66
from experiments.marin_models import marin_tokenizer
7-
from marin.datakit.download.nsf_awards import download_nsf_awards_step
7+
from marin.datakit.download.nsf_awards import download_nsf_awards_step, normalize_nsf_awards_step
88
from marin.execution.executor import ExecutorStep, output_path_of, this_output_path, versioned
99
from marin.processing.tokenize import TokenizeConfig, tokenize
1010

1111
nsf_awards_download = download_nsf_awards_step().as_executor_step()
12+
nsf_awards_normalized = normalize_nsf_awards_step(nsf_awards_download).as_executor_step()
1213

1314
nsf_awards_tokenized = ExecutorStep(
1415
name="tokenized/nsf_awards",
1516
fn=tokenize,
1617
config=TokenizeConfig(
17-
train_paths=[output_path_of(nsf_awards_download, "*.parquet")],
18+
train_paths=[output_path_of(nsf_awards_normalized, "outputs/main/*.parquet")],
1819
validation_paths=versioned([]),
1920
cache_path=this_output_path(),
2021
tokenizer=versioned(marin_tokenizer),

lib/marin/src/marin/datakit/download/nemotron_v1.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import requests
1212
import zstandard
1313
from rigging.filesystem import open_url
14+
from marin.datakit.normalize import normalize_step
1415
from marin.execution.step_spec import StepSpec
1516
from marin.utils import fsspec_exists
1617
from fray.cluster import ResourceConfig
@@ -122,3 +123,40 @@ def download_nemotron_v1_step() -> StepSpec:
122123
# NOTE: use the existing output to avoid re-downloading. Yes this is missing the `n`.
123124
override_output_path="raw/nemotro-cc-eeb783",
124125
)
126+
127+
128+
_NEMOTRON_V1_DATA_ROOT = "contrib/Nemotron/Nemotron-CC/data-jsonl"
129+
130+
# Maps split name → relative path under data-jsonl/ that the normalize
131+
# step should point at. Each split gets its own normalize StepSpec because
132+
# normalize now processes a single directory (no subdirectory grouping).
133+
NEMOTRON_V1_SPLITS: dict[str, str] = {
134+
"hq_actual": "quality=high/kind=actual",
135+
"hq_synth": "quality=high/kind=synthetic",
136+
"medium_high": "quality=medium-high",
137+
"medium": "quality=medium",
138+
"medium_low": "quality=medium-low",
139+
"low_actual": "quality=low/kind=actual",
140+
"low_synth": "quality=low/kind=synthetic",
141+
}
142+
143+
144+
def normalize_nemotron_v1_step(download: StepSpec, *, split: str) -> StepSpec:
145+
"""Normalize one Nemotron-CC v1 split.
146+
147+
The download writes dolma-format records ``{id, text, source, format,
148+
metadata}`` as ``.jsonl.zst`` under nested ``quality=<x>/kind=<y>/``
149+
directories. Each split gets its own normalize step pointing at the
150+
corresponding subdirectory.
151+
"""
152+
if split not in NEMOTRON_V1_SPLITS:
153+
raise ValueError(f"Unknown split {split!r}. Choose from: {sorted(NEMOTRON_V1_SPLITS)}")
154+
rel_path = NEMOTRON_V1_SPLITS[split]
155+
return normalize_step(
156+
name=f"normalized/nemotron_v1/{split}",
157+
download=download,
158+
text_field="text",
159+
id_field="id",
160+
file_extensions=(".jsonl.zst",),
161+
input_path=f"{download.output_path}/{_NEMOTRON_V1_DATA_ROOT}/{rel_path}",
162+
)

lib/marin/src/marin/datakit/download/nemotron_v2.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from dataclasses import dataclass, field
1414

1515
from marin.datakit.download.huggingface import download_hf_step
16+
from marin.datakit.normalize import normalize_step
1617
from marin.execution.step_spec import StepSpec
1718

1819

@@ -131,3 +132,24 @@ def download_nemotron_v2_step(family: str) -> StepSpec:
131132
revision=info.revision,
132133
override_output_path=info.override_output_path,
133134
)
135+
136+
137+
def normalize_nemotron_v2_step(download: StepSpec, *, family: str, subset: str) -> StepSpec:
138+
"""Normalize one subset of a Nemotron v2 family.
139+
140+
Each subset gets its own normalize step because normalize now processes a
141+
single directory. The subset's glob pattern (e.g. ``Diverse-QA/**/*.parquet``)
142+
is used to derive the input subdirectory under the family download.
143+
"""
144+
info = NEMOTRON_V2_DATASETS[family]
145+
glob_pattern = info.subsets[subset]
146+
# Extract the directory prefix from the glob (e.g. "Diverse-QA/**/*.parquet" → "Diverse-QA")
147+
subset_dir = glob_pattern.split("/**")[0]
148+
return normalize_step(
149+
name=f"normalized/{family}/{subset}",
150+
download=download,
151+
text_field="text",
152+
id_field="id",
153+
file_extensions=(".parquet",),
154+
input_path=f"{download.output_path}/{subset_dir}",
155+
)

lib/marin/src/marin/datakit/download/nsf_awards.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from io import BytesIO
1616

1717
import requests
18+
from marin.datakit.normalize import normalize_step
1819
from marin.execution.step_spec import StepSpec
1920
from zephyr import Dataset, ZephyrContext, counters
2021
from zephyr.writers import write_parquet_file
@@ -151,3 +152,14 @@ def _run(output_path: str) -> None:
151152
fn=_run,
152153
hash_attrs={"min_year": min_year, "max_year": max_year},
153154
)
155+
156+
157+
def normalize_nsf_awards_step(download: StepSpec) -> StepSpec:
158+
"""Normalize NSF awards: generate content-hash IDs, preserve awd_id as source_id."""
159+
return normalize_step(
160+
name="normalized/nsf-awards",
161+
download=download,
162+
text_field="text",
163+
id_field="awd_id",
164+
file_extensions=(".parquet",),
165+
)

0 commit comments

Comments
 (0)