-
Notifications
You must be signed in to change notification settings - Fork 432
Expand file tree
/
Copy pathrenamer.py
More file actions
215 lines (203 loc) · 8.58 KB
/
renamer.py
File metadata and controls
215 lines (203 loc) · 8.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import asyncio
import logging
import re
from module.conf import settings
from module.database import Database
from module.downloader import DownloadClient
from module.models import EpisodeFile, Notification, SubtitleFile
from module.parser import TitleParser
logger = logging.getLogger(__name__)
class Renamer(DownloadClient):
def __init__(self):
super().__init__()
self._parser = TitleParser()
self.check_pool = {}
@staticmethod
def print_result(torrent_count, rename_count):
if rename_count != 0:
logger.info(
f"Finished checking {torrent_count} files' name, renamed {rename_count} files."
)
logger.debug(f"Checked {torrent_count} files")
@staticmethod
def gen_path(
file_info: EpisodeFile | SubtitleFile,
bangumi_name: str,
method: str,
offset: int = 0,
) -> str:
season = f"0{file_info.season}" if file_info.season < 10 else file_info.season
# Apply offset (offset is stored as the value to ADD)
adjusted_episode = int(file_info.episode) + offset
if adjusted_episode < 1:
adjusted_episode = int(file_info.episode) # Safety: don't go below 1
logger.warning(f"[Renamer] Offset {offset} would result in negative episode, ignoring")
episode = f"0{adjusted_episode}" if adjusted_episode < 10 else adjusted_episode
if method == "none" or method == "subtitle_none":
return file_info.media_path
elif method == "pn":
return f"{file_info.title} S{season}E{episode}{file_info.suffix}"
elif method == "advance":
return f"{bangumi_name} S{season}E{episode}{file_info.suffix}"
elif method == "normal":
logger.warning("[Renamer] Normal rename method is deprecated.")
return file_info.media_path
elif method == "subtitle_pn":
return f"{file_info.title} S{season}E{episode}.{file_info.language}{file_info.suffix}"
elif method == "subtitle_advance":
return f"{bangumi_name} S{season}E{episode}.{file_info.language}{file_info.suffix}"
else:
logger.error(f"[Renamer] Unknown rename method: {method}")
return file_info.media_path
async def rename_file(
self,
torrent_name: str,
media_path: str,
bangumi_name: str,
method: str,
season: int,
_hash: str,
offset: int = 0,
**kwargs,
):
ep = self._parser.torrent_parser(
torrent_name=torrent_name,
torrent_path=media_path,
season=season,
)
if ep:
new_path = self.gen_path(ep, bangumi_name, method=method, offset=offset)
if media_path != new_path:
if new_path not in self.check_pool.keys():
if await self.rename_torrent_file(
_hash=_hash, old_path=media_path, new_path=new_path
):
# Return adjusted episode number for notification
adjusted_episode = int(ep.episode) + offset
if adjusted_episode < 1:
adjusted_episode = int(ep.episode)
return Notification(
official_title=bangumi_name,
season=ep.season,
episode=adjusted_episode,
)
else:
logger.warning(f"[Renamer] {media_path} parse failed")
if settings.bangumi_manage.remove_bad_torrent:
await self.delete_torrent(hashes=_hash)
return None
async def rename_collection(
self,
media_list: list[str],
bangumi_name: str,
season: int,
method: str,
_hash: str,
offset: int = 0,
**kwargs,
):
for media_path in media_list:
if self.is_ep(media_path):
ep = self._parser.torrent_parser(
torrent_path=media_path,
season=season,
)
if ep:
new_path = self.gen_path(ep, bangumi_name, method=method, offset=offset)
if media_path != new_path:
renamed = await self.rename_torrent_file(
_hash=_hash, old_path=media_path, new_path=new_path
)
if not renamed:
logger.warning(f"[Renamer] {media_path} rename failed")
# Delete bad torrent.
if settings.bangumi_manage.remove_bad_torrent:
await self.delete_torrent(_hash)
break
async def rename_subtitles(
self,
subtitle_list: list[str],
torrent_name: str,
bangumi_name: str,
season: int,
method: str,
_hash,
offset: int = 0,
**kwargs,
):
method = "subtitle_" + method
for subtitle_path in subtitle_list:
sub = self._parser.torrent_parser(
torrent_path=subtitle_path,
torrent_name=torrent_name,
season=season,
file_type="subtitle",
)
if sub:
new_path = self.gen_path(sub, bangumi_name, method=method, offset=offset)
if subtitle_path != new_path:
renamed = await self.rename_torrent_file(
_hash=_hash, old_path=subtitle_path, new_path=new_path
)
if not renamed:
logger.warning(f"[Renamer] {subtitle_path} rename failed")
def _lookup_offset_by_path(self, save_path: str) -> int:
"""Look up the offset for a bangumi by its save_path."""
try:
with Database() as db:
bangumi = db.bangumi.match_by_save_path(save_path)
if bangumi:
return bangumi.offset
except Exception as e:
logger.debug(f"[Renamer] Could not lookup offset for {save_path}: {e}")
return 0
async def rename(self) -> list[Notification]:
# Get torrent info
logger.debug("[Renamer] Start rename process.")
rename_method = settings.bangumi_manage.rename_method
torrents_info = await self.get_torrent_info()
renamed_info: list[Notification] = []
# Fetch all torrent files concurrently
all_files = await asyncio.gather(
*[self.get_torrent_files(info["hash"]) for info in torrents_info]
)
for info, files in zip(torrents_info, all_files):
torrent_hash = info["hash"]
torrent_name = info["name"]
save_path = info["save_path"]
media_list, subtitle_list = self.check_files(files)
bangumi_name, season = self._path_to_bangumi(save_path)
# Look up offset from database
offset = self._lookup_offset_by_path(save_path)
kwargs = {
"torrent_name": torrent_name,
"bangumi_name": bangumi_name,
"method": rename_method,
"season": season,
"_hash": torrent_hash,
"offset": offset,
}
# Rename single media file
if len(media_list) == 1:
notify_info = await self.rename_file(media_path=media_list[0], **kwargs)
if notify_info:
renamed_info.append(notify_info)
# Rename subtitle file
if len(subtitle_list) > 0:
await self.rename_subtitles(subtitle_list=subtitle_list, **kwargs)
# Rename collection
elif len(media_list) > 1:
logger.info("[Renamer] Start rename collection")
await self.rename_collection(media_list=media_list, **kwargs)
if len(subtitle_list) > 0:
await self.rename_subtitles(subtitle_list=subtitle_list, **kwargs)
await self.set_category(torrent_hash, "BangumiCollection")
else:
logger.warning(f"[Renamer] {torrent_name} has no media file")
logger.debug("[Renamer] Rename process finished.")
return renamed_info
def compare_ep_version(self, torrent_name: str, torrent_hash: str):
if re.search(r"v\d.", torrent_name):
pass
else:
self.delete_torrent(hashes=torrent_hash)