Skip to content

Commit 3a483ae

Browse files
committed
estuary-cdk: add support for GoogleServiceAccount credentials
Google service account credentials will be needed for the upcoming `source-google-play` connector. There are also other Google connectors (like `source-google-analytics-data-api-native` and probably others) that would could add support for service accounts. I considered not relying on the `google-auth` package and manually exchange the service account JSON for an access token. But after digging into how the `google-auth` package handles that, it's a significant amount of code that could be pretty fragile if Google decides to tweak that process in any way.
1 parent 3119c3e commit 3a483ae

File tree

4 files changed

+119
-2
lines changed

4 files changed

+119
-2
lines changed

estuary-cdk/estuary_cdk/flow.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,3 +303,20 @@ class _OAuth2Credentials(RotatingOAuth2Credentials):
303303
def _you_must_build_oauth2_credentials_for_a_provider(self): ...
304304

305305
return _OAuth2Credentials
306+
307+
308+
class GoogleServiceAccountSpec(BaseModel):
309+
scopes: list[str]
310+
311+
312+
class GoogleServiceAccount(BaseModel):
313+
credentials_title: Literal["Google Service Account"] = Field(
314+
default="Google Service Account",
315+
json_schema_extra={"type": "string", "order": 0}
316+
)
317+
service_account: str = Field(
318+
title="Google Service Account",
319+
description="Service account JSON key",
320+
json_schema_extra={"secret": True, "multiline": True, "order": 1},
321+
)
322+

estuary-cdk/estuary_cdk/http.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,14 @@
66
import aiohttp
77
import asyncio
88
import base64
9+
import json
910
import time
1011

12+
13+
from google.auth.credentials import TokenState as GoogleTokenState
14+
from google.auth.transport.requests import Request as GoogleAuthRequest
15+
from google.oauth2.service_account import Credentials as GoogleServiceAccountCredentials
16+
1117
from . import Mixin
1218
from .flow import (
1319
AccessToken,
@@ -21,6 +27,8 @@
2127
RotatingOAuth2Credentials,
2228
OAuth2Spec,
2329
OAuth2RotatingTokenSpec,
30+
GoogleServiceAccount,
31+
GoogleServiceAccountSpec,
2432
)
2533

2634
DEFAULT_AUTHORIZATION_HEADER = "Authorization"
@@ -205,9 +213,11 @@ class AccessTokenResponse(BaseModel):
205213
| LongLivedClientCredentialsOAuth2Credentials
206214
| AccessToken
207215
| BasicAuth
216+
| GoogleServiceAccount
208217
)
209218
authorization_header: str = DEFAULT_AUTHORIZATION_HEADER
210-
_access_token: AccessTokenResponse | None = None
219+
google_spec: GoogleServiceAccountSpec | None = None
220+
_access_token: AccessTokenResponse | GoogleServiceAccountCredentials | None = None
211221
_fetched_at: int = 0
212222

213223
async def fetch_token(self, log: Logger, session: HTTPSession) -> tuple[str, str]:
@@ -230,6 +240,25 @@ async def fetch_token(self, log: Logger, session: HTTPSession) -> tuple[str, str
230240
f"{self.credentials.username}:{self.credentials.password}".encode()
231241
).decode(),
232242
)
243+
elif isinstance(self.credentials, GoogleServiceAccount):
244+
assert isinstance(self.google_spec, GoogleServiceAccountSpec)
245+
if self._access_token is None:
246+
self._access_token = GoogleServiceAccountCredentials.from_service_account_info(
247+
json.loads(self.credentials.service_account),
248+
scopes=self.google_spec.scopes,
249+
)
250+
251+
assert isinstance(self._access_token, GoogleServiceAccountCredentials)
252+
253+
match self._access_token.token_state:
254+
case GoogleTokenState.FRESH:
255+
pass
256+
case GoogleTokenState.STALE | GoogleTokenState.INVALID:
257+
self._access_token.refresh(GoogleAuthRequest())
258+
case _:
259+
raise RuntimeError(f"Unknown GoogleTokenState: {self._access_token.token_state}")
260+
261+
return ("Bearer", self._access_token.token)
233262

234263
assert (
235264
isinstance(self.credentials, BaseOAuth2Credentials)
@@ -240,6 +269,7 @@ async def fetch_token(self, log: Logger, session: HTTPSession) -> tuple[str, str
240269
current_time = time.time()
241270

242271
if self._access_token is not None:
272+
assert isinstance(self._access_token, self.AccessTokenResponse)
243273
horizon = self._fetched_at + self._access_token.expires_in * 0.75
244274

245275
if current_time < horizon:

estuary-cdk/poetry.lock

Lines changed: 70 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

estuary-cdk/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pycron = "^3.1.2"
1919
stream-unzip = "0.0.99"
2020

2121
airbyte-cdk = "<=6.56.0"
22+
google-auth = ">=2.40.3"
2223
pendulum = "<=2.0.0 | >=3.0.0"
2324
nltk = "^3.9.1"
2425

0 commit comments

Comments
 (0)