Skip to content

Commit 9ff9161

Browse files
authored
Dedupe pipeline on 10BT fineweb-edu sample (#2129)
## Description Re: #2096 In this PR - "we use more data and more resources": * we add `fineweb_edu_small_10bt` dataset * request more resources - 1024 parallelism by default * guard `dupekit` to "protect" the Marin driver * cleanup the dedupe code slightly - still not clean enough * we will break out the dedupe utils out soon * compute dedupe stats/counts * reuse more code between paragraph and doc exact modes
1 parent 5308ed9 commit 9ff9161

File tree

3 files changed

+168
-78
lines changed

3 files changed

+168
-78
lines changed

experiments/dedup/dedup.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@
5050
),
5151
)
5252

53+
fineweb_edu_small_10bt = ExecutorStep(
54+
name="raw_fineweb_edu_small_10bt",
55+
fn=download_hf,
56+
config=DownloadConfig(
57+
hf_dataset_id="HuggingFaceFW/fineweb-edu",
58+
revision="3c452cb",
59+
hf_urls_glob=["sample/10BT/*.parquet"],
60+
),
61+
)
62+
5363
# N-gram configuration for train-test overlap detection
5464
DEFAULT_NGRAM_CONFIG = NGramConfig(
5565
ngram_length=[5, 10, 15],
@@ -61,14 +71,25 @@
6171
@ray.remote(runtime_env={"env_vars": {"JAX_PLATFORMS": "cpu", "PJRT_DEVICE": "cpu"}})
6272
def run_dedup(config: DedupeConfig) -> str:
6373
logger.info(f"Starting dedupe with config: {config}")
74+
6475
dedupe(config)
76+
6577
logger.info(f"Dedupe completed! Results written to {config.output_path}")
6678
return config.output_path
6779

6880

69-
def build_dedup_step(dataset: InputName) -> ExecutorStep:
81+
def build_dedup_step(dataset: InputName, max_parallelism: int) -> ExecutorStep:
82+
"""
83+
Builds a deduplication step for the given dataset.
84+
85+
Args:
86+
dataset: The input dataset to deduplicate.
87+
max_parallelism: Maximum parallelism for Zephyr tasks.
88+
"""
89+
input_path = dataset.cd("sample/10BT")
90+
7091
config = DedupeConfig(
71-
input_path=dataset.cd("sample/10BT"), attribute_name="is_duplicate", mode=DedupMode.EXACT_DOC_DEDUPLICATE
92+
input_path=input_path, attribute_name="is_duplicate", mode=DedupMode.DEDUPLICATE, processes=max_parallelism
7293
)
7394

7495
return ExecutorStep(
@@ -80,7 +101,11 @@ def build_dedup_step(dataset: InputName) -> ExecutorStep:
80101
)
81102

82103

83-
STEPS = [build_dedup_step(fineweb_edu_small_1), build_dedup_step(fineweb_edu_small_2)]
104+
STEPS = [
105+
build_dedup_step(fineweb_edu_small_1, max_parallelism=7),
106+
build_dedup_step(fineweb_edu_small_2, max_parallelism=7),
107+
build_dedup_step(fineweb_edu_small_10bt, max_parallelism=1024),
108+
]
84109

85110
if __name__ == "__main__":
86111
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

0 commit comments

Comments
 (0)