Skip to content

Commit f937e7c

Browse files
committed
rigorous dump cleanup, move logic to model, add tests, fixes #105
1 parent c6160b4 commit f937e7c

File tree

4 files changed

+363
-33
lines changed

4 files changed

+363
-33
lines changed

poliloom/poliloom/cli.py

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
Country,
1919
CurrentImportEntity,
2020
CurrentImportStatement,
21+
DownloadAlreadyCompleteError,
22+
DownloadInProgressError,
2123
Language,
2224
Location,
2325
Position,
@@ -140,62 +142,68 @@ def dump_download(output, force):
140142
last_modified_str, "%a, %d %b %Y %H:%M:%S %Z"
141143
).replace(tzinfo=timezone.utc)
142144

143-
# Check if we already have this dump (completed or in-progress) unless --force is used
145+
# Prepare dump record (handles stale detection, force mode, etc.)
144146
with Session(get_engine()) as session:
145-
existing_dump = (
146-
session.query(WikidataDump)
147-
.filter(WikidataDump.url == url)
148-
.filter(WikidataDump.last_modified == last_modified)
149-
.first()
150-
)
147+
try:
148+
new_dump = WikidataDump.prepare_for_download(
149+
session, url, last_modified, force=force
150+
)
151+
session.commit()
151152

152-
if existing_dump and not force:
153-
if existing_dump.downloaded_at:
153+
if force:
154154
click.echo(
155-
f"❌ Dump from {last_modified.strftime('%Y-%m-%d %H:%M:%S')} UTC already downloaded"
155+
f"⚠️ Forcing new download for dump from {last_modified.strftime('%Y-%m-%d %H:%M:%S')} UTC"
156156
)
157-
click.echo("No new dump available. Use --force to download anyway.")
158-
raise SystemExit(1)
159157
else:
160158
click.echo(
161-
f"❌ Download for dump from {last_modified.strftime('%Y-%m-%d %H:%M:%S')} UTC already in progress"
162-
)
163-
click.echo(
164-
"Another download process is running. Use --force to start new download."
159+
f"📝 New dump found from {last_modified.strftime('%Y-%m-%d %H:%M:%S')} UTC"
165160
)
166-
raise SystemExit(1)
167-
elif existing_dump and force:
161+
162+
except DownloadAlreadyCompleteError:
168163
click.echo(
169-
f"⚠️ Forcing new download for dump from {last_modified.strftime('%Y-%m-%d %H:%M:%S')} UTC (bypassing existing check)"
164+
f"❌ Dump from {last_modified.strftime('%Y-%m-%d %H:%M:%S')} UTC already downloaded"
170165
)
166+
click.echo("No new dump available. Use --force to download anyway.")
167+
raise SystemExit(1)
171168

172-
# Create new dump record
173-
if not existing_dump:
169+
except DownloadInProgressError as e:
174170
click.echo(
175-
f"📝 New dump found from {last_modified.strftime('%Y-%m-%d %H:%M:%S')} UTC"
171+
f"❌ Download for dump from {last_modified.strftime('%Y-%m-%d %H:%M:%S')} UTC already in progress"
176172
)
177-
new_dump = WikidataDump(url=url, last_modified=last_modified)
178-
session.add(new_dump)
179-
session.commit()
173+
click.echo(
174+
f" Started {e.hours_elapsed:.1f}h ago. Use --force to start new download."
175+
)
176+
raise SystemExit(1)
180177

181178
# Download the file
182179
click.echo(f"⏳ Downloading Wikidata dump to {output}...")
183180
click.echo(
184181
"This is a large file (~100GB compressed) and may take several hours."
185182
)
186183

187-
StorageFactory.download_from_url(url, output)
184+
try:
185+
StorageFactory.download_from_url(url, output)
188186

189-
# Mark as downloaded
190-
new_dump.downloaded_at = datetime.now(timezone.utc)
191-
with Session(get_engine()) as session:
192-
session.merge(new_dump)
193-
session.commit()
187+
# Mark as downloaded
188+
with Session(get_engine()) as session:
189+
new_dump.mark_downloaded(session)
190+
session.commit()
191+
192+
click.echo(f"✅ Successfully downloaded dump to {output}")
194193

195-
click.echo(f"✅ Successfully downloaded dump to {output}")
194+
except Exception as download_error:
195+
# Clean up the dump record on failure to allow retries
196+
click.echo(f"❌ Download failed: {download_error}")
197+
click.echo(" Cleaning up dump record to allow retry...")
198+
with Session(get_engine()) as session:
199+
new_dump.cleanup_failed_download(session)
200+
session.commit()
201+
raise SystemExit(1)
196202

203+
except SystemExit:
204+
raise
197205
except Exception as e:
198-
click.echo(f"❌ Download failed: {e}")
206+
click.echo(f"❌ Error: {e}")
199207
raise SystemExit(1)
200208

201209

