Skip to content

Commit 99c8764

Browse files
EstrellaXDclaude
andcommitted
perf: optimize renamer with batch database queries and reduced blocking
- Add batch offset lookup to reduce N database connections to 1-3 per cycle - Add search_by_qb_hashes() and search_ids() for batch queries - Throttle pending rename cache cleanup to once per minute max - Use exponential backoff for rename verification (0.1s->0.2s->0.4s) - Skip verification for subtitle renames to reduce latency Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 789c02f commit 99c8764

5 files changed

Lines changed: 177 additions & 41 deletions

File tree

backend/src/module/database/bangumi.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,14 @@ def search_id(self, _id: int) -> Optional[Bangumi]:
365365
logger.debug(f"[Database] Find bangumi id: {_id}.")
366366
return bangumi
367367

368+
def search_ids(self, ids: list[int]) -> list[Bangumi]:
369+
"""Batch lookup multiple bangumi by their IDs."""
370+
if not ids:
371+
return []
372+
statement = select(Bangumi).where(Bangumi.id.in_(ids))
373+
result = self.session.execute(statement)
374+
return list(result.scalars().all())
375+
368376
def match_poster(self, bangumi_name: str) -> str:
369377
statement = select(Bangumi).where(
370378
func.instr(bangumi_name, Bangumi.official_title) > 0

backend/src/module/database/torrent.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,15 @@ def update_one_user(self, data: Torrent):
3636
logger.debug(f"Update {data.name} in database.")
3737

3838
def search(self, _id: int) -> Torrent | None:
39-
result = self.session.execute(
40-
select(Torrent).where(Torrent.id == _id)
41-
)
39+
result = self.session.execute(select(Torrent).where(Torrent.id == _id))
4240
return result.scalar_one_or_none()
4341

4442
def search_all(self) -> list[Torrent]:
4543
result = self.session.execute(select(Torrent))
4644
return list(result.scalars().all())
4745

4846
def search_rss(self, rss_id: int) -> list[Torrent]:
49-
result = self.session.execute(
50-
select(Torrent).where(Torrent.rss_id == rss_id)
51-
)
47+
result = self.session.execute(select(Torrent).where(Torrent.rss_id == rss_id))
5248
return list(result.scalars().all())
5349

5450
def check_new(self, torrents_list: list[Torrent]) -> list[Torrent]:
@@ -62,16 +58,21 @@ def check_new(self, torrents_list: list[Torrent]) -> list[Torrent]:
6258

6359
def search_by_qb_hash(self, qb_hash: str) -> Torrent | None:
6460
"""Find torrent by qBittorrent hash."""
61+
result = self.session.execute(select(Torrent).where(Torrent.qb_hash == qb_hash))
62+
return result.scalar_one_or_none()
63+
64+
def search_by_qb_hashes(self, qb_hashes: list[str]) -> list[Torrent]:
65+
"""Find torrents by multiple qBittorrent hashes (batch query)."""
66+
if not qb_hashes:
67+
return []
6568
result = self.session.execute(
66-
select(Torrent).where(Torrent.qb_hash == qb_hash)
69+
select(Torrent).where(Torrent.qb_hash.in_(qb_hashes))
6770
)
68-
return result.scalar_one_or_none()
71+
return list(result.scalars().all())
6972

7073
def search_by_url(self, url: str) -> Torrent | None:
7174
"""Find torrent by URL."""
72-
result = self.session.execute(
73-
select(Torrent).where(Torrent.url == url)
74-
)
75+
result = self.session.execute(select(Torrent).where(Torrent.url == url))
7576
return result.scalar_one_or_none()
7677

7778
def update_qb_hash(self, torrent_id: int, qb_hash: str) -> bool:

backend/src/module/downloader/client/qb_downloader.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,9 @@ async def torrents_resume(self, hashes: str):
193193
data={"hashes": hashes},
194194
)
195195

