Skip to content

Commit 88c77ea

Browse files
authored
Merge pull request #79 from 19-84/fix/voat-dup-id-batch-drop
fix(import): dedupe IDs within COPY batch; relax export/enrich file requirement
2 parents 2be3693 + 2e58620 commit 88c77ea

5 files changed

Lines changed: 250 additions & 12 deletions

File tree

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,20 @@ All notable changes to Redd-Archiver will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [Unreleased]
9+
10+
### Fixed
11+
- Comment/post import dropped an entire COPY batch (up to ~1,000 rows) when the
12+
source contained a duplicate ID within that batch — the staging table's
13+
PRIMARY KEY aborted the whole COPY, not just the dup. Observed on the Voat
14+
searchvoat.co dump (overlapping exports repeat rows): a single `privacy`
15+
subverse lost ~850 comments. Duplicate IDs within a batch are now de-duped
16+
before COPY and reported; cross-batch repeats were already upserted.
17+
- `--export-from-database` (and the `--enrich*` modes) no longer require
18+
`--comments-file`/`--submissions-file` when a single community is named.
19+
Those modes read from the database / metadata dumps, not the source files,
20+
so the requirement was spurious.
21+
822
## [1.1.0] — 2026-06-12 — "Living Archive"
923

1024
The archive is no longer a snapshot: serve it three ways, keep it current

