-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdeduplicate.py
More file actions
330 lines (272 loc) · 10.4 KB
/
deduplicate.py
File metadata and controls
330 lines (272 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# Find and merge duplicate papers in your local collection.
# A duplicate is defined as two records sharing the same DOI. This happens
# when the same paper is ingested once as an arXiv preprint and again as a
# published paper via ADS. The richer record is kept (ADS preferred), the
# weaker one is deleted.
# Usage (interactive):
#
# python deduplicate.py
#
# Usage (non-interactive):
#
# python deduplicate.py --dry-run
# python deduplicate.py --merge
import argparse
import asyncio
import sys
import asyncpg
from app.config import settings
async def get_pool() -> asyncpg.Pool:
"""Create a direct asyncpg connection pool to Postgres.
Args:
None
Returns:
asyncpg.Pool: Connected pool ready for queries.
"""
return await asyncpg.create_pool(settings.database_url)
async def find_duplicates(pool: asyncpg.Pool) -> list[dict]:
"""Find all pairs of records that share the same DOI. Queries
Postgres for any DOI that appears in more than one row. Returns
each duplicate group with both records so we can decide which
one to keep.
Args:
pool (asyncpg.Pool): The database connection pool.
Returns:
list[dict]: Each entry contains the shared doi and both records
as 'keeper' and 'duplicate', with the ADS record
preferred as keeper.
"""
async with pool.acquire() as conn:
# Find all DOIs that appear more than once
rows = await conn.fetch(
"""
SELECT doi, array_agg(identifier) AS identifiers,
array_agg(identifier_type) AS types,
array_agg(source) AS sources,
array_agg(citation_count) AS citation_counts,
array_agg(abstract IS NOT NULL) AS has_abstract
FROM papers
WHERE doi IS NOT NULL AND doi != ''
GROUP BY doi
HAVING COUNT(*) > 1
"""
)
duplicates = []
for row in rows:
identifiers = row["identifiers"]
types = row["types"]
sources = row["sources"]
citation_counts = row["citation_counts"]
has_abstracts = row["has_abstract"]
# Build a list of candidates with their metadata
candidates = [
{
"identifier": identifiers[i],
"type": types[i],
"source": sources[i],
"citation_count": citation_counts[i],
"has_abstract": has_abstracts[i],
}
for i in range(len(identifiers))
]
# Pick the default; prefer ADS, then whichever has citation count,
# then whichever has an abstract
def keeper_score(c: dict) -> tuple:
return (
c["source"] == "ads",
c["citation_count"] is not None,
c["has_abstract"],
)
candidates.sort(key=keeper_score, reverse=True)
keeper = candidates[0]
to_delete = candidates[1:]
duplicates.append(
{
"doi": row["doi"],
"keeper": keeper,
"duplicates": to_delete,
}
)
return duplicates
async def find_arxiv_duplicates(pool: asyncpg.Pool) -> list[dict]:
"""Find pairs of records that share the same arXiv ID but were ingested
from different sources (e.g. once from arXiv, once from ADS). These are
missed by find_duplicates() because they may not share a DOI.
Args:
pool (asyncpg.Pool): The database connection pool.
Returns:
list[dict]: Each entry contains the shared arxiv_id and both records
as 'keeper' and 'duplicates', with ADS preferred.
"""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT arxiv_id, array_agg(identifier) AS identifiers,
array_agg(identifier_type) AS types,
array_agg(source) AS sources,
array_agg(citation_count) AS citation_counts,
array_agg(abstract IS NOT NULL) AS has_abstract
FROM papers
WHERE arxiv_id IS NOT NULL AND arxiv_id != ''
GROUP BY arxiv_id
HAVING COUNT(*) > 1
"""
)
duplicates = []
for row in rows:
identifiers = row["identifiers"]
types = row["types"]
sources = row["sources"]
citation_counts = row["citation_counts"]
has_abstracts = row["has_abstract"]
candidates = [
{
"identifier": identifiers[i],
"type": types[i],
"source": sources[i],
"citation_count": citation_counts[i],
"has_abstract": has_abstracts[i],
}
for i in range(len(identifiers))
]
def keeper_score(c: dict) -> tuple:
return (
c["source"] == "ads",
c["citation_count"] is not None,
c["has_abstract"],
)
candidates.sort(key=keeper_score, reverse=True)
keeper = candidates[0]
to_delete = candidates[1:]
duplicates.append(
{
"arxiv_id": row["arxiv_id"],
"keeper": keeper,
"duplicates": to_delete,
}
)
return duplicates
async def merge_duplicates(
pool: asyncpg.Pool, groups: list[dict], dry_run: bool
) -> None:
"""Delete the weaker record in each duplicate group. The keeper record
already has the best metadata. Deleting the weaker record is sufficient.
No field merging needed because ADS records are always more complete than
arXiv only records for published papers.
Args:
pool (asyncpg.Pool): The database connection pool.
groups (list[dict]): Duplicate groups from find_duplicates().
dry_run (bool): If True, report what would be deleted without making
any changes.
"""
total_deleted = 0
for group in groups:
doi = group["doi"]
keeper = group["keeper"]
to_delete = group["duplicates"]
print(f"\n DOI: {doi}")
print(
f" keeping : {keeper['identifier']} ({keeper['source']})"
f" citations: {keeper['citation_count']}, abstract: {keeper['has_abstract']}"
)
for dup in to_delete:
print(
f" removing : {dup['identifier']} ({dup['source']})"
f" citations: {dup['citation_count']}, abstract: {dup['has_abstract']}"
)
if not dry_run:
async with pool.acquire() as conn:
await conn.execute(
"DELETE FROM papers WHERE identifier = $1",
dup["identifier"],
)
total_deleted += 1
print(f"\n── Deduplication {'(dry run) ' if dry_run else ''}complete ─────────────")
print(f" Duplicate groups found : {len(groups)}")
print(
f" Records {'would be ' if dry_run else ''}deleted : "
f"{sum(len(g['duplicates']) for g in groups)}"
)
if not dry_run:
print(f" Actually deleted : {total_deleted}")
print("───────────────────────────────────────────────────\n")
async def run(dry_run: bool) -> None:
"""Main entry point: find and optionally merge duplicates.
Args:
dry_run (bool): If True, report without making changes.
"""
pool = await get_pool()
try:
print("\nScanning for duplicate DOIs...")
doi_groups = await find_duplicates(pool)
print(f" Found {len(doi_groups)} DOI duplicate group(s).")
print("\nScanning for duplicate arXiv IDs...")
arxiv_groups = await find_arxiv_duplicates(pool)
print(f" Found {len(arxiv_groups)} arXiv ID duplicate group(s).\n")
all_groups = doi_groups + arxiv_groups
if not all_groups:
print("\n No duplicates found...collection is clean.\n")
return
await merge_duplicates(pool, all_groups, dry_run)
finally:
await pool.close()
def build_parser() -> argparse.ArgumentParser:
"""Build the argument parser.
Returns:
argparse.ArgumentParser: Configured parser.
"""
parser = argparse.ArgumentParser(
description="Find and merge duplicate papers sharing the same DOI.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--dry-run",
action="store_true",
help="Show what would be merged without making any changes.",
)
group.add_argument(
"--merge",
action="store_true",
help="Find and merge duplicates immediately without prompting.",
)
return parser
def prompt(question: str, default: str) -> str:
"""Prompt the user with a default value shown in brackets.
Args:
question (str): The question to display.
default (str): The default value if the user presses Enter.
Returns:
str: The user's input or the default value.
"""
answer = input(f"{question} [{default}]: ").strip()
return answer if answer else default
if __name__ == "__main__":
parser = build_parser()
args = parser.parse_args()
try:
if args.dry_run:
asyncio.run(run(dry_run=True))
elif args.merge:
asyncio.run(run(dry_run=False))
else:
# Interactive mode
print("\n══ Heliophysics Duplicate Detector ══════════════════")
print(" Compares all stored papers by DOI.")
print(" Keeps the richer record (ADS preferred), deletes the rest.\n")
dry = prompt("Dry run first? (yes/no)", "yes").lower().startswith("y")
asyncio.run(run(dry_run=dry))
if dry:
proceed = (
prompt("\nProceed with merge? (yes/no)", "no")
.lower()
.startswith("y")
)
if proceed:
asyncio.run(run(dry_run=False))
else:
print("Cancelled — no changes made.")
except KeyboardInterrupt:
print("\nCancelled.")
sys.exit(0)