Skip to content
This repository was archived by the owner on Nov 15, 2019. It is now read-only.

Commit fb6b919

Browse files
author
Michael Baumann
committed
When loading data into the DSS, best results are obtained when all the cloud URLs
loaded by reference are accessible. This provides the most complete set of file metadata for the files loaded by reference. However, there are times, mainly during development and testing, when that is not the case. Currently, as we load the GTEx data, we have access to the NIH files in Google but not in AWS. The most important file information is included in the loader input data already. This change enables the loader to run and do a generally adequate job even when not all the cloud URLs are accessible. The specific changes are: Make use of the data file size from the input data. File size was already a required field for the input schema, but was not be used. Now the input file size is compared to the actual file size of the cloud URLs that are accessible, and discrepancies result in an error loading the bundle. If no actual cloud URLs are available for the a given data file, the input file size is used. Improved warnings when the cloud URLs are not accessible, using the Python warnings feature/library. Updated and enhanced unit tests for the above
1 parent 38c26f6 commit fb6b919

File tree

4 files changed

+134
-49
lines changed

4 files changed

+134
-49
lines changed

loader/base_loader.py

Lines changed: 91 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@
2222
import uuid
2323
from io import open
2424
from tempfile import mkdtemp
25-
from typing import Any, Dict, Optional
25+
from typing import Any, Dict
2626
from urllib.parse import urlparse
27+
from warnings import warn
2728

2829
import boto3
2930
import botocore
@@ -43,11 +44,27 @@
4344

4445
CREATOR_ID = 20
4546

47+
class CloudUrlAccessWarning(Warning):
48+
"""Warning when a cloud URL could not be accessed for any reason"""
49+
50+
class CloudUrlAccessForbidden(CloudUrlAccessWarning):
51+
"""Warning when a cloud URL could not be accessed due to authorization issues"""
52+
53+
class CloudUrlNotFound(CloudUrlAccessWarning):
54+
"""Warning when a cloud URL was not found"""
4655

4756
class FileURLError(Exception):
4857
"""Thrown when a file cannot be accessed by the given URl"""
4958

5059

60+
class InconsistentFileSizeValues(Exception):
61+
"""Thrown when the input file size does not match the actual file size of a file being loaded by reference"""
62+
63+
64+
class MissingInputFileSize(Exception):
65+
"""Thrown when the input file size is not available for a data file being loaded by reference"""
66+
67+
5168
class UnexpectedResponseError(Exception):
5269
"""Thrown when DSS gives an unexpected response"""
5370

@@ -91,6 +108,7 @@ def upload_cloud_file_by_reference(self,
91108
filename: str,
92109
file_uuid: str,
93110
file_cloud_urls: set,
111+
size: int,
94112
guid: str,
95113
file_version: str=None) -> tuple:
96114
"""
@@ -111,13 +129,18 @@ def upload_cloud_file_by_reference(self,
111129
:param file_uuid: An RFC4122-compliant UUID to be used to identify the file
112130
:param file_cloud_urls: A set of 'gs://' and 's3://' bucket links.
113131
e.g. {'gs://broad-public-datasets/g.bam', 's3://ucsc-topmed-datasets/a.bam'}
132+
:param size: size of the file in bytes, as provided by the input data to be loaded.
133+
An attempt will be made to access the `file_cloud_objects` to obtain the
134+
basic file metadata, and if successful, the size is verified to be consistent.
114135
:param guid: An optional additional/alternate data identifier/alias to associate with the file
115136
e.g. "dg.4503/887388d7-a974-4259-86af-f5305172363d"
116137
:param file_version: a RFC3339 compliant datetime string
117138
:return: file_uuid: str, file_version: str, filename: str, already_present: bool
139+
:raises MissingFileSize: If no input file size is available for file to be loaded by reference
140+
:raises InconsistentFileSizeValues: If file sizes are inconsistent for file to be loaded by reference
118141
"""
119142

