-
Notifications
You must be signed in to change notification settings - Fork 432
Expand file tree
/
Copy pathbangumi.py
More file actions
174 lines (152 loc) · 6.43 KB
/
bangumi.py
File metadata and controls
174 lines (152 loc) · 6.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import logging
from typing import Optional
from sqlalchemy.sql import func
from sqlmodel import Session, and_, delete, false, or_, select
from module.models import Bangumi, BangumiUpdate
logger = logging.getLogger(__name__)
class BangumiDatabase:
def __init__(self, session: Session):
self.session = session
def add(self, data: Bangumi):
statement = select(Bangumi).where(Bangumi.title_raw == data.title_raw)
bangumi = self.session.exec(statement).first()
if bangumi:
return False
self.session.add(data)
self.session.commit()
logger.debug(f"[Database] Insert {data.official_title} into database.")
return True
def add_all(self, datas: list[Bangumi]):
self.session.add_all(datas)
self.session.commit()
logger.debug(f"[Database] Insert {len(datas)} bangumi into database.")
def update(self, data: Bangumi | BangumiUpdate, _id: int = None) -> bool:
if _id and isinstance(data, BangumiUpdate):
db_data = self.session.get(Bangumi, _id)
elif isinstance(data, Bangumi):
db_data = self.session.get(Bangumi, data.id)
else:
return False
if not db_data:
return False
bangumi_data = data.dict(exclude_unset=True)
for key, value in bangumi_data.items():
setattr(db_data, key, value)
self.session.add(db_data)
self.session.commit()
self.session.refresh(db_data)
logger.debug(f"[Database] Update {data.official_title}")
return True
def update_all(self, datas: list[Bangumi]):
self.session.add_all(datas)
self.session.commit()
logger.debug(f"[Database] Update {len(datas)} bangumi.")
def update_rss(self, title_raw, rss_set: str):
# Update rss and added
statement = select(Bangumi).where(Bangumi.title_raw == title_raw)
bangumi = self.session.exec(statement).first()
bangumi.rss_link = rss_set
bangumi.added = False
self.session.add(bangumi)
self.session.commit()
self.session.refresh(bangumi)
logger.debug(f"[Database] Update {title_raw} rss_link to {rss_set}.")
def update_poster(self, title_raw, poster_link: str):
statement = select(Bangumi).where(Bangumi.title_raw == title_raw)
bangumi = self.session.exec(statement).first()
bangumi.poster_link = poster_link
self.session.add(bangumi)
self.session.commit()
self.session.refresh(bangumi)
logger.debug(f"[Database] Update {title_raw} poster_link to {poster_link}.")
def delete_one(self, _id: int):
statement = select(Bangumi).where(Bangumi.id == _id)
bangumi = self.session.exec(statement).first()
self.session.delete(bangumi)
self.session.commit()
logger.debug(f"[Database] Delete bangumi id: {_id}.")
def delete_all(self):
statement = delete(Bangumi)
self.session.exec(statement)
self.session.commit()
def search_all(self) -> list[Bangumi]:
statement = select(Bangumi)
return self.session.exec(statement).all()
def search_id(self, _id: int) -> Optional[Bangumi]:
statement = select(Bangumi).where(Bangumi.id == _id)
bangumi = self.session.exec(statement).first()
if bangumi is None:
logger.warning(f"[Database] Cannot find bangumi id: {_id}.")
return None
else:
logger.debug(f"[Database] Find bangumi id: {_id}.")
return self.session.exec(statement).first()
def match_poster(self, bangumi_name: str) -> str:
# Use like to match
statement = select(Bangumi).where(
func.instr(bangumi_name, Bangumi.official_title) > 0
)
data = self.session.exec(statement).first()
if data:
return data.poster_link
else:
return ""
def match_list(self, torrent_list: list, rss_link: str) -> list:
match_datas = self.search_all()
if not match_datas:
return torrent_list
# Match title
i = 0
while i < len(torrent_list):
torrent = torrent_list[i]
for match_data in match_datas:
if match_data.title_raw in torrent.name:
if rss_link not in match_data.rss_link:
match_data.rss_link += f",{rss_link}"
self.update_rss(match_data.title_raw, match_data.rss_link)
# if not match_data.poster_link:
# self.update_poster(match_data.title_raw, torrent.poster_link)
torrent_list.pop(i)
break
else:
i += 1
return torrent_list
def match_torrent(self, torrent_name: str, rss_link: str) -> Optional[Bangumi]:
statement = select(Bangumi).where(
and_(
func.instr(torrent_name, Bangumi.title_raw) > 0,
func.instr(Bangumi.rss_link, rss_link),
# use `false()` to avoid E712 checking
# see: https://docs.astral.sh/ruff/rules/true-false-comparison/
Bangumi.deleted == false(),
)
)
return self.session.exec(statement).first()
def not_complete(self) -> list[Bangumi]:
# Find eps_complete = False
# use `false()` to avoid E712 checking
# see: https://docs.astral.sh/ruff/rules/true-false-comparison/
condition = select(Bangumi).where(
and_(Bangumi.eps_collect == false(), Bangumi.deleted == false())
)
datas = self.session.exec(condition).all()
return datas
def not_added(self) -> list[Bangumi]:
conditions = select(Bangumi).where(
or_(
Bangumi.added == 0, Bangumi.rule_name is None, Bangumi.save_path is None
)
)
datas = self.session.exec(conditions).all()
return datas
def disable_rule(self, _id: int):
statement = select(Bangumi).where(Bangumi.id == _id)
bangumi = self.session.exec(statement).first()
bangumi.deleted = True
self.session.add(bangumi)
self.session.commit()
self.session.refresh(bangumi)
logger.debug(f"[Database] Disable rule {bangumi.title_raw}.")
def search_rss(self, rss_link: str) -> list[Bangumi]:
statement = select(Bangumi).where(func.instr(rss_link, Bangumi.rss_link) > 0)
return self.session.exec(statement).all()