Skip to content

Commit 8503792

Browse files
committed
[H] Backport mirroring hotfixes (#7130) (#7122, #7123, PR #7124) (#7109, PR #7116)
2 parents 27d1b17 + bb53ea2 commit 8503792

File tree

18 files changed

+454
-266
lines changed

18 files changed

+454
-266
lines changed

deployments/prod/environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1720,5 +1720,9 @@ def env() -> Mapping[str, Optional[str]]:
17201720

17211721
'AZUL_ENABLE_VERBATIM_RELATIONS': '0',
17221722

1723-
'AZUL_MIRROR_BUCKET': 'humancellatlas'
1723+
'AZUL_ENABLE_MIRRORING': '1',
1724+
1725+
'AZUL_MIRROR_BUCKET': 'humancellatlas',
1726+
1727+
'AZUL_MIRRORING_CONCURRENCY': '128'
17241728
}

lambdas/indexer/.chalice/config.json.template.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
'environment_variables': config.lambda_env,
2828
'lambda_timeout': config.api_gateway_lambda_timeout,
2929
'lambda_memory_size': 128,
30-
**chalice.vpc_lambda_config(app_name),
3130
'stages': {
3231
config.deployment_stage: {
3332
**chalice.private_api_stage_config(app_name),
@@ -37,36 +36,45 @@
3736
'reserved_concurrency': config.contribution_concurrency(retry=False),
3837
'lambda_memory_size': 256,
3938
'lambda_timeout': config.contribution_lambda_timeout(retry=False),
39+
**chalice.vpc_lambda_config(app_name)
4040
},
4141
indexer.contribute_retry.name: {
4242
'reserved_concurrency': config.contribution_concurrency(retry=True),
4343
'lambda_memory_size': 4096, # FIXME https://github.com/DataBiosphere/azul/issues/2902
44-
'lambda_timeout': config.contribution_lambda_timeout(retry=True)
44+
'lambda_timeout': config.contribution_lambda_timeout(retry=True),
45+
**chalice.vpc_lambda_config(app_name)
4546
},
4647
indexer.aggregate.name: {
4748
'reserved_concurrency': config.aggregation_concurrency(retry=False),
4849
'lambda_memory_size': 256,
49-
'lambda_timeout': config.aggregation_lambda_timeout(retry=False)
50+
'lambda_timeout': config.aggregation_lambda_timeout(retry=False),
51+
**chalice.vpc_lambda_config(app_name)
5052
},
5153
indexer.aggregate_retry.name: {
5254
'reserved_concurrency': config.aggregation_concurrency(retry=True),
5355
'lambda_memory_size': 6500,
54-
'lambda_timeout': config.aggregation_lambda_timeout(retry=True)
56+
'lambda_timeout': config.aggregation_lambda_timeout(retry=True),
57+
**chalice.vpc_lambda_config(app_name)
5558
},
59+
indexer.forward_alb_logs.name: chalice.vpc_lambda_config(app_name),
60+
indexer.forward_s3_logs.name: chalice.vpc_lambda_config(app_name),
5661
**(
5762
{
5863
indexer.mirror.name: {
5964
'reserved_concurrency': config.mirroring_concurrency,
6065
'lambda_memory_size': 512,
6166
'lambda_timeout': config.mirror_lambda_timeout
67+
# No VPC for this function so as to avoid paying for
68+
# NAT Gateway traffic
6269
},
6370
}
6471
if config.enable_mirroring else
6572
{}
6673
),
6774
indexer.update_health_cache.name: {
6875
'lambda_memory_size': 128,
69-
'lambda_timeout': config.health_cache_lambda_timeout
76+
'lambda_timeout': config.health_cache_lambda_timeout,
77+
**chalice.vpc_lambda_config(app_name)
7078
}
7179
}
7280
}

