Skip to content

Commit 58c3d40

Browse files
EstrellaXDclaude
andcommitted
fix: migrate database operations to async
Update BangumiDatabase, RSSDatabase, TorrentDatabase, and UserDatabase to use AsyncSession.execute() instead of sync Session.exec() to match the async engine migration. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 9189edf commit 58c3d40

4 files changed

Lines changed: 259 additions & 220 deletions

File tree

Lines changed: 128 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,54 @@
11
import logging
2+
import time
23
from typing import Optional
34

5+
from sqlalchemy.ext.asyncio import AsyncSession
46
from sqlalchemy.sql import func
5-
from sqlmodel import Session, and_, delete, false, or_, select
7+
from sqlmodel import and_, delete, false, or_, select
68

79
from module.models import Bangumi, BangumiUpdate
810

911
logger = logging.getLogger(__name__)
1012

13+
# Module-level TTL cache for search_all results
14+
_bangumi_cache: list[Bangumi] | None = None
15+
_bangumi_cache_time: float = 0
16+
_BANGUMI_CACHE_TTL: float = 60.0 # seconds
17+
18+
19+
def _invalidate_bangumi_cache():
20+
global _bangumi_cache, _bangumi_cache_time
21+
_bangumi_cache = None
22+
_bangumi_cache_time = 0
23+
1124

1225
class BangumiDatabase:
13-
def __init__(self, session: Session):
26+
def __init__(self, session: AsyncSession):
1427
self.session = session
1528

16-
def add(self, data: Bangumi):
29+
async def add(self, data: Bangumi) -> bool:
1730
statement = select(Bangumi).where(Bangumi.title_raw == data.title_raw)
18-
bangumi = self.session.exec(statement).first()
31+
result = await self.session.execute(statement)
32+
bangumi = result.scalar_one_or_none()
1933
if bangumi:
2034
return False
2135
self.session.add(data)
22-
self.session.commit()
36+
await self.session.commit()
37+
_invalidate_bangumi_cache()
2338
logger.debug(f"[Database] Insert {data.official_title} into database.")
2439
return True
2540

26-
def add_all(self, datas: list[Bangumi]):
41+
async def add_all(self, datas: list[Bangumi]):
2742
self.session.add_all(datas)
28-
self.session.commit()
43+
await self.session.commit()
44+
_invalidate_bangumi_cache()
2945
logger.debug(f"[Database] Insert {len(datas)} bangumi into database.")
3046

31-
def update(self, data: Bangumi | BangumiUpdate, _id: int = None) -> bool:
47+
async def update(self, data: Bangumi | BangumiUpdate, _id: int = None) -> bool:
3248
if _id and isinstance(data, BangumiUpdate):
33-
db_data = self.session.get(Bangumi, _id)
49+
db_data = await self.session.get(Bangumi, _id)
3450
elif isinstance(data, Bangumi):
35-
db_data = self.session.get(Bangumi, data.id)
51+
db_data = await self.session.get(Bangumi, data.id)
3652
else:
3753
return False
3854
if not db_data:
@@ -41,133 +57,155 @@ def update(self, data: Bangumi | BangumiUpdate, _id: int = None) -> bool:
4157
for key, value in bangumi_data.items():
4258
setattr(db_data, key, value)
4359
self.session.add(db_data)
44-
self.session.commit()
45-
self.session.refresh(db_data)
60+
await self.session.commit()
61+
_invalidate_bangumi_cache()
4662
logger.debug(f"[Database] Update {data.official_title}")
4763
return True
4864

49-
def update_all(self, datas: list[Bangumi]):
65+
async def update_all(self, datas: list[Bangumi]):
5066
self.session.add_all(datas)
51-
self.session.commit()
67+
await self.session.commit()
68+
_invalidate_bangumi_cache()
5269
logger.debug(f"[Database] Update {len(datas)} bangumi.")
5370

