12
12
from google .api_core import client_info as rest_client_info
13
13
from google .api_core .gapic_v1 import client_info as grpc_client_info
14
14
from google .cloud import bigquery , bigquery_storage
15
+ from google .oauth2 import service_account
15
16
16
17
import dask_bigquery
17
18
18
19
19
20
@contextmanager
20
- def bigquery_clients (project_id ):
21
+ def bigquery_clients (project_id , cred_fpath ):
21
22
"""This context manager is a temporary solution until there is an
22
23
upstream solution to handle this.
23
24
See googleapis/google-cloud-python#9457
@@ -30,7 +31,14 @@ def bigquery_clients(project_id):
30
31
user_agent = f"dask-bigquery/{ dask_bigquery .__version__ } "
31
32
)
32
33
33
- with bigquery .Client (project_id , client_info = bq_client_info ) as bq_client :
34
+ if cred_fpath :
35
+ credentials = service_account .Credentials .from_service_account_file (cred_fpath )
36
+ else :
37
+ credentials = cred_fpath # if no path set to None to try read default
38
+
39
+ with bigquery .Client (
40
+ project_id , credentials = credentials , client_info = bq_client_info
41
+ ) as bq_client :
34
42
bq_storage_client = bigquery_storage .BigQueryReadClient (
35
43
credentials = bq_client ._credentials ,
36
44
client_info = bqstorage_client_info ,
@@ -53,6 +61,7 @@ def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs):
53
61
def bigquery_read (
54
62
make_create_read_session_request : callable ,
55
63
project_id : str ,
64
+ cred_fpath : str ,
56
65
read_kwargs : dict ,
57
66
stream_name : str ,
58
67
) -> pd .DataFrame :
@@ -71,7 +80,7 @@ def bigquery_read(
71
80
NOTE: Please set if reading from Storage API without any `row_restriction`.
72
81
https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream
73
82
"""
74
- with bigquery_clients (project_id ) as (_ , bqs_client ):
83
+ with bigquery_clients (project_id , cred_fpath ) as (_ , bqs_client ):
75
84
session = bqs_client .create_read_session (make_create_read_session_request ())
76
85
schema = pyarrow .ipc .read_schema (
77
86
pyarrow .py_buffer (session .arrow_schema .serialized_schema )
@@ -89,6 +98,8 @@ def read_gbq(
89
98
dataset_id : str ,
90
99
table_id : str ,
91
100
row_filter = "" ,
101
+ * ,
102
+ cred_fpath : str = None ,
92
103
read_kwargs : dict = None ,
93
104
):
94
105
"""Read table as dask dataframe using BigQuery Storage API via Arrow format.
@@ -104,6 +115,8 @@ def read_gbq(
104
115
BigQuery table within dataset
105
116
row_filter: str
106
117
SQL text filtering statement to pass to `row_restriction`
118
+ cred_fpath: str
119
+ path for the service account key json file.
107
120
read_kwargs: dict
108
121
kwargs to pass to read_rows()
109
122
@@ -112,7 +125,7 @@ def read_gbq(
112
125
Dask DataFrame
113
126
"""
114
127
read_kwargs = read_kwargs or {}
115
- with bigquery_clients (project_id ) as (bq_client , bqs_client ):
128
+ with bigquery_clients (project_id , cred_fpath ) as (bq_client , bqs_client ):
116
129
table_ref = bq_client .get_table (f"{ dataset_id } .{ table_id } " )
117
130
if table_ref .table_type == "VIEW" :
118
131
raise TypeError ("Table type VIEW not supported" )
@@ -157,6 +170,7 @@ def make_create_read_session_request(row_filter=""):
157
170
bigquery_read ,
158
171
make_create_read_session_request ,
159
172
project_id ,
173
+ cred_fpath ,
160
174
read_kwargs ,
161
175
),
162
176
label = label ,
0 commit comments