Skip to content

Commit 5b89fc2

Browse files
committed
Parquet to BigQuery import for GCP-backed AnVIL snapshots (#6355)
1 parent c7c8e43 commit 5b89fc2

File tree

25 files changed

+646
-523
lines changed

25 files changed

+646
-523
lines changed

deployments/anvilbox/environment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
)
44
import json
55
from typing import (
6-
Optional,
76
Literal,
7+
Optional,
88
)
99

1010
is_sandbox = True

deployments/anvildev.browser/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import json
55
from typing import (
66
Optional,
7-
Literal,
87
)
98

109

deployments/anvildev.gitlab/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
)
44
from typing import (
55
Optional,
6-
Literal,
76
)
87

98

deployments/anvildev/environment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
)
44
import json
55
from typing import (
6-
Optional,
76
Literal,
7+
Optional,
88
)
99

1010

deployments/anvilprod.browser/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import json
55
from typing import (
66
Optional,
7-
Literal,
87
)
98

109

deployments/anvilprod.gitlab/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
)
44
from typing import (
55
Optional,
6-
Literal,
76
)
87

98

deployments/anvilprod.shared/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
)
44
from typing import (
55
Optional,
6-
Literal,
76
)
87

98

deployments/anvilprod/environment.py

Lines changed: 249 additions & 249 deletions
Large diffs are not rendered by default.

deployments/dev.gitlab/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
)
44
from typing import (
55
Optional,
6-
Literal,
76
)
87

98

deployments/dev.shared/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import json
55
from typing import (
66
Optional,
7-
Literal,
87
)
98

109

deployments/dev/environment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
)
44
import json
55
from typing import (
6-
Optional,
76
Literal,
7+
Optional,
88
)
99

1010

deployments/hammerbox/environment.py

Lines changed: 251 additions & 250 deletions
Large diffs are not rendered by default.

deployments/prod.gitlab/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
)
44
from typing import (
55
Optional,
6-
Literal,
76
)
87

98

deployments/prod/environment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
)
66
import json
77
from typing import (
8-
Optional,
98
Literal,
9+
Optional,
1010
)
1111

1212

deployments/sandbox/environment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
)
44
import json
55
from typing import (
6-
Optional,
76
Literal,
7+
Optional,
88
)
99

1010
is_sandbox = True

deployments/tempdev.browser/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import json
55
from typing import (
66
Optional,
7-
Literal,
87
)
98

109

deployments/tempdev.gitlab/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
)
44
from typing import (
55
Optional,
6-
Literal,
76
)
87

98

deployments/tempdev.shared/environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import json
55
from typing import (
66
Optional,
7-
Literal,
87
)
98

109

deployments/tempdev/environment.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
)
44
import json
55
from typing import (
6-
Optional,
7-
Literal,
86
Literal,
7+
Optional,
98
)
109

1110

environment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import os
66
from typing import (
77
Optional,
8-
Literal,
98
)
109

1110

