Skip to content

Commit 54b400b

Browse files
committed
estuary-cdk: retry connection timeouts
It's a good idea to retry connection timeouts a few times in case of intermittent errors from flaky APIs. This is safe to do in the request() method since it consumes the entire response body before returning, unlike request_lines() and request_stream() which yield chunks that may have already been processed before a timeout occurs.
1 parent 184c31b commit 54b400b

File tree

1 file changed

+27
-12
lines changed

1 file changed

+27
-12
lines changed

estuary-cdk/estuary_cdk/http.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from dataclasses import dataclass
22
from logging import Logger
3+
from aiohttp.client_exceptions import ConnectionTimeoutError
34
from estuary_cdk.incremental_json_processor import Remainder
45
from pydantic import BaseModel
56
from typing import AsyncGenerator, Any, TypeVar, Union, Callable
@@ -78,20 +79,34 @@ async def request(
7879
) -> bytes:
7980
"""Request a url and return its body as bytes"""
8081

81-
chunks: list[bytes] = []
82-
_, body_generator = await self._request_stream(
83-
log, url, method, params, json, form, _with_token, headers
84-
)
82+
max_attempts = 3
83+
attempt = 1
8584

86-
async for chunk in body_generator():
87-
chunks.append(chunk)
85+
while True:
86+
try:
87+
chunks: list[bytes] = []
88+
_, body_generator = await self._request_stream(
89+
log, url, method, params, json, form, _with_token, headers
90+
)
8891

89-
if len(chunks) == 0:
90-
return b""
91-
elif len(chunks) == 1:
92-
return chunks[0]
93-
else:
94-
return b"".join(chunks)
92+
async for chunk in body_generator():
93+
chunks.append(chunk)
94+
95+
if len(chunks) == 0:
96+
return b""
97+
elif len(chunks) == 1:
98+
return chunks[0]
99+
else:
100+
return b"".join(chunks)
101+
except ConnectionTimeoutError as e:
102+
if attempt <= max_attempts:
103+
log.warning(
104+
f"Connection timeout error (will retry)",
105+
{"url": url, "method": method, "attempt": attempt, "error": str(e)}
106+
)
107+
attempt += 1
108+
else:
109+
raise
95110

96111
async def request_lines(
97112
self,

0 commit comments

Comments
 (0)