Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion estuary-cdk/estuary_cdk/capture/base_capture_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def _encrypt_config(
}

encrypted_config = await self.request(
log, ENCRYPTION_URL, "POST", json=body, _with_token=False
log, ENCRYPTION_URL, "POST", json=body, with_token=False
)

return json.loads(encrypted_config.decode("utf-8"))
Expand Down
31 changes: 14 additions & 17 deletions estuary-cdk/estuary_cdk/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from google.auth.credentials import TokenState as GoogleTokenState
from google.auth.transport.requests import Request as GoogleAuthRequest
from google.oauth2.service_account import Credentials as GoogleServiceAccountCredentials
from multidict import CIMultiDict
from multidict import CIMultiDictProxy
from pydantic import BaseModel

from . import Mixin
Expand All @@ -37,7 +37,7 @@

T = TypeVar("T")

Headers = CIMultiDict[str]
Headers = CIMultiDictProxy[str]

BodyGeneratorFunction = Callable[[], AsyncGenerator[bytes, None]]
HeadersAndBodyGenerator = tuple[Headers, BodyGeneratorFunction]
Expand Down Expand Up @@ -108,7 +108,7 @@ async def request(
params: dict[str, Any] | None = None,
json: dict[str, Any] | None = None,
form: dict[str, Any] | None = None,
_with_token: bool = True, # Unstable internal API.
with_token: bool = True,
headers: dict[str, Any] | None = None,
should_retry: ShouldRetryProtocol | None = None,
) -> bytes:
Expand All @@ -122,7 +122,7 @@ async def request(
params,
json,
form,
_with_token,
with_token,
headers,
should_retry,
)
Expand All @@ -145,6 +145,7 @@ async def request_lines(
params: dict[str, Any] | None = None,
json: dict[str, Any] | None = None,
form: dict[str, Any] | None = None,
with_token: bool = True,
delim: bytes = b"\n",
headers: dict[str, Any] | None = None,
should_retry: ShouldRetryProtocol | None = None,
Expand All @@ -158,7 +159,7 @@ async def request_lines(
params,
json,
form,
True,
with_token,
headers,
should_retry,
)
Expand All @@ -184,14 +185,14 @@ async def request_stream(
params: dict[str, Any] | None = None,
json: dict[str, Any] | None = None,
form: dict[str, Any] | None = None,
_with_token: bool = True, # Unstable internal API.
with_token: bool = True,
headers: dict[str, Any] | None = None,
should_retry: ShouldRetryProtocol | None = None,
) -> tuple[Headers, BodyGeneratorFunction]:
"""Request a url and and return the raw response as a stream of bytes"""

return await self._request_stream(
log, url, method, params, json, form, _with_token, headers, should_retry
log, url, method, params, json, form, with_token, headers, should_retry
)

@abc.abstractmethod
Expand All @@ -203,15 +204,11 @@ async def _request_stream(
params: dict[str, Any] | None,
json: dict[str, Any] | None,
form: dict[str, Any] | None,
_with_token: bool,
with_token: bool,
headers: dict[str, Any] | None = None,
should_retry: ShouldRetryProtocol | None = None,
) -> HeadersAndBodyGenerator: ...

# TODO(johnny): This is an unstable API.
# It may need to accept request headers, or surface response headers,
# or we may refactor TokenSource, etc.


@dataclass
class TokenSource:
Expand Down Expand Up @@ -383,7 +380,7 @@ async def _fetch_oauth2_token(
method="POST",
headers=headers,
form=form,
_with_token=False,
with_token=False,
)
return self.AccessTokenResponse.model_validate_json(response)

Expand Down Expand Up @@ -457,10 +454,10 @@ async def _establish_connection_and_get_response(
params: dict[str, Any] | None,
json: dict[str, Any] | None,
form: dict[str, Any] | None,
_with_token: bool,
with_token: bool,
headers: dict[str, Any],
):
if _with_token and self.token_source is not None:
if with_token and self.token_source is not None:
token_type, token = await self.token_source.fetch_token(log, self)
header_value = (
f"{token_type} {token}"
Expand Down Expand Up @@ -526,7 +523,7 @@ async def _request_stream(
params: dict[str, Any] | None,
json: dict[str, Any] | None,
form: dict[str, Any] | None,
_with_token: bool,
with_token: bool,
headers: dict[str, Any] | None = None,
should_retry: ShouldRetryProtocol | None = None,
) -> HeadersAndBodyGenerator:
Expand All @@ -550,7 +547,7 @@ async def _request_stream(
params,
json,
form,
_with_token,
with_token,
headers,
),
)
Expand Down
2 changes: 1 addition & 1 deletion source-apple-app-store/source_apple_app_store/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ async def stream_tsv_data(
_, body_generator = await self.http.request_stream(
self.log,
download_url,
_with_token=False, # Apple's API returns signed/authenticated URLs; using Authorization header causes HTTP 400 errors
with_token=False, # Apple's API returns signed/authenticated URLs; using Authorization header causes HTTP 400 errors
)

async def uncompressed_body() -> AsyncGenerator[bytes, None]:
Expand Down
Loading