13
13
from google .api_core .gapic_v1 import client_info as grpc_client_info
14
14
from google .cloud import bigquery , bigquery_storage
15
15
from google .oauth2 import service_account
16
+ from google .oauth2 .service_account import Credentials
16
17
17
18
import dask_bigquery
18
19
19
20
20
21
@contextmanager
21
- def bigquery_clients (project_id , cred_fpath ):
22
+ def bigquery_clients (project_id , credentials ):
22
23
"""This context manager is a temporary solution until there is an
23
24
upstream solution to handle this.
24
25
See googleapis/google-cloud-python#9457
@@ -31,11 +32,6 @@ def bigquery_clients(project_id, cred_fpath):
31
32
user_agent = f"dask-bigquery/{ dask_bigquery .__version__ } "
32
33
)
33
34
34
- if cred_fpath :
35
- credentials = service_account .Credentials .from_service_account_file (cred_fpath )
36
- else :
37
- credentials = cred_fpath # if no path set to None to try read default
38
-
39
35
with bigquery .Client (
40
36
project_id , credentials = credentials , client_info = bq_client_info
41
37
) as bq_client :
@@ -61,7 +57,7 @@ def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs):
61
57
def bigquery_read (
62
58
make_create_read_session_request : callable ,
63
59
project_id : str ,
64
- cred_fpath : str ,
60
+ credentials : Credentials ,
65
61
read_kwargs : dict ,
66
62
stream_name : str ,
67
63
) -> pd .DataFrame :
@@ -80,7 +76,7 @@ def bigquery_read(
80
76
NOTE: Please set if reading from Storage API without any `row_restriction`.
81
77
https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream
82
78
"""
83
- with bigquery_clients (project_id , cred_fpath ) as (_ , bqs_client ):
79
+ with bigquery_clients (project_id , credentials ) as (_ , bqs_client ):
84
80
session = bqs_client .create_read_session (make_create_read_session_request ())
85
81
schema = pyarrow .ipc .read_schema (
86
82
pyarrow .py_buffer (session .arrow_schema .serialized_schema )
@@ -125,7 +121,13 @@ def read_gbq(
125
121
Dask DataFrame
126
122
"""
127
123
read_kwargs = read_kwargs or {}
128
- with bigquery_clients (project_id , cred_fpath ) as (bq_client , bqs_client ):
124
+
125
+ if cred_fpath :
126
+ credentials = service_account .Credentials .from_service_account_file (cred_fpath )
127
+ else :
128
+ credentials = cred_fpath # if no path set to None to try read default
129
+
130
+ with bigquery_clients (project_id , credentials ) as (bq_client , bqs_client ):
129
131
table_ref = bq_client .get_table (f"{ dataset_id } .{ table_id } " )
130
132
if table_ref .table_type == "VIEW" :
131
133
raise TypeError ("Table type VIEW not supported" )
@@ -170,7 +172,7 @@ def make_create_read_session_request(row_filter=""):
170
172
bigquery_read ,
171
173
make_create_read_session_request ,
172
174
project_id ,
173
- cred_fpath ,
175
+ credentials ,
174
176
read_kwargs ,
175
177
),
176
178
label = label ,
0 commit comments