Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,19 @@ def copy_from_gcs(
Other arguments to pass to the underlying load_table_from_uri
call on the BigQuery client.
"""
self._validate_copy_inputs(if_exists=if_exists, data_type=data_type)
self._validate_copy_inputs(
if_exists=if_exists,
data_type=data_type,
accepted_data_types=[
"csv",
"json",
"parquet",
"datastore_backup",
"newline_delimited_json",
"avro",
"orc",
],
)

job_config = self._process_job_config(
job_config=job_config,
Expand Down Expand Up @@ -624,7 +636,11 @@ def copy_large_compressed_file_from_gcs(
client.
"""

self._validate_copy_inputs(if_exists=if_exists, data_type=data_type)
self._validate_copy_inputs(
if_exists=if_exists,
data_type=data_type,
accepted_data_types=["csv", "newline_delimited_json"],
)

job_config = self._process_job_config(
job_config=job_config,
Expand Down Expand Up @@ -1005,7 +1021,9 @@ def _prepare_local_upload_job(
):
data_type = "csv"

self._validate_copy_inputs(if_exists=if_exists, data_type=data_type)
self._validate_copy_inputs(
if_exists=if_exists, data_type=data_type, accepted_data_types=["csv"]
)

# If our source table is loaded from CSV with no transformations
# The original source file will be directly loaded to GCS
Expand Down Expand Up @@ -1559,14 +1577,15 @@ def _fetch_query_results(self, cursor) -> Table:
ptable = petl.frompickle(temp_filename)
return Table(ptable)

def _validate_copy_inputs(self, if_exists: str, data_type: str):
def _validate_copy_inputs(self, if_exists: str, data_type: str, accepted_data_types: list[str]):
if if_exists not in ["fail", "truncate", "append", "drop"]:
raise ValueError(
f"Unexpected value for if_exists: {if_exists}, must be one of "
'"append", "drop", "truncate", or "fail"'
)
if data_type not in ["csv", "json"]:
raise ValueError(f"Only supports csv or json files [data_type = {data_type}]")

if data_type not in accepted_data_types:
raise ValueError(f"Only supports {accepted_data_types} files [data_type = {data_type}]")

def _load_table_from_uri(
self, source_uris, destination, job_config, max_timeout, **load_kwargs
Expand Down
Loading