scripts/post_deploy_tdr.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,10 @@ def verify_source(self,
9393
) -> None:
9494
source = self.tdr.lookup_source(source_spec)
9595
log.info('TDR client is authorized for API access to %s.', source_spec)
96-
require(source.project == source_spec.subdomain,
97-
'Actual Google project of TDR source differs from configured one',
98-
source.project, source_spec.subdomain)
96+
if source_spec.subdomain != config.google_project():
97+
require(source.project == source_spec.subdomain,
98+
'Actual Google project of TDR source differs from configured one',
99+
source.project, source_spec.subdomain)
99100
# Uppercase is standard for multi-regions in the documentation but TDR
100101
# returns 'us' in lowercase
101102
require(source.location.lower() == config.tdr_source_location.lower(),

scripts/reindex.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,15 @@
2626
from azul.logging import (
2727
configure_script_logging,
2828
)
29+
from azul.plugins.repository import (
30+
tdr_anvil,
31+
)
2932
from azul.plugins.repository.tdr import (
3033
TDRPlugin,
3134
)
35+
from azul.terra import (
36+
TDRSourceSpec,
37+
)
3238

3339
log = logging.getLogger(__name__)
3440

@@ -105,6 +111,11 @@
105111
default=False,
106112
action='store_true',
107113
help='Purge the queues before taking any action on the indices.')
114+
parser.add_argument('--import',
115+
default=False,
116+
action='store_true',
117+
dest='import_',
118+
help='Import sources into BigQuery data from TDR')
108119
parser.add_argument('--nowait', '--no-wait',
109120
dest='wait',
110121
default=True,
@@ -159,6 +170,19 @@ def main(argv: list[str]):
159170
parser.error('Cannot specify sources when performing a local reindex')
160171
assert False
161172

173+
if args.import_:
174+
for catalog, sources in sources_by_catalog.items():
175+
if config.is_tdr_enabled(catalog) and config.is_anvil_enabled(catalog) and sources:
176+
plugin = azul.repository_plugin(catalog)
177+
assert isinstance(plugin, tdr_anvil.Plugin)
178+
for source in sources:
179+
spec = TDRSourceSpec.parse(source)
180+
if spec.type == TDRSourceSpec.Type.parquet:
181+
source = plugin.resolve_source(source)
182+
plugin.import_tables(source)
183+
else:
184+
log.info('Skipping table import for catalog %r', catalog)
185+
162186
if args.deindex:
163187
require(not any((args.index, args.delete, args.create)),
164188
'--deindex is incompatible with --index, --create, and --delete.')

src/azul/plugins/repository/tdr_anvil/__init__.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717
)
1818

1919
import attrs
20+
from furl import (
21+
furl,
22+
)
2023
from more_itertools import (
2124
one,
2225
)
2326

2427
from azul import (
2528
cached_property,
2629
config,
30+
reject,
2731
require,
2832
uuids,
2933
)
@@ -740,3 +744,34 @@ def _columns(self, entity_type: EntityType) -> set[str]:
740744
entity_columns = {column['name'] for column in table['columns']}
741745
entity_columns.add('datarepo_row_id')
742746
return entity_columns
747+
748+
def import_tables(self, source: TDRSourceRef):
749+
"""
750+
Import tables for an AnVIL snapshot into BigQuery via TDR's Parquet
751+
export API. Only tables defined in the AnVIL schema will be imported.
752+
Currently, only GS-backed snapshots are supported.
753+
"""
754+
require(source.spec.subdomain == config.google_project(), source)
755+
756+
dataset_name = source.spec.name
757+
self.tdr.create_dataset(dataset_name)
758+
759+
urls_by_table = self.tdr.export_parquet_urls(source.id)
760+
reject(urls_by_table is None,
761+
'No Parquet access information is available for snapshot %r.', source.spec)
762+
763+
for table in anvil_schema['tables']:
764+
table_name = table['name']
765+
urls = urls_by_table[table_name]
766+
for url in urls:
767+
require(url.origin == 'https://storage.googleapis.com',
768+
'Unsupported storage location for snapshot %r: %r',
769+
source.spec, url)
770+
url.load(furl(scheme='gs',
771+
netloc=url.path.segments[0],
772+
path=url.path.segments[1:]))
773+
self.tdr.create_table(dataset_name=dataset_name,
774+
table_name=table_name,
775+
import_urls=urls,
776+
overwrite=False,
777+
clustering_fields=table['primaryKey'])

