-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbkb_client.py
More file actions
225 lines (184 loc) · 7.5 KB
/
bkb_client.py
File metadata and controls
225 lines (184 loc) · 7.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
"""Thin client for interacting with BiomarkerKB HTTP API."""
from __future__ import annotations
import io
import json
from dataclasses import dataclass
from typing import Callable, Dict, Iterable, Optional
import pandas as pd
import requests
API_BASE = "https://api.biomarkerkb.org"
SEARCH_PATH = "/biomarker/search"
DOWNLOAD_PATH = "/data/list_download"
class BiomarkerKBError(RuntimeError):
"""Raised when the BiomarkerKB API request cannot be satisfied."""
@dataclass(frozen=True)
class ListRequest:
"""Parameters that drive creation of temporary server-side lists."""
payload: Dict[str, object]
description: str
def _safe_json(response: requests.Response) -> Dict[str, object]:
"""Parse JSON responses while surfacing malformed payloads as rich errors."""
try:
return response.json()
except ValueError as exc: # server returned HTML/text instead of JSON
snippet = response.text[:500]
raise BiomarkerKBError(
"Non-JSON payload received from BiomarkerKB search endpoint. "
"Status code: %s. Body snippet: %r" % (response.status_code, snippet)
) from exc
def create_list(list_request: ListRequest, timeout: int = 60) -> Optional[str]:
"""Create a BiomarkerKB list and return its identifier."""
url = f"{API_BASE}{SEARCH_PATH}"
headers = {"Content-Type": "application/json", "Accept": "application/json"}
try:
response = requests.post(url, json=list_request.payload, headers=headers, timeout=timeout)
response.raise_for_status()
except requests.exceptions.RequestException as exc:
raise BiomarkerKBError(
f"Search request failed for {list_request.description!r}: {exc}"
) from exc
data = _safe_json(response)
list_id = data.get("list_id")
if not list_id:
raise BiomarkerKBError(
f"BiomarkerKB search response for {list_request.description!r} did not contain a 'list_id'."
)
return str(list_id)
def _parse_csv(data: str) -> pd.DataFrame:
"""Convert CSV text into a DataFrame while catching parser edge cases."""
buffer = io.StringIO(data)
try:
return pd.read_csv(buffer)
except pd.errors.EmptyDataError:
# The API explicitly told us there are no rows.
return pd.DataFrame()
except (pd.errors.ParserError, UnicodeDecodeError) as exc:
raise BiomarkerKBError(
f"CSV parsing failed with error: {exc}."
) from exc
def download_list(list_id: str, *, expect_label: str, timeout: int = 300) -> pd.DataFrame:
"""Download a previously created list and materialise it as a DataFrame."""
url = f"{API_BASE}{DOWNLOAD_PATH}"
headers = {"Content-Type": "application/json", "Accept": "text/csv"}
payload = {
"id": list_id,
"download_type": "biomarker_list",
"format": "csv",
"compressed": False,
}
try:
response = requests.post(url, json=payload, headers=headers, timeout=timeout)
response.raise_for_status()
except requests.exceptions.RequestException as exc:
raise BiomarkerKBError(
f"Data download failed for {expect_label!r}: {exc}"
) from exc
if not response.text or len(response.text.splitlines()) <= 1:
return pd.DataFrame()
try:
return _parse_csv(response.text)
except BiomarkerKBError:
# When CSV parsing fails, fall back to JSON download and convert manually.
json_payload = {
"id": list_id,
"download_type": "biomarker_list",
"format": "json",
"compressed": False,
}
try:
json_resp = requests.post(url, json=json_payload, headers=headers, timeout=timeout)
json_resp.raise_for_status()
except requests.exceptions.RequestException as exc:
raise BiomarkerKBError(
f"CSV parsing failed and JSON fallback also errored for {expect_label!r}: {exc}"
) from exc
try:
parsed = json.loads(json_resp.text)
except json.JSONDecodeError as exc:
snippet = json_resp.text[:500]
raise BiomarkerKBError(
"BiomarkerKB JSON fallback payload was malformed. "
"Body snippet: %r" % snippet
) from exc
if isinstance(parsed, list):
return pd.DataFrame(parsed)
raise BiomarkerKBError(
"Unexpected JSON structure received from BiomarkerKB download endpoint."
)
def ensure_complete_results(df: pd.DataFrame, *, page_hint: Optional[int] = None) -> None:
"""Emit a warning when the client-side row count suggests truncation."""
if page_hint is None or df.empty:
return
if len(df) >= page_hint:
print(
"⚠️ Retrieved %s rows, which matches or exceeds the configured page size (%s). "
"Results may be truncated. Consider adjusting the request parameters."
% (len(df), page_hint)
)
def _default_log(message: str) -> None:
print(message)
def download_with_size_escalation(
*,
payload_factory: Callable[[Optional[int]], Dict[str, object]],
description: str,
expect_label: str,
initial_size: Optional[int],
max_attempts: int = 4,
logger: Callable[[str], None] = _default_log,
) -> Optional[pd.DataFrame]:
"""Create a list and download it, retrying with larger page sizes when needed."""
attempt_size = initial_size
attempts = 0
previous_row_count: Optional[int] = None
while True:
logger(
"🔬 Creating a search list for %s (size=%s)..." % (description, attempt_size or "auto")
)
request = ListRequest(payload=payload_factory(attempt_size), description=description)
try:
list_id = create_list(request)
except BiomarkerKBError as exc:
logger(f" ❌ API request failed for {description!r}: {exc}")
return None
logger(f"📂 Downloading data for List ID: {list_id}...")
try:
df = download_list(list_id, expect_label=expect_label)
except BiomarkerKBError as exc:
logger(f" ❌ Data download or parsing failed for {description!r}: {exc}")
return None
ensure_complete_results(df, page_hint=attempt_size)
row_count = len(df)
logger(f" ✅ Retrieved {row_count} rows for {description!r}.")
if attempt_size is None or row_count == 0:
return df
if row_count < attempt_size:
return df
if previous_row_count == row_count:
logger(
" ⚠️ Received the same row count on consecutive attempts; assuming the dataset is complete."
)
return df
attempts += 1
if attempts >= max_attempts:
logger(
" ⚠️ Reached the maximum number of size escalation attempts. "
"Proceeding with the most recent download."
)
return df
previous_row_count = row_count
if attempt_size is not None:
attempt_size *= 2
logger(
" ⚠️ Row count matches the requested page size. Retrying with a larger size (%s)."
% (attempt_size or "auto")
)
def chunk(iterable: Iterable[str], size: int) -> Iterable[list[str]]:
"""Yield successive fixed-size chunks from *iterable*."""
batch: list[str] = []
for item in iterable:
batch.append(item)
if len(batch) == size:
yield batch
batch = []
if batch:
yield batch