Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 109 additions & 10 deletions backend/src/module/downloader/download_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import asyncio
import base64
import binascii
import logging
import re
from typing import Literal
from urllib.parse import parse_qs, urlparse

from module.conf import settings
from module.models import Bangumi, Torrent
Expand Down Expand Up @@ -155,6 +160,77 @@ async def resume_torrent(self, hashes: str):
await self.client.torrents_resume(hashes)

async def add_torrent(self, torrent: Torrent | list, bangumi: Bangumi) -> bool:
status = await self.add_torrent_with_status(torrent, bangumi)
return status == "added"

@staticmethod
def _extract_btih_hex(magnet_url: str) -> str | None:
"""Extract BTIH hash from magnet URL and normalize to 40-char lowercase hex."""
if not magnet_url or not magnet_url.startswith("magnet:?"):
return None

query = parse_qs(urlparse(magnet_url).query)
xt_values = query.get("xt", [])
btih_value = next(
(
xt.removeprefix("urn:btih:")
for xt in xt_values
if xt.startswith("urn:btih:")
),
None,
)
if not btih_value:
return None

btih_value = btih_value.strip()
if re.fullmatch(r"[0-9a-fA-F]{40}", btih_value):
return btih_value.lower()

# Some magnet links use 32-char base32 info-hash.
if re.fullmatch(r"[A-Z2-7a-z2-7]{32}", btih_value):
try:
return base64.b32decode(btih_value.upper()).hex()
except (binascii.Error, ValueError):
return None

return None

async def _torrent_exists_in_client(self, torrent: Torrent) -> bool:
"""Best-effort duplicate detection when add_torrents returns falsy."""
try:
existing = await self.client.torrents_info(
status_filter=None,
category=None,
tag=None,
)
except Exception as e:
logger.debug(
"[Downloader] Could not verify existing torrent for %s: %s",
torrent.name,
e,
)
return False

if not isinstance(existing, list) or not existing:
return False

target_hash = self._extract_btih_hex(torrent.url)
if target_hash:
for item in existing:
if str(item.get("hash", "")).lower() == target_hash:
return True

target_name = (torrent.name or "").strip()
if target_name:
for item in existing:
if str(item.get("name", "")).strip() == target_name:
return True

return False

async def add_torrent_with_status(
self, torrent: Torrent | list, bangumi: Bangumi
) -> Literal["added", "exists", "failed"]:
"""Download a torrent (or list of torrents) for the given bangumi entry.

Handles both magnet links and .torrent file URLs, fetching file bytes
Expand All @@ -169,7 +245,7 @@ async def add_torrent(self, torrent: Torrent | list, bangumi: Bangumi) -> bool:
logger.debug(
"[Downloader] No torrent found: %s", bangumi.official_title
)
return False
return "failed"
if "magnet" in torrent[0].url:
torrent_url = [t.url for t in torrent]
torrent_file = None
Expand All @@ -183,7 +259,7 @@ async def add_torrent(self, torrent: Torrent | list, bangumi: Bangumi) -> bool:
logger.warning(
f"[Downloader] Failed to fetch torrent files for: {bangumi.official_title}"
)
return False
return "failed"
torrent_url = None
else:
if "magnet" in torrent.url:
Expand All @@ -195,30 +271,53 @@ async def add_torrent(self, torrent: Torrent | list, bangumi: Bangumi) -> bool:
logger.warning(
f"[Downloader] Failed to fetch torrent file for: {bangumi.official_title}"
)
return False
return "failed"
torrent_url = None
# Create tag with bangumi_id for offset lookup during rename
tags = f"ab:{bangumi.id}" if bangumi.id else None
try:
if await self.client.add_torrents(
result = await self.client.add_torrents(
torrent_urls=torrent_url,
torrent_files=torrent_file,
save_path=bangumi.save_path,
category="Bangumi",
tags=tags,
):
)

# Optional tri-state return from downstream client.
if isinstance(result, str):
if result in ("added", "exists", "failed"):
return result
logger.debug(
"[Downloader] Unexpected add_torrents status %r, treating as failed",
result,
)
return "failed"

if result is True:
logger.debug("[Downloader] Add torrent: %s", bangumi.official_title)
return True
else:
return "added"

if isinstance(torrent, Torrent) and await self._torrent_exists_in_client(
torrent
):
logger.debug(
"[Downloader] Torrent added before: %s", bangumi.official_title
"[Downloader] Torrent already exists in client: %s",
torrent.name,
)
return False
return "exists"

# Keep retries for unknown falsy results.
logger.warning(
"[Downloader] add_torrents returned falsy for %s, treating as failed",
bangumi.official_title,
)
return "failed"
except Exception as e:
logger.error(
f"[Downloader] Failed to add torrent for {bangumi.official_title}: {e}"
)
return False
return "failed"

async def move_torrent(self, hashes, location):
await self.client.move_torrent(hashes=hashes, new_location=location)
Expand Down
58 changes: 54 additions & 4 deletions backend/src/module/rss/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,42 @@ def match_torrent(self, torrent: Torrent) -> Optional[Bangumi]:
matched: Bangumi = self.bangumi.match_torrent(torrent.name)
if matched:
if matched.filter == "":
torrent.bangumi_id = matched.id
return matched
pattern = self._get_filter_pattern(matched.filter)
if not pattern.search(torrent.name):
torrent.bangumi_id = matched.id
return matched
return None

@staticmethod
async def _add_torrent_with_compat(
client: DownloadClient, torrent: Torrent, matched_data: Bangumi
) -> str:
"""Add torrent with compatibility fallback.

