-
Notifications
You must be signed in to change notification settings - Fork 162
/
Copy pathweb_search.py
370 lines (303 loc) · 12.7 KB
/
web_search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# Copyright (c) OpenMMLab. All rights reserved.
"""Web search utils."""
import argparse
import asyncio
import json
import os
import time
import types
import pytoml
import requests
from bs4 import BeautifulSoup as BS
from duckduckgo_search import DDGS
from loguru import logger
from readability import Document
from ..primitive import FileOperation
from .helper import check_str_useful
# import_pyppeteer = False
# try:
# from pyppeteer import launch
# import_pyppeteer = True
# except Exception as e:
# # Fix ldd ~/.local/share/pyppeteer/local-chromium/1181205/chrome-linux/chrome | grep not
# # apt install libgbm-dev
# # See https://techoverflow.net/2020/09/29/how-to-fix-pyppeteer-pyppeteer-errors-browsererror-browser-closed-unexpectedly/
# logger.warning(
# 'For better URL parsing, try `pip install pyppeteer` and see https://github.com/pyppeteer/pyppeteer/issues/442'
# )
# async def fetch_chroumium_content(url):
# browser = await launch(headless=True,
# args=[
# '--no-sandbox', '--disable-dev-shm-usage',
# '--disable-gpu',
# '--disable-software-rasterizer',
# '--disable-setuid-sandbox'
# ])
# page = await browser.newPage()
# await page.goto(url)
# time.sleep(1)
# content = await page.evaluate('document.body.innerText', force_expr=True)
# await browser.close()
# return content
class Article:
def __init__(self, content: str = '', source='', brief=''):
self.content = content
self.source = source
if len(brief) < 1:
self.brief = content
else:
self.brief = brief
def __str__(self):
return self.content
def __len__(self):
return len(self.content)
def cut(self, start_index, end_index):
self.source = self.source[start_index:end_index]
class WebSearch:
"""This class provides functionality to perform web search operations.
Attributes:
config_path (str): Path to the configuration file.
retry (int): Number of times to retry a request before giving up.
Methods:
load_key(): Retrieves API key from the config file.
load_save_dir(): Gets the directory path for saving results.
google(query: str, max_article:int): Performs Google search for the given query and returns top max_article results. # noqa E501
save_search_result(query:str, articles: list): Saves the search result into a text file. # noqa E501
get(query: str, max_article=1): Searches with cache. If the query already exists in the cache, return the cached result. # noqa E501
"""
def __init__(self, config_path: str, retry: int = 1, language:str='zh') -> None:
"""Initializes the WebSearch object with the given config path and
retry count."""
self.search_config = None
with open(config_path, encoding='utf8') as f:
config = pytoml.load(f)
self.search_config = types.SimpleNamespace(**config['web_search'])
self.retry = retry
self.language = language
def load_key():
try:
return self.search_config.serper_x_api_key
except Exception as e:
return ''
def fetch_url(self, query: str, target_link: str, brief: str = ''):
if not target_link.startswith('http'):
return None
logger.info(f'extract: {target_link}')
try:
content = ''
if target_link.lower().endswith(
'.pdf') or target_link.lower().endswith('.docx'):
# download file and parse
logger.info(f'download and parse: {target_link}')
response = requests.get(target_link,
stream=True,
allow_redirects=True)
save_dir = self.search_config.save_dir
basename = os.path.basename(target_link)
save_path = os.path.join(save_dir, basename)
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
file_opr = FileOperation()
content, error = file_opr.read(filepath=save_path)
if error is not None:
return error
return Article(content=content,
source=target_link,
brief=brief)
response = requests.get(target_link, timeout=30)
doc = Document(response.text)
content_html = doc.summary()
title = doc.short_title()
soup = BS(content_html, 'html.parser')
if len(soup.text) < 4 * len(query):
return None
content = '{} {}'.format(title, soup.text)
content = content.replace('\n\n', '\n')
content = content.replace('\n\n', '\n')
content = content.replace(' ', ' ')
if not check_str_useful(content=content):
return None
# logger.info('retry with chromium {}'.format(target_link))
# nest_asyncio.apply()
# content = asyncio.get_event_loop().run_until_complete(
# fetch_chroumium_content(url=target_link))
# if not check_str_useful(content=content):
# return None
return Article(content=content, source=target_link, brief=brief)
except Exception as e:
logger.error('fetch_url {}'.format(str(e)))
return None
def ddgs(self, query: str, max_article: int):
"""Run DDGS search based on query."""
results = DDGS().text(query, max_results=20)
filter_results = []
for domain in self.search_config.domain_partial_order:
for result in results:
if domain in result['href']:
filter_results.append(result)
break
logger.debug('filter results: {}'.format(filter_results))
articles = []
for result in filter_results:
a = self.fetch_url(query=query,
target_link=result['href'],
brief=result['body'])
if a is not None and len(a) > 0:
articles.append(a)
if len(articles) > max_article:
break
return articles
def google(self, query: str, max_article: int):
"""Executes a google search based on the provided query.
Parses the response and extracts the relevant URLs based on the
priority defined in the configuration file. Performs a GET request on
these URLs and extracts the title and content of the page. The content
is cleaned and added to the articles list. Returns a list of articles.
"""
url = 'https://google.serper.dev/search'
if 'zh' in self.language:
lang = 'zh-cn'
else:
lang = 'en'
payload = json.dumps({'q': f'{query}', 'hl': lang})
headers = {
'X-API-KEY': self.search_config.serper_x_api_key,
'Content-Type': 'application/json'
}
response = requests.request('POST',
url,
headers=headers,
data=payload,
timeout=5) # noqa E501
jsonobj = json.loads(response.text)
logger.debug(jsonobj)
keys = self.search_config.domain_partial_order
urls = {}
normal_urls = []
for organic in jsonobj['organic']:
link = ''
logger.debug(organic)
if 'link' in organic:
link = organic['link']
else:
link = organic['sitelinks'][0]['link']
for key in keys:
if key in link:
if key not in urls:
urls[key] = [link]
else:
urls[key].append(link)
break
else:
normal_urls.append(link)
logger.debug(f'gather urls: {urls}')
links = []
for key in keys:
if key in urls:
links += urls[key]
target_links = links[0:max_article]
logger.debug(f'target_links:{target_links}')
articles = []
for target_link in target_links:
# network with exponential backoff
a = self.fetch_url(query=query, target_link=target_link)
if a is not None:
articles.append(a)
return articles
def save_search_result(self, query: str, articles: list):
"""Writes the search results (articles) for the provided query into a
text file.
If the directory does not exist, it creates one. In case of an error,
logs a warning message.
"""
try:
save_dir = self.search_config.save_dir
if save_dir is None:
return
if not os.path.exists(save_dir):
os.makedirs(save_dir)
filepath = os.path.join(save_dir, query)
text = ''
if len(articles) > 0:
texts = [str(a) for a in articles]
text = '\n\n'.join(texts)
with open(filepath, 'w', encoding='utf8') as f:
f.write(text)
except Exception as e:
logger.warning(f'error while saving search result {str(e)}')
def logging_search_query(self, query: str):
"""Logging search query to txt file."""
save_dir = self.search_config.save_dir
if save_dir is None:
return
if not os.path.exists(save_dir):
os.makedirs(save_dir)
filepath = os.path.join(save_dir, 'search_query.txt')
with open(filepath, 'a') as f:
f.write(query)
f.write('\n')
def get(self, query: str, max_article=1):
"""Executes a google search with cache.
If the query already exists in the cache, returns the cached result. If
an exception occurs during the process, retries the request based on
the retry count. Sleeps for a random time interval between retries.
"""
query = query.strip()
query = query[0:32]
try:
self.logging_search_query(query=query)
articles = []
engine = self.search_config.engine.lower()
if engine == 'ddgs':
articles = self.ddgs(query=query, max_article=max_article)
elif engine == 'serper':
articles = self.google(query=query, max_article=max_article)
self.save_search_result(query=query, articles=articles)
return articles, None
except Exception as e:
logger.error(('web_search exception', query, str(e)))
return [], Exception('search fail, please check TOKEN')
return [], None
def parse_args():
"""Parses command-line arguments for web search."""
parser = argparse.ArgumentParser(description='Web search.')
parser.add_argument('--keywords',
type=str,
help='Keywords for search and parse.')
parser.add_argument(
'--config_path',
default='config.ini',
help='Feature store configuration path. Default value is config.ini')
args = parser.parse_args()
return args
def fetch_web_content(target_link: str):
"""Fetches and parses the content of the target URL.
Extracts the main content and title from the HTML of the page. Returns the
title and content as a single string.
"""
response = requests.get(target_link, timeout=60)
doc = Document(response.text)
content_html = doc.summary()
title = doc.short_title()
soup = BS(content_html, 'html.parser')
ret = '{} {}'.format(title, soup.text)
return ret
if __name__ == '__main__':
# https://developer.aliyun.com/article/679591 failed
# print(fetch_web_content('https://www.volcengine.com/theme/4222537-R-7-1'))
parser = parse_args()
s = WebSearch(config_path=parser.config_path)
print(
s.fetch_url(
query='',
target_link=
'http://www.lswz.gov.cn/html/xhtml/ztcss/zt-jljstj/images/clgszpj.pdf'
))
print(
s.fetch_url(query='',
target_link='https://zhuanlan.zhihu.com/p/699164101'))
print(s.get('LMDeploy 修改日志级别'))
print(
fetch_web_content(
'https://mmdeploy.readthedocs.io/zh-cn/latest/get_started.html'))