src/azul/terra.py

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,15 @@
4040
bigquery,
4141
)
4242
from google.cloud.bigquery import (
43+
Dataset,
44+
DatasetReference,
45+
LoadJobConfig,
46+
ParquetOptions,
4347
QueryJob,
4448
QueryJobConfig,
4549
QueryPriority,
50+
SourceFormat,
51+
WriteDisposition,
4652
)
4753
from more_itertools import (
4854
one,
@@ -150,7 +156,6 @@ def parse(cls, spec: str) -> 'TDRSourceSpec':
150156
service, type, domain, subdomain, name = rest.split(':')
151157
assert service == 'tdr', service
152158
type = cls.Type(type)
153-
reject(type == cls.Type.parquet, 'Parquet sources are not yet supported')
154159
domain = cls.Domain(domain)
155160
reject(domain == cls.Domain.azure, 'Azure sources are not yet supported')
156161
self = cls(prefix=prefix,
@@ -257,7 +262,7 @@ def oauth2_scopes(self) -> Sequence[str]:
257262
return [
258263
*super().oauth2_scopes(),
259264
'https://www.googleapis.com/auth/devstorage.read_only',
260-
'https://www.googleapis.com/auth/bigquery.readonly'
265+
'https://www.googleapis.com/auth/bigquery'
261266
]
262267

263268

@@ -655,6 +660,73 @@ def get_duos(self, source: SourceRef) -> Optional[MutableJSON]:
655660
else:
656661
return self._check_response(url, response)
657662

663+
def create_dataset(self, dataset_name: str):
664+
"""
665+
Create a BigQuery dataset in the project and region configured for the
666+
current deployment.
667+
668+
:param dataset_name: Unqualified name of the dataset to create.
669+
`google.cloud.exceptions.Conflict` will be raised
670+
if a dataset with the same name already exists.
671+
"""
672+
bigquery = self._bigquery(self.credentials.project_id)
673+
ref = DatasetReference(bigquery.project, dataset_name)
674+
dataset = Dataset(ref)
675+
dataset.location = config.tdr_source_location
676+
log.info('Creating BigQuery dataset %r in region %r',
677+
dataset.dataset_id, dataset.location)
678+
bigquery.create_dataset(dataset)
679+
680+
def create_table(self,
681+
dataset_name: str,
682+
table_name: str,
683+
import_urls: Sequence[furl],
684+
*,
685+
overwrite: bool,
686+
clustering_fields: Optional[Sequence[str]] = None):
687+
"""
688+
Create a BigQuery table in the project and region configured for the
689+
current deployment.
690+
691+
:param dataset_name: Unqualified name of the dataset to contain the new
692+
table
693+
694+
:param table_name: Unqualified name of the new table
695+
696+
:param import_urls: URLs of Parquet file(s) to populate the table. These
697+
must be `gs://` URLS and the GCS bucket's region
698+
must be compatible with the target dataset's. See
699+
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet#limitations
700+
701+
:param overwrite: Overwrite existing table with the same ID as the table
702+
we're trying to create (true) or raise an exception if
703+
such a table exists (false)
704+
705+
:param clustering_fields: Fields defining clustering for the table. See
706+
https://cloud.google.com/bigquery/docs/clustered-tables
707+
"""
708+
for url in import_urls:
709+
require(url.scheme == 'gs', url)
710+
table_id = f'{dataset_name}.{table_name}'
711+
bigquery = self._bigquery(self.credentials.project_id)
712+
write_disposition = (
713+
WriteDisposition.WRITE_TRUNCATE if overwrite else WriteDisposition.WRITE_EMPTY
714+
)
715+
job_config = LoadJobConfig(
716+
write_disposition=write_disposition,
717+
clustering_fields=clustering_fields,
718+
source_format=SourceFormat.PARQUET,
719+
# Avoids convoluted data types for array fields
720+
parquet_options=ParquetOptions.from_api_repr(dict(enable_list_inference=True))
721+
)
722+
log.info('Creating BigQuery table %r',
723+
f'{bigquery.project}.{dataset_name}.{table_name}')
724+
load_job = bigquery.load_table_from_uri(source_uris=list(map(str, import_urls)),
725+
destination=table_id,
726+
job_config=job_config)
727+
load_job.result()
728+
log.info('Table created successfully')
729+
658730
def export_parquet_urls(self,
659731
snapshot_id: str
660732
) -> Optional[dict[str, list[mutable_furl]]]:

terraform/authentication.tf.json.template.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@
6060
"title": f"azul_{config.deployment_stage}",
6161
"permissions": [
6262
"bigquery.jobs.create",
63+
"bigquery.datasets.create",
64+
"bigquery.tables.create",
65+
"bigquery.tables.updateData",
6366
*[
6467
f'bigquery.{resource}.{action}'
6568
for resource in ('capacityCommitments', 'reservations')

0 commit comments

Comments
 (0)