Skip to content

Commit 8f7c63f

Browse files
authored
Merge pull request #129 from CHRISCARLON/async-ngd-client
New Feature: Async Client Moving into OS Datahub git for integration, testing and completion.
2 parents 875666c + 8adfeec commit 8f7c63f

12 files changed

Lines changed: 990 additions & 31 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,5 @@ settings.json
143143
*.tif
144144

145145
/sandbox
146-
github_issue.py
146+
github_issue.py
147+
pyrightconfig.json

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ typeguard~=4.4.4
33
shapely~=2.1.1
44
tqdm~=4.67.1
55
requests~=2.32.4
6+
pytest-asyncio~=1.3.0
7+
aiohttp~=3.13.2
68
urllib3>=2.5.0 # not directly required, pinned by Snyk to avoid a vulnerability
79
zipp>=3.23.0 # not directly required, pinned by Snyk to avoid a vulnerability
8-

setup.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ packages=find:
4949
where=src
5050
5151
[options.extras_require]
52+
async =
53+
aiohttp>=3.13.2
5254
dev =
5355
requests-mock
5456
pytest
57+
pytest-asyncio
5558
python-dotenv
5659
docs =
5760
sphinx

src/osdatahub/AsyncAPI/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .client import AsyncHTTPClient
2+
from .rate_limiter import RateLimiter

