diff --git a/backend/src/module/downloader/download_client.py b/backend/src/module/downloader/download_client.py index a587eec98..443a71361 100644 --- a/backend/src/module/downloader/download_client.py +++ b/backend/src/module/downloader/download_client.py @@ -1,5 +1,10 @@ import asyncio +import base64 +import binascii import logging +import re +from typing import Literal +from urllib.parse import parse_qs, urlparse from module.conf import settings from module.models import Bangumi, Torrent @@ -155,6 +160,77 @@ async def resume_torrent(self, hashes: str): await self.client.torrents_resume(hashes) async def add_torrent(self, torrent: Torrent | list, bangumi: Bangumi) -> bool: + status = await self.add_torrent_with_status(torrent, bangumi) + return status == "added" + + @staticmethod + def _extract_btih_hex(magnet_url: str) -> str | None: + """Extract BTIH hash from magnet URL and normalize to 40-char lowercase hex.""" + if not magnet_url or not magnet_url.startswith("magnet:?"): + return None + + query = parse_qs(urlparse(magnet_url).query) + xt_values = query.get("xt", []) + btih_value = next( + ( + xt.removeprefix("urn:btih:") + for xt in xt_values + if xt.startswith("urn:btih:") + ), + None, + ) + if not btih_value: + return None + + btih_value = btih_value.strip() + if re.fullmatch(r"[0-9a-fA-F]{40}", btih_value): + return btih_value.lower() + + # Some magnet links use 32-char base32 info-hash. + if re.fullmatch(r"[A-Z2-7a-z2-7]{32}", btih_value): + try: + return base64.b32decode(btih_value.upper()).hex() + except (binascii.Error, ValueError): + return None + + return None + + async def _torrent_exists_in_client(self, torrent: Torrent) -> bool: + """Best-effort duplicate detection when add_torrents returns falsy.""" + try: + existing = await self.client.torrents_info( + status_filter=None, + category=None, + tag=None, + ) + except Exception as e: + logger.debug( + "[Downloader] Could not verify existing torrent for %s: %s", + torrent.name, + e, + ) + return False + + if not isinstance(existing, list) or not existing: + return False + + target_hash = self._extract_btih_hex(torrent.url) + if target_hash: + for item in existing: + if str(item.get("hash", "")).lower() == target_hash: + return True + + target_name = (torrent.name or "").strip() + if target_name: + for item in existing: + if str(item.get("name", "")).strip() == target_name: + return True + + return False + + async def add_torrent_with_status( + self, torrent: Torrent | list, bangumi: Bangumi + ) -> Literal["added", "exists", "failed"]: """Download a torrent (or list of torrents) for the given bangumi entry. Handles both magnet links and .torrent file URLs, fetching file bytes @@ -169,7 +245,7 @@ async def add_torrent(self, torrent: Torrent | list, bangumi: Bangumi) -> bool: logger.debug( "[Downloader] No torrent found: %s", bangumi.official_title ) - return False + return "failed" if "magnet" in torrent[0].url: torrent_url = [t.url for t in torrent] torrent_file = None @@ -183,7 +259,7 @@ async def add_torrent(self, torrent: Torrent | list, bangumi: Bangumi) -> bool: logger.warning( f"[Downloader] Failed to fetch torrent files for: {bangumi.official_title}" ) - return False + return "failed" torrent_url = None else: if "magnet" in torrent.url: @@ -195,30 +271,53 @@ async def add_torrent(self, torrent: Torrent | list, bangumi: Bangumi) -> bool: logger.warning( f"[Downloader] Failed to fetch torrent file for: {bangumi.official_title}" ) - return False + return "failed" torrent_url = None # Create tag with bangumi_id for offset lookup during rename tags = f"ab:{bangumi.id}" if bangumi.id else None try: - if await self.client.add_torrents( + result = await self.client.add_torrents( torrent_urls=torrent_url, torrent_files=torrent_file, save_path=bangumi.save_path, category="Bangumi", tags=tags, - ): + ) + + # Optional tri-state return from downstream client. + if isinstance(result, str): + if result in ("added", "exists", "failed"): + return result + logger.debug( + "[Downloader] Unexpected add_torrents status %r, treating as failed", + result, + ) + return "failed" + + if result is True: logger.debug("[Downloader] Add torrent: %s", bangumi.official_title) - return True - else: + return "added" + + if isinstance(torrent, Torrent) and await self._torrent_exists_in_client( + torrent + ): logger.debug( - "[Downloader] Torrent added before: %s", bangumi.official_title + "[Downloader] Torrent already exists in client: %s", + torrent.name, ) - return False + return "exists" + + # Keep retries for unknown falsy results. + logger.warning( + "[Downloader] add_torrents returned falsy for %s, treating as failed", + bangumi.official_title, + ) + return "failed" except Exception as e: logger.error( f"[Downloader] Failed to add torrent for {bangumi.official_title}: {e}" ) - return False + return "failed" async def move_torrent(self, hashes, location): await self.client.move_torrent(hashes=hashes, new_location=location) diff --git a/backend/src/module/rss/engine.py b/backend/src/module/rss/engine.py index a9dc9dac5..858777fb3 100644 --- a/backend/src/module/rss/engine.py +++ b/backend/src/module/rss/engine.py @@ -135,6 +135,7 @@ def match_torrent(self, torrent: Torrent) -> Optional[Bangumi]: matched: Bangumi = self.bangumi.match_torrent(torrent.name) if matched: if matched.filter == "": + torrent.bangumi_id = matched.id return matched pattern = self._get_filter_pattern(matched.filter) if not pattern.search(torrent.name): @@ -142,6 +143,34 @@ def match_torrent(self, torrent: Torrent) -> Optional[Bangumi]: return matched return None + @staticmethod + async def _add_torrent_with_compat( + client: DownloadClient, torrent: Torrent, matched_data: Bangumi + ) -> str: + """Add torrent with compatibility fallback. + + Preferred path uses add_torrent_with_status() and expects one of: + - "added": accepted by downloader + - "exists": already exists in downloader + - "failed": add failed + + If the client does not implement that API (or returns unexpected data), + fallback to legacy add_torrent() bool semantics. + """ + add_with_status = getattr(client, "add_torrent_with_status", None) + if callable(add_with_status): + status = await add_with_status(torrent, matched_data) + if status in ("added", "exists", "failed"): + return status + logger.debug( + "[Engine] add_torrent_with_status returned unexpected value %r, " + "falling back to add_torrent().", + status, + ) + + added = await client.add_torrent(torrent, matched_data) + return "added" if added else "failed" + async def refresh_rss(self, client: DownloadClient, rss_id: Optional[int] = None): # Get All RSS Items if not rss_id: @@ -162,14 +191,35 @@ async def refresh_rss(self, client: DownloadClient, rss_id: Optional[int] = None rss_item.last_checked_at = now rss_item.last_error = error self.add(rss_item) + torrents_to_persist: list[Torrent] = [] for torrent in new_torrents: matched_data = self.match_torrent(torrent) - if matched_data: - if await client.add_torrent(torrent, matched_data): - logger.debug("[Engine] Add torrent %s to client", torrent.name) + if not matched_data: + torrents_to_persist.append(torrent) + continue + + add_status = await self._add_torrent_with_compat( + client, torrent, matched_data + ) + if add_status == "added": + logger.debug("[Engine] Add torrent %s to client", torrent.name) torrent.downloaded = True + torrents_to_persist.append(torrent) + elif add_status == "exists": + logger.debug( + "[Engine] Torrent %s already exists in client", torrent.name + ) + torrent.downloaded = True + torrents_to_persist.append(torrent) + else: + # Do not persist failed matched torrents. + # They should remain "new" and be retried on next refresh. + logger.warning( + "[Engine] Failed to add matched torrent %s, will retry later.", + torrent.name, + ) # Add all torrents to database - self.torrent.add_all(new_torrents) + self.torrent.add_all(torrents_to_persist) self.commit() async def download_bangumi(self, bangumi: Bangumi): diff --git a/backend/src/test/test_download_client.py b/backend/src/test/test_download_client.py index 2ce43af93..acb8e50d2 100644 --- a/backend/src/test/test_download_client.py +++ b/backend/src/test/test_download_client.py @@ -247,6 +247,69 @@ async def test_client_rejects_returns_false(self, download_client, mock_qb_clien assert result is False + async def test_client_rejects_status_is_failed(self, download_client, mock_qb_client): + """When client.add_torrents returns False, tri-state API reports failed.""" + mock_qb_client.add_torrents.return_value = False + mock_qb_client.torrents_info.return_value = [] + torrent = make_torrent(url="magnet:?xt=urn:btih:def") + bangumi = make_bangumi() + + with patch("module.downloader.download_client.RequestContent") as MockReq: + mock_req = AsyncMock() + MockReq.return_value.__aenter__ = AsyncMock(return_value=mock_req) + MockReq.return_value.__aexit__ = AsyncMock(return_value=False) + + status = await download_client.add_torrent_with_status(torrent, bangumi) + + assert status == "failed" + + async def test_client_rejects_but_hash_exists_reports_exists( + self, download_client, mock_qb_client + ): + """Falsy add result with matching info-hash should be treated as exists.""" + mock_qb_client.add_torrents.return_value = False + mock_qb_client.torrents_info.return_value = [ + {"hash": "0123456789abcdef0123456789abcdef01234567", "name": "Other Name"} + ] + torrent = make_torrent( + name="[Sub] Test Anime - 01 [1080p].mkv", + url="magnet:?xt=urn:btih:0123456789abcdef0123456789abcdef01234567", + ) + bangumi = make_bangumi() + + with patch("module.downloader.download_client.RequestContent") as MockReq: + mock_req = AsyncMock() + MockReq.return_value.__aenter__ = AsyncMock(return_value=mock_req) + MockReq.return_value.__aexit__ = AsyncMock(return_value=False) + + status = await download_client.add_torrent_with_status(torrent, bangumi) + + assert status == "exists" + + async def test_client_rejects_but_name_exists_reports_exists( + self, download_client, mock_qb_client + ): + """Falsy add result with matching torrent name should be treated as exists.""" + mock_qb_client.add_torrents.return_value = False + mock_qb_client.torrents_info.return_value = [ + {"hash": "deadbeef", "name": "[Sub] Test Anime - 01 [1080p].mkv"} + ] + torrent = make_torrent( + name="[Sub] Test Anime - 01 [1080p].mkv", + url="https://example.com/test-anime-01.torrent", + ) + bangumi = make_bangumi() + + with patch("module.downloader.download_client.RequestContent") as MockReq: + mock_req = AsyncMock() + mock_req.get_content = AsyncMock(return_value=b"torrent-file-data") + MockReq.return_value.__aenter__ = AsyncMock(return_value=mock_req) + MockReq.return_value.__aexit__ = AsyncMock(return_value=False) + + status = await download_client.add_torrent_with_status(torrent, bangumi) + + assert status == "exists" + async def test_generates_save_path_if_missing(self, download_client, mock_qb_client): """When bangumi.save_path is empty, generates one.""" torrent = make_torrent(url="magnet:?xt=urn:btih:abc") diff --git a/backend/src/test/test_integration.py b/backend/src/test/test_integration.py index b9f5ac711..9950b59f1 100644 --- a/backend/src/test/test_integration.py +++ b/backend/src/test/test_integration.py @@ -66,13 +66,13 @@ async def test_full_flow(self, db_engine): # 4. Mock download client mock_client = AsyncMock() - mock_client.add_torrent = AsyncMock(return_value=True) + mock_client.add_torrent_with_status = AsyncMock(return_value="added") # 5. Execute refresh_rss await engine.refresh_rss(mock_client) # 6. Verify: matched torrents were downloaded - assert mock_client.add_torrent.call_count == 2 + assert mock_client.add_torrent_with_status.call_count == 2 # 7. Verify: all torrents stored in DB all_torrents = engine.torrent.search_all() @@ -117,11 +117,11 @@ async def test_filtered_torrents_not_downloaded(self, db_engine): with patch.object(RSSEngine, "_get_torrents", new_callable=AsyncMock) as mock_get: mock_get.return_value = torrents mock_client = AsyncMock() - mock_client.add_torrent = AsyncMock(return_value=True) + mock_client.add_torrent_with_status = AsyncMock(return_value="added") await engine.refresh_rss(mock_client) # Only 1080p should be downloaded (720p is filtered) - assert mock_client.add_torrent.call_count == 1 + assert mock_client.add_torrent_with_status.call_count == 1 async def test_duplicate_torrents_not_reprocessed(self, db_engine): """Torrents already in the DB are not processed again.""" @@ -155,14 +155,92 @@ async def test_duplicate_torrents_not_reprocessed(self, db_engine): with patch.object(RSSEngine, "_get_torrents", new_callable=AsyncMock) as mock_get: mock_get.return_value = torrents mock_client = AsyncMock() - mock_client.add_torrent = AsyncMock(return_value=True) + mock_client.add_torrent_with_status = AsyncMock(return_value="added") await engine.refresh_rss(mock_client) # Only ep02 should be downloaded (ep01 already exists) - assert mock_client.add_torrent.call_count == 1 + assert mock_client.add_torrent_with_status.call_count == 1 all_torrents = engine.torrent.search_all() assert len(all_torrents) == 2 # original + new one + async def test_failed_add_torrent_is_retried_next_refresh(self, db_engine): + """Matched torrents that fail to add are retried in the next refresh cycle.""" + engine = RSSEngine(_engine=db_engine) + + rss_item = make_rss_item() + engine.rss.add(rss_item) + + bangumi = make_bangumi( + title_raw="Retry Anime", official_title="Retry Anime", filter="" + ) + engine.bangumi.add(bangumi) + + torrents = [ + Torrent( + name="[Sub] Retry Anime - 01 [1080p].mkv", + url="https://example.com/retry-ep01.torrent", + ) + ] + with patch.object( + RSSEngine, "_get_torrents", new_callable=AsyncMock + ) as mock_get: + mock_get.return_value = torrents + mock_client = AsyncMock() + # First cycle fails to add; second cycle succeeds. + mock_client.add_torrent_with_status = AsyncMock( + side_effect=["failed", "added"] + ) + + await engine.refresh_rss(mock_client) + + # Failed matched torrent should not be persisted yet. + assert mock_client.add_torrent_with_status.call_count == 1 + assert engine.torrent.search_all() == [] + + await engine.refresh_rss(mock_client) + + # The same torrent is retried and eventually persisted as downloaded. + assert mock_client.add_torrent_with_status.call_count == 2 + stored = engine.torrent.search_all() + assert len(stored) == 1 + assert stored[0].url == "https://example.com/retry-ep01.torrent" + assert stored[0].downloaded is True + + async def test_existing_in_client_is_persisted_without_retry(self, db_engine): + """If downloader reports torrent already exists, persist and avoid retry loops.""" + engine = RSSEngine(_engine=db_engine) + + rss_item = make_rss_item() + engine.rss.add(rss_item) + + bangumi = make_bangumi( + title_raw="Exists Anime", official_title="Exists Anime", filter="" + ) + engine.bangumi.add(bangumi) + + torrents = [ + Torrent( + name="[Sub] Exists Anime - 01 [1080p].mkv", + url="https://example.com/exists-ep01.torrent", + ) + ] + with patch.object( + RSSEngine, "_get_torrents", new_callable=AsyncMock + ) as mock_get: + mock_get.return_value = torrents + mock_client = AsyncMock() + mock_client.add_torrent_with_status = AsyncMock(return_value="exists") + + await engine.refresh_rss(mock_client) + await engine.refresh_rss(mock_client) + + # Persisted on first cycle, so second cycle should not retry same URL. + assert mock_client.add_torrent_with_status.call_count == 1 + stored = engine.torrent.search_all() + assert len(stored) == 1 + assert stored[0].url == "https://example.com/exists-ep01.torrent" + assert stored[0].downloaded is True + # --------------------------------------------------------------------------- # Rename Flow