Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
18dc0e4
Cleaned up returns on failure, updated docs
Charlie-Kramer Aug 3, 2024
65e1a63
Cleaned up returns on failure, updated docs
Charlie-Kramer Aug 3, 2024
c7ce13f
Cleaned up returns on failure, updated docs
Charlie-Kramer Aug 3, 2024
14984c6
Cleaned up returns on failure, updated docs
Charlie-Kramer Aug 3, 2024
79e7057
more de-linting
Charlie-Kramer Aug 3, 2024
441c206
fixed issues with PR
Charlie-Kramer Aug 30, 2024
b10d640
fixed issues with PR
Charlie-Kramer Aug 30, 2024
6f0b471
fixed issues with PR
Charlie-Kramer Dec 9, 2024
09880ea
Fixed formatting errors flagged by ruff
Charlie-Kramer Dec 9, 2024
78c34a6
fixed more ruff errors
Charlie-Kramer Dec 9, 2024
54694a5
removing accidentally included file
Charlie-Kramer Dec 9, 2024
892b8c4
added unit tests for missing/existing files
Charlie-Kramer Dec 15, 2024
a308460
Merge branch 'main' into main
Charlie-Kramer Dec 15, 2024
b746ca2
fixed ruff formatting
Charlie-Kramer Dec 15, 2024
b445109
fixed ruff formatting
Charlie-Kramer Dec 15, 2024
2913978
removed all bin files
Charlie-Kramer Dec 23, 2024
1192e87
changed logger error messages to f string
Charlie-Kramer Dec 23, 2024
9313120
removed pyvenv.cfg
Charlie-Kramer Dec 23, 2024
73a28cb
fixed parens
Charlie-Kramer Dec 23, 2024
4da6bf6
moved test_bigquery_copy to databases test folder
Charlie-Kramer Dec 23, 2024
453b3cd
fixed another format error
Charlie-Kramer Dec 23, 2024
8eeb282
more formatting errors
Charlie-Kramer Dec 23, 2024
ca820dd
moved bigquery copy test into test_bigquery.py
Charlie-Kramer Dec 23, 2024
9daa7d0
Merge branch 'main' into main
Charlie-Kramer Dec 23, 2024
95bc646
Merge branch 'main' into main
Charlie-Kramer Dec 23, 2024
f20d7f3
moved google project copying test to test_bigquery
Charlie-Kramer Dec 23, 2024
bfcb37c
formatting
Charlie-Kramer Dec 23, 2024
83fda65
checked formatting
Charlie-Kramer Dec 23, 2024
bee0913
fixed ruff formatting back to --target-version=py38
Charlie-Kramer Dec 23, 2024
80e7789
Merge branch 'main' into main
Charlie-Kramer Dec 23, 2024
bab5273
Merge branch 'fix_tests'
Charlie-Kramer Dec 23, 2024
045feed
attempt to fix ruff-formatter errors
Charlie-Kramer Dec 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,6 @@ bill_com_credentials.*
docs/html
docs/dirhtml
*.sw*

# bin files
bin/
2 changes: 2 additions & 0 deletions docs/google.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ API
:inherited-members:




*************
Cloud Storage
*************
Expand Down
99 changes: 98 additions & 1 deletion parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,8 @@ def copy(
for field in tbl.get_columns_type_stats():
if "dict" in field["type"]:
new_petl = tbl.table.addfield(
field["name"] + "_replace", lambda row: json.dumps(row[field["name"]])
field["name"] + "_replace",
lambda row: json.dumps(row[field["name"]]),
)
new_tbl = Table(new_petl)
new_tbl.remove_column(field["name"])
Expand Down Expand Up @@ -1451,6 +1452,102 @@ def extract(

logger.info(f"Finished exporting query result to {gs_destination}.")

def copy_between_projects(
self,
source_project,
source_dataset,
source_table,
destination_project,
destination_dataset,
destination_table,
if_dataset_not_exists="fail",
if_table_exists="fail",
):
"""
Copy a table from one project to another. Fails if the source or target project
does not exist.
If the target dataset does not exist, fhe flag if_dataset_not_exists controls behavior.
It defaults to 'fail'; set it to 'create' if it's ok to create it.
If the target table exists, the flag if_table_exists controls behavior.
It defaults to 'fail'; set it to 'overwrite' if it's ok to overwrite an existing table.

`Args`:
source_project: str
Name of source project
source_dataset: str
Name of source dataset
source_table: str
Name of source table
destination_project: str
Name of destination project
destination_dataset: str
Name of destination dataset
destination_table: str
Name of destination table
if_dataset_not_exists: str
Action if dataset doesn't exist {'fail','create'}
if_table_exists: str
Action if table exists {'fail', 'overwrite'}

`Returns:`
None
"""

from google.cloud import bigquery
from google.cloud.exceptions import NotFound

destination_table_id = (
destination_project + "." + destination_dataset + "." + destination_table
)
source_table_id = source_project + "." + source_dataset + "." + source_table
dataset_id = destination_project + "." + destination_dataset

# check if destination dataset exists
try:
self.client.get_dataset(dataset_id) # Make an API request.
# if it exists: continue; if not, check to see if it's ok to create it
except NotFound:
# if it doesn't exist: check if it's ok to create it
if if_dataset_not_exists == "create": # create a new dataset in the destination
dataset = bigquery.Dataset(dataset_id)
dataset = self.client.create_dataset(dataset, timeout=30)
else: # if it doesn't exist and it's not ok to create it, fail
logger.error("BigQuery copy failed")
logger.error(
f"Dataset {destination_dataset} does not exist and if_dataset_not_exists set to {if_dataset_not_exists}"
)

job_config = bigquery.CopyJobConfig()

# check if destination table exists
try:
self.client.get_table(destination_table_id)
if if_table_exists == "overwrite": # if it exists
job_config = bigquery.CopyJobConfig()
job_config.write_disposition = "WRITE_TRUNCATE"
job = self.client.copy_table(
source_table_id,
destination_table_id,
location="US",
job_config=job_config,
)
result = job.result()
else:
logger.error(
f"BigQuery copy failed, Table {destination_table} exists and if_table_exists set to {if_table_exists}"
)

except NotFound:
# destination table doesn't exist, so we can create one
job = self.client.copy_table(
source_table_id,
destination_table_id,
location="US",
job_config=job_config,
)
result = job.result()
logger.info(result)


class BigQueryTable(BaseTable):
"""BigQuery table object."""
Expand Down
114 changes: 114 additions & 0 deletions test/test_databases/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
from parsons import Table
from parsons.google.google_cloud_storage import GoogleCloudStorage

from unittest import TestCase
from unittest.mock import Mock
import logging
from testfixtures import log_capture


class BigQuery(GoogleBigQuery):
@mock.patch("parsons.google.google_bigquery.load_google_application_credentials")
Expand Down Expand Up @@ -603,3 +608,112 @@ def default_table(self):
{"num": 2, "ltr": "b", "boolcol": True},
]
)


