Skip to content

Check for disk space errors #121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions icrawler/builtin/urllist.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ def worker_exec(self, queue_timeout=2, **kwargs):
if self.signal.get("reach_max_num"):
self.logger.info("downloaded image reached max num, thread %s" " exit", threading.current_thread().name)
break
if self.signal.get("exceed_storage_space"):
self.logger.info("downloaded image reached max storage space, thread %s" " exit", threading.current_thread().name)
break
try:
url = self.in_queue.get(timeout=queue_timeout)
except queue.Empty:
Expand Down
16 changes: 5 additions & 11 deletions icrawler/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
from importlib import import_module

from . import defaults
from . import storage as storage_package
from .downloader import Downloader
from .feeder import Feeder
Expand Down Expand Up @@ -81,11 +82,11 @@ def set_logger(self, log_level=logging.INFO):
def init_signal(self):
"""Init signal

3 signals are added: ``feeder_exited``, ``parser_exited`` and
``reach_max_num``.
4 signals are added: ``feeder_exited``, ``parser_exited``,
``reach_max_num`` and ``exceed_storage_space``.
"""
self.signal = Signal()
self.signal.set(feeder_exited=False, parser_exited=False, reach_max_num=False)
self.signal.set(feeder_exited=False, parser_exited=False, reach_max_num=False, exceed_storage_space=False)

def set_storage(self, storage):
"""Set storage backend for downloader
Expand Down Expand Up @@ -133,14 +134,7 @@ def set_session(self, headers=None):
header to init the session)
"""
if headers is None:
headers = {
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
" AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/88.0.4324.104 Safari/537.36"
),
}
headers = defaults.DEFAULT_HEADERS
elif not isinstance(headers, dict):
raise TypeError('"headers" must be a dict object')

Expand Down
12 changes: 12 additions & 0 deletions icrawler/defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
MAX_RETRIES = 3
BACKOFF_BASE = 1.2

ACCEPT_LANGUAGES = "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36"
)

DEFAULT_HEADERS = {
"Accept-Language": ACCEPT_LANGUAGES,
"User-Agent": USER_AGENT,
}
20 changes: 15 additions & 5 deletions icrawler/downloader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import queue
import errno
import time
from io import BytesIO
from threading import current_thread
Expand Down Expand Up @@ -114,7 +115,7 @@ def download(self, task, default_ext, timeout=5, max_retry=3, overwrite=False, *
return
self.fetched_num -= 1

while retry > 0 and not self.signal.get("reach_max_num"):
while retry > 0 and not self.signal.get("reach_max_num") and not self.signal.get("exceed_storage_space"):
try:
response = self.session.get(file_url, timeout=timeout)
except Exception as e:
Expand All @@ -136,10 +137,19 @@ def download(self, task, default_ext, timeout=5, max_retry=3, overwrite=False, *
with self.lock:
self.fetched_num += 1
filename = self.get_filename(task, default_ext)
self.logger.info("image #%s\t%s", self.fetched_num, file_url)
self.storage.write(filename, response.content)
task["success"] = True
task["filename"] = filename
self.logger.info("image #%s\t%s %s", self.fetched_num, filename, file_url)

task["success"] = False
try:
task["filename"] = filename # may be zero bytes if OSError happened during write()
self.storage.write(filename, response.content)
task["success"] = True
except OSError as o:
# errno.EINVAL -- name too long
if o.errno == errno.ENOSPC:
self.signal.set(exceed_storage_space=True)
else:
raise
break
finally:
retry -= 1
Expand Down
17 changes: 15 additions & 2 deletions icrawler/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def worker_exec(self, queue_timeout=2, req_timeout=5, max_retry=3, **kwargs):
"downloaded image reached max num, thread %s " "is ready to exit", current_thread().name
)
break
if self.signal.get("exceed_storage_space"):
self.logger.info(
"no more storage space, thread %s " "is ready to exit", current_thread().name
)
break
# get the page url
try:
url = self.in_queue.get(timeout=queue_timeout)
Expand Down Expand Up @@ -90,8 +95,14 @@ def worker_exec(self, queue_timeout=2, req_timeout=5, max_retry=3, **kwargs):
)
else:
self.logger.info(f"parsing result page {url}")
for task in self.parse(response, **kwargs):
while not self.signal.get("reach_max_num"):
task_list = self.parse(response, **kwargs)
if not task_list:
self.logger.debug("self.parse() returned no tasks")
with open("task_list_error.log", 'ab') as f:
f.write(response.content)

for task in task_list:
while not self.signal.get("reach_max_num") and not self.signal.get("exceed_storage_space"):
try:
if isinstance(task, dict):
self.output(task, timeout=1)
Expand All @@ -110,6 +121,8 @@ def worker_exec(self, queue_timeout=2, req_timeout=5, max_retry=3, **kwargs):
break
if self.signal.get("reach_max_num"):
break
if self.signal.get("exceed_storage_space"):
break
self.in_queue.task_done()
break
finally:
Expand Down
45 changes: 22 additions & 23 deletions icrawler/utils/proxy_pool.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
import logging
import queue
import random
import threading
import time

import chanfig
import requests
from bs4 import BeautifulSoup

