Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ def get_all_step_functions(self):
step_functions.extend(result["stateMachines"])
return step_functions

def run_state_machine(self, machine_arn, message):
def run_state_machine(self, machine_arn, message, execution_name=None):
self._logger.info("running state machine with arn {}".format(machine_arn))
return self._states_client.start_execution(
stateMachineArn=machine_arn, input=json.dumps(message, default=self.json_serial)
stateMachineArn=machine_arn, input=json.dumps(message, default=self.json_serial), name=execution_name
)

def describe_state_execution(self, execution_arn):
Expand Down
10 changes: 7 additions & 3 deletions sdlf-pipeline/src/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ Conditions:
Resources:
rDeadLetterQueueRoutingStep:
Type: AWS::SQS::Queue
Condition: HasSourceEvents
DeletionPolicy: Delete
UpdateReplacePolicy: Delete
Properties:
QueueName: !Sub sdlf-${pDeploymentInstance}-dlq.fifo
FifoQueue: True
MessageRetentionPeriod: 1209600
ContentBasedDeduplication: True
VisibilityTimeout: 60
MessageRetentionPeriod: 1209600
KmsMasterKeyId: !If [FetchFromDatasetSsm, !Sub "{{resolve:ssm:/sdlf/dataset/rKMSInfraKey/${pDatasetDeploymentInstance}}}", !Ref pInfraKmsKey]

rDeadLetterQueueRoutingStepSsm:
Expand Down Expand Up @@ -220,4 +220,8 @@ Resources:
Outputs:
oPipelineReference:
Description: CodePipeline reference this stack has been deployed with
Value: !Ref pPipelineReference
Value: !Ref pPipelineReference

oDlqUrl:
Description: Dead-letter queue URL
Value: !GetAtt rDeadLetterQueueRoutingStep.QueueUrl
24 changes: 0 additions & 24 deletions sdlf-stage-glue/src/lambda/error/src/lambda_function.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
from datalake_library.commons import init_logger
from datalake_library.sdlf import PipelineExecutionHistoryAPI


class MapPartialFailureException(Exception):
pass


logger = init_logger(__name__)
deployment_instance = os.environ["DEPLOYMENT_INSTANCE"]
peh_table_instance = os.environ["DATASET_DEPLOYMENT_INSTANCE"]
Expand All @@ -28,23 +33,30 @@ def lambda_handler(event, context):
peh_table_instance=peh_table_instance,
manifests_table_instance=manifests_table_instance,
)
peh_id = event[0]["Items"][0]["transform"]["peh_id"]
peh_id = event["peh_id"]
pipeline_execution.retrieve_pipeline_execution(peh_id)

partial_failure = False
# for records in event:
# for record in records:
# if "processed" not in record or not record["processed"]:
# partial_failure = True
any_failure = False
for record in event["map_output"]:
if not record:
any_failure = True
break

if not partial_failure:
if not any_failure:
pipeline_execution.update_pipeline_execution(
status=f"{deployment_instance} {component} Processing", component=component
)
pipeline_execution.end_pipeline_execution_success()
else:
raise Exception("Failure: Processing failed for one or more record")
raise MapPartialFailureException("Failure: Processing failed for one or more record")

