-
Notifications
You must be signed in to change notification settings - Fork 705
Expand file tree
/
Copy path_abstract_http_crawler.py
More file actions
315 lines (252 loc) · 12.9 KB
/
_abstract_http_crawler.py
File metadata and controls
315 lines (252 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
from __future__ import annotations
import asyncio
import logging
from abc import ABC
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Generic
from more_itertools import partition
from pydantic import ValidationError
from typing_extensions import NotRequired, TypeVar
from crawlee._request import Request, RequestOptions, RequestState
from crawlee._utils.docs import docs_group
from crawlee._utils.time import SharedTimeout
from crawlee._utils.urls import to_absolute_url_iterator
from crawlee.crawlers._basic import BasicCrawler, BasicCrawlerOptions, ContextPipeline
from crawlee.errors import SessionError
from crawlee.statistics import StatisticsState
from ._http_crawling_context import HttpCrawlingContext, ParsedHttpCrawlingContext, TParseResult, TSelectResult
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Awaitable, Callable, Iterator
from typing_extensions import Unpack
from crawlee import RequestTransformAction
from crawlee._types import BasicCrawlingContext, EnqueueLinksKwargs, ExtractLinksFunction
from ._abstract_http_parser import AbstractHttpParser
TCrawlingContext = TypeVar('TCrawlingContext', bound=ParsedHttpCrawlingContext)
TStatisticsState = TypeVar('TStatisticsState', bound=StatisticsState, default=StatisticsState)
class HttpCrawlerOptions(
BasicCrawlerOptions[TCrawlingContext, TStatisticsState],
Generic[TCrawlingContext, TStatisticsState],
):
"""Arguments for the `AbstractHttpCrawler` constructor.
It is intended for typing forwarded `__init__` arguments in the subclasses.
"""
navigation_timeout: NotRequired[timedelta | None]
"""Timeout for the HTTP request."""
@docs_group('Crawlers')
class AbstractHttpCrawler(
BasicCrawler[TCrawlingContext, StatisticsState],
ABC,
Generic[TCrawlingContext, TParseResult, TSelectResult],
):
"""A web crawler for performing HTTP requests.
The `AbstractHttpCrawler` builds on top of the `BasicCrawler`, inheriting all its features. Additionally,
it implements HTTP communication using HTTP clients. The class allows integration with any HTTP client
that implements the `HttpClient` interface, provided as an input parameter to the constructor.
`AbstractHttpCrawler` is a generic class intended to be used with a specific parser for parsing HTTP responses
and the expected type of `TCrawlingContext` available to the user function. Examples of specific versions include
`BeautifulSoupCrawler`, `ParselCrawler`, and `HttpCrawler`.
HTTP client-based crawlers are ideal for websites that do not require JavaScript execution. For websites that
require client-side JavaScript execution, consider using a browser-based crawler like the `PlaywrightCrawler`.
"""
def __init__(
self,
*,
parser: AbstractHttpParser[TParseResult, TSelectResult],
navigation_timeout: timedelta | None = None,
**kwargs: Unpack[BasicCrawlerOptions[TCrawlingContext, StatisticsState]],
) -> None:
self._parser = parser
self._navigation_timeout = navigation_timeout or timedelta(minutes=1)
self._pre_navigation_hooks: list[Callable[[BasicCrawlingContext], Awaitable[None]]] = []
self._shared_navigation_timeouts: dict[int, SharedTimeout] = {}
if '_context_pipeline' not in kwargs:
raise ValueError(
'Please pass in a `_context_pipeline`. You should use the '
'AbstractHttpCrawler._create_static_content_crawler_pipeline() method to initialize it.'
)
kwargs.setdefault('_logger', logging.getLogger(self.__class__.__name__))
super().__init__(**kwargs)
@classmethod
def create_parsed_http_crawler_class(
cls,
static_parser: AbstractHttpParser[TParseResult, TSelectResult],
) -> type[AbstractHttpCrawler[ParsedHttpCrawlingContext[TParseResult], TParseResult, TSelectResult]]:
"""Create a specific version of `AbstractHttpCrawler` class.
This is a convenience factory method for creating a specific `AbstractHttpCrawler` subclass.
While `AbstractHttpCrawler` allows its two generic parameters to be independent,
this method simplifies cases where `TParseResult` is used for both generic parameters.
"""
class _ParsedHttpCrawler(AbstractHttpCrawler):
def __init__(
self,
parser: AbstractHttpParser[TParseResult, TSelectResult] = static_parser,
**kwargs: Unpack[BasicCrawlerOptions[ParsedHttpCrawlingContext[TParseResult]]],
) -> None:
kwargs['_context_pipeline'] = self._create_static_content_crawler_pipeline()
super().__init__(
parser=parser,
**kwargs,
)
return _ParsedHttpCrawler
def _create_static_content_crawler_pipeline(self) -> ContextPipeline[ParsedHttpCrawlingContext[TParseResult]]:
"""Create static content crawler context pipeline with expected pipeline steps."""
return (
ContextPipeline()
.compose(self._execute_pre_navigation_hooks)
.compose(self._make_http_request)
.compose(self._handle_status_code_response)
.compose(self._parse_http_response)
.compose(self._handle_blocked_request_by_content)
)
async def _execute_pre_navigation_hooks(
self, context: BasicCrawlingContext
) -> AsyncGenerator[BasicCrawlingContext, None]:
context_id = id(context)
self._shared_navigation_timeouts[context_id] = SharedTimeout(self._navigation_timeout)
try:
for hook in self._pre_navigation_hooks:
async with self._shared_navigation_timeouts[context_id]:
await hook(context)
yield context
finally:
self._shared_navigation_timeouts.pop(context_id, None)
async def _parse_http_response(
self, context: HttpCrawlingContext
) -> AsyncGenerator[ParsedHttpCrawlingContext[TParseResult], None]:
"""Parse HTTP response and create context enhanced by the parsing result and enqueue links function.
Args:
context: The current crawling context, that includes HTTP response.
Yields:
The original crawling context enhanced by the parsing result and enqueue links function.
"""
parsed_content = await self._parser.parse(context.http_response)
extract_links = self._create_extract_links_function(context, parsed_content)
yield ParsedHttpCrawlingContext.from_http_crawling_context(
context=context,
parsed_content=parsed_content,
enqueue_links=self._create_enqueue_links_function(context, extract_links),
extract_links=extract_links,
)
def _create_extract_links_function(
self, context: HttpCrawlingContext, parsed_content: TParseResult
) -> ExtractLinksFunction:
"""Create a callback function for extracting links from parsed content.
Args:
context: The current crawling context.
parsed_content: The parsed http response.
Returns:
Awaitable that is used for extracting links from parsed content.
"""
async def extract_links(
*,
selector: str = 'a',
label: str | None = None,
user_data: dict[str, Any] | None = None,
transform_request_function: Callable[[RequestOptions], RequestOptions | RequestTransformAction]
| None = None,
**kwargs: Unpack[EnqueueLinksKwargs],
) -> list[Request]:
requests = list[Request]()
base_user_data = user_data or {}
robots_txt_file = await self._get_robots_txt_file_for_url(context.request.url)
kwargs.setdefault('strategy', 'same-hostname')
strategy = kwargs.get('strategy', 'same-hostname')
links_iterator: Iterator[str] = iter(self._parser.find_links(parsed_content, selector=selector))
# Get base URL from <base> tag if present
extracted_base_urls = list(self._parser.find_links(parsed_content, 'base[href]'))
base_url: str = (
str(extracted_base_urls[0])
if extracted_base_urls
else context.request.loaded_url or context.request.url
)
links_iterator = to_absolute_url_iterator(base_url, links_iterator, logger=context.log)
if robots_txt_file:
skipped, links_iterator = partition(robots_txt_file.is_allowed, links_iterator)
else:
skipped = iter([])
for url in self._enqueue_links_filter_iterator(links_iterator, context.request.url, **kwargs):
request_options = RequestOptions(
url=url, user_data={**base_user_data}, label=label, enqueue_strategy=strategy
)
if transform_request_function:
transform_request_options = transform_request_function(request_options)
if transform_request_options == 'skip':
continue
if transform_request_options != 'unchanged':
request_options = transform_request_options
try:
request = Request.from_url(**request_options)
except ValidationError as exc:
context.log.debug(
f'Skipping URL "{url}" due to invalid format: {exc}. '
'This may be caused by a malformed URL or unsupported URL scheme. '
'Please ensure the URL is correct and retry.'
)
continue
requests.append(request)
skipped_tasks = [
asyncio.create_task(self._handle_skipped_request(request, 'robots_txt')) for request in skipped
]
await asyncio.gather(*skipped_tasks)
return requests
return extract_links
async def _make_http_request(self, context: BasicCrawlingContext) -> AsyncGenerator[HttpCrawlingContext, None]:
"""Make http request and create context enhanced by HTTP response.
Args:
context: The current crawling context.
Yields:
The original crawling context enhanced by HTTP response.
"""
async with self._shared_navigation_timeouts[id(context)] as remaining_timeout:
result = await self._http_client.crawl(
request=context.request,
session=context.session,
proxy_info=context.proxy_info,
statistics=self._statistics,
timeout=remaining_timeout,
)
context.request.state = RequestState.AFTER_NAV
yield HttpCrawlingContext.from_basic_crawling_context(context=context, http_response=result.http_response)
async def _handle_status_code_response(
self, context: HttpCrawlingContext
) -> AsyncGenerator[HttpCrawlingContext, None]:
"""Validate the HTTP status code and raise appropriate exceptions if needed.
Args:
context: The current crawling context containing the HTTP response.
Raises:
SessionError: If the status code indicates the session is blocked.
HttpStatusCodeError: If the status code represents a server error or is explicitly configured as an error.
HttpClientStatusCodeError: If the status code represents a client error.
Yields:
The original crawling context if no errors are detected.
"""
status_code = context.http_response.status_code
if self._retry_on_blocked:
self._raise_for_session_blocked_status_code(
context.session,
status_code,
request_url=context.request.url,
retry_after_header=context.http_response.headers.get('retry-after'),
)
self._raise_for_error_status_code(status_code)
yield context
async def _handle_blocked_request_by_content(
self, context: ParsedHttpCrawlingContext[TParseResult]
) -> AsyncGenerator[ParsedHttpCrawlingContext[TParseResult], None]:
"""Try to detect if the request is blocked based on the parsed response content.
Args:
context: The current crawling context.
Raises:
SessionError: If the request is considered blocked.
Yields:
The original crawling context if no blocking is detected.
"""
if self._retry_on_blocked and (blocked_info := self._parser.is_blocked(context.parsed_content)):
raise SessionError(blocked_info.reason)
yield context
def pre_navigation_hook(self, hook: Callable[[BasicCrawlingContext], Awaitable[None]]) -> None:
"""Register a hook to be called before each navigation.
Args:
hook: A coroutine function to be called before each navigation.
"""
self._pre_navigation_hooks.append(hook)