diff --git a/pinecone/openapi_support/rest_aiohttp.py b/pinecone/openapi_support/rest_aiohttp.py index 3cab099a..8b84e850 100644 --- a/pinecone/openapi_support/rest_aiohttp.py +++ b/pinecone/openapi_support/rest_aiohttp.py @@ -9,6 +9,8 @@ class AiohttpRestClient(RestClientInterface): def __init__(self, configuration: Configuration) -> None: try: import aiohttp + from aiohttp_retry import RetryClient + from .retry_aiohttp import JitterRetry except ImportError: raise ImportError( "Additional dependencies are required to use Pinecone with asyncio. Include these extra dependencies in your project by installing `pinecone[asyncio]`." @@ -28,8 +30,21 @@ def __init__(self, configuration: Configuration) -> None: else: self._session = aiohttp.ClientSession(connector=conn) + if configuration.retries is not None: + retry_options = configuration.retries + else: + retry_options = JitterRetry( + attempts=5, + start_timeout=0.1, + max_timeout=3.0, + statuses={500, 502, 503, 504}, + methods=None, # retry on all methods + exceptions={aiohttp.ClientError, aiohttp.ServerDisconnectedError}, + ) + self._retry_client = RetryClient(client_session=self._session, retry_options=retry_options) + async def close(self): - await self._session.close() + await self._retry_client.close() async def request( self, @@ -48,7 +63,7 @@ async def request( if "application/x-ndjson" in headers.get("Content-Type", "").lower(): ndjson_data = "\n".join(json.dumps(record) for record in body) - async with self._session.request( + async with self._retry_client.request( method, url, params=query_params, headers=headers, data=ndjson_data ) as resp: content = await resp.read() @@ -57,7 +72,7 @@ async def request( ) else: - async with self._session.request( + async with self._retry_client.request( method, url, params=query_params, headers=headers, json=body ) as resp: content = await resp.read() diff --git a/pinecone/openapi_support/rest_urllib3.py b/pinecone/openapi_support/rest_urllib3.py index f310ca99..3f718347 100644 --- a/pinecone/openapi_support/rest_urllib3.py +++ b/pinecone/openapi_support/rest_urllib3.py @@ -8,7 +8,7 @@ from .rest_utils import raise_exceptions_or_return, RESTResponse, RestClientInterface import urllib3 -from .retries import JitterRetry +from .retry_urllib3 import JitterRetry from .exceptions import PineconeApiException, PineconeApiValueError diff --git a/pinecone/openapi_support/retry_aiohttp.py b/pinecone/openapi_support/retry_aiohttp.py new file mode 100644 index 00000000..2b3019e7 --- /dev/null +++ b/pinecone/openapi_support/retry_aiohttp.py @@ -0,0 +1,44 @@ +import random +from typing import Optional +from aiohttp_retry import RetryOptionsBase, EvaluateResponseCallbackType, ClientResponse +import logging + +logger = logging.getLogger(__name__) + + +class JitterRetry(RetryOptionsBase): + """https://github.com/inyutin/aiohttp_retry/issues/44.""" + + def __init__( + self, + attempts: int = 3, # How many times we should retry + start_timeout: float = 0.1, # Base timeout time, then it exponentially grow + max_timeout: float = 5.0, # Max possible timeout between tries + statuses: Optional[set[int]] = None, # On which statuses we should retry + exceptions: Optional[set[type[Exception]]] = None, # On which exceptions we should retry + methods: Optional[set[str]] = None, # On which HTTP methods we should retry + retry_all_server_errors: bool = True, + evaluate_response_callback: Optional[EvaluateResponseCallbackType] = None, + ) -> None: + super().__init__( + attempts=attempts, + statuses=statuses, + exceptions=exceptions, + methods=methods, + retry_all_server_errors=retry_all_server_errors, + evaluate_response_callback=evaluate_response_callback, + ) + + self._start_timeout: float = start_timeout + self._max_timeout: float = max_timeout + + def get_timeout( + self, + attempt: int, + response: Optional[ClientResponse] = None, # noqa: ARG002 + ) -> float: + logger.debug(f"JitterRetry get_timeout: attempt={attempt}, response={response}") + """Return timeout with exponential backoff.""" + jitter = random.uniform(0, 0.1) + timeout = self._start_timeout * (2 ** (attempt - 1)) + return min(timeout + jitter, self._max_timeout) diff --git a/pinecone/openapi_support/retries.py b/pinecone/openapi_support/retry_urllib3.py similarity index 100% rename from pinecone/openapi_support/retries.py rename to pinecone/openapi_support/retry_urllib3.py diff --git a/poetry.lock b/poetry.lock index 2e4de34b..823ed4af 100644 --- a/poetry.lock +++ b/poetry.lock @@ -109,6 +109,20 @@ yarl = ">=1.17.0,<2.0" [package.extras] speedups = ["Brotli", "aiodns (>=3.2.0)", "brotlicffi"] +[[package]] +name = "aiohttp-retry" +version = "2.9.1" +description = "Simple retry client for aiohttp" +optional = true +python-versions = ">=3.7" +files = [ + {file = "aiohttp_retry-2.9.1-py3-none-any.whl", hash = "sha256:66d2759d1921838256a05a3f80ad7e724936f083e35be5abb5e16eed6be6dc54"}, + {file = "aiohttp_retry-2.9.1.tar.gz", hash = "sha256:8eb75e904ed4ee5c2ec242fefe85bf04240f685391c4879d8f541d6028ff01f1"}, +] + +[package.dependencies] +aiohttp = "*" + [[package]] name = "aiosignal" version = "1.3.1" @@ -1970,10 +1984,10 @@ multidict = ">=4.0" propcache = ">=0.2.0" [extras] -asyncio = ["aiohttp"] +asyncio = ["aiohttp", "aiohttp-retry"] grpc = ["googleapis-common-protos", "grpcio", "grpcio", "grpcio", "lz4", "protobuf", "protoc-gen-openapiv2"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "0145fb2ae02a1cdd6fe06b191a6761dcee4f4c67fe057b48d6b501d7b0b504da" +content-hash = "cc8b764abfc3d9ba774410ef118817c736c3c74a2bfa7f9f32a462628d804739" diff --git a/pyproject.toml b/pyproject.toml index 0a239e3a..7b987cbe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ protoc-gen-openapiv2 = {version = "^0.0.1", optional = true } pinecone-plugin-interface = "^0.0.7" python-dateutil = ">=2.5.3" aiohttp = { version = ">=3.9.0", optional = true } +aiohttp-retry = { version = "^2.9.1", optional = true } [tool.poetry.group.types] optional = true @@ -102,10 +103,9 @@ vprof = "^0.38" tuna = "^0.5.11" python-dotenv = "^1.1.0" - [tool.poetry.extras] grpc = ["grpcio", "googleapis-common-protos", "lz4", "protobuf", "protoc-gen-openapiv2"] -asyncio = ["aiohttp"] +asyncio = ["aiohttp", "aiohttp-retry"] [build-system] requires = ["poetry-core"] diff --git a/scripts/test-async-retry.py b/scripts/test-async-retry.py new file mode 100644 index 00000000..ca5f9bb4 --- /dev/null +++ b/scripts/test-async-retry.py @@ -0,0 +1,16 @@ +import dotenv +import asyncio +import logging +from pinecone import PineconeAsyncio + +dotenv.load_dotenv() + +logging.basicConfig(level=logging.DEBUG) + + +async def main(): + async with PineconeAsyncio(host="http://localhost:8000") as pc: + await pc.db.index.list() + + +asyncio.run(main()) diff --git a/tests/unit/openapi_support/test_retries.py b/tests/unit/openapi_support/test_retries.py index 5f31221d..ff624938 100644 --- a/tests/unit/openapi_support/test_retries.py +++ b/tests/unit/openapi_support/test_retries.py @@ -2,7 +2,7 @@ from unittest.mock import patch, MagicMock from urllib3.exceptions import MaxRetryError from urllib3.util.retry import Retry -from pinecone.openapi_support.retries import JitterRetry +from pinecone.openapi_support.retry_urllib3 import JitterRetry def test_jitter_retry_backoff():