class TestGoogleBigQueryCopyBetweenProjects(TestCase):
def setUp(self):
# mock the GoogleBigQuery class
self.bq = Mock(spec=GoogleBigQuery)

# define inputs to copy method
self.source_project = ("project1",)
self.source_dataset = ("dataset1",)
self.source_table = ("table1",)
self.destination_project = ("project2",)
self.destination_dataset = ("dataset2",)
self.destination_table = ("table2",)
self.if_dataset_not_exists = ("fail",)
self.if_table_exists = "fail"

def tearDown(self):
pass

def test_copy_called_once_with(self):
self.bq.copy_between_projects(
source_project=self.source_project,
source_dataset=self.destination_dataset,
source_table=self.source_table,
destination_project=self.destination_project,
destination_dataset=self.destination_dataset,
destination_table=self.destination_table,
if_dataset_not_exists=self.if_dataset_not_exists,
if_table_exists=self.if_table_exists,
)
self.bq.copy_between_projects.assert_called_once_with(
source_project=self.source_project,
source_dataset=self.destination_dataset,
source_table=self.source_table,
destination_project=self.destination_project,
destination_dataset=self.destination_dataset,
destination_table=self.destination_table,
if_dataset_not_exists=self.if_dataset_not_exists,
if_table_exists=self.if_table_exists,
)

@log_capture()
def test_logger_fail_on_dataset_does_not_exist(self, capture):
# create and set up logger
logger = logging.getLogger()
logger.error(
"Dataset {0} does not exist and if_dataset_not_exists set to {1}".format(
self.destination_dataset, self.if_dataset_not_exists
)
)

# call the method to generate log message
self.bq.copy_between_projects(
source_project=self.source_project,
source_dataset=self.destination_dataset,
source_table=self.source_table,
destination_project=self.destination_project,
destination_dataset=self.destination_dataset,
destination_table=self.destination_table,
if_dataset_not_exists=self.if_dataset_not_exists,
if_table_exists=self.if_table_exists,
)

# check that the log message was generated correctly
capture.check(
(
"root",
"ERROR",
"Dataset {0} does not exist and if_dataset_not_exists set to {1}".format(
self.destination_dataset, self.if_dataset_not_exists
),
)
)

@log_capture()
def test_logger_fail_on_table_exists(self, capture):
# create and set up logger
logger = logging.getLogger()

## now test with table copy error
logger.error(
"BigQuery copy failed, Table {0} exists and if_table_exists set to {1}".format(
self.destination_table, self.if_table_exists
)
)

# call the method to generate log message
self.bq.copy_between_projects(
source_project=self.source_project,
source_dataset=self.destination_dataset,
source_table=self.source_table,
destination_project=self.destination_project,
destination_dataset=self.destination_dataset,
destination_table=self.destination_table,
if_dataset_not_exists=self.if_dataset_not_exists,
if_table_exists=self.if_table_exists,
)

# check that the log message was generated correctly
capture.check(
(
"root",
"ERROR",
"BigQuery copy failed, Table {0} exists and if_table_exists set to {1}".format(
self.destination_table, self.if_table_exists
),
)
)
Loading