Skip to content

Commit 732a488

Browse files
Support for http retry middleware (#70)
1 parent 292b6ae commit 732a488

5 files changed

Lines changed: 480 additions & 62 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ description = "Foundation for llm integration with Dremio"
99
readme = "README.md"
1010
requires-python = ">=3.11"
1111
dependencies = [
12-
"aiohttp>=3.11.12",
12+
"aiohttp>=3.12.15",
1313
"beeai-framework>=0.1.8",
1414
"black>=25.1.0",
1515
"click>=8.1.8",

src/dremioai/api/transport.py

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,91 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16+
import logging
17+
import asyncio
1618

1719
from aiohttp import ClientSession, ClientResponse, ClientResponseError
20+
from typing import (
21+
AnyStr,
22+
Callable,
23+
Optional,
24+
Dict,
25+
TypeAlias,
26+
Union,
27+
TextIO,
28+
Awaitable,
29+
Any,
30+
)
1831
from pathlib import Path
19-
from typing import AnyStr, Callable, Optional, Dict, TypeAlias, Union, TextIO
2032
from dremioai.log import logger
2133
from json import loads
2234
from pydantic import BaseModel, ValidationError
35+
from http import HTTPStatus
2336

2437
from dremioai.config import settings
2538
from dremioai.api.oauth2 import get_oauth2_tokens
2639

2740
DeserializationStrategy: TypeAlias = Union[Callable, BaseModel]
2841

2942

43+
class RetryConfig:
44+
def __init__(self):
45+
if settings.instance() and settings.instance().dremio:
46+
self.config = settings.instance().dremio.http_retry
47+
else:
48+
self.config = settings.HttpRetry()
49+
50+
@property
51+
def max_retries(self) -> int:
52+
"""Expose max_retries from config for convenience"""
53+
return self.config.max_retries
54+
55+
def get_config_delay(self, attempt_number: int = 0) -> float:
56+
return self.config.initial_delay * (
57+
self.config.backoff_multiplier**attempt_number
58+
)
59+
60+
def get_delay(
61+
self,
62+
response: ClientResponse,
63+
attempt_number: int,
64+
) -> float:
65+
retry_after = response.headers.get("Retry-After")
66+
delay = self.get_config_delay(attempt_number=attempt_number)
67+
if retry_after is not None:
68+
try:
69+
delay = min(delay, int(retry_after))
70+
except (ValueError, TypeError) as e:
71+
logger().debug(
72+
f"Invalid Retry-After header, using exponential backoff - {e}"
73+
)
74+
75+
return min(delay, self.config.max_delay)
76+
77+
78+
async def retry_middleware(
79+
req, handler: Callable[[any], Awaitable[ClientResponse]]
80+
) -> ClientResponse:
81+
"""
82+
Middleware that automatically retries requests on 429 (rate limit) errors.
83+
Uses exponential backoff with configurable parameters from settings.
84+
"""
85+
retry_config = RetryConfig()
86+
for attempt in range(retry_config.max_retries + 1):
87+
response = await handler(req)
88+
if response.status != HTTPStatus.TOO_MANY_REQUESTS:
89+
break
90+
91+
delay = retry_config.get_delay(response, attempt)
92+
logger(f"{__name__}.retry").warning(
93+
f"Rate limited (429) on {req.method} {req.url.path}. "
94+
f"Retry {attempt + 1}/{retry_config.max_retries} after {delay:.2f}s"
95+
)
96+
await asyncio.sleep(delay)
97+
98+
return response
99+
100+
30101
class AsyncHttpClient:
31102
def __init__(self, uri: AnyStr, token: AnyStr):
32103
self.uri = uri
@@ -83,6 +154,18 @@ async def handle_response(
83154
)
84155
await self.download(response, file)
85156

157+
def log_request(
158+
self, method: str, endpoint: str, params: Optional[Dict[AnyStr, Any]] = None
159+
):
160+
if logger().isEnabledFor(logging.DEBUG):
161+
sanitized_headers = {
162+
k: (v if k != "Authorization" else "Bearer <redacted>")
163+
for k, v in self.headers.items()
164+
}
165+
logger().debug(
166+
f"{method} {self.uri}{endpoint}', headers={sanitized_headers}, params={params}"
167+
)
168+
86169
async def get(
87170
self,
88171
endpoint: AnyStr,
@@ -92,10 +175,8 @@ async def get(
92175
file: Optional[TextIO] = None,
93176
top_level_list: bool = False,
94177
):
95-
async with ClientSession() as session:
96-
logger().info(
97-
f"{self.uri}{endpoint}', headers={self.headers}, params={params}"
98-
)
178+
async with ClientSession(middlewares=(retry_middleware,)) as session:
179+
self.log_request("GET", endpoint, params)
99180
async with session.get(
100181
f"{self.uri}{endpoint}",
101182
headers=self.headers,
@@ -115,7 +196,8 @@ async def post(
115196
file: Optional[TextIO] = None,
116197
top_level_list: bool = False,
117198
):
118-
async with ClientSession() as session:
199+
async with ClientSession(middlewares=(retry_middleware,)) as session:
200+
self.log_request("POST", endpoint)
119201
async with session.post(
120202
f"{self.uri}{endpoint}", headers=self.headers, json=body, ssl=False
121203
) as response:

src/dremioai/config/settings.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,25 @@ class Metrics(BaseModel):
143143
model_config = ConfigDict(validate_assignment=True)
144144

145145

146+
class HttpRetry(BaseModel):
147+
"""Configuration for HTTP retry behavior with exponential backoff"""
148+
149+
max_retries: Optional[int] = Field(
150+
default=3,
151+
description="Maximum number of retry attempts for rate-limited requests",
152+
)
153+
initial_delay: Optional[float] = Field(
154+
default=1.0, description="Initial delay in seconds before first retry"
155+
)
156+
max_delay: Optional[float] = Field(
157+
default=60.0, description="Maximum delay in seconds between retries"
158+
)
159+
backoff_multiplier: Optional[float] = Field(
160+
default=2.0, description="Multiplier for exponential backoff"
161+
)
162+
model_config = ConfigDict(validate_assignment=True)
163+
164+
146165
class Dremio(BaseModel):
147166
uri: Annotated[
148167
Union[str, HttpUrl, DremioCloudUri], AfterValidator(_resolve_dremio_uri)
@@ -160,6 +179,8 @@ class Dremio(BaseModel):
160179
wlm: Optional[Wlm] = None
161180
# Metrics server configuration
162181
metrics: Optional[Metrics] = None
182+
# HTTP retry configuration
183+
http_retry: Optional[HttpRetry] = Field(default_factory=HttpRetry)
163184
model_config = ConfigDict(validate_assignment=True)
164185

165186
@field_serializer("raw_pat")

0 commit comments

Comments
 (0)