From 87db6bd6485d7e4da277b3adfc6c68890a45508d Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Fri, 16 May 2025 13:42:57 -0400 Subject: [PATCH 1/3] Add retries for asyncio --- pinecone/openapi_support/rest_aiohttp.py | 18 +++++++++++++++--- pinecone/openapi_support/rest_urllib3.py | 2 +- pinecone/openapi_support/retries.py | 21 --------------------- poetry.lock | 18 ++++++++++++++++-- pyproject.toml | 4 ++-- tests/unit/openapi_support/test_retries.py | 2 +- 6 files changed, 35 insertions(+), 30 deletions(-) delete mode 100644 pinecone/openapi_support/retries.py diff --git a/pinecone/openapi_support/rest_aiohttp.py b/pinecone/openapi_support/rest_aiohttp.py index 3cab099a..2a32342b 100644 --- a/pinecone/openapi_support/rest_aiohttp.py +++ b/pinecone/openapi_support/rest_aiohttp.py @@ -9,6 +9,8 @@ class AiohttpRestClient(RestClientInterface): def __init__(self, configuration: Configuration) -> None: try: import aiohttp + from aiohttp_retry import RetryClient + from .retry_aiohttp import JitterRetry except ImportError: raise ImportError( "Additional dependencies are required to use Pinecone with asyncio. Include these extra dependencies in your project by installing `pinecone[asyncio]`." @@ -28,8 +30,18 @@ def __init__(self, configuration: Configuration) -> None: else: self._session = aiohttp.ClientSession(connector=conn) + jitter_retry = JitterRetry( + attempts=5, + start_timeout=0.1, + max_timeout=3.0, + statuses={500, 502, 503, 504}, + methods=None, # retry on all methods + exceptions={aiohttp.ClientError, aiohttp.ServerDisconnectedError}, + ) + self._retry_client = RetryClient(client_session=self._session, retry_options=jitter_retry) + async def close(self): - await self._session.close() + await self._retry_client.close() async def request( self, @@ -48,7 +60,7 @@ async def request( if "application/x-ndjson" in headers.get("Content-Type", "").lower(): ndjson_data = "\n".join(json.dumps(record) for record in body) - async with self._session.request( + async with self._retry_client.request( method, url, params=query_params, headers=headers, data=ndjson_data ) as resp: content = await resp.read() @@ -57,7 +69,7 @@ async def request( ) else: - async with self._session.request( + async with self._retry_client.request( method, url, params=query_params, headers=headers, json=body ) as resp: content = await resp.read() diff --git a/pinecone/openapi_support/rest_urllib3.py b/pinecone/openapi_support/rest_urllib3.py index f310ca99..3f718347 100644 --- a/pinecone/openapi_support/rest_urllib3.py +++ b/pinecone/openapi_support/rest_urllib3.py @@ -8,7 +8,7 @@ from .rest_utils import raise_exceptions_or_return, RESTResponse, RestClientInterface import urllib3 -from .retries import JitterRetry +from .retry_urllib3 import JitterRetry from .exceptions import PineconeApiException, PineconeApiValueError diff --git a/pinecone/openapi_support/retries.py b/pinecone/openapi_support/retries.py deleted file mode 100644 index 2b91a31d..00000000 --- a/pinecone/openapi_support/retries.py +++ /dev/null @@ -1,21 +0,0 @@ -import random -from urllib3.util.retry import Retry -import logging - -logger = logging.getLogger(__name__) - - -class JitterRetry(Retry): - """ - Retry with exponential back‑off with jitter. - - The Retry class is being extended as built-in support for jitter was added only in urllib3 2.0.0. - Jitter logic is following the official implementation with a constant jitter factor: https://github.com/urllib3/urllib3/blob/main/src/urllib3/util/retry.py - """ - - def get_backoff_time(self) -> float: - backoff_value = super().get_backoff_time() - jitter = random.random() * 0.25 - backoff_value += jitter - logger.debug(f"Calculating retry backoff: {backoff_value} (jitter: {jitter})") - return backoff_value diff --git a/poetry.lock b/poetry.lock index 2e4de34b..823ed4af 100644 --- a/poetry.lock +++ b/poetry.lock @@ -109,6 +109,20 @@ yarl = ">=1.17.0,<2.0" [package.extras] speedups = ["Brotli", "aiodns (>=3.2.0)", "brotlicffi"] +[[package]] +name = "aiohttp-retry" +version = "2.9.1" +description = "Simple retry client for aiohttp" +optional = true +python-versions = ">=3.7" +files = [ + {file = "aiohttp_retry-2.9.1-py3-none-any.whl", hash = "sha256:66d2759d1921838256a05a3f80ad7e724936f083e35be5abb5e16eed6be6dc54"}, + {file = "aiohttp_retry-2.9.1.tar.gz", hash = "sha256:8eb75e904ed4ee5c2ec242fefe85bf04240f685391c4879d8f541d6028ff01f1"}, +] + +[package.dependencies] +aiohttp = "*" + [[package]] name = "aiosignal" version = "1.3.1" @@ -1970,10 +1984,10 @@ multidict = ">=4.0" propcache = ">=0.2.0" [extras] -asyncio = ["aiohttp"] +asyncio = ["aiohttp", "aiohttp-retry"] grpc = ["googleapis-common-protos", "grpcio", "grpcio", "grpcio", "lz4", "protobuf", "protoc-gen-openapiv2"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "0145fb2ae02a1cdd6fe06b191a6761dcee4f4c67fe057b48d6b501d7b0b504da" +content-hash = "cc8b764abfc3d9ba774410ef118817c736c3c74a2bfa7f9f32a462628d804739" diff --git a/pyproject.toml b/pyproject.toml index 0a239e3a..7b987cbe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ protoc-gen-openapiv2 = {version = "^0.0.1", optional = true } pinecone-plugin-interface = "^0.0.7" python-dateutil = ">=2.5.3" aiohttp = { version = ">=3.9.0", optional = true } +aiohttp-retry = { version = "^2.9.1", optional = true } [tool.poetry.group.types] optional = true @@ -102,10 +103,9 @@ vprof = "^0.38" tuna = "^0.5.11" python-dotenv = "^1.1.0" - [tool.poetry.extras] grpc = ["grpcio", "googleapis-common-protos", "lz4", "protobuf", "protoc-gen-openapiv2"] -asyncio = ["aiohttp"] +asyncio = ["aiohttp", "aiohttp-retry"] [build-system] requires = ["poetry-core"] diff --git a/tests/unit/openapi_support/test_retries.py b/tests/unit/openapi_support/test_retries.py index 5f31221d..ff624938 100644 --- a/tests/unit/openapi_support/test_retries.py +++ b/tests/unit/openapi_support/test_retries.py @@ -2,7 +2,7 @@ from unittest.mock import patch, MagicMock from urllib3.exceptions import MaxRetryError from urllib3.util.retry import Retry -from pinecone.openapi_support.retries import JitterRetry +from pinecone.openapi_support.retry_urllib3 import JitterRetry def test_jitter_retry_backoff(): From 67fc321618fa5508861680607ff6ea968cb69e2c Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Fri, 16 May 2025 13:47:11 -0400 Subject: [PATCH 2/3] Add missing files --- pinecone/openapi_support/rest_aiohttp.py | 21 ++++++----- pinecone/openapi_support/retry_aiohttp.py | 46 +++++++++++++++++++++++ pinecone/openapi_support/retry_urllib3.py | 21 +++++++++++ scripts/test-async-retry.py | 16 ++++++++ 4 files changed, 95 insertions(+), 9 deletions(-) create mode 100644 pinecone/openapi_support/retry_aiohttp.py create mode 100644 pinecone/openapi_support/retry_urllib3.py create mode 100644 scripts/test-async-retry.py diff --git a/pinecone/openapi_support/rest_aiohttp.py b/pinecone/openapi_support/rest_aiohttp.py index 2a32342b..8b84e850 100644 --- a/pinecone/openapi_support/rest_aiohttp.py +++ b/pinecone/openapi_support/rest_aiohttp.py @@ -30,15 +30,18 @@ def __init__(self, configuration: Configuration) -> None: else: self._session = aiohttp.ClientSession(connector=conn) - jitter_retry = JitterRetry( - attempts=5, - start_timeout=0.1, - max_timeout=3.0, - statuses={500, 502, 503, 504}, - methods=None, # retry on all methods - exceptions={aiohttp.ClientError, aiohttp.ServerDisconnectedError}, - ) - self._retry_client = RetryClient(client_session=self._session, retry_options=jitter_retry) + if configuration.retries is not None: + retry_options = configuration.retries + else: + retry_options = JitterRetry( + attempts=5, + start_timeout=0.1, + max_timeout=3.0, + statuses={500, 502, 503, 504}, + methods=None, # retry on all methods + exceptions={aiohttp.ClientError, aiohttp.ServerDisconnectedError}, + ) + self._retry_client = RetryClient(client_session=self._session, retry_options=retry_options) async def close(self): await self._retry_client.close() diff --git a/pinecone/openapi_support/retry_aiohttp.py b/pinecone/openapi_support/retry_aiohttp.py new file mode 100644 index 00000000..e42bb87d --- /dev/null +++ b/pinecone/openapi_support/retry_aiohttp.py @@ -0,0 +1,46 @@ +import random +from typing import Optional +from aiohttp_retry import RetryOptionsBase, EvaluateResponseCallbackType, ClientResponse +import logging + +logger = logging.getLogger(__name__) + + +class JitterRetry(RetryOptionsBase): + """https://github.com/inyutin/aiohttp_retry/issues/44.""" + + def __init__( + self, + attempts: Optional[int] = 3, # How many times we should retry + start_timeout: Optional[float] = 0.1, # Base timeout time, then it exponentially grow + max_timeout: Optional[float] = 5.0, # Max possible timeout between tries + statuses: Optional[set[int]] = None, # On which statuses we should retry + exceptions: Optional[set[type[Exception]]] = None, # On which exceptions we should retry + methods: Optional[set[str]] = None, # On which HTTP methods we should retry + random_interval_size: Optional[float] = 2.0, # size of interval for random component + retry_all_server_errors: bool = True, + evaluate_response_callback: Optional[EvaluateResponseCallbackType] = None, + ) -> None: + super().__init__( + attempts=attempts, + statuses=statuses, + exceptions=exceptions, + methods=methods, + retry_all_server_errors=retry_all_server_errors, + evaluate_response_callback=evaluate_response_callback, + ) + + self._start_timeout: float = start_timeout + self._max_timeout: float = max_timeout + self._random_interval_size = random_interval_size + + def get_timeout( + self, + attempt: int, + response: Optional[ClientResponse] = None, # noqa: ARG002 + ) -> float: + logger.debug(f"JitterRetry get_timeout: attempt={attempt}, response={response}") + """Return timeout with exponential backoff.""" + jitter = random.uniform(0, 0.1) + timeout = self._start_timeout * (2 ** (attempt - 1)) + return min(timeout + jitter, self._max_timeout) diff --git a/pinecone/openapi_support/retry_urllib3.py b/pinecone/openapi_support/retry_urllib3.py new file mode 100644 index 00000000..2b91a31d --- /dev/null +++ b/pinecone/openapi_support/retry_urllib3.py @@ -0,0 +1,21 @@ +import random +from urllib3.util.retry import Retry +import logging + +logger = logging.getLogger(__name__) + + +class JitterRetry(Retry): + """ + Retry with exponential back‑off with jitter. + + The Retry class is being extended as built-in support for jitter was added only in urllib3 2.0.0. + Jitter logic is following the official implementation with a constant jitter factor: https://github.com/urllib3/urllib3/blob/main/src/urllib3/util/retry.py + """ + + def get_backoff_time(self) -> float: + backoff_value = super().get_backoff_time() + jitter = random.random() * 0.25 + backoff_value += jitter + logger.debug(f"Calculating retry backoff: {backoff_value} (jitter: {jitter})") + return backoff_value diff --git a/scripts/test-async-retry.py b/scripts/test-async-retry.py new file mode 100644 index 00000000..ca5f9bb4 --- /dev/null +++ b/scripts/test-async-retry.py @@ -0,0 +1,16 @@ +import dotenv +import asyncio +import logging +from pinecone import PineconeAsyncio + +dotenv.load_dotenv() + +logging.basicConfig(level=logging.DEBUG) + + +async def main(): + async with PineconeAsyncio(host="http://localhost:8000") as pc: + await pc.db.index.list() + + +asyncio.run(main()) From a9bd8870372a7e9eb5e178203288e41db6598cc1 Mon Sep 17 00:00:00 2001 From: Jen Hamon Date: Fri, 16 May 2025 13:53:55 -0400 Subject: [PATCH 3/3] Fix mypy errors --- pinecone/openapi_support/retry_aiohttp.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pinecone/openapi_support/retry_aiohttp.py b/pinecone/openapi_support/retry_aiohttp.py index e42bb87d..2b3019e7 100644 --- a/pinecone/openapi_support/retry_aiohttp.py +++ b/pinecone/openapi_support/retry_aiohttp.py @@ -11,13 +11,12 @@ class JitterRetry(RetryOptionsBase): def __init__( self, - attempts: Optional[int] = 3, # How many times we should retry - start_timeout: Optional[float] = 0.1, # Base timeout time, then it exponentially grow - max_timeout: Optional[float] = 5.0, # Max possible timeout between tries + attempts: int = 3, # How many times we should retry + start_timeout: float = 0.1, # Base timeout time, then it exponentially grow + max_timeout: float = 5.0, # Max possible timeout between tries statuses: Optional[set[int]] = None, # On which statuses we should retry exceptions: Optional[set[type[Exception]]] = None, # On which exceptions we should retry methods: Optional[set[str]] = None, # On which HTTP methods we should retry - random_interval_size: Optional[float] = 2.0, # size of interval for random component retry_all_server_errors: bool = True, evaluate_response_callback: Optional[EvaluateResponseCallbackType] = None, ) -> None: @@ -32,7 +31,6 @@ def __init__( self._start_timeout: float = start_timeout self._max_timeout: float = max_timeout - self._random_interval_size = random_interval_size def get_timeout( self,