From 2b9dc9091a11f1c43f74d845892fc1cfa8d77498 Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Wed, 7 Jan 2026 11:39:49 -0500 Subject: [PATCH 1/3] estuary-cdk: make `_with_token` part of the public API The `_with_token` parameter is used to control whether or not to include a token from the `TokenSource` in HTTP requests made through an `HTTPSession`. For most requests, we do want to include the token in an Authorization header. However, there are legitimate situations where we don't want to include that token, like during OAuth token exchange or when using a pre-signed URL. During the original development of the CDK, `_with_token` was marked as an unstable internal API. However, the CDK has developed since then; we've added many of the features mentioned in Johnny's original TODO about why this was an unstable API. While there's still other improvements we want to make to the CDK, I think `HTTPSession` is mature enough to consider `with_token` a part of the public API. --- .../capture/base_capture_connector.py | 2 +- estuary-cdk/estuary_cdk/http.py | 24 ++++++++----------- .../source_apple_app_store/client.py | 2 +- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/estuary-cdk/estuary_cdk/capture/base_capture_connector.py b/estuary-cdk/estuary_cdk/capture/base_capture_connector.py index a1a8817679..9050c1c24f 100644 --- a/estuary-cdk/estuary_cdk/capture/base_capture_connector.py +++ b/estuary-cdk/estuary_cdk/capture/base_capture_connector.py @@ -184,7 +184,7 @@ async def _encrypt_config( } encrypted_config = await self.request( - log, ENCRYPTION_URL, "POST", json=body, _with_token=False + log, ENCRYPTION_URL, "POST", json=body, with_token=False ) return json.loads(encrypted_config.decode("utf-8")) diff --git a/estuary-cdk/estuary_cdk/http.py b/estuary-cdk/estuary_cdk/http.py index 1c862c5716..28b49bf3f5 100644 --- a/estuary-cdk/estuary_cdk/http.py +++ b/estuary-cdk/estuary_cdk/http.py @@ -108,7 +108,7 @@ async def request( params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, form: dict[str, Any] | None = None, - _with_token: bool = True, # Unstable internal API. + with_token: bool = True, headers: dict[str, Any] | None = None, should_retry: ShouldRetryProtocol | None = None, ) -> bytes: @@ -122,7 +122,7 @@ async def request( params, json, form, - _with_token, + with_token, headers, should_retry, ) @@ -184,14 +184,14 @@ async def request_stream( params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, form: dict[str, Any] | None = None, - _with_token: bool = True, # Unstable internal API. + with_token: bool = True, headers: dict[str, Any] | None = None, should_retry: ShouldRetryProtocol | None = None, ) -> tuple[Headers, BodyGeneratorFunction]: """Request a url and and return the raw response as a stream of bytes""" return await self._request_stream( - log, url, method, params, json, form, _with_token, headers, should_retry + log, url, method, params, json, form, with_token, headers, should_retry ) @abc.abstractmethod @@ -203,15 +203,11 @@ async def _request_stream( params: dict[str, Any] | None, json: dict[str, Any] | None, form: dict[str, Any] | None, - _with_token: bool, + with_token: bool, headers: dict[str, Any] | None = None, should_retry: ShouldRetryProtocol | None = None, ) -> HeadersAndBodyGenerator: ... - # TODO(johnny): This is an unstable API. - # It may need to accept request headers, or surface response headers, - # or we may refactor TokenSource, etc. - @dataclass class TokenSource: @@ -383,7 +379,7 @@ async def _fetch_oauth2_token( method="POST", headers=headers, form=form, - _with_token=False, + with_token=False, ) return self.AccessTokenResponse.model_validate_json(response) @@ -457,10 +453,10 @@ async def _establish_connection_and_get_response( params: dict[str, Any] | None, json: dict[str, Any] | None, form: dict[str, Any] | None, - _with_token: bool, + with_token: bool, headers: dict[str, Any], ): - if _with_token and self.token_source is not None: + if with_token and self.token_source is not None: token_type, token = await self.token_source.fetch_token(log, self) header_value = ( f"{token_type} {token}" @@ -526,7 +522,7 @@ async def _request_stream( params: dict[str, Any] | None, json: dict[str, Any] | None, form: dict[str, Any] | None, - _with_token: bool, + with_token: bool, headers: dict[str, Any] | None = None, should_retry: ShouldRetryProtocol | None = None, ) -> HeadersAndBodyGenerator: @@ -550,7 +546,7 @@ async def _request_stream( params, json, form, - _with_token, + with_token, headers, ), ) diff --git a/source-apple-app-store/source_apple_app_store/client.py b/source-apple-app-store/source_apple_app_store/client.py index c320f30d06..d930948a2f 100644 --- a/source-apple-app-store/source_apple_app_store/client.py +++ b/source-apple-app-store/source_apple_app_store/client.py @@ -300,7 +300,7 @@ async def stream_tsv_data( _, body_generator = await self.http.request_stream( self.log, download_url, - _with_token=False, # Apple's API returns signed/authenticated URLs; using Authorization header causes HTTP 400 errors + with_token=False, # Apple's API returns signed/authenticated URLs; using Authorization header causes HTTP 400 errors ) async def uncompressed_body() -> AsyncGenerator[bytes, None]: From 821732289e53d8982a6178c03b96fa217f437f1b Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Wed, 7 Jan 2026 11:42:16 -0500 Subject: [PATCH 2/3] estuary-cdk: support omitting authentication tokens in `http.request_lines` All public `HTTPSession` methods already support the `_with_lines` parameter, so `request_lines` should too for congruency. --- estuary-cdk/estuary_cdk/http.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/estuary-cdk/estuary_cdk/http.py b/estuary-cdk/estuary_cdk/http.py index 28b49bf3f5..081e9d72f2 100644 --- a/estuary-cdk/estuary_cdk/http.py +++ b/estuary-cdk/estuary_cdk/http.py @@ -145,6 +145,7 @@ async def request_lines( params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, form: dict[str, Any] | None = None, + with_token: bool = True, delim: bytes = b"\n", headers: dict[str, Any] | None = None, should_retry: ShouldRetryProtocol | None = None, @@ -158,7 +159,7 @@ async def request_lines( params, json, form, - True, + with_token, headers, should_retry, ) From 328b663853bb30097aa6df7f8d71d446203c10ea Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Wed, 7 Jan 2026 11:44:56 -0500 Subject: [PATCH 3/3] estuary-cdk: fix Headers type to match aiohttp response headers --- estuary-cdk/estuary_cdk/http.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/estuary-cdk/estuary_cdk/http.py b/estuary-cdk/estuary_cdk/http.py index 081e9d72f2..206f72192d 100644 --- a/estuary-cdk/estuary_cdk/http.py +++ b/estuary-cdk/estuary_cdk/http.py @@ -11,7 +11,7 @@ from google.auth.credentials import TokenState as GoogleTokenState from google.auth.transport.requests import Request as GoogleAuthRequest from google.oauth2.service_account import Credentials as GoogleServiceAccountCredentials -from multidict import CIMultiDict +from multidict import CIMultiDictProxy from pydantic import BaseModel from . import Mixin @@ -37,7 +37,7 @@ T = TypeVar("T") -Headers = CIMultiDict[str] +Headers = CIMultiDictProxy[str] BodyGeneratorFunction = Callable[[], AsyncGenerator[bytes, None]] HeadersAndBodyGenerator = tuple[Headers, BodyGeneratorFunction]