diff --git a/.gitignore b/.gitignore index 8d6520e18e..3f994ee23f 100644 --- a/.gitignore +++ b/.gitignore @@ -126,3 +126,6 @@ bill_com_credentials.* docs/html docs/dirhtml *.sw* + +# bin files +bin/ diff --git a/docs/google.rst b/docs/google.rst index 9590d51f20..d3f1243fc7 100644 --- a/docs/google.rst +++ b/docs/google.rst @@ -142,6 +142,8 @@ API :inherited-members: + + ************* Cloud Storage ************* diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index ec1b8c5b08..76326487ee 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -821,7 +821,8 @@ def copy( for field in tbl.get_columns_type_stats(): if "dict" in field["type"]: new_petl = tbl.table.addfield( - field["name"] + "_replace", lambda row: json.dumps(row[field["name"]]) + field["name"] + "_replace", + lambda row: json.dumps(row[field["name"]]), ) new_tbl = Table(new_petl) new_tbl.remove_column(field["name"]) @@ -1451,6 +1452,102 @@ def extract( logger.info(f"Finished exporting query result to {gs_destination}.") + def copy_between_projects( + self, + source_project, + source_dataset, + source_table, + destination_project, + destination_dataset, + destination_table, + if_dataset_not_exists="fail", + if_table_exists="fail", + ): + """ + Copy a table from one project to another. Fails if the source or target project + does not exist. + If the target dataset does not exist, fhe flag if_dataset_not_exists controls behavior. + It defaults to 'fail'; set it to 'create' if it's ok to create it. + If the target table exists, the flag if_table_exists controls behavior. + It defaults to 'fail'; set it to 'overwrite' if it's ok to overwrite an existing table. + + `Args`: + source_project: str + Name of source project + source_dataset: str + Name of source dataset + source_table: str + Name of source table + destination_project: str + Name of destination project + destination_dataset: str + Name of destination dataset + destination_table: str + Name of destination table + if_dataset_not_exists: str + Action if dataset doesn't exist {'fail','create'} + if_table_exists: str + Action if table exists {'fail', 'overwrite'} + + `Returns:` + None + """ + + from google.cloud import bigquery + from google.cloud.exceptions import NotFound + + destination_table_id = ( + destination_project + "." + destination_dataset + "." + destination_table + ) + source_table_id = source_project + "." + source_dataset + "." + source_table + dataset_id = destination_project + "." + destination_dataset + + # check if destination dataset exists + try: + self.client.get_dataset(dataset_id) # Make an API request. + # if it exists: continue; if not, check to see if it's ok to create it + except NotFound: + # if it doesn't exist: check if it's ok to create it + if if_dataset_not_exists == "create": # create a new dataset in the destination + dataset = bigquery.Dataset(dataset_id) + dataset = self.client.create_dataset(dataset, timeout=30) + else: # if it doesn't exist and it's not ok to create it, fail + logger.error("BigQuery copy failed") + logger.error( + f"Dataset {destination_dataset} does not exist and if_dataset_not_exists set to {if_dataset_not_exists}" + ) + + job_config = bigquery.CopyJobConfig() + + # check if destination table exists + try: + self.client.get_table(destination_table_id) + if if_table_exists == "overwrite": # if it exists + job_config = bigquery.CopyJobConfig() + job_config.write_disposition = "WRITE_TRUNCATE" + job = self.client.copy_table( + source_table_id, + destination_table_id, + location="US", + job_config=job_config, + ) + result = job.result() + else: + logger.error( + f"BigQuery copy failed, Table {destination_table} exists and if_table_exists set to {if_table_exists}" + ) + + except NotFound: + # destination table doesn't exist, so we can create one + job = self.client.copy_table( + source_table_id, + destination_table_id, + location="US", + job_config=job_config, + ) + result = job.result() + logger.info(result) + class BigQueryTable(BaseTable): """BigQuery table object.""" diff --git a/test/test_databases/test_bigquery.py b/test/test_databases/test_bigquery.py index 05e7671ceb..55ee87a04a 100644 --- a/test/test_databases/test_bigquery.py +++ b/test/test_databases/test_bigquery.py @@ -10,6 +10,11 @@ from parsons import Table from parsons.google.google_cloud_storage import GoogleCloudStorage +from unittest import TestCase +from unittest.mock import Mock +import logging +from testfixtures import log_capture + class BigQuery(GoogleBigQuery): @mock.patch("parsons.google.google_bigquery.load_google_application_credentials") @@ -603,3 +608,112 @@ def default_table(self): {"num": 2, "ltr": "b", "boolcol": True}, ] ) + + +class TestGoogleBigQueryCopyBetweenProjects(TestCase): + def setUp(self): + # mock the GoogleBigQuery class + self.bq = Mock(spec=GoogleBigQuery) + + # define inputs to copy method + self.source_project = ("project1",) + self.source_dataset = ("dataset1",) + self.source_table = ("table1",) + self.destination_project = ("project2",) + self.destination_dataset = ("dataset2",) + self.destination_table = ("table2",) + self.if_dataset_not_exists = ("fail",) + self.if_table_exists = "fail" + + def tearDown(self): + pass + + def test_copy_called_once_with(self): + self.bq.copy_between_projects( + source_project=self.source_project, + source_dataset=self.destination_dataset, + source_table=self.source_table, + destination_project=self.destination_project, + destination_dataset=self.destination_dataset, + destination_table=self.destination_table, + if_dataset_not_exists=self.if_dataset_not_exists, + if_table_exists=self.if_table_exists, + ) + self.bq.copy_between_projects.assert_called_once_with( + source_project=self.source_project, + source_dataset=self.destination_dataset, + source_table=self.source_table, + destination_project=self.destination_project, + destination_dataset=self.destination_dataset, + destination_table=self.destination_table, + if_dataset_not_exists=self.if_dataset_not_exists, + if_table_exists=self.if_table_exists, + ) + + @log_capture() + def test_logger_fail_on_dataset_does_not_exist(self, capture): + # create and set up logger + logger = logging.getLogger() + logger.error( + "Dataset {0} does not exist and if_dataset_not_exists set to {1}".format( + self.destination_dataset, self.if_dataset_not_exists + ) + ) + + # call the method to generate log message + self.bq.copy_between_projects( + source_project=self.source_project, + source_dataset=self.destination_dataset, + source_table=self.source_table, + destination_project=self.destination_project, + destination_dataset=self.destination_dataset, + destination_table=self.destination_table, + if_dataset_not_exists=self.if_dataset_not_exists, + if_table_exists=self.if_table_exists, + ) + + # check that the log message was generated correctly + capture.check( + ( + "root", + "ERROR", + "Dataset {0} does not exist and if_dataset_not_exists set to {1}".format( + self.destination_dataset, self.if_dataset_not_exists + ), + ) + ) + + @log_capture() + def test_logger_fail_on_table_exists(self, capture): + # create and set up logger + logger = logging.getLogger() + + ## now test with table copy error + logger.error( + "BigQuery copy failed, Table {0} exists and if_table_exists set to {1}".format( + self.destination_table, self.if_table_exists + ) + ) + + # call the method to generate log message + self.bq.copy_between_projects( + source_project=self.source_project, + source_dataset=self.destination_dataset, + source_table=self.source_table, + destination_project=self.destination_project, + destination_dataset=self.destination_dataset, + destination_table=self.destination_table, + if_dataset_not_exists=self.if_dataset_not_exists, + if_table_exists=self.if_table_exists, + ) + + # check that the log message was generated correctly + capture.check( + ( + "root", + "ERROR", + "BigQuery copy failed, Table {0} exists and if_table_exists set to {1}".format( + self.destination_table, self.if_table_exists + ), + ) + )