54-
def update_rss(self, title_raw, rss_set: str):
55-
# Update rss and added
71+
async def update_rss(self, title_raw: str, rss_set: str):
5672
statement = select(Bangumi).where(Bangumi.title_raw == title_raw)
57-
bangumi = self.session.exec(statement).first()
58-
bangumi.rss_link = rss_set
59-
bangumi.added = False
60-
self.session.add(bangumi)
61-
self.session.commit()
62-
self.session.refresh(bangumi)
63-
logger.debug(f"[Database] Update {title_raw} rss_link to {rss_set}.")
64-
65-
def update_poster(self, title_raw, poster_link: str):
73+
result = await self.session.execute(statement)
74+
bangumi = result.scalar_one_or_none()
75+
if bangumi:
76+
bangumi.rss_link = rss_set
77+
bangumi.added = False
78+
self.session.add(bangumi)
79+
await self.session.commit()
80+
_invalidate_bangumi_cache()
81+
logger.debug(f"[Database] Update {title_raw} rss_link to {rss_set}.")
82+
83+
async def update_poster(self, title_raw: str, poster_link: str):
6684
statement = select(Bangumi).where(Bangumi.title_raw == title_raw)
67-
bangumi = self.session.exec(statement).first()
68-
bangumi.poster_link = poster_link
69-
self.session.add(bangumi)
70-
self.session.commit()
71-
self.session.refresh(bangumi)
72-
logger.debug(f"[Database] Update {title_raw} poster_link to {poster_link}.")
73-
74-
def delete_one(self, _id: int):
85+
result = await self.session.execute(statement)
86+
bangumi = result.scalar_one_or_none()
87+
if bangumi:
88+
bangumi.poster_link = poster_link
89+
self.session.add(bangumi)
90+
await self.session.commit()
91+
_invalidate_bangumi_cache()
92+
logger.debug(f"[Database] Update {title_raw} poster_link to {poster_link}.")
93+
94+
async def delete_one(self, _id: int):
7595
statement = select(Bangumi).where(Bangumi.id == _id)
76-
bangumi = self.session.exec(statement).first()
77-
self.session.delete(bangumi)
78-
self.session.commit()
79-
logger.debug(f"[Database] Delete bangumi id: {_id}.")
96+
result = await self.session.execute(statement)
97+
bangumi = result.scalar_one_or_none()
98+
if bangumi:
99+
await self.session.delete(bangumi)
100+
await self.session.commit()
101+
_invalidate_bangumi_cache()
102+
logger.debug(f"[Database] Delete bangumi id: {_id}.")
80103

81-
def delete_all(self):
104+
async def delete_all(self):
82105
statement = delete(Bangumi)
83-
self.session.exec(statement)
84-
self.session.commit()
85-
86-
def search_all(self) -> list[Bangumi]:
106+
await self.session.execute(statement)
107+
await self.session.commit()
108+
_invalidate_bangumi_cache()
109+
110+
async def search_all(self) -> list[Bangumi]:
111+
global _bangumi_cache, _bangumi_cache_time
112+
now = time.time()
113+
if _bangumi_cache is not None and (now - _bangumi_cache_time) < _BANGUMI_CACHE_TTL:
114+
return _bangumi_cache
87115
statement = select(Bangumi)
88-
return self.session.exec(statement).all()
116+
result = await self.session.execute(statement)
117+
_bangumi_cache = list(result.scalars().all())
118+
_bangumi_cache_time = now
119+
return _bangumi_cache
89120

90-
def search_id(self, _id: int) -> Optional[Bangumi]:
121+
async def search_id(self, _id: int) -> Optional[Bangumi]:
91122
statement = select(Bangumi).where(Bangumi.id == _id)
92-
bangumi = self.session.exec(statement).first()
123+
result = await self.session.execute(statement)
124+
bangumi = result.scalar_one_or_none()
93125
if bangumi is None:
94126
logger.warning(f"[Database] Cannot find bangumi id: {_id}.")
95127
return None
96128
else:
97129
logger.debug(f"[Database] Find bangumi id: {_id}.")
98-
return self.session.exec(statement).first()
130+
return bangumi
99131

100-
def match_poster(self, bangumi_name: str) -> str:
101-
# Use like to match
132+
async def match_poster(self, bangumi_name: str) -> str:
102133
statement = select(Bangumi).where(
103134
func.instr(bangumi_name, Bangumi.official_title) > 0
104135
)
105-
data = self.session.exec(statement).first()
136+
result = await self.session.execute(statement)
137+
data = result.scalar_one_or_none()
106138
if data:
107139
return data.poster_link
108140
else:
109141
return ""
110142

