Skip to content

Commit 5673d6d

Browse files
shinonomeowclaude
andcommitted
refactor: improve download system architecture and reliability
- Separate DownloadController from DownloadQueue for better separation of concerns - Add deduplication logic to prevent duplicate torrents in download queue - Improve qBittorrent host checking and error handling - Enhance RSS service timeout handling for download queue operations - Fix import paths and module organization - Add better logging for download operations 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent e05dfc2 commit 5673d6d

10 files changed

Lines changed: 179 additions & 119 deletions

File tree

backend/src/module/core/services/download_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import TYPE_CHECKING, Any
44
from typing_extensions import override
55

6-
from module.downloader.download_queue import DownloadController
6+
from module.downloader import DownloadController
77
from module.utils.events import ServiceException
88
from .base_services import BaseService
99

backend/src/module/core/services/rss_service.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,20 @@ async def execute(self) -> None:
4343
await self._engine.refresh_all()
4444

4545
if self._eps_complete_enabled:
46-
from module.downloader import DownloadQueue
46+
from module.downloader import download_queue
4747
from module.manager import eps_complete
4848

4949
# 要等到 download queue 空了后再做这个,不然会重复下载
5050
# 等太久了就说明现在挺重的, 就先不 eps 了
51-
await asyncio.wait_for(DownloadQueue().queue.join(), timeout=60)
52-
await eps_complete()
51+
try:
52+
await asyncio.wait_for(
53+
download_queue.join(), timeout=30
54+
)
55+
await eps_complete()
56+
except asyncio.TimeoutError:
57+
logger.warning(
58+
"[RSSService] 当前任务过多,跳过Eps Complete处理"
59+
)
5360

5461
logger.debug("[RSSService] RSS刷新完成")
5562
except Exception as e:
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from .download_client import Client, DownloadClient
2-
from .download_queue import DownloadController, DownloadQueue
2+
from .download_queue import download_queue, DownloadQueue
3+
from .download_controller import DownloadController
34

45
__all__ = [
56
"DownloadClient",
67
"DownloadQueue",
78
"DownloadController",
89
"Client",
10+
"download_queue",
911
]

backend/src/module/downloader/client/qbittorrent/qbittorrent.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def initialize(self) -> None:
5353
)
5454