scripts/mirror.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
def mirror_catalog(catalog: CatalogName, source_globs: set[str], wait: bool):
2929
azul = AzulClient()
3030
plugin = azul.repository_plugin(catalog)
31-
assert azul.is_queue_empty(config.mirror_queue.name), R(
32-
'A mirroring operation is already in progress. The current operation '
33-
'must finish before another can begin.')
3431
fail_queue = config.mirror_queue.to_fail.name
3532
assert azul.is_queue_empty(fail_queue), R(
3633
'Cannot begin mirroring because a previous operation failed: '

src/azul/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1807,7 +1807,7 @@ def enable_mirroring(self) -> bool:
18071807
return self._boolean(self.environ['AZUL_ENABLE_MIRRORING'])
18081808

18091809
@property
1810-
def external_mirror_bucket(self) -> str | None:
1810+
def mirror_bucket(self) -> str | None:
18111811
return self.environ.get('AZUL_MIRROR_BUCKET')
18121812

18131813

src/azul/deployment.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
)
9393
from mypy_boto3_s3 import (
9494
S3Client,
95+
S3ServiceResource,
9596
)
9697
from mypy_boto3_secretsmanager import (
9798
SecretsManagerClient,
@@ -200,6 +201,10 @@ def region_name(self):
200201
def s3(self) -> 'S3Client':
201202
return self.client('s3', azul_logging=True)
202203

204+
@property
205+
def s3_resource(self) -> 'S3ServiceResource':
206+
return self.resource('s3', azul_logging=True)
207+
203208
@property
204209
def securityhub(self) -> 'SecurityHubClient':
205210
return self.client('securityhub')
@@ -734,7 +739,7 @@ def monitoring_topic_name(self):
734739

735740
@property
736741
def sqs_resource(self) -> 'SQSServiceResource':
737-
return self.resource('sqs')
742+
return self.resource('sqs', azul_logging=config.is_in_lambda)
738743

739744
@_cache
740745
def sqs_queue(self, queue_name: str) -> 'Queue':

src/azul/health.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def queues(self):
214214
"""
215215
Returns information about the SQS queues used by the indexer.
216216
"""
217-
sqs = aws.resource('sqs', azul_logging=True)
217+
sqs = aws.sqs_resource
218218
response: MutableJSON = {'up': True}
219219
for queue in config.all_queue_names:
220220
try:

src/azul/indexer/lambda_iam_policy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@
127127
],
128128
'Resource': [
129129
f'arn:aws:s3:::{resource}'
130-
for bucket in alist(aws.mirror_bucket, config.external_mirror_bucket)
130+
for bucket in alist(aws.mirror_bucket, config.mirror_bucket)
131131
for resource in [bucket, f'{bucket}/*']
132132
]
133133
}

src/azul/indexer/mirror_controller.py

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
log = logging.getLogger(__name__)
5858

5959

60-
@attrs.frozen(auto_attribs=True, kw_only=True)
60+
@attrs.frozen(kw_only=True)
6161
class MirrorController(ActionController[MirrorAction]):
6262
schema_url_func: SchemaUrlFunc
6363

@@ -107,31 +107,39 @@ def mirror_source(self, catalog: CatalogName, source_json: JSON):
107107
plugin = self.repository_plugin(catalog)
108108
source = plugin.source_ref_cls.from_json(source_json)
109109
source = plugin.partition_source_for_mirroring(catalog, source)
110+
prefix = source.spec.prefix
111+
log.info('Queueing %d partitions of source %r in catalog %r',
112+
prefix.num_partitions, str(source.spec), catalog)
110113

111-
def message(prefix: str) -> SQSMessage:
112-
log.info('Mirroring files in partition %r of source %r from catalog %r',
113-
prefix, str(source.spec), catalog)
114-
return self.mirror_partition_message(catalog, source, prefix)
114+
def message(partition: str) -> SQSMessage:
115+
log.debug('Queueing partition %r', partition)
116+
return self.mirror_partition_message(catalog, source, partition)
115117

116-
messages = map(message, source.spec.prefix.partition_prefixes())
118+
messages = map(message, prefix.partition_prefixes())
117119
self.client.queue_mirror_messages(messages)
118120

119-
def mirror_partition(self, catalog: CatalogName, source_json: JSON, prefix: str):
121+
def mirror_partition(self,
122+
catalog: CatalogName,
123+
source_json: JSON,
124+
prefix: str
125+
):
120126
plugin = self.repository_plugin(catalog)
121127
source = plugin.source_ref_cls.from_json(source_json)
122128
already_mirrored = self.service.list_info_objects(catalog, prefix)
129+
files = plugin.list_files(source, prefix)
123130