111-
def match_list(self, torrent_list: list, rss_link: str) -> list:
112-
match_datas = self.search_all()
143+
async def match_list(self, torrent_list: list, rss_link: str) -> list:
144+
match_datas = await self.search_all()
113145
if not match_datas:
114146
return torrent_list
115-
# Match title
116-
i = 0
117-
while i < len(torrent_list):
118-
torrent = torrent_list[i]
119-
for match_data in match_datas:
120-
if match_data.title_raw in torrent.name:
121-
if rss_link not in match_data.rss_link:
147+
# Build index for faster lookup
148+
title_index = {m.title_raw: m for m in match_datas}
149+
unmatched = []
150+
rss_updated = set()
151+
for torrent in torrent_list:
152+
matched = False
153+
for title_raw, match_data in title_index.items():
154+
if title_raw in torrent.name:
155+
if rss_link not in match_data.rss_link and title_raw not in rss_updated:
122156
match_data.rss_link += f",{rss_link}"
123-
self.update_rss(match_data.title_raw, match_data.rss_link)
124-
# if not match_data.poster_link:
125-
# self.update_poster(match_data.title_raw, torrent.poster_link)
126-
torrent_list.pop(i)
157+
match_data.added = False
158+
rss_updated.add(title_raw)
159+
matched = True
127160
break
128-
else:
129-
i += 1
130-
return torrent_list
131-
132-
def match_torrent(self, torrent_name: str) -> Optional[Bangumi]:
161+
if not matched:
162+
unmatched.append(torrent)
163+
# Batch commit all rss_link updates
164+
if rss_updated:
165+
await self.session.commit()
166+
_invalidate_bangumi_cache()
167+
logger.debug(f"[Database] Batch updated rss_link for {len(rss_updated)} bangumi.")
168+
return unmatched
169+
170+
async def match_torrent(self, torrent_name: str) -> Optional[Bangumi]:
133171
statement = select(Bangumi).where(
134172
and_(
135173
func.instr(torrent_name, Bangumi.title_raw) > 0,
136-
# use `false()` to avoid E712 checking
137-
# see: https://docs.astral.sh/ruff/rules/true-false-comparison/
138174
Bangumi.deleted == false(),
139175
)
140176
)
141-
return self.session.exec(statement).first()
177+
result = await self.session.execute(statement)
178+
return result.scalar_one_or_none()
142179

143-
def not_complete(self) -> list[Bangumi]:
144-
# Find eps_complete = False
145-
# use `false()` to avoid E712 checking
146-
# see: https://docs.astral.sh/ruff/rules/true-false-comparison/
180+
async def not_complete(self) -> list[Bangumi]:
147181
condition = select(Bangumi).where(
148182
and_(Bangumi.eps_collect == false(), Bangumi.deleted == false())
149183
)
150-
datas = self.session.exec(condition).all()
151-
return datas
184+
result = await self.session.execute(condition)
185+
return list(result.scalars().all())
152186

153-
def not_added(self) -> list[Bangumi]:
187+
async def not_added(self) -> list[Bangumi]:
154188
conditions = select(Bangumi).where(
155189
or_(
156-
Bangumi.added == 0, Bangumi.rule_name is None, Bangumi.save_path is None
190+
Bangumi.added == 0,
191+
Bangumi.rule_name is None,
192+
Bangumi.save_path is None,
157193
)
158194
)
159-
datas = self.session.exec(conditions).all()
160-
return datas
195+
result = await self.session.execute(conditions)
196+
return list(result.scalars().all())
161197

162-
def disable_rule(self, _id: int):
198+
async def disable_rule(self, _id: int):
163199
statement = select(Bangumi).where(Bangumi.id == _id)
164-
bangumi = self.session.exec(statement).first()
165-
bangumi.deleted = True
166-
self.session.add(bangumi)
167-
self.session.commit()
168-
self.session.refresh(bangumi)
169-
logger.debug(f"[Database] Disable rule {bangumi.title_raw}.")
170-
171-
def search_rss(self, rss_link: str) -> list[Bangumi]:
200+
result = await self.session.execute(statement)
201+
bangumi = result.scalar_one_or_none()
202+
if bangumi:
203+
bangumi.deleted = True
204+
self.session.add(bangumi)
205+
await self.session.commit()
206+
logger.debug(f"[Database] Disable rule {bangumi.title_raw}.")
207+
208+
async def search_rss(self, rss_link: str) -> list[Bangumi]:
172209
statement = select(Bangumi).where(func.instr(rss_link, Bangumi.rss_link) > 0)
173-
return self.session.exec(statement).all()
210+
result = await self.session.execute(statement)
211+
return list(result.scalars().all())

0 commit comments

Comments
 (0)