Skip to content

Commit 252acf5

Browse files
[nightshift] Remove dead code and deduplicate helpers in decon.py (#4086)
> *Stale branches pruned clean,* > *shared roots drink from one well —* > *less code, same harvest.* ## Summary - **Removed dead `_init_wandb` function** from `decon.py` — defined in Dec 2025 but never called from anywhere in the codebase. This also removes its now-unused imports: `wandb`, `datetime`, `timezone`, `WANDB_PROJECT`, `WANDB_ENTITY`. - **Deduplicated `_collect_input_files` and `_get_extension`** — both were copy-pasted into `decon.py` from `dedup_commons.py` with near-identical logic. Now `decon.py` imports and reuses the shared implementations in `dedup_commons.py`, using `DEFAULT_FILETYPES` for the file type list. Net result: **−59 lines, +9 lines** (−50 net), no behavior change.
1 parent 2452845 commit 252acf5

1 file changed

Lines changed: 9 additions & 59 deletions

File tree

  • lib/marin/src/marin/processing/classification

lib/marin/src/marin/processing/classification/decon.py

Lines changed: 9 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,22 @@
1414
import os
1515
from collections.abc import Iterator
1616
from dataclasses import dataclass
17-
from datetime import datetime, timezone
1817
from enum import StrEnum, auto
1918
import dupekit
2019

2120
from marin.execution.executor import THIS_OUTPUT_PATH
2221
import draccus
2322
import msgspec
2423
from iris.marin_fs import url_to_fs
25-
import wandb
2624

27-
from marin.utilities.wandb_utils import WANDB_PROJECT, WANDB_ENTITY
28-
29-
from marin.utils import fsspec_glob, rebase_file_path
25+
from marin.processing.classification.deduplication.dedup_commons import (
26+
DEFAULT_FILETYPES,
27+
_collect_input_files,
28+
_get_extension,
29+
)
30+
from marin.utils import rebase_file_path
3031
from zephyr import Dataset, ZephyrContext
31-
from zephyr.readers import load_file, SUPPORTED_EXTENSIONS
32+
from zephyr.readers import load_file
3233
from iris.logging import configure_logging
3334

3435
logger = logging.getLogger(__name__)
@@ -115,50 +116,6 @@ def extract_features(text: str, ngram_config: NGramConfig | None) -> Iterator[st
115116
yield para
116117

117118

118-
def _collect_input_files(input_path: str | list[str]) -> list[str]:
119-
"""
120-
Given an input path or list of paths, collect all matching files (jsonl, parquet, etc).
121-
"""
122-
input_paths = input_path if isinstance(input_path, list) else [input_path]
123-
all_files = []
124-
for path in input_paths:
125-
logger.info(f"Collecting files from path: {path}")
126-
files = fsspec_glob(f"{path.rstrip('/')}/**/*.{{jsonl,jsonl.gz,jsonl.zst,parquet}}")
127-
if files:
128-
all_files.extend(files)
129-
else:
130-
if not path.endswith(("jsonl", "jsonl.gz", "jsonl.zst", "parquet")):
131-
raise FileNotFoundError(f"No files found in path: {path}")
132-
all_files.append(path) # Assume it's a single file
133-
assert all_files, "No input files found for deduplication."
134-
return all_files
135-
136-
137-
def _init_wandb(config: DeconConfig, tags: list[str] | None = None):
138-
"""
139-
Initialize wandb if configured.
140-
"""
141-
if "WANDB_API_KEY" not in os.environ:
142-
return
143-
144-
run_name = os.environ.get("WANDB_RUN_NAME")
145-
if not run_name:
146-
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
147-
run_name = f"{config.mode}-{timestamp}"
148-
149-
wandb.init(
150-
entity=WANDB_ENTITY,
151-
project=WANDB_PROJECT,
152-
name=run_name,
153-
tags=[str(config.mode)] + (tags or []),
154-
config={
155-
"mode": str(config.mode),
156-
"input_path": config.input_path,
157-
"processes": config.processes,
158-
},
159-
)
160-
161-
162119
def _record_id(record: dict) -> str:
163120
if "id" in record:
164121
return record["id"]
@@ -168,13 +125,6 @@ def _record_id(record: dict) -> str:
168125
return str(_bloom_hash(s))
169126

170127

171-
def _get_extension(file_path: str) -> str:
172-
for ext in sorted(SUPPORTED_EXTENSIONS, key=len, reverse=True):
173-
if file_path.endswith(ext):
174-
return ext
175-
raise ValueError(f"Unsupported extension: {file_path}.")
176-
177-
178128
def build_filter(
179129
input_path: str | list[str],
180130
bloom_path: str,
@@ -195,7 +145,7 @@ def build_shard_bloom(records: Iterator[dict], _) -> Iterator[bytes]:
195145

196146
yield bf.save_bytes()
197147

198-
all_files = _collect_input_files(input_path)
148+
all_files = _collect_input_files(input_paths=input_path, filetypes=DEFAULT_FILETYPES)
199149
logger.info(f"Building bloom filter from {all_files} into {bloom_path}")
200150

201151
def _merge_bloom(bloom_files: Iterator[str], _):
@@ -265,7 +215,7 @@ def mark_duplicates_bloom(
265215

266216
# Determine base path for rebasing
267217
base_path = input_path[0] if isinstance(input_path, list) else input_path
268-
all_files = _collect_input_files(input_path)
218+
all_files = _collect_input_files(input_paths=input_path, filetypes=DEFAULT_FILETYPES)
269219

270220
def process_shard_with_bloom(records: Iterator[dict], _) -> Iterator[dict]:
271221
"""Load bloom filter once per shard and mark duplicates."""

0 commit comments

Comments
 (0)