120-
def _create_file_reference(file_cloud_urls: set, guid: str) -> dict:
143+
def _create_file_reference(file_cloud_urls: set, size: int, guid: str) -> dict:
121144
"""
122145
Format a file's metadata into a dictionary for uploading as a json to support the approach
123146
described here:
@@ -127,22 +150,26 @@ def _create_file_reference(file_cloud_urls: set, guid: str) -> dict:
127150
e.g. {'gs://broad-public-datasets/g.bam', 's3://ucsc-topmed-datasets/a.bam'}
128151
:param guid: An optional additional/alternate data identifier/alias to associate with the file
129152
e.g. "dg.4503/887388d7-a974-4259-86af-f5305172363d"
130-
:param file_version: RFC3339 formatted timestamp.
153+
:param size: file size in bytes from input data
131154
:return: A dictionary of metadata values.
132155
"""
133-
s3_metadata = None
134-
gs_metadata = None
156+
157+
input_metadata = dict(size=size)
158+
s3_metadata: Dict[str, Any] = dict()
159+
gs_metadata: Dict[str, Any] = dict()
135160
for cloud_url in file_cloud_urls:
136161
url = urlparse(cloud_url)
137162
bucket = url.netloc
138163
key = url.path[1:]
164+
if not (bucket and key):
165+
raise FileURLError(f'Invalid URL {cloud_url}')
139166
if url.scheme == "s3":
140167
s3_metadata = _get_s3_file_metadata(bucket, key)
141168
elif url.scheme == "gs":
142169
gs_metadata = _get_gs_file_metadata(bucket, key)
143170
else:
144171
raise FileURLError("Unsupported cloud URL scheme: {cloud_url}")
145-
return _consolidate_metadata(file_cloud_urls, s3_metadata, gs_metadata, guid)
172+
return _consolidate_metadata(file_cloud_urls, input_metadata, s3_metadata, gs_metadata, guid)
146173

147174
def _get_s3_file_metadata(bucket: str, key: str) -> dict:
148175
"""
@@ -155,11 +182,24 @@ def _get_s3_file_metadata(bucket: str, key: str) -> dict:
155182
metadata = dict()
156183
try:
157184
response = self.s3_client.head_object(Bucket=bucket, Key=key, RequestPayer="requester")
158-
metadata['content-type'] = response['ContentType']
159-
metadata['s3_etag'] = response['ETag']
160-
metadata['size'] = response['ContentLength']
161-
except Exception as e:
162-
raise FileURLError(f"Error accessing s3://{bucket}/{key}") from e
185+
except botocore.exceptions.ClientError as e:
186+
if e.response['Error']['Code'] == str(requests.codes.not_found):
187+
warn(f'Could not find \"s3://{bucket}/{key}\" Error: {e}'
188+
' The S3 file metadata for this file reference will be missing.',
189+
CloudUrlNotFound)
190+
else:
191+
warn(f"Failed to access \"s3://{bucket}/{key}\" Error: {e}"
192+
" The S3 file metadata for this file reference will be missing.",
193+
CloudUrlAccessWarning)
194+
else:
195+
try:
196+
metadata['size'] = response['ContentLength']
197+
metadata['content-type'] = response['ContentType']
198+
metadata['s3_etag'] = response['ETag']
199+
except KeyError as e:
200+
# These standard metadata should always be present.
201+
logging.error(f'Failed to access "s3://{bucket}/{key}" file metadata field. Error: {e}'
202+
' The S3 file metadata for this file will be incomplete.')
163203
return metadata
164204

165205
def _get_gs_file_metadata(bucket: str, key: str) -> dict:
@@ -170,25 +210,30 @@ def _get_gs_file_metadata(bucket: str, key: str) -> dict:
170210
:param key: GS file to upload. e.g. 'output.txt' or 'data/output.txt'
171211
:return: A dictionary of metadata values.
172212
"""
173-
metadata = dict()
174-
try:
175-
gs_bucket = self.gs_client.bucket(bucket, self.google_project_id)
176-
blob_obj = gs_bucket.get_blob(key)
213+
gs_bucket = self.gs_client.bucket(bucket, self.google_project_id)
214+
blob_obj = gs_bucket.get_blob(key)
215+
if blob_obj is not None:
216+
metadata = dict()
217+
metadata['size'] = blob_obj.size
177218
metadata['content-type'] = blob_obj.content_type
178219
metadata['crc32c'] = binascii.hexlify(base64.b64decode(blob_obj.crc32c)).decode("utf-8").lower()
179-
metadata['size'] = blob_obj.size
180-
except Exception as e:
181-
raise FileURLError(f"Error accessing gs://{bucket}/{key}") from e
182-
return metadata
220+
return metadata
221+
else:
222+
warn(f'Could not find "gs://{bucket}/{key}"'
223+
' The GS file metadata for this file reference will be missing.',
224+
CloudUrlNotFound)
225+
return dict()
183226

