1919from collections .abc import Callable , Iterator
2020from concurrent .futures import ThreadPoolExecutor , as_completed
2121from dataclasses import dataclass , field
22+ from enum import StrEnum
2223from typing import Any
2324
2425import dupekit
3233logger = logging .getLogger (__name__ )
3334
3435
36+ class DedupMode (StrEnum ):
37+ """How aggressively to deduplicate records during normalization.
38+
39+ ``EXACT`` drops records with duplicate ``id`` (i.e. byte-identical text)
40+ within each output shard. ``NONE`` skips the dedup pass entirely.
41+ """
42+
43+ NONE = "none"
44+ EXACT = "exact"
45+
46+
3547@dataclass
3648class NormalizeSubdirResult :
3749 """Per-subdirectory outcome of :func:`normalize_to_parquet`.
@@ -177,7 +189,7 @@ def _build_pipeline(
177189 num_shards : int ,
178190 text_field : str ,
179191 id_field : str | None ,
180- exact_dedup : bool ,
192+ dedup_mode : DedupMode ,
181193) -> Dataset :
182194 """Build a single Zephyr pipeline for one subdirectory."""
183195 normalize_record = _make_normalize_fn (text_field , id_field )
@@ -192,7 +204,7 @@ def dedup(_key: int, items: Iterator[dict[str, Any]]) -> Iterator[dict[str, Any]
192204 yield record
193205
194206 def passthrough (_key : int , items : Iterator [dict [str , Any ]]) -> Iterator [dict [str , Any ]]:
195- """Yield items unchanged; used when exact dedup is disabled."""
207+ """Yield items unchanged; used when dedup is disabled."""
196208 yield from items
197209
198210 def has_text (record : dict [str , Any ]) -> bool :
@@ -202,14 +214,16 @@ def has_text(record: dict[str, Any]) -> bool:
202214 return False
203215 return True
204216
217+ reducers : dict [DedupMode , Callable ] = {DedupMode .EXACT : dedup , DedupMode .NONE : passthrough }
218+
205219 return (
206220 Dataset .from_list (files )
207221 .flat_map (load_file )
208222 .filter (has_text )
209223 .map (normalize_record )
210224 .group_by (
211225 key = lambda r : int (r ["id" ], 16 ) % num_shards ,
212- reducer = dedup if exact_dedup else passthrough ,
226+ reducer = reducers [ dedup_mode ] ,
213227 sort_by = lambda r : r ["id" ],
214228 num_output_shards = num_shards ,
215229 )
@@ -229,15 +243,15 @@ def normalize_to_parquet(
229243 target_partition_bytes : int = 256 * 1024 * 1024 ,
230244 worker_resources : ResourceConfig | None = None ,
231245 file_extensions : tuple [str , ...] | None = None ,
232- exact_dedup : bool = True ,
246+ dedup_mode : DedupMode = DedupMode . EXACT ,
233247) -> NormalizeResult :
234248 """Normalize raw downloaded data to the datakit standard Parquet format.
235249
236250 Discovers all data files under *input_path*, groups them by subdirectory,
237251 and launches one Zephyr pipeline per subdirectory concurrently. Each
238252 pipeline normalizes records (``id``, ``text``, preserves all other columns),
239- optionally exact- deduplicates by content, sorts by ``id``, and writes
240- Parquet partitions sized by *target_partition_bytes*.
253+ optionally deduplicates by content per *dedup_mode* , sorts by ``id``, and
254+ writes Parquet partitions sized by *target_partition_bytes*.
241255
242256 Args:
243257 input_path: Root directory containing raw downloaded data.
@@ -255,9 +269,10 @@ def normalize_to_parquet(
255269 file_extensions: Tuple of file extensions to include (e.g.
256270 ``(".parquet",)``). Defaults to all extensions supported by
257271 ``zephyr.readers.load_file``.
258- exact_dedup: If True (the default), drop records with duplicate ``id``
259- values (i.e. exact text duplicates) within each output shard. Set
260- to False to skip the dedup pass and preserve all input records.
272+ dedup_mode: How to deduplicate records within each output shard.
273+ ``EXACT`` (the default) drops records with duplicate ``id`` values
274+ (i.e. byte-identical text). ``NONE`` skips dedup and preserves
275+ all input records.
261276
262277 Returns:
263278 A :class:`NormalizeResult` describing the output files and zephyr
@@ -285,7 +300,7 @@ def _run_subdir(subdir: str, files: list[str]) -> NormalizeSubdirResult:
285300 num_shards ,
286301 )
287302
288- pipeline = _build_pipeline (files , output_dir , num_shards , text_field , id_field , exact_dedup )
303+ pipeline = _build_pipeline (files , output_dir , num_shards , text_field , id_field , dedup_mode )
289304 ctx = ZephyrContext (
290305 name = f"normalize-{ subdir .replace ('/' , '-' ) if subdir else 'all' } " ,
291306 resources = resources ,
@@ -332,7 +347,7 @@ def normalize_step(
332347 override_output_path : str | None = None ,
333348 input_path : str | None = None ,
334349 file_extensions : tuple [str , ...] | None = None ,
335- exact_dedup : bool = True ,
350+ dedup_mode : DedupMode = DedupMode . EXACT ,
336351) -> StepSpec :
337352 """Create a StepSpec that normalizes downloaded data to Parquet.
338353
@@ -350,8 +365,8 @@ def normalize_step(
350365 file_extensions: Tuple of file extensions to include (e.g.
351366 ``(".parquet",)``). Defaults to all extensions supported by
352367 ``zephyr.readers.load_file``.
353- exact_dedup: If True (the default), drop records with duplicate ``id``
354- values (i.e. exact text duplicates) within each output shard .
368+ dedup_mode: How to deduplicate records within each output shard.
369+ Defaults to ``DedupMode.EXACT``; use ``DedupMode.NONE`` to skip .
355370 """
356371 resolved_input = input_path or download .output_path
357372
@@ -365,7 +380,7 @@ def normalize_step(
365380 target_partition_bytes = target_partition_bytes ,
366381 worker_resources = worker_resources ,
367382 file_extensions = file_extensions ,
368- exact_dedup = exact_dedup ,
383+ dedup_mode = dedup_mode ,
369384 ),
370385 deps = [download ],
371386 hash_attrs = {
@@ -374,7 +389,7 @@ def normalize_step(
374389 "target_partition_bytes" : target_partition_bytes ,
375390 "input_path" : resolved_input ,
376391 "file_extensions" : file_extensions ,
377- "exact_dedup " : exact_dedup ,
392+ "dedup_mode " : dedup_mode ,
378393 },
379394 override_output_path = override_output_path ,
380395 )
0 commit comments