124131
def messages() -> Iterable[SQSMessage]:
125-
for file in plugin.list_files(source, prefix):
132+
for file in files:
126133
info_key = self.service.info_object_key(file)
127134
if info_key in already_mirrored:
128-
log.info('Not mirroring file %r because info object already exists at %r',
129-
file.uuid, info_key)
135+
log.debug('Not queueing file %r because info object already exists', file)
130136
else:
131-
log.info('Mirroring file %r', file.uuid)
137+
log.debug('Queueing file %r', file)
132138
yield self.mirror_file_message(catalog, source, file)
133139

134-
self.client.queue_mirror_messages(messages())
140+
message_count = self.client.queue_mirror_messages(messages())
141+
log.info('Queued %d/%d files in partition %r of source %r in catalog %r',
142+
message_count, len(files), prefix, str(source), catalog)
135143

136144
def mirror_file(self,
137145
catalog: CatalogName,
@@ -145,26 +153,36 @@ def mirror_file(self,
145153
and not config.deployment.is_unit_test
146154
and catalog not in config.integration_test_catalogs)
147155
if file_is_large and not deployment_is_stable:
148-
log.info('Not mirroring file %r (%d bytes) to save cost',
149-
file.uuid, file.size)
156+
log.info('Not mirroring file to save cost: %r', file)
150157
else:
151158
# Ensure we test with multiple parts on lower deployments
152159
part_size = FilePart.default_size if deployment_is_stable else FilePart.min_size
153160
if file.size <= part_size:
154-
log.info('Mirroring file %r via standard upload', file.uuid)
161+
log.info('Mirroring file via standard upload: %r', file)
155162
self.service.mirror_file(catalog, file)
156-
log.info('Successfully mirrored file %r via standard upload', file.uuid)
163+
log.info('Successfully mirrored file via standard upload: %r', file)
157164
else:
158-
log.info('Mirroring file %r via multi-part upload', file.uuid)
165+
log.info('Mirroring file via multi-part upload: %r', file)
159166
_, digest_type = file.digest()
160167
hasher = get_resumable_hasher(digest_type)
161168
upload_id = self.service.begin_mirroring_file(catalog, file)
162169
first_part = FilePart.first(file, part_size)
163-
etag = self.service.mirror_file_part(catalog, file, first_part, upload_id, hasher)
170+
log.info('Uploading part #%d of file %r', first_part.index, file)
171+
etag = self.service.mirror_file_part(catalog,
172+
file,
173+
first_part,
174+
upload_id,
175+
hasher)
164176
next_part = first_part.next(file)
165177
assert next_part is not None
166-
messages = [self.mirror_part_message(catalog, file, next_part, upload_id, [etag], hasher)]
167-
self.client.queue_mirror_messages(messages)
178+
log.info('Queueing part #%d of file %r', next_part.index, file)
179+
message = self.mirror_part_message(catalog,
180+
file,
181+
next_part,
182+
upload_id,
183+
[etag],
184+
hasher)
185+
self.client.queue_mirror_messages([message])
168186

169187
def mirror_file_part(self,
170188
catalog: CatalogName,
@@ -177,17 +195,19 @@ def mirror_file_part(self,
177195
file = self.load_file(catalog, file_json)
178196
part = FilePart.from_json(part_json)
179197
hasher = hasher_from_str(hasher_data)
198+
log.info('Uploading part #%d of file %r', part.index, file)
180199
etag = self.service.mirror_file_part(catalog, file, part, upload_id, hasher)
181200
etags = [*etags, etag]
182201
next_part = part.next(file)
183202
if next_part is None:
184-
log.info('File %r fully uploaded in %d parts', file.uuid, len(etags))
203+
log.info('File fully uploaded in %d parts: %r', len(etags), file)
185204
message = self.finalize_file_message(catalog,
186205
file,
187206
upload_id,
188207
etags,
189208
hasher)
190209
else:
210+
log.info('Queueing part #%d of file %r', next_part.index, file)
191211
message = self.mirror_part_message(catalog,
192212
file,
193213
next_part,
@@ -211,7 +231,7 @@ def finalize_file(self,
211231
upload_id=upload_id,
212232
etags=etags,
213233
hasher=hasher)
214-
log.info('Successfully mirrored file %r via multi-part upload', file.uuid)
234+
log.info('Successfully mirrored file via multi-part upload: %r', file)
215235

216236
def load_file(self, catalog: CatalogName, file: JSON) -> File:
217237
return self.client.metadata_plugin(catalog).file_class.from_json(file)

0 commit comments

Comments
 (0)