Expand Down Expand Up @@ -44,7 +44,12 @@ def to_dict(self):
dict: A dict with four keys: ``addr``, ``protocol``,
``weight`` and ``last_checked``
"""
return dict(addr=self.addr, protocol=self.protocol, weight=self.weight, last_checked=self.last_checked)
return {
"addr": self.addr,
"protocol": self.protocol,
"weight": self.weight,
"last_checked": self.last_checked,
}


class ProxyPool:
Expand Down Expand Up @@ -146,17 +151,15 @@ def save(self, filename):
for proxy in self.proxies[protocol]:
serializable_proxy = self.proxies[protocol][proxy].to_dict()
proxies[protocol].append(serializable_proxy)
with open(filename, "w") as fout:
json.dump(proxies, fout)
chanfig.save(proxies, filename)

def load(self, filename):
"""Load proxies from file"""
with open(filename) as fin:
proxies = json.load(fin)
for protocol in proxies:
for proxy in proxies[protocol]:
proxies = chanfig.load(filename)
for protocol, protocol_proxies in proxies.items():
for proxy in protocol_proxies:
self.proxies[protocol][proxy["addr"]] = Proxy(
proxy["addr"], proxy["protocol"], proxy["weight"], proxy["last_checked"]
proxy["addr"], protocol, proxy.get("weight", 1.0), proxy.get("last_checked")
)
self.addr_list[protocol].append(proxy["addr"])

Expand Down Expand Up @@ -215,7 +218,7 @@ def is_valid(self, addr, protocol="http", timeout=5):
raise
except requests.exceptions.Timeout:
return {"valid": False, "msg": "timeout"}
except:
except BaseException: # noqa: B036
return {"valid": False, "msg": "exception"}
else:
if r.status_code == 200:
Expand Down Expand Up @@ -278,20 +281,20 @@ def scan(
t = threading.Thread(
name=f"val-{i + 1:0>2d}",
target=self.validate,
kwargs=dict(
proxy_scanner=proxy_scanner,
expected_num=expected_num,
queue_timeout=queue_timeout,
val_timeout=val_timeout,
),
kwargs={
"proxy_scanner": proxy_scanner,
"expected_num": expected_num,
"queue_timeout": queue_timeout,
"val_timeout": val_timeout,
},
)
t.daemon = True
val_threads.append(t)
t.start()
for t in val_threads:
t.join()
self.logger.info("Proxy scanning done!")
except:
except BaseException:
raise
finally:
if out_file is not None:
Expand Down Expand Up @@ -466,18 +469,14 @@ def scan_free_proxy_list(self):
def scan_file(self, src_file):
"""Scan candidate proxies from an existing file"""
self.logger.info(f"start scanning file {src_file} for proxy list...")
with open(src_file) as fin:
proxies = json.load(fin)
proxies = chanfig.load(src_file)
for protocol in proxies.keys():
for proxy in proxies[protocol]:
self.proxy_queue.put({"addr": proxy["addr"], "protocol": protocol})

def is_scanning(self):
"""Return whether at least one scanning thread is alive"""
for t in self.scan_threads:
if t.is_alive():
return True
return False
return any(t.is_alive() for t in self.scan_threads)

def scan(self):
"""Start a thread for each registered scan function to scan proxy lists"""
Expand Down
75 changes: 46 additions & 29 deletions icrawler/utils/session.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,59 @@
from __future__ import annotations

import logging
from collections.abc import Mapping
from urllib.parse import urlsplit

import requests
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential

from .. import defaults
from .proxy_pool import ProxyPool


class Session(requests.Session):
def __init__(self, proxy_pool):
def __init__(
self, proxy_pool: ProxyPool | None = None, headers: Mapping | None = None, cookies: Mapping | None = None
):
super().__init__()
self.logger = logging.getLogger("cscholars.connection")
self.proxy_pool = proxy_pool
if headers is not None:
self.headers.update(headers)
if cookies is not None:
self.cookies.update(cookies)

def _url_scheme(self, url):
return urlsplit(url).scheme

def get(self, url, **kwargs):
proxy = self.proxy_pool.get_next(protocol=self._url_scheme(url))
if proxy is None:
return super().get(url, **kwargs)
try:
response = super().get(url, proxies=proxy.format(), **kwargs)
except requests.exceptions.ConnectionError:
self.proxy_pool.decrease_weight(proxy)
raise
except:
raise
else:
self.proxy_pool.increase_weight(proxy)
return response

def post(self, url, data=None, json=None, **kwargs):
proxy = self.proxy_pool.get_next(protocol=self._url_scheme(url))
if proxy is None:
return super().get(url, data, json, **kwargs)
try:
response = super().post(url, data, json, proxies=proxy.format(), **kwargs)
except requests.exceptions.ConnectionError:
self.proxy_pool.decrease_weight(proxy)
raise
except:
raise
@retry(
stop=stop_after_attempt(defaults.MAX_RETRIES),
wait=wait_random_exponential(exp_base=defaults.BACKOFF_BASE),
retry=retry_if_exception_type((requests.RequestException, requests.HTTPError, requests.ConnectionError)),
)
def request(self, method, url, *args, **kwargs):
message = f"{method}ing {url}"
if args and kwargs:
message += f" with {args} and {kwargs}"
elif args:
message += f" with {args}"
elif kwargs:
message += f" with {kwargs}"
self.logger.debug(message)

if self.proxy_pool is not None:
proxy = self.proxy_pool.get_next(protocol=self._url_scheme(url))
self.logger.debug(f"Using proxy: {proxy.format()}")
try:
response = super().request(method, url, *args, proxies=proxy.format(), **kwargs)
response.raise_for_status()
self.proxy_pool.increase_weight(proxy)
except (requests.ConnectionError, requests.HTTPError):
self.proxy_pool.decrease_weight(proxy)
raise
else:
self.proxy_pool.increase_weight(proxy)
return response
response = super().request(method, url, *args, **kwargs)

if "set-cookie" in response.headers:
self.cookies.update(response.cookies)
return response
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ dynamic = [
dependencies = [
"beautifulsoup4",
"bs4",
"chanfig",
"lxml",
"pillow",
"pyyaml",
"requests",
"six",
]
Expand Down
Loading