-
Notifications
You must be signed in to change notification settings - Fork 89
/
Copy pathclients.py
85 lines (65 loc) · 3.04 KB
/
clients.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from typing import Optional
from google.api_core.client_info import ClientInfo
from google.api_core.client_options import ClientOptions
from google.auth.exceptions import DefaultCredentialsError
from google.cloud import aiplatform_v1
from google.cloud.bigquery import Client as BigQueryClient, DEFAULT_RETRY as BQ_DEFAULT_RETRY
from google.cloud.dataproc_v1 import BatchControllerClient, JobControllerClient
from google.cloud.storage import Client as StorageClient
from google.cloud.storage.retry import DEFAULT_RETRY as GCS_DEFAULT_RETRY
from google.oauth2.credentials import Credentials as GoogleCredentials
from dbt.adapters.events.logging import AdapterLogger
import dbt.adapters.bigquery.__version__ as dbt_version
from dbt.adapters.bigquery.credentials import (
BigQueryCredentials,
create_google_credentials,
set_default_credentials,
)
_logger = AdapterLogger("BigQuery")
def create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
try:
return _create_bigquery_client(credentials)
except DefaultCredentialsError:
_logger.info("Please log into GCP to continue")
set_default_credentials()
return _create_bigquery_client(credentials)
@GCS_DEFAULT_RETRY
def create_gcs_client(credentials: BigQueryCredentials) -> StorageClient:
return StorageClient(
project=credentials.execution_project,
credentials=create_google_credentials(credentials),
)
# dataproc does not appear to have a default retry like BQ and GCS
def create_dataproc_job_controller_client(credentials: BigQueryCredentials) -> JobControllerClient:
return JobControllerClient(
credentials=create_google_credentials(credentials),
client_options=ClientOptions(api_endpoint=_dataproc_endpoint(credentials)),
)
# dataproc does not appear to have a default retry like BQ and GCS
def create_dataproc_batch_controller_client(
credentials: BigQueryCredentials,
) -> BatchControllerClient:
return BatchControllerClient(
credentials=create_google_credentials(credentials),
client_options=ClientOptions(api_endpoint=_dataproc_endpoint(credentials)),
)
@BQ_DEFAULT_RETRY
def _create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
return BigQueryClient(
credentials.execution_project,
create_google_credentials(credentials),
location=getattr(credentials, "location", None),
client_info=ClientInfo(user_agent=f"dbt-bigquery-{dbt_version.version}"),
client_options=ClientOptions(quota_project_id=credentials.quota_project),
)
def _dataproc_endpoint(credentials: BigQueryCredentials) -> str:
return f"{credentials.dataproc_region}-dataproc.googleapis.com:443"
def create_notebook_client(
credentials: GoogleCredentials, region: Optional[str]
) -> aiplatform_v1.NotebookServiceClient:
api_endpoint = f"{region}-aiplatform.googleapis.com"
notebook_client = aiplatform_v1.NotebookServiceClient(
credentials=credentials,
client_options=ClientOptions(api_endpoint),
)
return notebook_client