Skip to content

Commit 6b81795

Browse files
authored
Align S3 prefixes with APHL eCR pipeline (#419)
## Summary - Update 5 S3 prefix defaults to match APHL AIMS eCR pipeline conventions (`ValidationResponseV2/`, `TextToCodeSubmissionV2/`, `TTCAugmentationMetadataV2/`, `TTCMetadataV2/`, `AugmentationMetadataV2/`) - Add source-bucket routing to TTC Lambda: extracts bucket name from S3 event with fallback to `S3_BUCKET` env var, enabling the training pipeline to share the same Lambda Closes #374 ## Test plan - [x] All TTC Lambda tests pass (7/7) - [x] All Augmentation Lambda tests pass (7/7) - [x] Grep confirms no remaining references to old prefix strings - [ ] Verify Terraform validates in CI - [ ] Confirm prefix names with APHL ## Follow-up - #418: Refactor augmentation Lambda to read from S3 instead of SQS message body
1 parent 9d7a9ff commit 6b81795

File tree

4 files changed

+44
-32
lines changed

4 files changed

+44
-32
lines changed

packages/augmentation-lambda/src/augmentation_lambda/lambda_function.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# Environment variables
1717
S3_BUCKET = os.getenv("S3_BUCKET", "dibbs-text-to-code")
1818
AUGMENTED_EICR_PREFIX = os.getenv("AUGMENTED_EICR_PREFIX", "AugmentationEICRV2/")
19-
AUGMENTATION_METADATA_PREFIX = os.getenv("AUGMENTATION_METADATA_PREFIX", "AugmentationMetadata/")
19+
AUGMENTATION_METADATA_PREFIX = os.getenv("AUGMENTATION_METADATA_PREFIX", "AugmentationMetadataV2/")
2020

2121
# Cache S3 client to reuse across Lambda invocations
2222
_cached_s3_client: BaseClient | None = None

packages/text-to-code-lambda/src/text_to_code_lambda/lambda_function.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
# Environment variables
2828
S3_BUCKET = os.getenv("S3_BUCKET", "dibbs-text-to-code")
2929
EICR_INPUT_PREFIX = os.getenv("EICR_INPUT_PREFIX", "eCRMessageV2/")
30-
SCHEMATRON_ERROR_PREFIX = os.getenv("SCHEMATRON_ERROR_PREFIX", "schematronErrors/")
31-
TTC_INPUT_PREFIX = os.getenv("TTC_INPUT_PREFIX", "TextToCodeValidateSubmissionV2/")
32-
TTC_OUTPUT_PREFIX = os.getenv("TTC_OUTPUT_PREFIX", "TTCOutput/")
33-
TTC_METADATA_PREFIX = os.getenv("TTC_METADATA_PREFIX", "TTCMetadata/")
30+
SCHEMATRON_ERROR_PREFIX = os.getenv("SCHEMATRON_ERROR_PREFIX", "ValidationResponseV2/")
31+
TTC_INPUT_PREFIX = os.getenv("TTC_INPUT_PREFIX", "TextToCodeSubmissionV2/")
32+
TTC_OUTPUT_PREFIX = os.getenv("TTC_OUTPUT_PREFIX", "TTCAugmentationMetadataV2/")
33+
TTC_METADATA_PREFIX = os.getenv("TTC_METADATA_PREFIX", "TTCMetadataV2/")
3434
AWS_REGION = os.getenv("AWS_REGION")
3535
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL")
3636
OPENSEARCH_ENDPOINT_URL = os.getenv("OPENSEARCH_ENDPOINT_URL")
@@ -120,7 +120,8 @@ def process_record(record: SQSRecord, s3_client: BaseClient, opensearch_client:
120120
# Parse the EventBridge S3 event from the SQS message body
121121
eventbridge_data = lambda_handler.get_eventbridge_data_from_s3_event(s3_event)
122122
object_key = eventbridge_data["object_key"]
123-
logger.info(f"Processing S3 Object: s3://{S3_BUCKET}/{object_key}")
123+
bucket_name = eventbridge_data.get("bucket_name") or S3_BUCKET
124+
logger.info(f"Processing S3 Object: s3://{bucket_name}/{object_key}")
124125

125126
# Extract persistence_id from the RR object key
126127
persistence_id = lambda_handler.get_persistence_id(object_key, TTC_INPUT_PREFIX)
@@ -129,7 +130,7 @@ def process_record(record: SQSRecord, s3_client: BaseClient, opensearch_client:
129130
with logger.append_context_keys(
130131
persistence_id=persistence_id,
131132
):
132-
_process_record_pipeline(persistence_id, s3_client, opensearch_client)
133+
_process_record_pipeline(persistence_id, s3_client, opensearch_client, bucket_name)
133134

134135

135136
def _initialize_ttc_outputs(persistence_id: str) -> tuple[dict, dict]:
@@ -152,17 +153,20 @@ def _initialize_ttc_outputs(persistence_id: str) -> tuple[dict, dict]:
152153
return ttc_output, ttc_metadata_output
153154

154155

155-
def _load_schematron_data_fields(persistence_id: str, s3_client: BaseClient) -> list:
156+
def _load_schematron_data_fields(
157+
persistence_id: str, s3_client: BaseClient, bucket_name: str
158+
) -> list:
156159
"""Load Schematron errors from S3 and extract relevant fields.
157160
158161
:param persistence_id: The persistence ID extracted from the S3 object key
159162
:param s3_client: The S3 client to use for fetching files.
163+
:param bucket_name: The S3 bucket name to read from.
160164
:return: The relevant Schematron data fields for TTC processing.
161165
"""
162166
object_key = f"{SCHEMATRON_ERROR_PREFIX}{persistence_id}"
163-
logger.info("Loading Schematron errors", s3_key=f"s3://{S3_BUCKET}/{object_key}")
167+
logger.info("Loading Schematron errors", s3_key=f"s3://{bucket_name}/{object_key}")
164168
schematron_errors = lambda_handler.get_file_content_from_s3(
165-
bucket_name=S3_BUCKET,
169+
bucket_name=bucket_name,
166170
object_key=object_key,
167171
s3_client=s3_client,
168172
)
@@ -172,17 +176,18 @@ def _load_schematron_data_fields(persistence_id: str, s3_client: BaseClient) ->
172176
return schematron_processor.get_data_fields_from_schematron_error(schematron_errors)
173177

174178

175-
def _load_original_eicr(persistence_id: str, s3_client: BaseClient) -> str:
179+
def _load_original_eicr(persistence_id: str, s3_client: BaseClient, bucket_name: str) -> str:
176180
"""Load the original eICR from S3.
177181
178182
:param persistence_id: The persistence ID extracted from the S3 object key
179183
:param s3_client: The S3 client to use for fetching files.
184+
:param bucket_name: The S3 bucket name to read from.
180185
:return: The original eICR content.
181186
"""
182187
object_key = f"{EICR_INPUT_PREFIX}{persistence_id}"
183-
logger.info(f"Retrieving eICR from s3://{S3_BUCKET}/{object_key}")
188+
logger.info(f"Retrieving eICR from s3://{bucket_name}/{object_key}")
184189
original_eicr_content = lambda_handler.get_file_content_from_s3(
185-
bucket_name=S3_BUCKET, object_key=object_key, s3_client=s3_client
190+
bucket_name=bucket_name, object_key=object_key, s3_client=s3_client
186191
)
187192
logger.info(f"Retrieved eICR content for persistence_id {persistence_id}")
188193
return original_eicr_content
@@ -283,20 +288,25 @@ def _process_schematron_errors(
283288

284289

285290
def _save_ttc_outputs(
286-
persistence_id: str, ttc_output: dict, ttc_metadata_output: dict, s3_client: BaseClient
291+
persistence_id: str,
292+
ttc_output: dict,
293+
ttc_metadata_output: dict,
294+
s3_client: BaseClient,
295+
bucket_name: str,
287296
) -> None:
288297
"""Save TTC output and metadata output to S3.
289298
290299
:param persistence_id: The persistence ID extracted from the S3 object key
291300
:param ttc_output: The TTC output dictionary.
292301
:param ttc_metadata_output: The TTC metadata output dictionary.
293302
:param s3_client: The S3 client to use for uploading files.
303+
:param bucket_name: The S3 bucket name to write to.
294304
"""
295305
# Save the TTC output to S3 for the Augmentation Lambda to consume
296306
logger.info(f"Saving TTC output to S3 for persistence_id {persistence_id}")
297307
lambda_handler.put_file(
298308
file_obj=io.BytesIO(json.dumps(ttc_output, default=str).encode("utf-8")),
299-
bucket_name=S3_BUCKET,
309+
bucket_name=bucket_name,
300310
object_key=f"{TTC_OUTPUT_PREFIX}{persistence_id}",
301311
s3_client=s3_client,
302312
)
@@ -305,7 +315,7 @@ def _save_ttc_outputs(
305315
logger.info(f"Saving TTC metadata output to S3 for persistence_id {persistence_id}")
306316
lambda_handler.put_file(
307317
file_obj=io.BytesIO(json.dumps(ttc_metadata_output, default=str).encode("utf-8")),
308-
bucket_name=S3_BUCKET,
318+
bucket_name=bucket_name,
309319
object_key=f"{TTC_METADATA_PREFIX}{persistence_id}",
310320
s3_client=s3_client,
311321
)
@@ -315,6 +325,7 @@ def _process_record_pipeline(
315325
persistence_id: str,
316326
s3_client: BaseClient,
317327
opensearch_client: OpenSearch,
328+
bucket_name: str,
318329
) -> dict:
319330
"""The main pipeline for processing each record.
320331
@@ -333,11 +344,12 @@ def _process_record_pipeline(
333344
:param persistence_id: The persistence ID extracted from the S3 object key
334345
:param s3_client: The S3 client to use for S3 operations.
335346
:param opensearch_client: The OpenSearch client.
347+
:param bucket_name: The S3 bucket name extracted from the event, or the default.
336348
"""
337349
ttc_output, ttc_metadata_output = _initialize_ttc_outputs(persistence_id)
338350

339351
logger.info("Starting TTC processing")
340-
schematron_data_fields = _load_schematron_data_fields(persistence_id, s3_client)
352+
schematron_data_fields = _load_schematron_data_fields(persistence_id, s3_client, bucket_name)
341353

342354
if not schematron_data_fields:
343355
logger.warning(
@@ -348,13 +360,13 @@ def _process_record_pipeline(
348360
logger.info(f"Saving TTC metadata output to S3 for persistence_id {persistence_id}")
349361
lambda_handler.put_file(
350362
file_obj=io.BytesIO(json.dumps(ttc_metadata_output, default=str).encode("utf-8")),
351-
bucket_name=S3_BUCKET,
363+
bucket_name=bucket_name,
352364
object_key=f"{TTC_METADATA_PREFIX}{persistence_id}",
353365
s3_client=s3_client,
354366
)
355367
return ttc_output
356368

357-
original_eicr_content = _load_original_eicr(persistence_id, s3_client)
369+
original_eicr_content = _load_original_eicr(persistence_id, s3_client, bucket_name)
358370
_populate_eicr_metadata(original_eicr_content, ttc_output, ttc_metadata_output)
359371
_process_schematron_errors(
360372
original_eicr_content,
@@ -363,6 +375,6 @@ def _process_record_pipeline(
363375
ttc_output,
364376
ttc_metadata_output,
365377
)
366-
_save_ttc_outputs(persistence_id, ttc_output, ttc_metadata_output, s3_client)
378+
_save_ttc_outputs(persistence_id, ttc_output, ttc_metadata_output, s3_client, bucket_name)
367379

368380
return {"statusCode": 200, "message": "TTC processed successfully!"}

packages/text-to-code-lambda/tests/conftest.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313

1414
S3_BUCKET = "dibbs-text-to-code"
1515
EICR_INPUT_PREFIX = "eCRMessageV2/"
16-
SCHEMATRON_ERROR_PREFIX = "schematronErrors/"
17-
TTC_INPUT_PREFIX = "TextToCodeValidateSubmissionV2/"
18-
TTC_OUTPUT_PREFIX = "TTCOutput/"
19-
TTC_METADATA_PREFIX = "TTCMetadata/"
16+
SCHEMATRON_ERROR_PREFIX = "ValidationResponseV2/"
17+
TTC_INPUT_PREFIX = "TextToCodeSubmissionV2/"
18+
TTC_OUTPUT_PREFIX = "TTCAugmentationMetadataV2/"
19+
TTC_METADATA_PREFIX = "TTCMetadataV2/"
2020
AWS_REGION = "us-east-1"
2121
AWS_ACCESS_KEY_ID = "test_access_key_id"
2222
AWS_SECRET_ACCESS_KEY = "test_secret_access_key" # noqa: S105

terraform/_variables.tf

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,26 +113,26 @@ variable "eicr_input_prefix" {
113113

114114
variable "schematron_error_prefix" {
115115
type = string
116-
default = "schematronErrors/"
117-
description = "S3 prefix for schematron error files"
116+
default = "ValidationResponseV2/"
117+
description = "S3 prefix for schematron validation response files"
118118
}
119119

120120
variable "ttc_input_prefix" {
121121
type = string
122-
default = "TextToCodeValidateSubmissionV2/"
122+
default = "TextToCodeSubmissionV2/"
123123
description = "S3 prefix for TTC input submission files"
124124
}
125125

126126
variable "ttc_output_prefix" {
127127
type = string
128-
default = "TTCOutput/"
129-
description = "S3 prefix for TTC output files"
128+
default = "TTCAugmentationMetadataV2/"
129+
description = "S3 prefix for TTC augmentation metadata output files"
130130
}
131131

132132
variable "ttc_metadata_prefix" {
133133
type = string
134-
default = "TTCMetadata/"
135-
description = "S3 prefix for TTC metadata files"
134+
default = "TTCMetadataV2/"
135+
description = "S3 prefix for TTC analysis metadata files"
136136
}
137137

138138
variable "augmented_eicr_prefix" {
@@ -143,7 +143,7 @@ variable "augmented_eicr_prefix" {
143143

144144
variable "augmentation_metadata_prefix" {
145145
type = string
146-
default = "AugmentationMetadata/"
146+
default = "AugmentationMetadataV2/"
147147
description = "S3 prefix for augmentation metadata files"
148148
}
149149

0 commit comments

Comments
 (0)