-
Notifications
You must be signed in to change notification settings - Fork 9.8k
Expand file tree
/
Copy pathmain.py
More file actions
157 lines (127 loc) · 5.2 KB
/
main.py
File metadata and controls
157 lines (127 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# -*- coding: utf-8 -*-
# Copyright (c) 2025 relakkes@gmail.com
#
# This file is part of MediaCrawler project.
# Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/main.py
# GitHub: https://github.com/NanmiCoder
# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1
#
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import sys
import io
# Force UTF-8 encoding for stdout/stderr to prevent encoding errors
# when outputting Chinese characters in non-UTF-8 terminals
if sys.stdout and hasattr(sys.stdout, 'buffer'):
if sys.stdout.encoding and sys.stdout.encoding.lower() != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
if sys.stderr and hasattr(sys.stderr, 'buffer'):
if sys.stderr.encoding and sys.stderr.encoding.lower() != 'utf-8':
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
import asyncio
from typing import Optional, Type
import cmd_arg
import config
from database import db
from base.base_crawler import AbstractCrawler
from media_platform.bilibili import BilibiliCrawler
from media_platform.douyin import DouYinCrawler
from media_platform.kuaishou import KuaishouCrawler
from media_platform.tieba import TieBaCrawler
from media_platform.weibo import WeiboCrawler
from media_platform.xhs import XiaoHongShuCrawler
from media_platform.zhihu import ZhihuCrawler
from tools.async_file_writer import AsyncFileWriter
from var import crawler_type_var
class CrawlerFactory:
CRAWLERS: dict[str, Type[AbstractCrawler]] = {
"xhs": XiaoHongShuCrawler,
"dy": DouYinCrawler,
"ks": KuaishouCrawler,
"bili": BilibiliCrawler,
"wb": WeiboCrawler,
"tieba": TieBaCrawler,
"zhihu": ZhihuCrawler,
}
@staticmethod
def create_crawler(platform: str) -> AbstractCrawler:
crawler_class = CrawlerFactory.CRAWLERS.get(platform)
if not crawler_class:
supported = ", ".join(sorted(CrawlerFactory.CRAWLERS))
raise ValueError(f"Invalid media platform: {platform!r}. Supported: {supported}")
return crawler_class()
crawler: Optional[AbstractCrawler] = None
def _flush_excel_if_needed() -> None:
if config.SAVE_DATA_OPTION != "excel":
return
try:
from store.excel_store_base import ExcelStoreBase
ExcelStoreBase.flush_all()
print("[Main] Excel files saved successfully")
except Exception as e:
print(f"[Main] Error flushing Excel data: {e}")
async def _generate_wordcloud_if_needed() -> None:
if config.SAVE_DATA_OPTION not in ("json", "jsonl") or not config.ENABLE_GET_WORDCLOUD:
return
try:
file_writer = AsyncFileWriter(
platform=config.PLATFORM,
crawler_type=crawler_type_var.get(),
)
await file_writer.generate_wordcloud_from_comments()
except Exception as e:
print(f"[Main] Error generating wordcloud: {e}")
async def main() -> None:
global crawler
args = await cmd_arg.parse_cmd()
if args.init_db:
await db.init_db(args.init_db)
print(f"Database {args.init_db} initialized successfully.")
return
crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM)
await crawler.start()
_flush_excel_if_needed()
# Generate wordcloud after crawling is complete
# Only for JSON save mode
await _generate_wordcloud_if_needed()
async def async_cleanup() -> None:
global crawler
if crawler:
if getattr(crawler, "cdp_manager", None):
try:
await crawler.cdp_manager.cleanup(force=True)
except Exception as e:
error_msg = str(e).lower()
if "closed" not in error_msg and "disconnected" not in error_msg:
print(f"[Main] Error cleaning up CDP browser: {e}")
elif getattr(crawler, "browser_context", None):
try:
await crawler.browser_context.close()
except Exception as e:
error_msg = str(e).lower()
if "closed" not in error_msg and "disconnected" not in error_msg:
print(f"[Main] Error closing browser context: {e}")
if config.SAVE_DATA_OPTION in ("db", "sqlite"):
await db.close()
if __name__ == "__main__":
from tools.app_runner import run
def _force_stop() -> None:
c = crawler
if not c:
return
cdp_manager = getattr(c, "cdp_manager", None)
launcher = getattr(cdp_manager, "launcher", None)
if not launcher:
return
try:
launcher.cleanup()
except Exception:
pass
run(main, async_cleanup, cleanup_timeout_seconds=15.0, on_first_interrupt=_force_stop)