|
32 | 32 | from marin.schemas.web.convert import ResiliparseConfig |
33 | 33 | from marin.training.training import TrainLmOnPodConfig, run_levanter_train_lm |
34 | 34 | from marin.transform.simple_html_to_md.process import SimpleHtmlToMdConfig, html_to_md |
| 35 | +from marin.utils import fsspec_glob |
35 | 36 |
|
36 | 37 | from iris.logging import configure_logging |
37 | 38 |
|
@@ -68,6 +69,61 @@ def _setup_log_tee(log_path: str) -> None: |
68 | 69 | _tee_fd(sys.stderr.fileno(), log_fh) |
69 | 70 |
|
70 | 71 |
|
| 72 | +@dataclasses.dataclass(frozen=True) |
| 73 | +class ValidateExactDedupConfig: |
| 74 | + data_path: str |
| 75 | + expected_rows: int |
| 76 | + expected_dups: int |
| 77 | + |
| 78 | + |
| 79 | +def validate_exact_dedup_output(config: ValidateExactDedupConfig): |
| 80 | + """Validate exact dedup output: directory exists, files are vortex, has expected row/dup counts.""" |
| 81 | + import vortex |
| 82 | + |
| 83 | + vortex_files = fsspec_glob(os.path.join(config.data_path, "**/*.vortex")) |
| 84 | + assert vortex_files, f"No vortex files found in {config.data_path}" |
| 85 | + |
| 86 | + total_rows = 0 |
| 87 | + for vf_path in vortex_files: |
| 88 | + table = vortex.open(vf_path).to_arrow().read_all() |
| 89 | + total_rows += len(table) |
| 90 | + |
| 91 | + # Every row in exact paragraph dedup output has dup_spans (non-dups are omitted) |
| 92 | + assert total_rows == config.expected_rows, f"Expected {config.expected_rows} rows, got {total_rows}" |
| 93 | + assert total_rows == config.expected_dups, f"Expected {config.expected_dups} dups, got {total_rows}" |
| 94 | + logger.info(f"Validated exact dedup: {total_rows} rows in {len(vortex_files)} vortex files") |
| 95 | + |
| 96 | + |
| 97 | +@dataclasses.dataclass(frozen=True) |
| 98 | +class ValidateFuzzyDedupConfig: |
| 99 | + data_path: str |
| 100 | + expected_dups: int |
| 101 | + |
| 102 | + |
| 103 | +def validate_fuzzy_dedup_output(config: ValidateFuzzyDedupConfig): |
| 104 | + """Validate fuzzy dedup output: directory exists, files are jsonl.gz, has expected dup counts.""" |
| 105 | + import gzip |
| 106 | + import json |
| 107 | + |
| 108 | + jsonl_files = fsspec_glob(os.path.join(config.data_path, "**/*.jsonl.gz")) |
| 109 | + assert jsonl_files, f"No jsonl.gz files found in {config.data_path}" |
| 110 | + |
| 111 | + total_rows = 0 |
| 112 | + total_dups = 0 |
| 113 | + for jf_path in jsonl_files: |
| 114 | + with gzip.open(jf_path, "rt") as f: |
| 115 | + for line in f: |
| 116 | + row = json.loads(line) |
| 117 | + total_rows += 1 |
| 118 | + if row.get("attributes", {}).get(str(DedupMode.FUZZY_DOCUMENT), False): |
| 119 | + total_dups += 1 |
| 120 | + |
| 121 | + assert total_dups == config.expected_dups, f"Expected {config.expected_dups} dups, got {total_dups}" |
| 122 | + logger.info( |
| 123 | + f"Validated fuzzy dedup: {total_dups} dups out of {total_rows} rows in {len(jsonl_files)} jsonl.gz files" |
| 124 | + ) |
| 125 | + |
| 126 | + |
71 | 127 | def create_steps(prefix: str, synth_data: str) -> list[ExecutorStep]: |
72 | 128 | # ############################################################ |
73 | 129 | # Transform HTML to text |
@@ -176,6 +232,28 @@ def create_steps(prefix: str, synth_data: str) -> list[ExecutorStep]: |
176 | 232 | ), |
177 | 233 | ) |
178 | 234 |
|
| 235 | + ############################################################ |
| 236 | + # Validate dedup outputs |
| 237 | + |
| 238 | + validate_exact_dedup_step = ExecutorStep( |
| 239 | + name=os.path.join(prefix, "validate_exact_dedup"), |
| 240 | + fn=validate_exact_dedup_output, |
| 241 | + config=ValidateExactDedupConfig( |
| 242 | + data_path=dedup_exact_paragraph_step.cd("data"), |
| 243 | + expected_rows=2, |
| 244 | + expected_dups=2, |
| 245 | + ), |
| 246 | + ) |
| 247 | + |
| 248 | + validate_fuzzy_dedup_step = ExecutorStep( |
| 249 | + name=os.path.join(prefix, "validate_fuzzy_dedup"), |
| 250 | + fn=validate_fuzzy_dedup_output, |
| 251 | + config=ValidateFuzzyDedupConfig( |
| 252 | + data_path=dedup_fuzzy_document_step.cd("data"), |
| 253 | + expected_dups=2, |
| 254 | + ), |
| 255 | + ) |
| 256 | + |
179 | 257 | ############################################################ |
180 | 258 | # Consolidate |
181 | 259 |
|
@@ -267,6 +345,8 @@ def create_steps(prefix: str, synth_data: str) -> list[ExecutorStep]: |
267 | 345 | inference_lq_step, |
268 | 346 | dedup_exact_paragraph_step, |
269 | 347 | dedup_fuzzy_document_step, |
| 348 | + validate_exact_dedup_step, |
| 349 | + validate_fuzzy_dedup_step, |
270 | 350 | consolidate_step, |
271 | 351 | tokenize_step, |
272 | 352 | train_step, |
|
0 commit comments