src/osdatahub/AsyncAPI/client.py

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
import asyncio
2+
import logging
3+
from typing import Any, Dict, Optional
4+
5+
import aiohttp
6+
7+
from osdatahub.AsyncAPI.rate_limiter import RateLimiter
8+
9+
_USER_AGENT_TAG = "osdatahub-python-async"
10+
11+
12+
class AsyncHTTPClient:
13+
"""
14+
Reusable async HTTP client with connection pooling, rate limiting, and retry logic.
15+
16+
This client provides:
17+
- Connection pooling via aiohttp TCPConnector
18+
- Rate limiting via semaphore and request delays
19+
- Automatic retries with exponential backoff
20+
- Content-length validation
21+
- Proxy support
22+
23+
Args:
24+
max_concurrent: Maximum concurrent requests (default: 5)
25+
request_delay: Delay between requests in seconds (default: 0.1)
26+
max_retries: Maximum retry attempts on failure (default: 3)
27+
connector_limit: Total connection pool limit (default: 10)
28+
connector_limit_per_host: Per-host connection limit (default: 5)
29+
timeout: Request timeout in seconds (default: 30)
30+
proxies: Proxy configuration dict (e.g., {"http": "...", "https": "..."})
31+
Uses the "https" value for HTTPS requests, "http" for HTTP.
32+
33+
Example::
34+
35+
async with AsyncHTTPClient() as client:
36+
response = await client.get(url, params=params, headers=headers)
37+
"""
38+
39+
def __init__(
40+
self,
41+
max_concurrent: int = 5,
42+
request_delay: float = 0.1,
43+
max_retries: int = 3,
44+
connector_limit: int = 30,
45+
connector_limit_per_host: int = 5,
46+
timeout: float = 30.0,
47+
proxies: Optional[Dict[str, str]] = None,
48+
) -> None:
49+
self._max_concurrent = max_concurrent
50+
self._request_delay = request_delay
51+
self._max_retries = max_retries
52+
self._connector_limit = connector_limit
53+
self._connector_limit_per_host = connector_limit_per_host
54+
self._timeout = timeout
55+
self._proxies = proxies or {}
56+
57+
self._session: Optional[aiohttp.ClientSession] = None
58+
self._rate_limiter: Optional[RateLimiter] = None
59+
60+
def _get_proxy(self, url: str) -> Optional[str]:
61+
"""Get the appropriate proxy URL for the given request URL."""
62+
if not self._proxies:
63+
return None
64+
if url.startswith("https://"):
65+
return self._proxies.get("https")
66+
return self._proxies.get("http")
67+
68+
async def _get_session(self) -> aiohttp.ClientSession:
69+
"Initialisation of aiohttp session."
70+
if self._session is None or self._session.closed:
71+
connector = aiohttp.TCPConnector(
72+
limit=self._connector_limit,
73+
limit_per_host=self._connector_limit_per_host,
74+
ttl_dns_cache=300,
75+
force_close=False,
76+
enable_cleanup_closed=True,
77+
)
78+
timeout = aiohttp.ClientTimeout(total=self._timeout)
79+
self._session = aiohttp.ClientSession(connector=connector, timeout=timeout)
80+
return self._session
81+
82+
def _get_rate_limiter(self) -> RateLimiter:
83+
"""Initialisation of rate limiter."""
84+
if self._rate_limiter is None:
85+
self._rate_limiter = RateLimiter(
86+
max_concurrent=self._max_concurrent, request_delay=self._request_delay
87+
)
88+
return self._rate_limiter
89+
90+
async def get(
91+
self,
92+
url: str,
93+
params: Optional[Dict[str, Any]] = None,
94+
headers: Optional[Dict[str, str]] = None,
95+
**kwargs,
96+
) -> Dict[str, Any]:
97+
"""
98+
Perform an async GET request with rate limiting and retries.
99+
100+
Args:
101+
url: The URL to request
102+
params: Query parameters
103+
headers: HTTP headers
104+
**kwargs: Additional arguments passed to aiohttp
105+
106+
Returns:
107+
JSON response as dict
108+
109+
Raises:
110+
aiohttp.ClientResponseError: On HTTP errors after retries exhausted
111+
IOError: On content length mismatch
112+
"""
113+
headers = self._prepare_headers(headers)
114+
session = await self._get_session()
115+
rate_limiter = self._get_rate_limiter()
116+
117+
last_exception: Optional[Exception] = None
118+
119+
proxy = self._get_proxy(url)
120+
121+
for attempt in range(self._max_retries):
122+
try:
123+
# TODO: Write docs on how this works
124+
# The key is that everything must acquire the semapohore to proceed
125+
async with rate_limiter:
126+
async with session.get(
127+
url, params=params, headers=headers, proxy=proxy, **kwargs
128+
) as response:
129+
response.raise_for_status()
130+
data = await response.json()
131+
self._validate_content_length(response, data)
132+
return data
133+
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
134+
last_exception = e
135+
if attempt < self._max_retries - 1:
136+
backoff = 0.5 * (2**attempt) # Exponential backoff
137+
logging.warning(
138+
f"Request failed (attempt {attempt + 1}/{self._max_retries}), "
139+
f"retrying in {backoff}s: {e}"
140+
)
141+
await asyncio.sleep(backoff)
142+
143+
if last_exception is not None:
144+
raise last_exception
145+
raise RuntimeError("Unexpected state: no exception but request did not succeed")
146+
147+
async def post(
148+
self,
149+
url: str,
150+
data: Optional[Any] = None,
151+
json: Optional[Dict[str, Any]] = None,
152+
params: Optional[Dict[str, Any]] = None,
153+
headers: Optional[Dict[str, str]] = None,
154+
**kwargs,
155+
) -> Dict[str, Any]:
156+
"""
157+
Perform an async POST request with rate limiting and retries.
158+
159+
Args:
160+
url: The URL to request
161+
data: Form data to send
162+
json: JSON data to send
163+
params: Query parameters
164+
headers: HTTP headers
165+
**kwargs: Additional arguments passed to aiohttp
166+
167+
Returns:
168+
JSON response as dict
169+
170+
Raises:
171+
aiohttp.ClientResponseError: On HTTP errors after retries exhausted
172+
"""
173+
headers = self._prepare_headers(headers)
174+
session = await self._get_session()
175+
rate_limiter = self._get_rate_limiter()
176+
177+
last_exception: Optional[Exception] = None
178+
179+
proxy = self._get_proxy(url)
180+
181+
for attempt in range(self._max_retries):
182+
try:
183+
async with rate_limiter:
184+
async with session.post(
185+
url,
186+
data=data,
187+
json=json,
188+
params=params,
189+
headers=headers,
190+
proxy=proxy,
191+
**kwargs,
192+
) as response:
193+
response.raise_for_status()
194+
return await response.json()
195+
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
196+
last_exception = e
197+
if attempt < self._max_retries - 1:
198+
backoff = 0.5 * (2**attempt)
199+
logging.warning(
200+
f"Request failed (attempt {attempt + 1}/{self._max_retries}), "
201+
f"retrying in {backoff}s: {e}"
202+
)
203+
await asyncio.sleep(backoff)
204+
205+
if last_exception is not None:
206+
raise last_exception
207+
raise RuntimeError("Unexpected state: no exception but request did not succeed")
208+
209+
def _prepare_headers(self, headers: Optional[Dict[str, str]]) -> Dict[str, str]:
210+
"""Add User-Agent header to requests."""
211+
headers = headers.copy() if headers else {}
212+
headers.setdefault("User-Agent", _USER_AGENT_TAG)
213+
return headers
214+
215+
def _validate_content_length(
216+
self, response: aiohttp.ClientResponse, data: Any
217+
) -> None:
218+
"""
219+
Validate response content length matches header.
220+
"""
221+
expected = response.headers.get("Content-Length")
222+
if expected is not None:
223+
pass
224+
225+
async def close(self) -> None:
226+
"""Close the HTTP session."""
227+
if self._session is not None and not self._session.closed:
228+
await self._session.close()
229+
self._session = None
230+
231+
async def __aenter__(self) -> "AsyncHTTPClient":
232+
"""Context manager entry."""
233+
return self
234+
235+
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
236+
"""Context manager exit - cleanup resources."""
237+
await self.close()
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import asyncio
2+
import time
3+
4+
5+
class RateLimiter:
6+
"""
7+
This class provides two levels of rate limiting:
8+
9+
1. Concurrent request limiting via asyncio.Semaphore
10+
2. Time-based delay between requests to prevent burst traffic
11+
12+
Args:
13+
max_concurrent: Maximum number of concurrent requests (default: 5)
14+
request_delay: Minimum delay in seconds between requests (default: 0.02)
15+
16+
Example::
17+
18+
limiter = RateLimiter(max_concurrent=5, request_delay=0.3)
19+
async with limiter:
20+
await make_request()
21+
"""
22+
23+
def __init__(self, max_concurrent: int = 5, request_delay: float = 0.02) -> None:
24+
self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrent)
25+
self._request_delay: float = request_delay
26+
self._last_request_time: float = 0.0
27+
self._lock: asyncio.Lock = asyncio.Lock()
28+
29+
async def __aenter__(self) -> "RateLimiter":
30+
"""Acquire semaphore and enforce delay between requests."""
31+
await self._semaphore.acquire()
32+
await self._enforce_delay()
33+
return self
34+
35+
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
36+
"""Release the semaphore."""
37+
self._semaphore.release()
38+
39+
async def _enforce_delay(self) -> None:
40+
"""Ensure minimum delay between requests."""
41+
async with self._lock:
42+
now = time.monotonic()
43+
elapsed = now - self._last_request_time
44+
if elapsed < self._request_delay:
45+
await asyncio.sleep(self._request_delay - elapsed)
46+
self._last_request_time = time.monotonic()
47+
48+
@property
49+
def max_concurrent(self) -> int:
50+
"""Return the maximum number of concurrent requests allowed."""
51+
return self._semaphore._value
52+
53+
@property
54+
def request_delay(self) -> float:
55+
"""Return the minimum delay between requests."""
56+
return self._request_delay

src/osdatahub/NGD/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
from .ngd_api import NGD
1+
from .ngd_api import NGD
2+
from .async_ngd_api import AsyncNGD

0 commit comments

Comments
 (0)