Preferred path uses add_torrent_with_status() and expects one of:
- "added": accepted by downloader
- "exists": already exists in downloader
- "failed": add failed

If the client does not implement that API (or returns unexpected data),
fallback to legacy add_torrent() bool semantics.
"""
add_with_status = getattr(client, "add_torrent_with_status", None)
if callable(add_with_status):
status = await add_with_status(torrent, matched_data)
if status in ("added", "exists", "failed"):
return status
logger.debug(
"[Engine] add_torrent_with_status returned unexpected value %r, "
"falling back to add_torrent().",
status,
)

added = await client.add_torrent(torrent, matched_data)
return "added" if added else "failed"

async def refresh_rss(self, client: DownloadClient, rss_id: Optional[int] = None):
# Get All RSS Items
if not rss_id:
Expand All @@ -162,14 +191,35 @@ async def refresh_rss(self, client: DownloadClient, rss_id: Optional[int] = None
rss_item.last_checked_at = now
rss_item.last_error = error
self.add(rss_item)
torrents_to_persist: list[Torrent] = []
for torrent in new_torrents:
matched_data = self.match_torrent(torrent)
if matched_data:
if await client.add_torrent(torrent, matched_data):
logger.debug("[Engine] Add torrent %s to client", torrent.name)
if not matched_data:
torrents_to_persist.append(torrent)
continue

add_status = await self._add_torrent_with_compat(
client, torrent, matched_data
)
if add_status == "added":
logger.debug("[Engine] Add torrent %s to client", torrent.name)
torrent.downloaded = True
torrents_to_persist.append(torrent)
elif add_status == "exists":
logger.debug(
"[Engine] Torrent %s already exists in client", torrent.name
)
torrent.downloaded = True
torrents_to_persist.append(torrent)
else:
# Do not persist failed matched torrents.
# They should remain "new" and be retried on next refresh.
logger.warning(
"[Engine] Failed to add matched torrent %s, will retry later.",
torrent.name,
)
# Add all torrents to database
self.torrent.add_all(new_torrents)
self.torrent.add_all(torrents_to_persist)
self.commit()

async def download_bangumi(self, bangumi: Bangumi):
Expand Down
63 changes: 63 additions & 0 deletions backend/src/test/test_download_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,69 @@ async def test_client_rejects_returns_false(self, download_client, mock_qb_clien

assert result is False

async def test_client_rejects_status_is_failed(self, download_client, mock_qb_client):
"""When client.add_torrents returns False, tri-state API reports failed."""
mock_qb_client.add_torrents.return_value = False
mock_qb_client.torrents_info.return_value = []
torrent = make_torrent(url="magnet:?xt=urn:btih:def")
bangumi = make_bangumi()

with patch("module.downloader.download_client.RequestContent") as MockReq:
mock_req = AsyncMock()
MockReq.return_value.__aenter__ = AsyncMock(return_value=mock_req)
MockReq.return_value.__aexit__ = AsyncMock(return_value=False)

status = await download_client.add_torrent_with_status(torrent, bangumi)

assert status == "failed"

async def test_client_rejects_but_hash_exists_reports_exists(
self, download_client, mock_qb_client
):
"""Falsy add result with matching info-hash should be treated as exists."""
mock_qb_client.add_torrents.return_value = False
mock_qb_client.torrents_info.return_value = [
{"hash": "0123456789abcdef0123456789abcdef01234567", "name": "Other Name"}
]
torrent = make_torrent(
name="[Sub] Test Anime - 01 [1080p].mkv",
url="magnet:?xt=urn:btih:0123456789abcdef0123456789abcdef01234567",
)
bangumi = make_bangumi()

with patch("module.downloader.download_client.RequestContent") as MockReq:
mock_req = AsyncMock()
MockReq.return_value.__aenter__ = AsyncMock(return_value=mock_req)
MockReq.return_value.__aexit__ = AsyncMock(return_value=False)

status = await download_client.add_torrent_with_status(torrent, bangumi)

assert status == "exists"

async def test_client_rejects_but_name_exists_reports_exists(
self, download_client, mock_qb_client
):
"""Falsy add result with matching torrent name should be treated as exists."""
mock_qb_client.add_torrents.return_value = False
mock_qb_client.torrents_info.return_value = [
{"hash": "deadbeef", "name": "[Sub] Test Anime - 01 [1080p].mkv"}
]
torrent = make_torrent(
name="[Sub] Test Anime - 01 [1080p].mkv",
url="https://example.com/test-anime-01.torrent",
)
bangumi = make_bangumi()

with patch("module.downloader.download_client.RequestContent") as MockReq:
mock_req = AsyncMock()
mock_req.get_content = AsyncMock(return_value=b"torrent-file-data")
MockReq.return_value.__aenter__ = AsyncMock(return_value=mock_req)
MockReq.return_value.__aexit__ = AsyncMock(return_value=False)

status = await download_client.add_torrent_with_status(torrent, bangumi)

assert status == "exists"

async def test_generates_save_path_if_missing(self, download_client, mock_qb_client):
"""When bangumi.save_path is empty, generates one."""
torrent = make_torrent(url="magnet:?xt=urn:btih:abc")
Expand Down
Loading