184227
def _consolidate_metadata(file_cloud_urls: set,
185-
s3_metadata: Optional[Dict[str, Any]],
186-
gs_metadata: Optional[Dict[str, Any]],
228+
input_metadata: Dict[str, Any],
229+
s3_metadata: Dict[str, Any],
230+
gs_metadata: Dict[str, Any],
187231
guid: str) -> dict:
188232
"""
189233
Consolidates cloud file metadata to create the JSON used to load by reference
190234
into the DSS.
191235
236+
:param input_metadata:
192237
:param file_cloud_urls: A set of 'gs://' and 's3://' bucket URLs.
193238
e.g. {'gs://broad-public-datasets/g.bam', 's3://ucsc-topmed-datasets/a.bam'}
194239
:param s3_metadata: Dictionary of meta data produced by _get_s3_file_metadata().
@@ -197,19 +242,38 @@ def _consolidate_metadata(file_cloud_urls: set,
197242
e.g. "dg.4503/887388d7-a974-4259-86af-f5305172363d"
198243
:return: A dictionary of cloud file metadata values
199244
"""
200-
consolidated_metadata = dict()
201-
if s3_metadata:
202-
consolidated_metadata.update(s3_metadata)
203-
if gs_metadata:
204-
consolidated_metadata.update(gs_metadata)
245+
246+
def _check_file_size_consistency(input_metadata, s3_metadata, gs_metadata):
247+
input_size = input_metadata.get('size', None)
248+
if input_size is not None:
249+
input_size = int(input_size)
250+
else:
251+
raise MissingInputFileSize('No input file size is available for file being loaded by reference.')
252+
s3_size = s3_metadata.get('size', None)
253+
gs_size = gs_metadata.get('size', None)
254+
if s3_size and input_size != s3_size:
255+
raise InconsistentFileSizeValues(
256+
f'Input file size does not match actual S3 file size: '
257+
f'input size: {input_size}, S3 actual size: {s3_size}')
258+
if gs_size and input_size != gs_size:
259+
raise InconsistentFileSizeValues(
260+
f'Input file size does not match actual GS actual file size: '
261+
f'input size: {input_size}, GS actual size: {gs_size}')
262+
return input_size
263+
264+
consolidated_metadata: Dict[str, Any] = dict()
265+
consolidated_metadata.update(input_metadata)
266+
consolidated_metadata.update(s3_metadata)
267+
consolidated_metadata.update(gs_metadata)
268+
consolidated_metadata['size'] = _check_file_size_consistency(input_metadata, s3_metadata, gs_metadata)
205269
consolidated_metadata['url'] = list(file_cloud_urls)
206270
consolidated_metadata['aliases'] = [str(guid)]
207271
return consolidated_metadata
208272

209273
if self.dry_run:
210274
logger.info(f"DRY RUN: upload_cloud_file_by_reference: {filename} {str(file_cloud_urls)} {guid}")
211275

