Skip to content

Commit 0e17d80

Browse files
committed
estuary-cdk: serialize token fetches
Concurrent tasks can race to refresh an expiring token and can result in 401 responses depending on the source system's behavior. Each task independently calls the OAuth2 token endpoint to obtain a new token without any coordination between them. Some providers revoke the previous token upon issuing a new one, so a task that received the first new token finds it already revoked by the time it makes an API request and can result in a `401` response that crashes the connector. Fix by adding an asyncio.Lock to TokenSource.fetch_token(). The lock serializes access to the check-and-refresh logic in the new _fetch_token method. In the common case (cached token still valid), no await occurs while the lock is held, so there is no actual contention. This _does_ serialize access for connectors that have non-expiring tokens too, but those code paths contain no await points, so they were already effectively atomic and the lock adds no meaningful overhead.
1 parent e6a4737 commit 0e17d80

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

estuary-cdk/estuary_cdk/http.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import base64
44
import json
55
import time
6-
from dataclasses import dataclass
6+
from dataclasses import dataclass, field
77
from logging import Logger
88
from typing import Any, AsyncGenerator, Awaitable, Callable, Protocol, TypeVar
99

@@ -277,8 +277,18 @@ class AccessTokenResponse(BaseModel):
277277
google_spec: GoogleServiceAccountSpec | None = None
278278
_access_token: AccessTokenResponse | GoogleServiceAccountCredentials | None = None
279279
_fetched_at: int = 0
280+
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
280281

281282
async def fetch_token(self, log: Logger, session: HTTPSession) -> tuple[str, str]:
283+
# Serialize token fetches so that concurrent tasks don't race to refresh
284+
# an expiring token. Without this, multiple tasks can each request a new
285+
# token from the OAuth2 provider. Some providers revoke the previous token
286+
# when a new one is issued, causing in-flight requests that used the old
287+
# token to fail.
288+
async with self._lock:
289+
return await self._fetch_token(log, session)
290+
291+
async def _fetch_token(self, log: Logger, session: HTTPSession) -> tuple[str, str]:
282292
if isinstance(
283293
self.credentials,
284294
(

0 commit comments

Comments
 (0)