5555
@override
56-
async def auth(self):
56+
async def auth(self)->bool:
5757
try:
5858
resp = await self._client.post(
5959
url=QB_API_URL["login"],
@@ -98,14 +98,16 @@ async def logout(self):
9898
return False
9999

100100
@override
101-
async def check_host(self):
101+
async def check_host(self)->bool:
102102
try:
103+
logger.debug(f"[qbittorrent] Check host: {self.config.host}")
103104
resp = await self._client.get(url=QB_API_URL["version"], timeout=5)
104-
resp.raise_for_status()
105-
if resp.status_code == 200:
105+
if resp.status_code == 200 or resp.status_code == 403:
106+
logger.debug(f"[qbittorrent] Check host success: {self.config.host}")
107+
# 检查
106108
return True
107-
return False
108-
except (httpx.ConnectError, httpx.TimeoutException, httpx.ReadTimeout) as e:
109+
resp.raise_for_status()
110+
except (httpx.RemoteProtocolError,httpx.ConnectError, httpx.TimeoutException, httpx.ReadTimeout) as e:
109111
logger.error(
110112
f"[qbittorrent] Check host error,please check your host {self.config.host}"
111113
)
@@ -239,23 +241,31 @@ async def add(self, torrent_url, save_path, category) -> str | None:
239241
logger.debug(f"[QbDownloader] Got torrent hashes: {torrent_hashes}")
240242
logger.debug(f"[QbDownloader] Using hash: {torrent_link}")
241243
file = {"torrents": torrent_file}
244+
data.pop("urls", None) # 移除urls字段
242245
else:
246+
# 没有拿到的话就拿 hash
247+
torrent_link = get_hash(torrent_url)
248+
if not torrent_link:
249+
logger.error(
250+
f"[QbDownloader] Failed to get torrent hash from {torrent_url}"
251+
)
252+
return None
243253
logger.warning(
244254
f"[QbDownloader] Failed to get torrent content from {torrent_url}"
245255
)
246256
else:
247257
# 如果是 magnet 链接,直接使用
248258
torrent_link = get_hash(torrent_url)
249-
if torrent_link:
250-
# 判断是否为32字符的Base32格式(DMHY等站点使用)
251-
if len(torrent_link) == 32:
252-
# 转换Base32格式为40字符小写十六进制
253-
hex_hash = base32_to_hex(torrent_link)
254-
if hex_hash:
255-
torrent_link = hex_hash
256-
logger.debug(
257-
f"[QbDownloader] 转换Base32 hash为十六进制: {torrent_link}"
258-
)
259+
# if torrent_link:
260+
# # 判断是否为32字符的Base32格式(DMHY等站点使用)
261+
# if len(torrent_link) == 32:
262+
# # 转换Base32格式为40字符小写十六进制
263+
# hex_hash = base32_to_hex(torrent_link)
264+
# if hex_hash:
265+
# torrent_link = hex_hash
266+
# logger.debug(
267+
# f"[QbDownloader] 转换Base32 hash为十六进制: {torrent_link}"
268+
# )
259269
logger.debug(f"[QbDownloader] Using magnet link: {torrent_url}")
260270
try:
261271
resp = await self._client.post(

backend/src/module/downloader/download_client.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from module.utils import gen_save_path
1111

1212
from .client import AuthorizationError, BaseDownloader
13+
from .download_queue import download_queue
1314

1415
logger = logging.getLogger(__name__)
1516

@@ -48,7 +49,6 @@ async def wrapper(self, *args, **kwargs):
4849
if lock_task.done() and not lock_task.cancelled():
4950
self._api_lock.release()
5051
raise asyncio.CancelledError("API call cancelled")
51-
5252
# 成功获取锁
5353
break
5454

@@ -145,19 +145,19 @@ async def login(self):
145145
self.login_success_event.clear() # 重置事件状态
146146
self.is_authenticating = True # 设置正在认证状态
147147
logger.debug("[Downloader Client] attempting authentication")
148-
148+
if not await self.check_host():
149+
logger.error("[Downloader Client] Downloader host check failed")
150+
return
151+
logger.debug("[Downloader Client] Downloader host check passed")
149152
if await self.downloader.auth():
150153
self.login_success_event.set() # 设置认证成功事件
151-
logger.info("[Downloader Client] authentication success")
152-
else:
153-
# 保持 login_success_event 为 clear 状态,表示认证失败
154-
logger.warning("[Downloader Client] authentication failed")
155154

156155
except Exception as e:
157156
logger.error(f"[Downloader Client] authentication error: {e}")
158157
# 保持 login_success_event 为 clear 状态,表示认证失败
159158
finally:
160159
self.is_authenticating = False
160+
self.login_task = None
161161

162162
async def wait_for_login(self) -> bool:
163163
"""等待认证完成,返回是否可以继续API调用"""
@@ -238,14 +238,18 @@ async def add_torrent(self, torrent: Torrent, bangumi) -> bool:
238238
)
239239
except AuthorizationError:
240240
self.start_login()
241+
#TODO: 重试太多了怎么办?
242+
# https://mikanani.me/Home/Episode/4294fd53bcd1bfe2ff3b5796004ee3ccb1ba0d0e 这是个死种
243+
download_queue.add(torrent, bangumi) # 重新放回队列
244+
logger.debug( f"[Downloader] Add torrent failed, re-adding to queue: {torrent.name}")
241245
return False
242246

243247
@api_rate_limit
244248
async def move_torrent(self, hashes: list[str] | str, location: str) -> bool:
245249
if not await self.wait_for_login():
246250
return False # 登录失败时返回False
247-
248251
try:
252+
#TODO: 好像是用 | 连起来就行,但现在好像用不上了
249253
result = await self.downloader.move(hashes=hashes, new_location=location)
250254
if result:
251255
logger.info(f"[Downloader] Move torrents {hashes} to {location}")
@@ -333,7 +337,7 @@ async def get_torrent_files(self, _hash: str) -> list[str] | None:
333337
return []
334338

335339
def start_login(self):
336-
if not self.is_authenticating:
340+
if not self.is_authenticating and self.login_task is None:
337341
self.is_authenticating = True # 设置认证状态
338342
self.login_task = asyncio.create_task(self.login(), name="login")
339343

@@ -371,7 +375,8 @@ def get_waiting_api_count(self) -> int:
371375
async def stop(self):
372376
logger.info("[Download Client] Stopping download client")
373377
self.cancel_all_api_calls() # 先取消所有API调用
374-
await self.downloader.logout()
378+
if self.login_success_event.is_set():
379+
await self.downloader.logout()
375380
if self.login_task:
376381
self.login_task.cancel()
377382

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import asyncio
2+
import logging
3+
4+
from module.database import Database, engine
5+
from module.downloader.download_client import Client as client
6+
from module.utils import event_bus
7+
from module.utils.events import Event, EventBus, EventType
8+
from module.models import Bangumi, Torrent
9+
10+
from .download_queue import download_queue
11+
12+
logger = logging.getLogger(__name__)
13+
class DownloadController:
14+
def __init__(self):
15+
self._event_bus:EventBus = event_bus
16+
17+
# 10秒拿5个
18+
async def download(self):
19+
if client.login_success_event.is_set() is False:
20+
if client.login_task is None:
21+
logger.warning(
22+
"[Download Controller] 正在尝试登录下载客户端,请稍候..."
23+
)
24+
client.login_task = asyncio.create_task(client.login())
25+
return
26+
queue_size = download_queue.qsize()
27+
if queue_size == 0:
28+
return
29+
30+
tasks = []
31+
torrents = []
32+
torrent_bangumi_pairs = []
33+
34+
# 一次取五个torrent
35+
batch_size = min(queue_size, 5)
36+
37+
for i in range(batch_size):
38+
torrent, bangumi = download_queue.get_nowait()
39+
40+
download_queue.task_done()
41+
logging.debug(f"[Download Controller] start download {torrent.name}")
42+
torrents.append(torrent)
43+
torrent_bangumi_pairs.append((torrent, bangumi))
44+
tasks.append(client.add_torrent(torrent, bangumi))
45+
# 更新种子信息
46+
47+
# 执行下载任务
48+
results = await asyncio.gather(*tasks)
49+
50+
# 处理下载结果并发布事件
51+
# 保存到数据库
52+
53+
with Database(engine) as database:
54+
for torrent in torrents:
55+
database.torrent.add(torrent)
56+
for i, result in enumerate(results):
57+
torrent, bangumi = torrent_bangumi_pairs[i]
58+
59+
if result is True: # 下载成功
60+
if torrent.download_uid: # 有下载哈希
61+
# 发布下载开始事件
62+
await self._publish_download_started(torrent, bangumi)
63+
elif isinstance(result, Exception):
64+
logger.error(
65+
f"[Download Controller] 下载失败: {torrent.name} - {result}"
66+
)
67+
68+
async def _publish_download_started(
69+
self, torrent: Torrent, bangumi: Bangumi
70+
) -> None:
71+
"""发布下载开始事件
72+
73+
Args:
74+
torrent: 种子信息
75+
bangumi: 番剧信息
76+
"""
77+
if not self._event_bus:
78+
logger.warning("[Download Controller] EventBus 未设置,无法发布事件")
79+
return
80+
81+
try:
82+
event = Event(
83+
type=EventType.DOWNLOAD_STARTED,
84+
data={"torrent": torrent, "bangumi": bangumi},
85+
)
86+
87+
asyncio.create_task(self._event_bus.publish(event))
88+
logger.debug(f"[Download Controller] 已发布下载开始事件: {torrent.name}")
89+
90+
except Exception as e:
91+
logger.error(f"[Download Controller] 发布下载开始事件失败: {e}")

0 commit comments

Comments
 (0)