Skip to content

Add retry configuration for asyncio #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions pinecone/openapi_support/rest_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class AiohttpRestClient(RestClientInterface):
def __init__(self, configuration: Configuration) -> None:
try:
import aiohttp
from aiohttp_retry import RetryClient
from .retry_aiohttp import JitterRetry
except ImportError:
raise ImportError(
"Additional dependencies are required to use Pinecone with asyncio. Include these extra dependencies in your project by installing `pinecone[asyncio]`."
Expand All @@ -28,8 +30,21 @@ def __init__(self, configuration: Configuration) -> None:
else:
self._session = aiohttp.ClientSession(connector=conn)

if configuration.retries is not None:
retry_options = configuration.retries
else:
retry_options = JitterRetry(
attempts=5,
start_timeout=0.1,
max_timeout=3.0,
statuses={500, 502, 503, 504},
methods=None, # retry on all methods
exceptions={aiohttp.ClientError, aiohttp.ServerDisconnectedError},
)
self._retry_client = RetryClient(client_session=self._session, retry_options=retry_options)

async def close(self):
await self._session.close()
await self._retry_client.close()

async def request(
self,
Expand All @@ -48,7 +63,7 @@ async def request(
if "application/x-ndjson" in headers.get("Content-Type", "").lower():
ndjson_data = "\n".join(json.dumps(record) for record in body)

async with self._session.request(
async with self._retry_client.request(
method, url, params=query_params, headers=headers, data=ndjson_data
) as resp:
content = await resp.read()
Expand All @@ -57,7 +72,7 @@ async def request(
)

else:
async with self._session.request(
async with self._retry_client.request(
method, url, params=query_params, headers=headers, json=body
) as resp:
content = await resp.read()
Expand Down
2 changes: 1 addition & 1 deletion pinecone/openapi_support/rest_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .rest_utils import raise_exceptions_or_return, RESTResponse, RestClientInterface

import urllib3
from .retries import JitterRetry
from .retry_urllib3 import JitterRetry
from .exceptions import PineconeApiException, PineconeApiValueError


Expand Down
44 changes: 44 additions & 0 deletions pinecone/openapi_support/retry_aiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import random
from typing import Optional
from aiohttp_retry import RetryOptionsBase, EvaluateResponseCallbackType, ClientResponse
import logging

logger = logging.getLogger(__name__)


class JitterRetry(RetryOptionsBase):
"""https://github.com/inyutin/aiohttp_retry/issues/44."""

def __init__(
self,
attempts: int = 3, # How many times we should retry
start_timeout: float = 0.1, # Base timeout time, then it exponentially grow
max_timeout: float = 5.0, # Max possible timeout between tries
statuses: Optional[set[int]] = None, # On which statuses we should retry
exceptions: Optional[set[type[Exception]]] = None, # On which exceptions we should retry
methods: Optional[set[str]] = None, # On which HTTP methods we should retry
retry_all_server_errors: bool = True,
evaluate_response_callback: Optional[EvaluateResponseCallbackType] = None,
) -> None:
super().__init__(
attempts=attempts,
statuses=statuses,
exceptions=exceptions,
methods=methods,
retry_all_server_errors=retry_all_server_errors,
evaluate_response_callback=evaluate_response_callback,
)

self._start_timeout: float = start_timeout
self._max_timeout: float = max_timeout

def get_timeout(
self,
attempt: int,
response: Optional[ClientResponse] = None, # noqa: ARG002
) -> float:
logger.debug(f"JitterRetry get_timeout: attempt={attempt}, response={response}")
"""Return timeout with exponential backoff."""
jitter = random.uniform(0, 0.1)
timeout = self._start_timeout * (2 ** (attempt - 1))
return min(timeout + jitter, self._max_timeout)
18 changes: 16 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protoc-gen-openapiv2 = {version = "^0.0.1", optional = true }
pinecone-plugin-interface = "^0.0.7"
python-dateutil = ">=2.5.3"
aiohttp = { version = ">=3.9.0", optional = true }
aiohttp-retry = { version = "^2.9.1", optional = true }

[tool.poetry.group.types]
optional = true
Expand Down Expand Up @@ -102,10 +103,9 @@ vprof = "^0.38"
tuna = "^0.5.11"
python-dotenv = "^1.1.0"


[tool.poetry.extras]
grpc = ["grpcio", "googleapis-common-protos", "lz4", "protobuf", "protoc-gen-openapiv2"]
asyncio = ["aiohttp"]
asyncio = ["aiohttp", "aiohttp-retry"]

[build-system]
requires = ["poetry-core"]
Expand Down
16 changes: 16 additions & 0 deletions scripts/test-async-retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import dotenv
import asyncio
import logging
from pinecone import PineconeAsyncio

dotenv.load_dotenv()

logging.basicConfig(level=logging.DEBUG)


async def main():
async with PineconeAsyncio(host="http://localhost:8000") as pc:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there also a scripts/test-server.py file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already in. It got committed in a previous diff for urllib3 retries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, was checking main not the branch 🤦‍♂️

await pc.db.index.list()


asyncio.run(main())
2 changes: 1 addition & 1 deletion tests/unit/openapi_support/test_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from unittest.mock import patch, MagicMock
from urllib3.exceptions import MaxRetryError
from urllib3.util.retry import Retry
from pinecone.openapi_support.retries import JitterRetry
from pinecone.openapi_support.retry_urllib3 import JitterRetry


def test_jitter_retry_backoff():
Expand Down