|
2 | 2 | import os |
3 | 3 | import sys |
4 | 4 | from datetime import datetime |
5 | | -from time import time |
| 5 | +from time import sleep |
6 | 6 | from typing import Dict, Optional |
7 | 7 | from urllib.parse import urljoin |
8 | 8 |
|
9 | 9 | import requests |
10 | | -from requests.exceptions import ChunkedEncodingError |
| 10 | +from requests.exceptions import ( |
| 11 | + ChunkedEncodingError, |
| 12 | + ConnectionError, |
| 13 | + ContentDecodingError, |
| 14 | + JSONDecodeError, |
| 15 | + ReadTimeout, |
| 16 | +) |
| 17 | +from urllib3.exceptions import DecodeError |
11 | 18 |
|
12 | 19 | logging.basicConfig(stream=sys.stdout, level=logging.INFO) |
13 | 20 | logger = logging.getLogger(__name__) |
@@ -72,63 +79,128 @@ def __init__( |
72 | 79 | else: |
73 | 80 | self.end_date = now_datetime.strftime("%Y-%m-%d") |
74 | 81 |
|
75 | | - def request(self, url, method, params=None): |
| 82 | + def request(self, url, method, params=None, parse_json: bool = False): |
76 | 83 | """Send an API request to open Engage.""" |
77 | 84 |
|
| 85 | + headers = {"Accept-Encoding": "identity", "Accept": "application/json"} |
| 86 | + retryable = ( |
| 87 | + ChunkedEncodingError, |
| 88 | + ContentDecodingError, |
| 89 | + DecodeError, |
| 90 | + ReadTimeout, |
| 91 | + ConnectionError, |
| 92 | + ) |
| 93 | + transient_status = {429, 500, 502, 503, 504} |
| 94 | + backoff = 0.1 |
| 95 | + |
78 | 96 | for attempt in range(self.max_retries): |
79 | 97 | try: |
80 | 98 | if method.casefold() == "get": |
81 | | - return requests.get(url, params=params, timeout=10) |
| 99 | + response = requests.get( |
| 100 | + url, params=params, headers=headers, timeout=(5, 30) |
| 101 | + ) |
82 | 102 | elif method.casefold() == "post": |
83 | | - return requests.post(url, json=params, timeout=10) |
| 103 | + response = requests.post( |
| 104 | + url, json=params, headers=headers, timeout=(5, 30) |
| 105 | + ) |
84 | 106 | else: |
85 | 107 | raise ConnectionError(f"Unknown method for query: {method}") |
86 | | - except ChunkedEncodingError as e: |
87 | | - logger.warning(f"ChunkedEncodingError occurred for {url}: {e}") |
| 108 | + if response.status_code in transient_status: |
| 109 | + logger.warning( |
| 110 | + f"{response.status_code} for {url} (attempt {attempt + 1}/{self.max_retries}); retrying in {backoff:.1f}s" |
| 111 | + ) |
| 112 | + if attempt + 1 == self.max_retries: |
| 113 | + response.raise_for_status() |
| 114 | + sleep(backoff) |
| 115 | + backoff = min(60.0, backoff * 2) |
| 116 | + continue |
| 117 | + elif 400 <= response.status_code < 500: |
| 118 | + response.raise_for_status() |
| 119 | + if not parse_json: |
| 120 | + return response |
| 121 | + |
| 122 | + try: |
| 123 | + return response.json() |
| 124 | + except JSONDecodeError: |
| 125 | + logger.warning( |
| 126 | + f"JSONDecodeError for {response.url} " |
| 127 | + f"(attempt {attempt + 1}/{self.max_retries}); retrying in {backoff:.1f}s" |
| 128 | + ) |
| 129 | + if attempt + 1 == self.max_retries: |
| 130 | + raise |
| 131 | + sleep(backoff) |
| 132 | + backoff = min(60.0, backoff * 2) |
| 133 | + continue |
| 134 | + |
| 135 | + except retryable as e: |
| 136 | + logger.warning( |
| 137 | + f"{e.__class__.__name__} for {url} (attempt {attempt + 1}/{self.max_retries}); " |
| 138 | + f"retrying in {backoff:.1f}s" |
| 139 | + ) |
88 | 140 | if attempt + 1 == self.max_retries: |
89 | | - raise e |
90 | | - time.sleep(3) |
| 141 | + raise |
| 142 | + sleep(backoff) |
| 143 | + backoff = min(60.0, backoff * 2) |
91 | 144 |
|
92 | 145 | def query(self, query, method="get", params=None): |
93 | 146 | """Perform a direct query.""" |
94 | 147 |
|
95 | | - r = self.request(urljoin(self.base, query), method, params=params) |
96 | | - r.raise_for_status() |
97 | | - return r.json() |
| 148 | + return self.request( |
| 149 | + urljoin(self.base, query), method, params=params, parse_json=True |
| 150 | + ) |
98 | 151 |
|
99 | | - def query_generator(self, query, method: str = "get", params: Dict = {}): |
| 152 | + def query_generator( |
| 153 | + self, query, method: str = "get", params: Optional[Dict] = None |
| 154 | + ): |
100 | 155 | """Query for a list of items, with paging. Returns a generator.""" |
101 | 156 |
|
102 | | - try: |
103 | | - total = self.number_of_preprints() |
104 | | - except Exception: |
105 | | - total = float("inf") # fallback if that call fails |
106 | | - |
107 | | - page = 0 |
108 | | - while True: |
109 | | - params.update( |
110 | | - { |
111 | | - "limit": self.page_size, |
112 | | - "skip": page * self.page_size, |
113 | | - "searchDateFrom": self.start_date, |
114 | | - "searchDateTo": self.end_date, |
115 | | - } |
116 | | - ) |
117 | | - if page * self.page_size > total: |
118 | | - break |
119 | | - r = self.request(urljoin(self.base, query), method, params=params) |
120 | | - if r.status_code == 400: |
121 | | - raise ValueError(r.json()["message"]) |
122 | | - r.raise_for_status() |
123 | | - r = r.json() |
124 | | - r = r["itemHits"] |
125 | | - |
126 | | - # If we have no more results, bail out |
127 | | - if len(r) == 0: |
128 | | - return |
129 | | - |
130 | | - yield from r |
131 | | - page += 1 |
| 157 | + start_datetime = datetime.fromisoformat(self.start_date) |
| 158 | + end_datetime = datetime.fromisoformat(self.end_date) |
| 159 | + |
| 160 | + def year_windows(): |
| 161 | + year = start_datetime.year |
| 162 | + while year <= end_datetime.year: |
| 163 | + year_start = datetime(year, 1, 1) |
| 164 | + year_end = datetime(year, 12, 31) |
| 165 | + win_start = max(start_datetime, year_start) |
| 166 | + win_end = min(end_datetime, year_end) |
| 167 | + yield win_start.strftime("%Y-%m-%d"), win_end.strftime("%Y-%m-%d") |
| 168 | + year += 1 |
| 169 | + |
| 170 | + params = (params or {}).copy() |
| 171 | + |
| 172 | + for year_from, year_to in year_windows(): |
| 173 | + logger.info(f"Starting to scrape data from {year_from} to {year_to}") |
| 174 | + page = 0 |
| 175 | + while True: |
| 176 | + params.update( |
| 177 | + { |
| 178 | + "limit": self.page_size, |
| 179 | + "skip": page * self.page_size, |
| 180 | + "searchDateFrom": year_from, |
| 181 | + "searchDateTo": year_to, |
| 182 | + } |
| 183 | + ) |
| 184 | + try: |
| 185 | + data = self.request( |
| 186 | + urljoin(self.base, query), |
| 187 | + method, |
| 188 | + params=params, |
| 189 | + parse_json=True, |
| 190 | + ) |
| 191 | + except requests.HTTPError as e: |
| 192 | + status = getattr(e.response, "status_code", None) |
| 193 | + logger.warning( |
| 194 | + f"Stopping year window {year_from}..{year_to} at skip={page * self.page_size} " |
| 195 | + f"due to HTTPError {status}" |
| 196 | + ) |
| 197 | + break |
| 198 | + items = data.get("itemHits", []) |
| 199 | + if not items: |
| 200 | + break |
| 201 | + for item in items: |
| 202 | + yield item |
| 203 | + page += 1 |
132 | 204 |
|
133 | 205 | def all_preprints(self): |
134 | 206 | """Return a generator to all the chemRxiv articles.""" |
|
0 commit comments