212-
file_reference = _create_file_reference(file_cloud_urls, guid)
276+
file_reference = _create_file_reference(file_cloud_urls, size, guid)
213277
return self.upload_dict_as_file(file_reference,
214278
filename,
215279
file_uuid,

loader/standard_loader.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class ParsedDataFile(typing.NamedTuple):
2222
filename: str
2323
file_uuid: str
2424
cloud_urls: typing.List[str] # list of urls
25+
size: int
2526
file_guid: str
2627
file_version: str # rfc3339
2728

@@ -95,6 +96,14 @@ def _get_cloud_urls(file_info: dict):
9596
raise ParseError(f"Expected 'url' as key for urls in file_info: \n{file_info}")
9697
return [url_dict['url'] for url_dict in urls]
9798

99+
@staticmethod
100+
def _get_file_size(file_info: dict):
101+
if 'size' not in file_info:
102+
raise ParseError(f'Size field not present in file_info: \n{file_info}')
103+
if not int(file_info['size']) >= 0:
104+
raise ParseError(f'Invalid value for size in file_info: \n{file_info}')
105+
return file_info['size']
106+
98107
@classmethod
99108
def _parse_bundle(cls, bundle: dict) -> ParsedBundle:
100109
try:
@@ -116,7 +125,8 @@ def _parse_bundle(cls, bundle: dict) -> ParsedBundle:
116125
file_uuid = cls._get_file_uuid(file_guid)
117126
file_version = cls._get_file_version(file_info)
118127
cloud_urls = cls._get_cloud_urls(file_info)
119-
parsed_file = ParsedDataFile(filename, file_uuid, cloud_urls, file_guid, file_version)
128+
file_size = cls._get_file_size(file_info)
129+
parsed_file = ParsedDataFile(filename, file_uuid, cloud_urls, file_size, file_guid, file_version)
120130
parsed_files.append(parsed_file)
121131

122132
return ParsedBundle(bundle_uuid, metadata_dict, parsed_files)
@@ -139,13 +149,14 @@ def _load_bundle(self, bundle_uuid, metadata_dict, data_files, bundle_num):
139149
name=metadata_filename, indexed=True))
140150

141151
for data_file in data_files:
142-
filename, file_uuid, cloud_urls, file_guid, file_version, = data_file
152+
filename, file_uuid, cloud_urls, file_size, file_guid, file_version = data_file
143153
logger.debug(f'Bundle {bundle_num}: Attempting to upload data file: {filename} '
144154
f'with uuid:version {file_uuid}:{file_version}...')
145155
file_uuid, file_version, filename, already_present = \
146156
self.dss_uploader.upload_cloud_file_by_reference(filename,
147157
file_uuid,
148158
cloud_urls,
159+
file_size,
149160
file_guid,
150161
file_version=file_version)
151162
if already_present:

scripts/cgp_data_loader.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ def main(argv=sys.argv[1:]):
5656
logging.getLogger(__name__)
5757
suppress_verbose_logging()
5858

59+
if not sys.warnoptions:
60+
import warnings
61+
# Log each unique cloud URL access warning once by default.
62+
# This can be overridden using the "PYTHONWARNINGS" environment variable.
63+
# See: https://docs.python.org/3/library/warnings.html
64+
warnings.simplefilter('default', 'CloudUrlAccessWarning', append=True)
65+
5966
bundle_uploader = StandardFormatBundleUploader(dss_uploader, metadata_file_uploader)
6067
logging.info(f'Uploading {"serially" if options.serial else "concurrently"}')
6168
return bundle_uploader.load_all_bundles(load_json_from_file(options.input_json), not options.serial)

tests/test_standard_loader.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
import logging
33
import typing
44
import uuid
5+
import warnings
56
from pathlib import Path
67

78
import boto3
89
import iso8601
910

1011
from loader import base_loader
11-
from loader.base_loader import FileURLError
12+
from loader.base_loader import FileURLError, CloudUrlNotFound
1213
from loader.standard_loader import StandardFormatBundleUploader, ParsedBundle, ParseError, ParsedDataFile
1314
from scripts.cgp_data_loader import GOOGLE_PROJECT_ID
1415
from tests import ignore_resource_warnings
@@ -121,7 +122,8 @@ def test_parse_bundle(self):
121122
minimal_file_info = {'name': 'buried_treasure_map',
122123
'created': tz_utc_now(),
123124
'urls': [{'url': 's3://desert/island/under/palm'},
124-
{'url': 'gs://captains/quarters/bottom/drawer'}]}
125+
{'url': 'gs://captains/quarters/bottom/drawer'}],
126+
'size': 0}
125127
bundle = {}
126128
self.assertRaises(ParseError, self.loader._parse_bundle, bundle)
127129
data_bundle = {}
@@ -192,18 +194,19 @@ def _make_minimal_bundle(self, parsed=True, files=1):
192194
file.Acl().put(ACL='public-read')
193195