196-
async def torrents_rename_file(self, torrent_hash, old_path, new_path) -> bool:
196+
async def torrents_rename_file(
197+
self, torrent_hash, old_path, new_path, verify: bool = True
198+
) -> bool:
197199
try:
198200
resp = await self._client.post(
199201
self._url("torrents/renameFile"),
@@ -205,20 +207,31 @@ async def torrents_rename_file(self, torrent_hash, old_path, new_path) -> bool:
205207
if resp.status_code != 200:
206208
return False
207209

210+
if not verify:
211+
return True
212+
208213
# Verify the rename actually happened by checking file list
209214
# qBittorrent can return 200 but delay the actual rename (e.g., while seeding)
210-
await asyncio.sleep(0.5) # Brief delay to allow qBittorrent to process
211-
files = await self.torrents_files(torrent_hash)
212-
for f in files:
213-
if f.get("name") == new_path:
214-
return True
215-
if f.get("name") == old_path:
216-
# File still has old name - rename didn't actually happen
217-
logger.debug(
218-
f"[Downloader] Rename API returned 200 but file unchanged: {old_path}"
219-
)
220-
return False
221-
return True # new_path found or old_path not found
215+
# Use exponential backoff: 0.1s, 0.2s, 0.4s (max 3 attempts)
216+
for attempt in range(3):
217+
delay = 0.1 * (2**attempt)
218+
await asyncio.sleep(delay)
219+
files = await self.torrents_files(torrent_hash)
220+
for f in files:
221+
if f.get("name") == new_path:
222+
return True
223+
if f.get("name") == old_path:
224+
# File still has old name - try again
225+
if attempt < 2:
226+
continue
227+
# Final attempt failed
228+
logger.debug(
229+
f"[Downloader] Rename API returned 200 but file unchanged: {old_path}"
230+
)
231+
return False
232+
# new_path found or old_path not found
233+
return True
234+
return True
222235
except (httpx.ConnectError, httpx.RequestError, httpx.TimeoutException) as e:
223236
logger.warning(f"[Downloader] Failed to rename file {old_path}: {e}")
224237
return False

backend/src/module/downloader/download_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,11 @@ async def get_torrent_info(
120120
async def get_torrent_files(self, torrent_hash: str):
121121
return await self.client.torrents_files(torrent_hash=torrent_hash)
122122

123-
async def rename_torrent_file(self, _hash, old_path, new_path) -> bool:
123+
async def rename_torrent_file(
124+
self, _hash, old_path, new_path, verify: bool = True
125+
) -> bool:
124126
result = await self.client.torrents_rename_file(
125-
torrent_hash=_hash, old_path=old_path, new_path=new_path
127+
torrent_hash=_hash, old_path=old_path, new_path=new_path, verify=verify
126128
)
127129
if result:
128130
logger.info(f"{old_path} >> {new_path}")

backend/src/module/manager/renamer.py

Lines changed: 127 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,32 @@
1616
# This prevents spamming the same rename when qBittorrent returns 200 but doesn't actually rename
1717
_pending_renames: dict[tuple[str, str, str], float] = {}
1818
_PENDING_RENAME_COOLDOWN = 300 # 5 minutes cooldown before retrying same rename
19+
_CLEANUP_INTERVAL = 60 # Clean up pending cache at most once per minute
20+
_last_cleanup_time: float = 0
1921

2022

2123
class Renamer(DownloadClient):
2224
def __init__(self):
2325
super().__init__()
2426
self._parser = TitleParser()
2527
self.check_pool = {}
28+
self._offset_cache: dict[str, tuple[int, int]] = {}
29+
30+
@staticmethod
31+
def _cleanup_pending_cache():
32+
"""Clean up expired entries from pending renames cache (throttled)."""
33+
global _last_cleanup_time
34+
current_time = time.time()
35+
if current_time - _last_cleanup_time < _CLEANUP_INTERVAL:
36+
return
37+
_last_cleanup_time = current_time
38+
expired_keys = [
39+
k
40+
for k, v in _pending_renames.items()
41+
if current_time - v > _PENDING_RENAME_COOLDOWN * 2
42+
]
43+
for k in expired_keys:
44+
_pending_renames.pop(k, None)
2645

2746
@staticmethod
2847
def print_result(torrent_count, rename_count):
@@ -111,7 +130,10 @@ async def rename_file(
111130
# (qBittorrent can return 200 but delay actual rename while seeding)
112131
pending_key = (_hash, media_path, new_path)
113132
last_attempt = _pending_renames.get(pending_key)
114-
if last_attempt and (time.time() - last_attempt) < _PENDING_RENAME_COOLDOWN:
133+
if (
134+
last_attempt
135+
and (time.time() - last_attempt) < _PENDING_RENAME_COOLDOWN
136+
):
115137
logger.debug(
116138
f"[Renamer] Skipping rename (pending cooldown): {media_path}"
117139
)
@@ -137,14 +159,8 @@ async def rename_file(
137159
# Rename API returned success but file wasn't actually renamed
138160
# Add to pending cache to avoid spamming
139161
_pending_renames[pending_key] = time.time()
140-
# Clean up old entries from cache
141-
current_time = time.time()
142-
expired_keys = [
143-
k for k, v in _pending_renames.items()
144-
if current_time - v > _PENDING_RENAME_COOLDOWN * 2
145-
]
146-
for k in expired_keys:
147-
_pending_renames.pop(k, None)
162+
# Periodic cleanup of expired entries (at most once per minute)
163+
self._cleanup_pending_cache()
148164
else:
149165
logger.warning(f"[Renamer] {media_path} parse failed")
150166
if settings.bangumi_manage.remove_bad_torrent:
@@ -216,8 +232,12 @@ async def rename_subtitles(
216232
season_offset=season_offset,
217233
)
218234
if subtitle_path != new_path:
235+
# Skip verification for subtitles to reduce latency
219236
renamed = await self.rename_torrent_file(
220-
_hash=_hash, old_path=subtitle_path, new_path=new_path
237+
_hash=_hash,
238+
old_path=subtitle_path,
239+
new_path=new_path,
240+
verify=False,
221241
)
222242
if not renamed:
223243
logger.warning(f"[Renamer] {subtitle_path} rename failed")
@@ -249,6 +269,99 @@ def _normalize_path(path: str) -> str:
249269
# Remove trailing slashes
250270
return normalized.rstrip("/")
251271

272+
def _batch_lookup_offsets(
273+
self, torrents_info: list[dict]
274+
) -> dict[str, tuple[int, int]]:
275+
"""Batch lookup offsets for all torrents in a single database session.
276+
277+
Returns a dict mapping torrent_hash to (episode_offset, season_offset).
278+
"""
279+
result: dict[str, tuple[int, int]] = {}
280+
if not torrents_info:
281+
return result
282+
283+
try:
284+
with Database() as db:
285+
# Collect all hashes for batch query
286+
hashes = [info["hash"] for info in torrents_info]
287+
torrent_records = db.torrent.search_by_qb_hashes(hashes)
288+
hash_to_bangumi_id = {
289+
r.qb_hash: r.bangumi_id for r in torrent_records if r.bangumi_id
290+
}
291+
292+
# Collect unique bangumi IDs to fetch
293+
bangumi_ids_to_fetch = set(hash_to_bangumi_id.values())
294+
295+
# Also collect bangumi IDs from tags
296+
tag_bangumi_ids = {}
297+
for info in torrents_info:
298+
tags = info.get("tags", "")
299+
bangumi_id = self._parse_bangumi_id_from_tags(tags)
300+
if bangumi_id:
301+
tag_bangumi_ids[info["hash"]] = bangumi_id
302+
bangumi_ids_to_fetch.add(bangumi_id)
303+
304+
# Batch fetch all bangumi records
305+
bangumi_map = {}
306+
if bangumi_ids_to_fetch:
307+
bangumi_records = db.bangumi.search_ids(list(bangumi_ids_to_fetch))
308+
bangumi_map = {
309+
b.id: b for b in bangumi_records if b and not b.deleted
310+
}
311+
312+
# Now resolve offsets for each torrent
313+
for info in torrents_info:
314+
torrent_hash = info["hash"]
315+
torrent_name = info["name"]
316+
save_path = info["save_path"]
317+
318+
# 1. Try by qb_hash
319+
bangumi_id = hash_to_bangumi_id.get(torrent_hash)
320+
if bangumi_id and bangumi_id in bangumi_map:
321+
b = bangumi_map[bangumi_id]
322+
result[torrent_hash] = (b.episode_offset, b.season_offset)
323+
continue
324+
325+
# 2. Try by tag
326+
bangumi_id = tag_bangumi_ids.get(torrent_hash)
327+
if bangumi_id and bangumi_id in bangumi_map:
328+
b = bangumi_map[bangumi_id]
329+
result[torrent_hash] = (b.episode_offset, b.season_offset)
330+
continue
331+
332+
# 3. Try by torrent name (individual query, but less common path)
333+
bangumi = db.bangumi.match_torrent(torrent_name)
334+
if bangumi:
335+
result[torrent_hash] = (
336+
bangumi.episode_offset,
337+
bangumi.season_offset,
338+
)
339+
continue
340+
341+
# 4. Try by save_path (individual query, fallback)
342+
normalized_save_path = self._normalize_path(save_path)
343+
bangumi = db.bangumi.match_by_save_path(save_path)
344+
if not bangumi:
345+
bangumi = db.bangumi.match_by_save_path(normalized_save_path)
346+
if bangumi:
347+
result[torrent_hash] = (
348+
bangumi.episode_offset,
349+
bangumi.season_offset,
350+
)
351+
continue
352+
353+
# Default: no offset
354+
result[torrent_hash] = (0, 0)
355+
356+
except Exception as e:
357+
logger.debug(f"[Renamer] Batch offset lookup failed: {e}")
358+
# Fall back to individual lookups on error
359+
for info in torrents_info:
360+
if info["hash"] not in result:
361+
result[info["hash"]] = (0, 0)
362+
363+
return result
364+
252365
def _lookup_offsets(
253366
self, torrent_hash: str, torrent_name: str, save_path: str, tags: str = ""
254367
) -> tuple[int, int]:
@@ -331,17 +444,16 @@ async def rename(self) -> list[Notification]:
331444
all_files = await asyncio.gather(
332445
*[self.get_torrent_files(info["hash"]) for info in torrents_info]
333446
)
447+
# Batch lookup all offsets in a single database session
448+
offset_map = self._batch_lookup_offsets(torrents_info)
334449
for info, files in zip(torrents_info, all_files):
335450
torrent_hash = info["hash"]
336451
torrent_name = info["name"]
337452
save_path = info["save_path"]
338-
tags = info.get("tags", "")
339453
media_list, subtitle_list = self.check_files(files)
340454
bangumi_name, season = self._path_to_bangumi(save_path)
341-
# Look up offsets from database (use hash/tags/bangumi_id for accurate matching)
342-
episode_offset, season_offset = self._lookup_offsets(
343-
torrent_hash, torrent_name, save_path, tags
344-
)
455+
# Use pre-fetched offsets
456+
episode_offset, season_offset = offset_map.get(torrent_hash, (0, 0))
345457
kwargs = {
346458
"torrent_name": torrent_name,
347459
"bangumi_name": bangumi_name,

0 commit comments

Comments
 (0)