-
-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathcrawl_only.py
More file actions
334 lines (283 loc) · 11.9 KB
/
crawl_only.py
File metadata and controls
334 lines (283 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# -*- coding: utf-8 -*-
"""CLI download entry using SpiderRuntimeThread + event_q."""
import argparse
import asyncio
import os
import queue
import sys
from dataclasses import dataclass
from uuid import uuid4
from loguru import logger
from ComicSpider.runtime import SpiderRuntimeThread
from utils import conf, select
from utils.config.qc import cgs_cfg
from utils.protocol import (
SpiderDownloadJob,
JobAcceptedEvent,
LogEvent,
ErrorEvent,
JobFinishedEvent,
BarProgressEvent,
ProcessStateEvent,
TasksObjEvent,
)
from utils.website.registry import resolve_provider_descriptor_by_site
from utils.website.runtime_context import PreviewSiteConfig
from utils.website.site_runtime import ThreadSiteRuntime
from variables import Spider, SPIDERS
is_debugging = os.getenv("CGS_DEBUG") == "1"
class PreviewRuntime:
def __init__(self, site_index: int):
self.site_index = site_index
self.provider_descriptor = resolve_provider_descriptor_by_site(site_index)
self.site_config = PreviewSiteConfig.create(
self.provider_descriptor.provider_name,
cookies_by_site=conf.cookies,
domains=getattr(conf, "domains", None),
custom_map=conf.custom_map,
proxies=conf.proxies,
doh_url=cgs_cfg.doh.get_url(),
)
self.thread_site_runtime = ThreadSiteRuntime(
self.provider_descriptor,
site_config=self.site_config,
conf_state=conf,
)
async def __aenter__(self):
self.thread_site_runtime.get_async_preview_client()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.thread_site_runtime.aclose()
async def search(self, keyword: str, page: int = 1):
return await self.thread_site_runtime.preview_search(keyword, page=page)
async def fetch_episodes(self, book):
return await self.thread_site_runtime.preview_fetch_episodes(book)
async def fetch_pages(self, episode):
return await self.thread_site_runtime.preview_fetch_pages(episode)
@dataclass
class DownloadQuantityRecord:
expected_pages: int | None = None
registered_tasks_count: int | None = None
processed_events: int = 0
@property
def is_aligned(self) -> bool:
return (
self.expected_pages is not None
and self.expected_pages == self.registered_tasks_count
and self.expected_pages == self.processed_events
)
class DownloadQuantityProbe:
def __init__(self, payload):
self.records: dict[str, DownloadQuantityRecord] = {}
self._seed_from_payload(payload)
@staticmethod
def _iter_payload_items(payload):
if payload is None:
return
if isinstance(payload, list):
for item in payload:
if hasattr(item, "to_tasks_obj"):
yield item
return
if hasattr(payload, "to_tasks_obj"):
yield payload
def _seed_from_payload(self, payload):
for item in self._iter_payload_items(payload):
tasks_obj = item.to_tasks_obj()
record = self.records.setdefault(tasks_obj.taskid, DownloadQuantityRecord())
record.expected_pages = tasks_obj.tasks_count
def observe(self, event: TasksObjEvent):
task = getattr(event, "task_obj", None)
taskid = getattr(task, "taskid", None)
if not taskid:
return
record = self.records.setdefault(taskid, DownloadQuantityRecord())
if event.is_new:
record.registered_tasks_count = getattr(task, "tasks_count", None)
if record.expected_pages is None:
record.expected_pages = record.registered_tasks_count
return
record.processed_events += 1
def summarize(self) -> tuple[list[str], list[str]]:
aligned = []
drifted = []
for taskid, record in sorted(self.records.items()):
summary = (
f"{taskid}: expected={record.expected_pages}, "
f"registered={record.registered_tasks_count}, processed={record.processed_events}"
)
if record.is_aligned:
aligned.append(summary)
else:
drifted.append(summary)
return aligned, drifted
def _build_parser():
parser = argparse.ArgumentParser(
description=f"CGS CLI runtime downloader. 网站序号: {SPIDERS}",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("-w", "--website", type=int, default=1, help="选择网站序号")
parser.add_argument("-k", "--keyword", required=True, help="关键字(作品名)")
parser.add_argument("-i", "--indexes", required=True, help="选书序号")
parser.add_argument("-i2", "--indexes2", default=None, help="选话序号,非 specials 站点必填")
parser.add_argument("-l", "--log_level", default="DEBUG", help="log level")
parser.add_argument("-tw", "--time_wait", default=None, help="保留兼容参数,当前未使用")
parser.add_argument("-tp", "--turn_page", action="store_true", help="保留兼容参数,当前未使用")
parser.add_argument("-dt", "--daily_test", action="store_true", help="保留兼容参数,当前未使用")
return parser
def _validate_args(parser, args):
if args.website not in Spider.specials() and not args.indexes2:
parser.error(
"the following argument is required when website is not in Spider.specials(): -i2/--indexes2"
)
if args.website in Spider.specials() and args.indexes2:
parser.error("the argument -i2/--indexes2 is not allowed when website is in Spider.specials()")
def _render_books(books_map: dict):
for idx, book in sorted(books_map.items()):
title = getattr(book, "name", "") or getattr(book, "title", "") or "-"
logger.info(f"[book:{idx}] {title}")
def _render_episodes(episodes_map: dict):
for idx, ep in sorted(episodes_map.items()):
title = getattr(ep, "name", "") or getattr(ep, "title", "") or "-"
logger.info(f"[ep:{idx}] {title}")
async def _search_books(site_index: int, keyword: str) -> dict:
async with PreviewRuntime(site_index) as preview:
books = await preview.search(keyword, page=1)
books_map = {}
for idx, book in enumerate(books, start=1):
if getattr(book, "idx", None) is None:
book.idx = idx
books_map[int(book.idx)] = book
return books_map
async def _fetch_episode_choices(site_index: int, books: list) -> dict:
episode_choices = {}
async with PreviewRuntime(site_index) as preview:
next_idx = 1
for book in books:
episodes = await preview.fetch_episodes(book)
for ep in episodes or []:
episode_choices[next_idx] = ep
next_idx += 1
return episode_choices
async def _fetch_selected_pages(site_index: int, episodes: list):
async with PreviewRuntime(site_index) as preview:
for ep in episodes:
if getattr(ep, "page_urls", None):
continue
page_urls = await preview.fetch_pages(ep)
if not isinstance(page_urls, list):
raise TypeError(f"preview_fetch_pages must return list, got {type(page_urls).__name__}")
ep.pages = len(page_urls)
ep.page_urls = list(page_urls)
def _build_download_payload(site_index: int, selected_books: list, selected_eps: list | None):
if site_index in Spider.specials():
return selected_books[0] if len(selected_books) == 1 else selected_books
books_by_key = {}
for ep in selected_eps or []:
book = getattr(ep, "from_book", None)
if book is None:
continue
key = id(book)
if key not in books_by_key:
book.episodes = []
books_by_key[key] = book
books_by_key[key].episodes.append(ep)
payload = list(books_by_key.values())
if not payload:
raise ValueError("no episodes selected for download")
return payload[0] if len(payload) == 1 else payload
def _submit_and_wait(site_index: int, payload) -> bool:
runtime = SpiderRuntimeThread()
runtime.daemon = True
runtime.start()
runtime.wait_ready(timeout=30)
quantity_probe = DownloadQuantityProbe(payload)
job = SpiderDownloadJob(
job_id=uuid4().hex,
spider_name=SPIDERS[site_index],
site_index=site_index,
payload=payload,
options={},
)
logger.info(f"[submit] spider={job.spider_name} job={job.job_id}")
runtime.submit_job(job)
last_percent = None
success = False
try:
while True:
try:
event = runtime.event_q.get(timeout=0.2)
except queue.Empty:
continue
event_job_id = getattr(event, "job_id", None)
if event_job_id and event_job_id != job.job_id:
continue
if isinstance(event, JobAcceptedEvent):
logger.info(f"[accepted] {event.job_id}")
elif isinstance(event, LogEvent):
logger.info(str(event.message))
elif isinstance(event, ProcessStateEvent):
logger.debug(f"[stage] {event.process}")
elif isinstance(event, BarProgressEvent):
if event.percent != last_percent:
last_percent = event.percent
logger.info(f"[progress] {event.percent}%")
elif isinstance(event, TasksObjEvent):
quantity_probe.observe(event)
task = event.task_obj
if event.is_new:
title = getattr(task, "display_title", None) or getattr(task, "taskid", "")
logger.info(f"[task] {title}")
elif isinstance(event, ErrorEvent):
logger.error(event.error)
elif isinstance(event, JobFinishedEvent):
success = bool(event.success)
logger.info(f"[finished] success={success}")
return success
finally:
aligned, drifted = quantity_probe.summarize()
for summary in aligned:
logger.info(f"[quantity] aligned {summary}")
for summary in drifted:
logger.warning(f"[quantity] drift {summary}")
runtime.shutdown()
runtime.join(timeout=5)
def main():
parser = _build_parser()
args = parser.parse_args()
logger.remove()
logger.add(sys.stderr, level=args.log_level.upper())
_validate_args(parser, args)
if args.turn_page:
logger.warning("--turn_page is no longer supported in runtime CLI; ignoring")
if args.daily_test or is_debugging:
logger.info("runtime CLI uses the same event_q flow in daily/debug mode")
try:
books_map = asyncio.run(_search_books(args.website, args.keyword))
if not books_map:
logger.error("search returned no books")
return 1
_render_books(books_map)
selected_books = select(args.indexes, books_map)
if not selected_books:
logger.error("selected book indexes resolved to empty set")
return 1
selected_eps = None
if args.website not in Spider.specials():
episode_choices = asyncio.run(_fetch_episode_choices(args.website, selected_books))
if not episode_choices:
logger.error("episode fetch returned no episodes")
return 1
_render_episodes(episode_choices)
selected_eps = select(args.indexes2, episode_choices)
if not selected_eps:
logger.error("selected episode indexes resolved to empty set")
return 1
asyncio.run(_fetch_selected_pages(args.website, selected_eps))
payload = _build_download_payload(args.website, selected_books, selected_eps)
return 0 if _submit_and_wait(args.website, payload) else 1
except Exception as exc:
logger.exception(exc)
return 1
if __name__ == "__main__":
sys.exit(main())