194196
data_objects[file_guid] = ParsedDataFile(filename, file_uuid, cloud_urls,
195-
file_guid, file_version)
197+
file.content_length, file_guid, file_version)
196198

197199
if parsed:
198200
return ParsedBundle(bundle_uuid, metadata_dict, list(data_objects.values()))
199201
else:
200202
dict_objects = {}
201-
for filename, file_uuid, cloud_urls, file_guid, file_version in data_objects.values():
203+
for filename, file_uuid, cloud_urls, file_size, file_guid, file_version in data_objects.values():
202204
dict_objects[file_guid] = {
203205
'name': filename,
204206
'created': file_version,
205207
'id': file_guid,
206-
'urls': [{'url': url} for url in cloud_urls]
208+
'urls': [{'url': url} for url in cloud_urls],
209+
'size': file_size
207210
}
208211
minimal = {
209212
'data_bundle': {
@@ -330,22 +333,14 @@ def test_duplicate_file_upload(self):
330333
"""
331334
_, _, data_files = self._make_minimal_bundle()
332335
data_file = data_files[0]
333-
filename, file_uuid, cloud_urls, file_guid, file_version, = data_file
336+
filename, file_uuid, cloud_urls, file_size, file_guid, file_version = data_file
334337

335338
_, _, _, already_present = \
336-
self.dss_uploader.upload_cloud_file_by_reference(filename,
337-
file_uuid,
338-
cloud_urls,
339-
file_guid,
340-
file_version=file_version)
339+
self.dss_uploader.upload_cloud_file_by_reference(filename, file_uuid, cloud_urls, file_size, file_guid, file_version)
341340
# make sure the file hasn't already been uploaded
342341
self.assertFalse(already_present)
343342
_, _, _, already_present = \
344-
self.dss_uploader.upload_cloud_file_by_reference(filename,
345-
file_uuid,
346-
cloud_urls,
347-
file_guid,
348-
file_version=file_version)
343+
self.dss_uploader.upload_cloud_file_by_reference(filename, file_uuid, cloud_urls, file_size, file_guid, file_version)
349344
# make sure the file HAS already been uploaded
350345
self.assertTrue(already_present)
351346

@@ -355,7 +350,15 @@ def test_bad_URL(self):
355350
bundle = self._make_minimal_bundle(parsed=True)
356351
bundle.data_files[0].cloud_urls[0] = 'https://example.com'
357352
self.assertRaises(FileURLError, self.loader._load_bundle, *bundle, 0)
358-
bundle.data_files[0].cloud_urls[0] = 's3://definatelynotavalidbucketorfile'
359-
self.assertRaises(FileURLError, self.loader._load_bundle, *bundle, 1)
360-
bundle.data_files[0].cloud_urls[0] = 'gs://definatelynotavalidbucketorfile'
361-
self.assertRaises(FileURLError, self.loader._load_bundle, *bundle, 2)
353+
354+
bundle = self._make_minimal_bundle(parsed=True)
355+
bundle.data_files[0].cloud_urls[0] = 's3://definatelynotavalidbucketor/file'
356+
with self.assertWarnsRegex(CloudUrlNotFound, 'Could not find "s3://definatelynotavalidbucketor/file"'):
357+
warnings.simplefilter('always', 'CloudUrlAccessWarning', append=True)
358+
self.loader._load_bundle(*bundle, 1)
359+
360+
bundle = self._make_minimal_bundle(parsed=True)
361+
bundle.data_files[0].cloud_urls[0] = 'gs://definatelynotavalidbucketor/file'
362+
with self.assertWarnsRegex(CloudUrlNotFound, 'Could not find "gs://definatelynotavalidbucketor/file"'):
363+
warnings.simplefilter('always', 'CloudUrlAccessWarning', append=True)
364+
self.loader._load_bundle(*bundle, 2)

0 commit comments

Comments
 (0)