1616
1717import logging
1818import os
19+ import re
1920from collections .abc import Callable , Iterator
2021from concurrent .futures import ThreadPoolExecutor , as_completed
2122from typing import Any
2425from rigging .filesystem import url_to_fs
2526from marin .execution .step_spec import StepSpec
2627from fray .v2 import ResourceConfig
27- from zephyr import Dataset , ZephyrContext
28+ from zephyr import Dataset , ZephyrContext , counters
2829from zephyr .readers import SUPPORTED_EXTENSIONS , load_file
2930
3031logger = logging .getLogger (__name__ )
3132
33+ # Default cap on the longest consecutive whitespace run in a document.
34+ # Runs exceeding this are compacted to this length at normalization time.
35+ # Pathologically long whitespace runs (e.g. multi-MB runs from broken
36+ # HTML→text extraction, cf. #4588) can OOM downstream tokenization.
37+ DEFAULT_MAX_WHITESPACE_RUN_CHARS = 128
38+
39+ # Counter name for documents that had whitespace runs compacted.
40+ COMPACTED_WHITESPACE_COUNTER = "datakit_normalize_compacted_whitespace"
41+
3242
3343def generate_id (text : str ) -> str :
3444 """Generate a deterministic document ID from text content.
@@ -134,12 +144,34 @@ def _compute_total_bytes(file_paths: list[str]) -> int:
134144 return total
135145
136146
147+ def _make_whitespace_compactor (max_whitespace_run_chars : int ) -> Callable [[dict [str , Any ]], dict [str , Any ]]:
148+ """Return a map function that compacts consecutive whitespace runs exceeding the limit.
149+
150+ Any run of whitespace longer than *max_whitespace_run_chars* is truncated to
151+ that length (preserving the original whitespace characters). Affected records
152+ are counted via the ``COMPACTED_WHITESPACE_COUNTER`` Zephyr counter, and the
153+ ``id`` is recomputed to reflect the new text.
154+ """
155+ pattern = re .compile (r"\s{" + str (max_whitespace_run_chars + 1 ) + r",}" )
156+
157+ def compact (record : dict [str , Any ]) -> dict [str , Any ]:
158+ text = record ["text" ]
159+ compacted = pattern .sub (lambda m : m .group (0 )[:max_whitespace_run_chars ], text )
160+ if len (compacted ) != len (text ):
161+ counters .increment (COMPACTED_WHITESPACE_COUNTER )
162+ record = {** record , "text" : compacted , "id" : generate_id (compacted )}
163+ return record
164+
165+ return compact
166+
167+
137168def _build_pipeline (
138169 files : list [str ],
139170 output_dir : str ,
140171 num_shards : int ,
141172 text_field : str ,
142173 id_field : str | None ,
174+ max_whitespace_run_chars : int | None ,
143175) -> Dataset :
144176 """Build a single Zephyr pipeline for one subdirectory."""
145177 normalize_record = _make_normalize_fn (text_field , id_field )
@@ -153,20 +185,17 @@ def dedup_and_sort(_key: int, items: Iterator[dict[str, Any]]) -> Iterator[dict[
153185 prev_id = rid
154186 yield record
155187
156- return (
157- Dataset .from_list (files )
158- .flat_map (load_file )
159- .map (normalize_record )
160- .group_by (
161- key = lambda r : int (r ["id" ], 16 ) % num_shards ,
162- reducer = dedup_and_sort ,
163- sort_by = lambda r : r ["id" ],
164- num_output_shards = num_shards ,
165- )
166- .write_parquet (
167- f"{ output_dir } /part-{{shard:05d}}-of-{{total:05d}}.parquet" ,
168- skip_existing = True ,
169- )
188+ pipeline = Dataset .from_list (files ).flat_map (load_file ).map (normalize_record )
189+ if max_whitespace_run_chars is not None :
190+ pipeline = pipeline .map (_make_whitespace_compactor (max_whitespace_run_chars ))
191+ return pipeline .group_by (
192+ key = lambda r : int (r ["id" ], 16 ) % num_shards ,
193+ reducer = dedup_and_sort ,
194+ sort_by = lambda r : r ["id" ],
195+ num_output_shards = num_shards ,
196+ ).write_parquet (
197+ f"{ output_dir } /part-{{shard:05d}}-of-{{total:05d}}.parquet" ,
198+ skip_existing = True ,
170199 )
171200
172201
@@ -177,6 +206,7 @@ def normalize_to_parquet(
177206 text_field : str = "text" ,
178207 id_field : str = "id" ,
179208 target_partition_bytes : int = 256 * 1024 * 1024 ,
209+ max_whitespace_run_chars : int | None = DEFAULT_MAX_WHITESPACE_RUN_CHARS ,
180210 worker_resources : ResourceConfig | None = None ,
181211) -> None :
182212 """Normalize raw downloaded data to the datakit standard Parquet format.
@@ -196,6 +226,13 @@ def normalize_to_parquet(
196226 silently skipped.
197227 target_partition_bytes: Target size in bytes per output partition.
198228 Used to compute the number of output shards per subdirectory.
229+ max_whitespace_run_chars: Compact any consecutive whitespace run
230+ longer than this many characters down to this length.
231+ Pathologically long whitespace runs (e.g. multi-MB runs from
232+ broken HTML→text extraction, cf. #4588) can OOM downstream
233+ tokenization. Affected records are counted via the
234+ ``datakit_normalize_compacted_whitespace`` Zephyr counter.
235+ Pass ``None`` to disable compaction.
199236 worker_resources: Per-worker resource request for the Zephyr pipeline.
200237 Defaults to 2 CPU / 16GB RAM / 10GB disk, sized for
201238 ``target_partition_bytes`` of 256MB. Scale up when increasing
@@ -223,7 +260,7 @@ def _run_subdir(subdir: str, files: list[str]) -> None:
223260 num_shards ,
224261 )
225262
226- pipeline = _build_pipeline (files , output_dir , num_shards , text_field , id_field )
263+ pipeline = _build_pipeline (files , output_dir , num_shards , text_field , id_field , max_whitespace_run_chars )
227264 ctx = ZephyrContext (
228265 name = f"normalize-{ subdir .replace ('/' , '-' ) if subdir else 'all' } " ,
229266 resources = resources ,
@@ -246,6 +283,7 @@ def normalize_step(
246283 text_field : str = "text" ,
247284 id_field : str = "id" ,
248285 target_partition_bytes : int = 256 * 1024 * 1024 ,
286+ max_whitespace_run_chars : int | None = DEFAULT_MAX_WHITESPACE_RUN_CHARS ,
249287 worker_resources : ResourceConfig | None = None ,
250288 override_output_path : str | None = None ,
251289 input_path : str | None = None ,
@@ -274,13 +312,15 @@ def normalize_step(
274312 text_field = text_field ,
275313 id_field = id_field ,
276314 target_partition_bytes = target_partition_bytes ,
315+ max_whitespace_run_chars = max_whitespace_run_chars ,
277316 worker_resources = worker_resources ,
278317 ),
279318 deps = [download ],
280319 hash_attrs = {
281320 "text_field" : text_field ,
282321 "id_field" : id_field ,
283322 "target_partition_bytes" : target_partition_bytes ,
323+ "max_whitespace_run_chars" : max_whitespace_run_chars ,
284324 "input_path" : resolved_input ,
285325 },
286326 override_output_path = override_output_path ,
0 commit comments