|
| 1 | +import json |
| 2 | +from typing import TypedDict |
| 3 | + |
1 | 4 | from aws_lambda_typing import context as lambda_context |
2 | 5 | from aws_lambda_typing import events as lambda_events |
3 | 6 |
|
| 7 | +from augmentation.models import TTCAugmenterConfig |
| 8 | +from augmentation.models.application import TTCAugmenterOutput |
| 9 | +from augmentation.services.eicr_augmenter import EICRAugmenter |
| 10 | +from shared_models import TTCAugmenterInput |
| 11 | + |
| 12 | + |
| 13 | +class HandlerResponse(TypedDict): |
| 14 | + """Response from the AWS Lambda handler.""" |
| 15 | + |
| 16 | + results: list[dict[str, object]] |
| 17 | + batchItemFailures: list[dict[str, str]] |
| 18 | + |
| 19 | + |
| 20 | +def handler(event: lambda_events.SQSEvent, context: lambda_context.Context) -> HandlerResponse: |
| 21 | + """AWS Lambda handler for augmenting eICRs with nonstandard codes. |
| 22 | +
|
| 23 | + :param event: The SQS event containing messages with eICRs to augment. |
| 24 | + :param context: The AWS Lambda context object. |
| 25 | + :return: A dictionary containing the results of the augmentation and any batch item failures. |
| 26 | + """ |
| 27 | + results: list[dict[str, object]] = [] |
| 28 | + batch_item_failures: list[dict[str, str]] = [] |
| 29 | + |
| 30 | + for record in event["Records"]: |
| 31 | + message_id = record["messageId"] |
| 32 | + |
| 33 | + try: |
| 34 | + payload = json.loads(record["body"]) |
| 35 | + augmenter_input = TTCAugmenterInput.model_validate( |
| 36 | + { |
| 37 | + "eicr_id": payload["eicr_id"], |
| 38 | + "nonstandard_codes": payload["nonstandard_codes"], |
| 39 | + } |
| 40 | + ) |
| 41 | + |
| 42 | + eicr = payload["eicr"] |
| 43 | + |
| 44 | + # TODO: will need to determine config based on application code when there are multiple applications using the augmentation service. For now, since TTC is the only application, we can directly initialize the config as a TTC config. |
| 45 | + config = ( |
| 46 | + TTCAugmenterConfig.model_validate(payload["config"]) |
| 47 | + if "config" in payload |
| 48 | + else TTCAugmenterConfig() |
| 49 | + ) |
| 50 | + |
| 51 | + # TODO: in the future, when there are multiple applications using the augmentation service, we will need to determine which augmenter to use based on the application code in the config. For now, since TTC is the only application, we can directly initialize the EICRAugmenter. |
| 52 | + augmenter = EICRAugmenter( |
| 53 | + document=eicr, |
| 54 | + nonstandard_codes=augmenter_input.nonstandard_codes, |
| 55 | + config=config, |
| 56 | + ) |
| 57 | + |
| 58 | + metadata = augmenter.augment() |
| 59 | + |
| 60 | + # TODO: the output of the augmenter will likely need to be modified when there are multiple applications and augmenters, but for now we can directly create a TTC augmenter output. |
| 61 | + output = TTCAugmenterOutput( |
| 62 | + eicr_id=augmenter_input.eicr_id, |
| 63 | + augmented_eicr=augmenter.augmented_xml, |
| 64 | + metadata=metadata, |
| 65 | + ) |
| 66 | + |
| 67 | + results.append( |
| 68 | + { |
| 69 | + "messageId": message_id, |
| 70 | + "status": "success", |
| 71 | + "result": output.model_dump(), |
| 72 | + } |
| 73 | + ) |
| 74 | + except Exception as exc: |
| 75 | + batch_item_failures.append({"itemIdentifier": message_id}) |
| 76 | + results.append( |
| 77 | + { |
| 78 | + "messageId": message_id, |
| 79 | + "status": "error", |
| 80 | + "error": str(exc), |
| 81 | + } |
| 82 | + ) |
4 | 83 |
|
5 | | -def handler(event: lambda_events.SQSEvent, context: lambda_context.Context) -> dict: |
6 | | - """Augmentation lambda entry point.""" |
7 | | - return {"message": "DIBBS Augmentation!", "event": event} |
| 84 | + return { |
| 85 | + "results": results, |
| 86 | + "batchItemFailures": batch_item_failures, |
| 87 | + } |
0 commit comments