poliloom/poliloom/models/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from .wikidata import (
1818
CurrentImportEntity,
1919
CurrentImportStatement,
20+
DownloadAlreadyCompleteError,
21+
DownloadInProgressError,
2022
WikidataEntity,
2123
WikidataEntityLabel,
2224
WikidataEntityMixin,
@@ -54,6 +56,8 @@
5456
# Wikidata
5557
"CurrentImportEntity",
5658
"CurrentImportStatement",
59+
"DownloadAlreadyCompleteError",
60+
"DownloadInProgressError",
5761
"WikidataEntity",
5862
"WikidataEntityLabel",
5963
"WikidataDump",

poliloom/poliloom/models/wikidata.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,11 +708,29 @@ class WikidataRelation(Base, TimestampMixin, SoftDeleteMixin, UpsertMixin):
708708
)
709709

710710

711+
class DownloadAlreadyCompleteError(Exception):
712+
"""Raised when attempting to download a dump that's already been downloaded."""
713+
714+
pass
715+
716+
717+
class DownloadInProgressError(Exception):
718+
"""Raised when another download is already in progress for this dump."""
719+
720+
def __init__(self, message: str, hours_elapsed: float):
721+
super().__init__(message)
722+
self.hours_elapsed = hours_elapsed
723+
724+
711725
class WikidataDump(Base, TimestampMixin):
712726
"""WikidataDump entity for tracking dump download and processing stages."""
713727

714728
__tablename__ = "wikidata_dumps"
715729

730+
# Default stale threshold: downloads taking longer than 24 hours are considered failed
731+
# (typical download time is ~10 hours)
732+
STALE_THRESHOLD_HOURS = 24
733+
716734
id = Column(
717735
UUID(as_uuid=True), primary_key=True, server_default=text("gen_random_uuid()")
718736
)
@@ -734,6 +752,101 @@ class WikidataDump(Base, TimestampMixin):
734752
DateTime, nullable=True
735753
) # When politicians import completed
736754

755+
@classmethod
756+
def prepare_for_download(
757+
cls,
758+
session: Session,
759+
url: str,
760+
last_modified: datetime,
761+
force: bool = False,
762+
) -> "WikidataDump":
763+
"""Prepare a WikidataDump record for downloading.
764+
765+
Handles checking for existing downloads (completed or in-progress),
766+
stale download detection, and record cleanup.
767+
768+
Args:
769+
session: Database session
770+
url: URL of the dump file
771+
last_modified: Last-Modified timestamp from the server
772+
force: If True, bypass existing download checks
773+
774+
Returns:
775+
WikidataDump record ready for download
776+
777+
Raises:
778+
DownloadAlreadyCompleteError: If dump was already downloaded (and not force)
779+
DownloadInProgressError: If another download is in progress (and not force/stale)
780+
"""
781+
from datetime import timedelta, timezone
782+
783+
existing_dump = (
784+
session.query(cls)
785+
.filter(cls.url == url)
786+
.filter(cls.last_modified == last_modified)
787+
.first()
788+
)
789+
790+
if existing_dump and not force:
791+
if existing_dump.downloaded_at:
792+
raise DownloadAlreadyCompleteError(
793+
f"Dump from {last_modified.strftime('%Y-%m-%d %H:%M:%S')} UTC "
794+
"already downloaded"
795+
)
796+
else:
797+
# Check if the download is stale
798+
created_at_utc = existing_dump.created_at.replace(tzinfo=timezone.utc)
799+
age = datetime.now(timezone.utc) - created_at_utc
800+
hours_elapsed = age.total_seconds() / 3600
801+
802+
if age > timedelta(hours=cls.STALE_THRESHOLD_HOURS):
803+
# Stale download - clean up and allow retry
804+
session.delete(existing_dump)
805+
session.flush()
806+
existing_dump = None
807+
else:
808+
raise DownloadInProgressError(
809+
f"Download for dump from {last_modified.strftime('%Y-%m-%d %H:%M:%S')} UTC "
810+
"already in progress",
811+
hours_elapsed=hours_elapsed,
812+
)
813+
elif existing_dump and force:
814+
# Force mode - delete existing record
815+
session.delete(existing_dump)
816+
session.flush()
817+
existing_dump = None
818+
819+
# Create new dump record
820+
new_dump = cls(url=url, last_modified=last_modified)
821+
session.add(new_dump)
822+
session.flush()
823+
824+
return new_dump
825+
826+
def mark_downloaded(self, session: Session) -> None:
827+
"""Mark this dump as successfully downloaded.
828+
829+
Args:
830+
session: Database session
831+
"""
832+
from datetime import timezone
833+
834+
self.downloaded_at = datetime.now(timezone.utc)
835+
session.merge(self)
836+
session.flush()
837+
838+
def cleanup_failed_download(self, session: Session) -> None:
839+
"""Clean up this dump record after a failed download.
840+
841+
Removes the record to allow future retry attempts.
842+
843+
Args:
844+
session: Database session
845+
"""
846+
session.merge(self)
847+
session.delete(self)
848+
session.flush()
849+
737850

738851
class CurrentImportEntity(Base):
739852
"""Temporary tracking table for entities seen during current import."""

0 commit comments

Comments
 (0)