diff --git a/estuary-cdk/estuary_cdk/capture/base_capture_connector.py b/estuary-cdk/estuary_cdk/capture/base_capture_connector.py index a1a8817679..9050c1c24f 100644 --- a/estuary-cdk/estuary_cdk/capture/base_capture_connector.py +++ b/estuary-cdk/estuary_cdk/capture/base_capture_connector.py @@ -184,7 +184,7 @@ async def _encrypt_config( } encrypted_config = await self.request( - log, ENCRYPTION_URL, "POST", json=body, _with_token=False + log, ENCRYPTION_URL, "POST", json=body, with_token=False ) return json.loads(encrypted_config.decode("utf-8")) diff --git a/estuary-cdk/estuary_cdk/http.py b/estuary-cdk/estuary_cdk/http.py index 1c862c5716..206f72192d 100644 --- a/estuary-cdk/estuary_cdk/http.py +++ b/estuary-cdk/estuary_cdk/http.py @@ -11,7 +11,7 @@ from google.auth.credentials import TokenState as GoogleTokenState from google.auth.transport.requests import Request as GoogleAuthRequest from google.oauth2.service_account import Credentials as GoogleServiceAccountCredentials -from multidict import CIMultiDict +from multidict import CIMultiDictProxy from pydantic import BaseModel from . import Mixin @@ -37,7 +37,7 @@ T = TypeVar("T") -Headers = CIMultiDict[str] +Headers = CIMultiDictProxy[str] BodyGeneratorFunction = Callable[[], AsyncGenerator[bytes, None]] HeadersAndBodyGenerator = tuple[Headers, BodyGeneratorFunction] @@ -108,7 +108,7 @@ async def request( params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, form: dict[str, Any] | None = None, - _with_token: bool = True, # Unstable internal API. + with_token: bool = True, headers: dict[str, Any] | None = None, should_retry: ShouldRetryProtocol | None = None, ) -> bytes: @@ -122,7 +122,7 @@ async def request( params, json, form, - _with_token, + with_token, headers, should_retry, ) @@ -145,6 +145,7 @@ async def request_lines( params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, form: dict[str, Any] | None = None, + with_token: bool = True, delim: bytes = b"\n", headers: dict[str, Any] | None = None, should_retry: ShouldRetryProtocol | None = None, @@ -158,7 +159,7 @@ async def request_lines( params, json, form, - True, + with_token, headers, should_retry, ) @@ -184,14 +185,14 @@ async def request_stream( params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, form: dict[str, Any] | None = None, - _with_token: bool = True, # Unstable internal API. + with_token: bool = True, headers: dict[str, Any] | None = None, should_retry: ShouldRetryProtocol | None = None, ) -> tuple[Headers, BodyGeneratorFunction]: """Request a url and and return the raw response as a stream of bytes""" return await self._request_stream( - log, url, method, params, json, form, _with_token, headers, should_retry + log, url, method, params, json, form, with_token, headers, should_retry ) @abc.abstractmethod @@ -203,15 +204,11 @@ async def _request_stream( params: dict[str, Any] | None, json: dict[str, Any] | None, form: dict[str, Any] | None, - _with_token: bool, + with_token: bool, headers: dict[str, Any] | None = None, should_retry: ShouldRetryProtocol | None = None, ) -> HeadersAndBodyGenerator: ... - # TODO(johnny): This is an unstable API. - # It may need to accept request headers, or surface response headers, - # or we may refactor TokenSource, etc. - @dataclass class TokenSource: @@ -383,7 +380,7 @@ async def _fetch_oauth2_token( method="POST", headers=headers, form=form, - _with_token=False, + with_token=False, ) return self.AccessTokenResponse.model_validate_json(response) @@ -457,10 +454,10 @@ async def _establish_connection_and_get_response( params: dict[str, Any] | None, json: dict[str, Any] | None, form: dict[str, Any] | None, - _with_token: bool, + with_token: bool, headers: dict[str, Any], ): - if _with_token and self.token_source is not None: + if with_token and self.token_source is not None: token_type, token = await self.token_source.fetch_token(log, self) header_value = ( f"{token_type} {token}" @@ -526,7 +523,7 @@ async def _request_stream( params: dict[str, Any] | None, json: dict[str, Any] | None, form: dict[str, Any] | None, - _with_token: bool, + with_token: bool, headers: dict[str, Any] | None = None, should_retry: ShouldRetryProtocol | None = None, ) -> HeadersAndBodyGenerator: @@ -550,7 +547,7 @@ async def _request_stream( params, json, form, - _with_token, + with_token, headers, ), ) diff --git a/source-apple-app-store/source_apple_app_store/client.py b/source-apple-app-store/source_apple_app_store/client.py index c320f30d06..d930948a2f 100644 --- a/source-apple-app-store/source_apple_app_store/client.py +++ b/source-apple-app-store/source_apple_app_store/client.py @@ -300,7 +300,7 @@ async def stream_tsv_data( _, body_generator = await self.http.request_stream( self.log, download_url, - _with_token=False, # Apple's API returns signed/authenticated URLs; using Authorization header causes HTTP 400 errors + with_token=False, # Apple's API returns signed/authenticated URLs; using Authorization header causes HTTP 400 errors ) async def uncompressed_body() -> AsyncGenerator[bytes, None]: