Skip to content

Commit 2892e39

Browse files
rjpoweryonromai
authored andcommitted
[dedup] Replace side-effect mutation with reduce in fuzzy dedup map (#3938)
_load_fuzzy_dupe_map_shard built a dict by mutating a closure variable via .map() side effect, which only works with LocalClient (shared memory). Replace with .reduce() so the dict flows through the Zephyr pipeline as a proper return value. Also removes a stale comment from _compute_fuzzy_dedup_stats. Co-authored-by: Romain Yon <yonromai@users.noreply.github.com>
1 parent c18c986 commit 2892e39

1 file changed

Lines changed: 9 additions & 11 deletions

File tree

  • lib/marin/src/marin/processing/classification/deduplication

lib/marin/src/marin/processing/classification/deduplication/fuzzy.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ def _compute_fuzzy_dedup_stats(shards: list[str] | Sequence[str], method: str, l
3535
result: DupCounters = ctx.execute( # type: ignore[bad-assignment]
3636
Dataset.from_list(shards)
3737
.load_parquet(columns=["component_id"])
38-
# Compute the per-component statistics and then roll them up into a single counter group
3938
.group_by(
4039
key=lambda r: r["component_id"],
4140
reducer=lambda _, items: DupCounters(
@@ -56,19 +55,18 @@ def _load_fuzzy_dupe_map_shard(shards: list[str]) -> dict[str, bool]:
5655
logger.warning("No fuzzy duplicate documents found.")
5756
return {}
5857

59-
# Map record ID -> is duplicate (bool)
60-
shard_dup_map = {}
61-
62-
def add_to_dup_map(record: dict):
63-
shard_dup_map[record["id"]] = record["fuzzy_duplicate"]
64-
6558
with log_time(f"Load fuzzy duplicate map from {len(shards)} shards"):
6659
ctx = ZephyrContext(client=LocalClient(), name="fuzzy-dup-map")
67-
ctx.execute(
68-
Dataset.from_list(shards).load_parquet().map(add_to_dup_map),
69-
)
60+
result: dict[str, bool] = ctx.execute( # type: ignore[bad-assignment]
61+
Dataset.from_list(shards)
62+
.load_parquet()
63+
.reduce(
64+
local_reducer=lambda items: {r["id"]: r["fuzzy_duplicate"] for r in items},
65+
global_reducer=lambda dicts: {k: v for d in dicts for k, v in d.items()},
66+
),
67+
)[0]
7068

71-
return shard_dup_map
69+
return result
7270

7371

7472
def dedup_fuzzy_document(

0 commit comments

Comments
 (0)