except MapPartialFailureException:
# this exception is caught in the state machine
pipeline_execution.end_pipeline_execution_failed(
component=component,
issue_comment=f"{deployment_instance} {component} Processing failed for one or more record",
)
raise
except Exception as e:
logger.error("Fatal error", exc_info=True)
pipeline_execution.end_pipeline_execution_failed(
Expand Down
4 changes: 2 additions & 2 deletions sdlf-stage-glue/src/lambda/routing/src/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def get_source_records(event):
logger.info("Stage trigger: event-schedule")
min_items_to_process = 1
max_items_to_process = 100
logger.info(f"Pipeline is {deployment_instance}")
logger.info(f"Pipeline stage is {deployment_instance}")
logger.info(
f"Pipeline stage configuration: min_items_to_process {min_items_to_process}, max_items_to_process {max_items_to_process}"
)
Expand Down Expand Up @@ -108,7 +108,7 @@ def lambda_handler(event, context):
logger.info(f"Starting State Machine Execution (processing {len(records)} source events)")
state_config = StateMachineConfiguration(instance=deployment_instance)
StatesInterface().run_state_machine(
state_config.stage_state_machine, json.dumps(records, default=serializer)
state_config.stage_state_machine, json.dumps(records, default=serializer), execution_name=peh_id
)
pipeline_execution.update_pipeline_execution(
status=f"{deployment_instance} Transform Processing", component="Transform"
Expand Down
75 changes: 12 additions & 63 deletions sdlf-stage-glue/src/stageglue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ Resources:
Action:
- states:StartExecution
Resource:
- !Ref rStateMachine
- !Sub arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:sdlf-${pDeploymentInstance}-sm # not a ref to avoid circular dependency
- Effect: Allow
Action:
- sqs:DeleteMessage
Expand Down Expand Up @@ -300,43 +300,6 @@ Resources:
Service: lambda.amazonaws.com
Action: sts:AssumeRole

# Error Handling Lambda Role
rRoleLambdaExecutionErrorStep:
Type: AWS::IAM::Role
Properties:
Path: !Sub /sdlf-${pDeploymentInstance}/
# PermissionsBoundary: !Sub "{{resolve:ssm:/SDLF/IAM/${pDataset}/TeamPermissionsBoundary}}"
ManagedPolicyArns:
- !Ref rLambdaCommonPolicy
- !If
- RunInVpc
- !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
- !Ref "AWS::NoValue"
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: !Sub sdlf-${pDeploymentInstance}-error
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:GetQueueUrl
- sqs:ListQueues
- sqs:ListDeadLetterSourceQueues
- sqs:ListQueueTags
- sqs:ReceiveMessage
- sqs:SendMessage
Resource:
- !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pDeploymentInstance}-dlq.fifo

######## LAMBDA FUNCTIONS #########
rLambdaRoutingStep:
Type: AWS::Serverless::Function
Expand Down Expand Up @@ -386,21 +349,6 @@ Resources:
Timeout: 300
Role: !GetAtt rRoleLambdaExecutionMetadataStep.Arn

rLambdaErrorStep:
Type: AWS::Serverless::Function
Metadata:
cfn_nag:
rules_to_suppress:
- id: W58
reason: Permissions to write CloudWatch Logs are granted by rLambdaCommonPolicy
Properties:
CodeUri: ./lambda/error/src
FunctionName: !Sub sdlf-${pDeploymentInstance}-error
Description: Fallback lambda to handle messages which failed processing
MemorySize: 192
Timeout: 300
Role: !GetAtt rRoleLambdaExecutionErrorStep.Arn

######## CLOUDWATCH #########
rRoutingLambdaSsm:
Type: AWS::SSM::Parameter
Expand Down Expand Up @@ -437,15 +385,6 @@ Resources:
RetentionInDays: !Ref pCloudWatchLogsRetentionInDays
KmsKeyId: !If [FetchFromDatasetSsm, !Sub "{{resolve:ssm:/sdlf/dataset/rKMSInfraKey/${pDatasetDeploymentInstance}}}", !Ref pInfraKmsKey]

rLambdaErrorStepLogGroup:
Type: AWS::Logs::LogGroup
DeletionPolicy: Delete
UpdateReplacePolicy: Delete
Properties:
LogGroupName: !Sub /aws/lambda/${rLambdaErrorStep}
RetentionInDays: !Ref pCloudWatchLogsRetentionInDays
KmsKeyId: !If [FetchFromDatasetSsm, !Sub "{{resolve:ssm:/sdlf/dataset/rKMSInfraKey/${pDatasetDeploymentInstance}}}", !Ref pInfraKmsKey]

######## STATE MACHINE #########
rStatesExecutionRole:
Type: AWS::IAM::Role
Expand Down Expand Up @@ -489,6 +428,15 @@ Resources:
- glue:StartCrawler
- glue:GetCrawler
Resource: !Sub arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:crawler/sdlf-${pDatasetDeploymentInstance}-*
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pDeploymentInstance}-dlq.fifo
- Effect: Allow
Action:
- kms:GenerateDataKey*
Resource:
- !If [FetchFromDatasetSsm, !Sub "{{resolve:ssm:/sdlf/dataset/rKMSInfraKey/${pDatasetDeploymentInstance}}}", !Ref pInfraKmsKey]
- Effect: Allow
Action:
- xray:PutTraceSegments # W11 exception
Expand Down Expand Up @@ -521,7 +469,6 @@ Resources:
DefinitionUri: ./state-machine/stage-glue.asl.json
DefinitionSubstitutions:
lPostMetadata: !GetAtt rLambdaPostMetadataStep.Arn
lError: !GetAtt rLambdaErrorStep.Arn
lMaxItemsPerBatch: !Ref pMaxItemsPerBatch
lTransform: !Ref pGlueJobName
lWorkerType: !Ref pGlueWorkerType
Expand All @@ -530,6 +477,8 @@ Resources:
# lArguments:
lCrawlerName: !If [FetchFromDatasetSsm, !Sub "{{resolve:ssm:/sdlf/dataset/rAnalyticsGlueCrawler/${pDatasetDeploymentInstance}}}", !Ref pGlueCrawler]
lWaitTime: 75
lDlqUrl: !GetAtt rPipelineInterface.Outputs.oDlqUrl
lDeploymentInstance: !Ref pDeploymentInstance
Role: !GetAtt rStatesExecutionRole.Arn
Tracing:
Enabled: !If [EnableTracing, true, false]
Expand Down
Loading