core/postgres_database.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,7 @@ def insert_posts_batch(
789789
successful = 0
790790
failed = 0
791791
skipped = 0 # Track posts without valid IDs
792+
duplicates = 0 # Track duplicate IDs within a batch (e.g. overlapping source dumps)
792793
failed_post_ids: set[str] = set() # Track which specific posts failed
793794
total_posts = len(posts)
794795
current_batch_size = initial_batch_size
@@ -813,6 +814,11 @@ def insert_posts_batch(
813814
# This eliminates 350MB buffer overhead per batch
814815
copy_buffer = StringIO()
815816
records_prepared = 0
817+
# Dedupe IDs within the batch: posts_staging has a PRIMARY KEY,
818+
# so a repeated ID (common when source dumps overlap) aborts the whole
819+
# COPY — not just the dup row. Cross-batch repeats are handled by the
820+
# ON CONFLICT upsert below; only same-batch collisions break COPY.
821+
seen_ids: set[str] = set()
816822

817823
for post in batch:
818824
# Validate post has a valid ID before attempting insertion
@@ -821,6 +827,12 @@ def insert_posts_batch(
821827
skipped += 1
822828
continue
823829

830+
id_key = str(post_id)
831+
if id_key in seen_ids:
832+
duplicates += 1
833+
continue
834+
seen_ids.add(id_key)
835+
824836
try:
825837
sanitized_post = self._sanitize_recursive(post)
826838
json_data = json.dumps(sanitized_post, allow_nan=False)
@@ -948,6 +960,9 @@ def insert_posts_batch(
948960
if skipped > 0:
949961
print_warning(f"Skipped {skipped} posts with missing or empty IDs")
950962

963+
if duplicates > 0:
964+
print_warning(f"Skipped {duplicates} duplicate post IDs within batches (overlapping source data)")
965+
951966
if failed > 0:
952967
print_warning(f"Failed to insert {failed} posts ({len(failed_post_ids)} unique IDs tracked)")
953968

@@ -986,6 +1001,7 @@ def insert_comments_batch(
9861001
successful = 0
9871002
failed = 0
9881003
skipped = 0 # Track comments without valid IDs
1004+
duplicates = 0 # Track duplicate IDs within a batch (e.g. overlapping source dumps)
9891005
total_comments = len(comments)
9901006
current_batch_size = initial_batch_size
9911007

@@ -1019,6 +1035,11 @@ def insert_comments_batch(
10191035
# Use PostgreSQL COPY protocol for true streaming (no buffering)
10201036
copy_buffer = StringIO()
10211037
records_prepared = 0
1038+
# Dedupe IDs within the batch: comments_staging has a PRIMARY KEY,
1039+
# so a repeated ID (common when source dumps overlap) aborts the whole
1040+
# COPY — not just the dup row. Cross-batch repeats are handled by the
1041+
# ON CONFLICT upsert below; only same-batch collisions break COPY.
1042+
seen_ids: set[str] = set()
10221043

10231044
for comment in batch:
10241045
# Validate comment has a valid ID before attempting insertion
@@ -1027,6 +1048,12 @@ def insert_comments_batch(
10271048
skipped += 1
10281049
continue
10291050

1051+
id_key = str(comment_id)
1052+
if id_key in seen_ids:
1053+
duplicates += 1
1054+
continue
1055+
seen_ids.add(id_key)
1056+
10301057
try:
10311058
# Extract post_id (parent thread ID)
10321059
# For multi-platform support, prefer post_id field from normalized data
@@ -1192,6 +1219,9 @@ def insert_comments_batch(
11921219
if skipped > 0:
11931220
print_warning(f"Skipped {skipped} comments with missing or empty IDs")
11941221

1222+
if duplicates > 0:
1223+
print_warning(f"Skipped {duplicates} duplicate comment IDs within batches (overlapping source data)")
1224+
11951225
if failed > 0:
11961226
print_warning(f"Failed to insert {failed} comments")
11971227

reddarc.py

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,25 @@ def detect_resume_state_and_files(
639639
return "start_fresh", None, {}
640640

641641

642+
def _import_needs_source_files(args: argparse.Namespace) -> bool:
643+
"""Whether the run consumes --comments-file/--submissions-file.
644+
645+
Only a real import from the positional input reads the source files.
646+
--export-from-database renders from the database, and the --enrich* paths
647+
read from metadata dumps — none of them touch the comments/submissions
648+
files, so those flags must not be required in those modes.
649+
"""
650+
return not (
651+
args.export_from_database
652+
or args.enrich
653+
or args.enrich_metadata
654+
or args.enrich_rules
655+
or args.enrich_wikis
656+
or args.enrich_voat
657+
or args.voat_thumbnails
658+
)
659+
660+
642661
def main() -> None:
643662
parser = argparse.ArgumentParser(
644663
description="Generate multi-platform archive websites from Reddit, Voat, and Ruqqus data",
@@ -966,19 +985,23 @@ def main() -> None:
966985

967986
# Validate single community mode arguments (subreddit/subverse/guild)
968987
community_filter = args.subreddit or args.subverses or args.guilds
988+
# Source files are only consumed when importing from the positional input;
989+
# export/enrich modes read from the DB or metadata dumps instead.
990+
import_needs_files = _import_needs_source_files(args)
969991
if community_filter:
970-
if not (args.comments_file and args.submissions_file):
971-
platform_name = "subreddit" if args.subreddit else ("subverse" if args.subverses else "guild")
972-
print_error(f"Single {platform_name} mode requires both --comments-file and --submissions-file")
973-
print_info(
974-
f"Example: python reddarc.py /data --{platform_name} example --comments-file /data/example_comments.zst --submissions-file /data/example_submissions.zst"
975-
)
976-
return
977-
if not all(os.path.exists(f) for f in [args.comments_file, args.submissions_file]):
978-
print_error("One or both specified files do not exist:")
979-
print_error(f" Comments: {args.comments_file} (exists: {os.path.exists(args.comments_file)})")
980-
print_error(f" Submissions: {args.submissions_file} (exists: {os.path.exists(args.submissions_file)})")
981-
return
992+
if import_needs_files:
993+
if not (args.comments_file and args.submissions_file):
994+
platform_name = "subreddit" if args.subreddit else ("subverse" if args.subverses else "guild")
995+
print_error(f"Single {platform_name} mode requires both --comments-file and --submissions-file")
996+
print_info(
997+
f"Example: python reddarc.py /data --{platform_name} example --comments-file /data/example_comments.zst --submissions-file /data/example_submissions.zst"
998+
)
999+
return
1000+
if not all(os.path.exists(f) for f in [args.comments_file, args.submissions_file]):
1001+
print_error("One or both specified files do not exist:")
1002+
print_error(f" Comments: {args.comments_file} (exists: {os.path.exists(args.comments_file)})")
1003+
print_error(f" Submissions: {args.submissions_file} (exists: {os.path.exists(args.submissions_file)})")
1004+
return
9821005
prefix = "r/" if args.subreddit else ("v/" if args.subverses else "g/")
9831006
print_info(f"Single community mode: processing {prefix}{community_filter}")
9841007
elif args.comments_file or args.submissions_file:
@@ -1009,6 +1032,14 @@ def main() -> None:
10091032
process_export_only(args.input_dir or ".", args.output, {}, args)
10101033
return
10111034

1035+
# Export-only mode reads everything from the database — no source files or
1036+
# input-dir discovery required. Dispatch here, before the import path, so a
1037+
# plain `--export-from-database` does not demand --comments-file/
1038+
# --submissions-file or a populated input directory.
1039+
if args.export_from_database:
1040+
process_export_only(args.input_dir or ".", args.output, {}, args)
1041+
return
1042+
10121043
# Validate input directory
10131044
if not args.input_dir:
10141045
print_error("input_dir is required (directory containing .zst files)")

tests/test_cli_validation.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/usr/bin/env python
2+
"""
3+
ABOUTME: Unit tests for reddarc CLI argument validation helpers
4+
ABOUTME: Covers which run modes require --comments-file/--submissions-file
5+
"""
6+
7+
import argparse
8+
9+
import pytest
10+
11+
from reddarc import _import_needs_source_files
12+
13+
14+
def _args(**overrides) -> argparse.Namespace:
15+
"""Build an args namespace with all mode flags off, then apply overrides."""
16+
base = {
17+
"export_from_database": False,
18+
"enrich": None,
19+
"enrich_metadata": None,
20+
"enrich_rules": None,
21+
"enrich_wikis": None,
22+
"enrich_voat": None,
23+
"voat_thumbnails": None,
24+
}
25+
base.update(overrides)
26+
return argparse.Namespace(**base)
27+
28+
29+
@pytest.mark.unit
30+
class TestImportNeedsSourceFiles:
31+
"""A community filter only requires source files for a real import."""
32+
33+
def test_plain_import_requires_files(self):
34+
# No export/enrich flag set: a normal import reads the source files.
35+
assert _import_needs_source_files(_args()) is True
36+
37+
def test_export_from_database_does_not_require_files(self):
38+
# Regression: `--export-from-database --subverse x` used to demand
39+
# --comments-file/--submissions-file even though it renders from the DB.
40+
assert _import_needs_source_files(_args(export_from_database=True)) is False
41+
42+
@pytest.mark.parametrize(
43+
"flag",
44+
["enrich", "enrich_metadata", "enrich_rules", "enrich_wikis", "enrich_voat", "voat_thumbnails"],
45+
)
46+
def test_enrich_modes_do_not_require_files(self, flag):
47+
# Enrichment reads metadata dumps / the DB, not the comments/submissions files.
48+
assert _import_needs_source_files(_args(**{flag: "/some/path"})) is False
49+
50+
def test_enrich_chained_with_export_still_does_not_require_files(self):
51+
assert _import_needs_source_files(_args(enrich_voat="/p", export_from_database=True)) is False

tests/test_postgres_database_extended.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,62 @@ def test_insert_posts_batch_with_duplicates(self, postgres_db):
115115
cur.execute("DELETE FROM posts WHERE subreddit = 'test_dup'")
116116
conn.commit()
117117

118+
def test_insert_posts_batch_within_batch_duplicate_ids(self, postgres_db):
119+
"""A duplicate ID within a single batch must not abort the whole COPY.
120+
121+
Overlapping source dumps repeat the same row inside one batch; the
122+
posts_staging PRIMARY KEY would otherwise fail the entire COPY (taking
123+
every other post in the batch down with it). The duplicate is dropped
124+
and the remaining rows still land.
125+
"""
126+
posts = [
127+
{
128+
"id": "wb_dup_post",
129+
"subreddit": "test_wbdup",
130+
"author": "a",
131+
"title": "First",
132+
"created_utc": 1640000000,
133+
"score": 10,
134+
"permalink": "/r/test_wbdup/comments/wb_dup_post/",
135+
"platform": "reddit",
136+
},
137+
{
138+
"id": "wb_dup_post", # same ID, same batch
139+
"subreddit": "test_wbdup",
140+
"author": "a",
141+
"title": "Duplicate",
142+
"created_utc": 1640000000,
143+
"score": 99,
144+
"permalink": "/r/test_wbdup/comments/wb_dup_post/",
145+
"platform": "reddit",
146+
},
147+
{
148+
"id": "wb_unique_post",
149+
"subreddit": "test_wbdup",
150+
"author": "a",
151+
"title": "Unique",
152+
"created_utc": 1640000001,
153+
"score": 5,
154+
"permalink": "/r/test_wbdup/comments/wb_unique_post/",
155+
"platform": "reddit",
156+
},
157+
]
158+
159+
_successful, failed, _failed_ids = postgres_db.insert_posts_batch(posts)
160+
161+
# The whole batch must not be lost to the duplicate: both distinct IDs land.
162+
assert failed == 0
163+
with postgres_db.pool.get_connection() as conn, conn.cursor() as cur:
164+
cur.execute("SELECT COUNT(DISTINCT id) FROM posts WHERE subreddit = 'test_wbdup'")
165+
assert cur.fetchone()["count"] == 2
166+
cur.execute("SELECT COUNT(*) FROM posts WHERE id = 'wb_dup_post'")
167+
assert cur.fetchone()["count"] == 1
168+
169+
# Cleanup
170+
with postgres_db.pool.get_connection() as conn, conn.cursor() as cur:
171+
cur.execute("DELETE FROM posts WHERE subreddit = 'test_wbdup'")
172+
conn.commit()
173+
118174
def test_insert_posts_batch_empty_list(self, postgres_db):
119175
"""Test empty batch insertion."""
120176
successful, failed, _failed_ids = postgres_db.insert_posts_batch([])
@@ -213,6 +269,62 @@ def test_insert_comments_batch_basic(self, postgres_db):
213269
cur.execute("DELETE FROM posts WHERE id = 'comment_parent_post'")
214270
conn.commit()
215271

272+
def test_insert_comments_batch_within_batch_duplicate_ids(self, postgres_db):
273+
"""A duplicate comment ID within a single batch must not abort the whole COPY.
274+
275+
This is the exact failure observed on the Voat searchvoat dump, where
276+
overlapping exports repeated a comment row: comments_staging's PRIMARY
277+
KEY failed the COPY and dropped the entire 1000-row batch. The duplicate
278+
is now skipped and the rest of the batch still lands.
279+
"""
280+
parent_post = {
281+
"id": "wbc_parent_post",
282+
"subreddit": "test_wbcdup",
283+
"author": "post_author",
284+
"title": "Parent",
285+
"created_utc": 1640000000,
286+
"score": 100,
287+
"permalink": "/r/test_wbcdup/comments/wbc_parent_post/",
288+
"platform": "reddit",
289+
}
290+
postgres_db.insert_posts_batch([parent_post])
291+
292+
def _comment(cid: str, score: int, body: str) -> dict:
293+
return {
294+
"id": cid,
295+
"subreddit": "test_wbcdup",
296+
"author": "commenter",
297+
"body": body,
298+
"created_utc": 1640000100,
299+
"score": score,
300+
"post_id": "wbc_parent_post",
301+
"link_id": "t3_wbc_parent_post",
302+
"parent_id": "t3_wbc_parent_post",
303+
"permalink": "/r/test_wbcdup/comments/wbc_parent_post/_/" + cid + "/",
304+
"platform": "reddit",
305+
}
306+
307+
comments = [
308+
_comment("wbc_dup", 10, "first"),
309+
_comment("wbc_dup", 99, "duplicate in same batch"),
310+
_comment("wbc_unique", 5, "unique"),
311+
]
312+
313+
_successful, failed = postgres_db.insert_comments_batch(comments)
314+
315+
assert failed == 0
316+
with postgres_db.pool.get_connection() as conn, conn.cursor() as cur:
317+
cur.execute("SELECT COUNT(DISTINCT id) FROM comments WHERE subreddit = 'test_wbcdup'")
318+
assert cur.fetchone()["count"] == 2
319+
cur.execute("SELECT COUNT(*) FROM comments WHERE id = 'wbc_dup'")
320+
assert cur.fetchone()["count"] == 1
321+
322+
# Cleanup
323+
with postgres_db.pool.get_connection() as conn, conn.cursor() as cur:
324+
cur.execute("DELETE FROM comments WHERE subreddit = 'test_wbcdup'")
325+
cur.execute("DELETE FROM posts WHERE id = 'wbc_parent_post'")
326+
conn.commit()
327+
216328
def test_insert_comments_batch_empty_list(self, postgres_db):
217329
"""Test empty comment batch insertion."""
218330
successful, failed = postgres_db.insert_comments_batch([])

0 commit comments

Comments
 (0)