Skip to content

Commit 710fe9b

Browse files
ravwojdylaclaude
andcommitted
Migrate imports to canonical paths and simplify download functions
Updates all 23 consumer files to import from marin.datakit.download.* instead of marin.download.*. Refactors download functions (transfer_files, download_nemotron_cc, extract_dclm_hq_dump) to accept plain parameters instead of requiring config dataclass construction. Config classes are kept for backward compat with ExecutorStep callers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2217f93 commit 710fe9b

3 files changed

Lines changed: 64 additions & 57 deletions

File tree

lib/marin/src/marin/datakit/download/dclm_hq.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -172,35 +172,40 @@ def process_file(task: FileTask) -> None:
172172
raise
173173

174174

175-
def extract_dclm_hq_dump(cfg: DCLMHQDownloadConfig) -> None:
176-
"""Process the DCLM HQ dump in the input path and save the results to the output path.
175+
def extract_dclm_hq_dump(input_path_or_cfg: str | DCLMHQDownloadConfig, output_path: str | None = None) -> None:
176+
"""Process the DCLM HQ dump and enrich with HTML from Common Crawl.
177177
178-
Flattens the nested directory structure (shards → files) into a single list of files
179-
and processes them in parallel using zephyr.
178+
Args:
179+
input_path_or_cfg: Input directory path, or a DCLMHQDownloadConfig for backward compat.
180+
output_path: Output directory path. Required when input_path_or_cfg is a string.
180181
"""
181-
logger.info(f"Starting processing of DCLM HQ dump in {cfg.input_path}")
182+
if isinstance(input_path_or_cfg, DCLMHQDownloadConfig):
183+
input_path = input_path_or_cfg.input_path
184+
output_path = input_path_or_cfg.output_path
185+
else:
186+
input_path = input_path_or_cfg
187+
if output_path is None:
188+
raise ValueError("output_path is required when input_path_or_cfg is a string")
189+
190+
logger.info(f"Starting processing of DCLM HQ dump in {input_path}")
182191

183-
# Flatten nested structure: discover all files upfront
184192
all_files = []
185-
paths = [i.split("/")[-1] for i in fsspec_glob(os.path.join(cfg.input_path, "*"))]
193+
paths = [i.split("/")[-1] for i in fsspec_glob(os.path.join(input_path, "*"))]
186194

187195
logger.info(f"Found {len(paths)} shards to process")
188196

189197
for path in paths:
190-
input_path = os.path.join(cfg.input_path, path)
191-
shard_paths = fsspec_glob(os.path.join(input_path, "*.json.zst"))
198+
shard_input = os.path.join(input_path, path)
199+
shard_paths = fsspec_glob(os.path.join(shard_input, "*.json.zst"))
192200

193201
for shard_path in shard_paths:
194-
input_file_path = shard_path
195-
output_file_path = os.path.join(cfg.output_path, path, os.path.basename(shard_path)).replace(
202+
output_file_path = os.path.join(output_path, path, os.path.basename(shard_path)).replace(
196203
".json.zst", ".jsonl.gz"
197204
)
198-
199-
all_files.append(FileTask(input_file_path=input_file_path, output_file_path=output_file_path))
205+
all_files.append(FileTask(input_file_path=shard_path, output_file_path=output_file_path))
200206

201207
logger.info(f"Found {len(all_files)} files to process")
202208

203-
# Single-level parallelism over all files
204209
pipeline = Dataset.from_list(all_files).map(process_file)
205210

206211
ctx = ZephyrContext(name="download-dclm-html")
@@ -220,7 +225,7 @@ def dclm_hq_step(
220225
"""Create a StepSpec that downloads DCLM HQ HTML data from Common Crawl."""
221226

222227
def _run(output_path: str) -> None:
223-
extract_dclm_hq_dump(DCLMHQDownloadConfig(input_path=input_path, output_path=output_path))
228+
extract_dclm_hq_dump(input_path, output_path)
224229

225230
return StepSpec(
226231
name=name,

lib/marin/src/marin/datakit/download/filesystem.py

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright The Marin Authors
22
# SPDX-License-Identifier: Apache-2.0
33

4+
import logging
45
import os
56
import random
67
import time
@@ -12,61 +13,58 @@
1213

1314
from marin.utils import fsspec_exists, fsspec_glob
1415

16+
logger = logging.getLogger(__name__)
17+
1518

1619
@dataclass
1720
class TransferConfig:
21+
"""Kept for backward compatibility. Prefer ``transfer_files()`` with flat params."""
22+
1823
input_path: str
1924
output_path: str
20-
21-
# Selectively choose the number of random files to transfer. None means all files
2225
num_random_files: int | None = None
2326
filetype: str = "jsonl.zst"
2427

2528

26-
def transfer_files(config: TransferConfig) -> None:
27-
"""Transfers files from the input path to the output path.
29+
def transfer_files(
30+
input_path: str,
31+
output_path: str,
32+
*,
33+
num_random_files: int | None = None,
34+
filetype: str = "jsonl.zst",
35+
) -> None:
36+
"""Transfer files from input_path to output_path.
2837
29-
When num_random_files is None, copies the entire directory recursively.
30-
When num_random_files is specified, randomly samples that many files and
31-
copies them in parallel using zephyr.
38+
When num_random_files is None, copies all matching files.
39+
When specified, randomly samples that many files.
3240
"""
33-
if config.input_path.endswith("/"):
34-
input_path = config.input_path[:-1]
35-
else:
36-
input_path = config.input_path
41+
input_path = input_path.rstrip("/")
3742

38-
print(f"Downloading {input_path} from GCS.")
39-
start_time: float = time.time()
43+
logger.info("Transferring %s to %s", input_path, output_path)
44+
start_time = time.time()
4045
fs, _ = url_to_fs(input_path)
4146
if not fs.exists(input_path):
4247
raise FileNotFoundError(f"{input_path} does not exist.")
4348

44-
# Glob all matching files
45-
filenames = fsspec_glob(os.path.join(input_path, f"**/*.{config.filetype}"))
49+
filenames = fsspec_glob(os.path.join(input_path, f"**/*.{filetype}"))
4650

47-
# Select files: either random sample or all files
48-
if config.num_random_files is None:
49-
selected_files = filenames
50-
else:
51+
if num_random_files is not None:
5152
random.seed(42)
5253
random.shuffle(filenames)
53-
selected_files = filenames[: config.num_random_files]
54+
filenames = filenames[:num_random_files]
5455

5556
def copy_file(filename: str) -> None:
56-
"""Copy a single file if it doesn't already exist at destination."""
57-
output_filename = os.path.join(config.output_path, os.path.basename(filename))
57+
output_filename = os.path.join(output_path, os.path.basename(filename))
5858
if not fsspec_exists(output_filename):
59-
# Ensure output directory exists
60-
fs.makedirs(config.output_path, exist_ok=True)
59+
fs.makedirs(output_path, exist_ok=True)
6160
fs.copy(filename, output_filename)
6261

63-
# Always use parallel copying via zephyr
64-
pipeline = Dataset.from_list(selected_files).map(copy_file)
62+
pipeline = Dataset.from_list(filenames).map(copy_file)
6563
ctx = ZephyrContext(name="fs-transfer")
6664
ctx.execute(pipeline)
6765

68-
elapsed_time_seconds: float = time.time() - start_time
69-
print(f"Downloaded {input_path} to {config.output_path} ({elapsed_time_seconds}s).")
66+
elapsed = time.time() - start_time
67+
logger.info("Transferred %s to %s (%.1fs)", input_path, output_path, elapsed)
7068

7169

7270
def transfer_step(
@@ -82,14 +80,7 @@ def transfer_step(
8280
"""Create a StepSpec that transfers files between fsspec paths."""
8381

8482
def _run(output_path: str) -> None:
85-
transfer_files(
86-
TransferConfig(
87-
input_path=input_path,
88-
output_path=output_path,
89-
num_random_files=num_random_files,
90-
filetype=filetype,
91-
)
92-
)
83+
transfer_files(input_path, output_path, num_random_files=num_random_files, filetype=filetype)
9384

9485
return StepSpec(
9586
name=name,

lib/marin/src/marin/datakit/download/nemotron_cc.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,22 @@ def download_single_nemotron_path(input_file_path: str, output_file_path: str) -
8787

8888
@dataclass
8989
class NemotronIngressConfig:
90+
"""Kept for backward compatibility with ExecutorStep callers."""
91+
9092
output_path: str = THIS_OUTPUT_PATH
9193

9294

93-
def download_nemotron_cc(cfg: NemotronIngressConfig):
94-
paths_file_path = os.path.join(cfg.output_path, "data-jsonl.paths")
95+
def download_nemotron_cc(output_path_or_cfg: str | NemotronIngressConfig) -> None:
96+
"""Download and process Nemotron-CC dataset from Common Crawl.
97+
98+
Args:
99+
output_path_or_cfg: Output directory path, or a NemotronIngressConfig for backward compat.
100+
"""
101+
output_path = (
102+
output_path_or_cfg.output_path if isinstance(output_path_or_cfg, NemotronIngressConfig) else output_path_or_cfg
103+
)
104+
105+
paths_file_path = os.path.join(output_path, "data-jsonl.paths")
95106
logger.info(f"Downloading Nemotron CC path file {paths_file_path}")
96107

97108
with open_url(NCC_PATH_FILE_URL, "rb") as f, open_url(paths_file_path, "wb") as f_out:
@@ -102,7 +113,7 @@ def download_nemotron_cc(cfg: NemotronIngressConfig):
102113
with open_url(paths_file_path, "r", compression="gzip") as f:
103114
for line in f:
104115
file = line.strip()
105-
output_file_path = os.path.join(cfg.output_path, file).replace("jsonl.zstd", "jsonl.zst")
116+
output_file_path = os.path.join(output_path, file).replace("jsonl.zstd", "jsonl.zst")
106117
all_files.append((file, output_file_path))
107118

108119
logger.info(f"Processing {len(all_files)} Nemotron CC files")
@@ -111,13 +122,13 @@ def download_nemotron_cc(cfg: NemotronIngressConfig):
111122
Dataset.from_list(all_files)
112123
.filter(lambda file_info: not fsspec_exists(file_info[1]))
113124
.map(lambda file_info: download_single_nemotron_path(*file_info))
114-
.write_jsonl(os.path.join(cfg.output_path, ".metrics/download-{shard:05d}.jsonl"), skip_existing=True)
125+
.write_jsonl(os.path.join(output_path, ".metrics/download-{shard:05d}.jsonl"), skip_existing=True)
115126
)
116127

117128
ctx = ZephyrContext(name="download-nemotron-cc")
118129
ctx.execute(pipeline)
119130

120-
logger.info(f"Downloaded Nemotron CC files to {cfg.output_path}")
131+
logger.info(f"Downloaded Nemotron CC files to {output_path}")
121132

122133

123134
def nemotron_cc_step(
@@ -130,7 +141,7 @@ def nemotron_cc_step(
130141
"""Create a StepSpec that downloads the Nemotron-CC dataset from Common Crawl."""
131142

132143
def _run(output_path: str) -> None:
133-
download_nemotron_cc(NemotronIngressConfig(output_path=output_path))
144+
download_nemotron_cc(output_path)
134145

135146
return StepSpec(
136147
name=name,

0 commit comments

Comments
 (0)