Skip to content

Commit 5758a7d

Browse files
committed
[sdlf-stage-A/B] simplify and update with datalakeLibrary
1 parent 94017dc commit 5758a7d

File tree

15 files changed

+73
-541
lines changed

15 files changed

+73
-541
lines changed
Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import json
22

3+
from datalake_library import DataLakeClient
34
from datalake_library.commons import init_logger
4-
from datalake_library.configuration.resource_configs import SQSConfiguration
5-
from datalake_library.interfaces.sqs_interface import SQSInterface
65

76
logger = init_logger(__name__)
87

@@ -11,11 +10,11 @@ def lambda_handler(event, context):
1110
try:
1211
if isinstance(event, str):
1312
event = json.loads(event)
14-
sqs_config = SQSConfiguration(event["team"], event["pipeline"], event["pipeline_stage"])
15-
sqs_interface = SQSInterface(sqs_config.get_stage_dlq_name)
13+
14+
client = DataLakeClient(team=event["team"], pipeline=event["pipeline"], stage=event["pipeline_stage"])
1615

1716
logger.info("Execution Failed. Sending original payload to DLQ")
18-
sqs_interface.send_message_to_fifo_queue(json.dumps(event), "failed")
17+
client.sqs.send_message_to_fifo_queue(json.dumps(event), "failed", client.sqs.stage_dlq_url)
1918
except Exception as e:
2019
logger.error("Fatal error", exc_info=True)
2120
raise e

sdlf-stageA/lambda/stage-a-postupdate-metadata/src/lambda_function.py

Lines changed: 0 additions & 73 deletions
This file was deleted.

sdlf-stageA/lambda/stage-a-preupdate-metadata/src/lambda_function.py

Lines changed: 0 additions & 76 deletions
This file was deleted.
Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,22 @@
11
import json
22
from pathlib import PurePath
33

4-
from datalake_library import octagon
4+
from datalake_library import DataLakeClient
55
from datalake_library.commons import init_logger
6-
from datalake_library.configuration.resource_configs import (
7-
KMSConfiguration,
8-
S3Configuration,
9-
)
10-
from datalake_library.interfaces.s3_interface import S3Interface
11-
from datalake_library.octagon import peh
126

137
logger = init_logger(__name__)
148

159

1610
def transform_object(bucket, key, team, dataset):
17-
s3_interface = S3Interface()
11+
# Initialize data lake client with team/dataset/stage parameters
12+
client = DataLakeClient(team=team, dataset=dataset, stage="a")
13+
1814
# IMPORTANT: Stage bucket where transformed data must be uploaded
19-
stage_bucket = S3Configuration().stage_bucket
15+
stage_bucket = client.s3.stage_bucket
2016
# Download S3 object locally to /tmp directory
21-
# The s3_helper.download_object method
22-
# returns the local path where the file was saved
23-
local_path = s3_interface.download_object(bucket, key)
17+
local_path = client.s3.download_object(bucket, key)
2418

25-
# Apply business business logic:
19+
# Apply business logic:
2620
# Below example is opening a JSON file and
2721
# extracting fields, then saving the file
2822
# locally and re-uploading to Stage bucket
@@ -34,7 +28,6 @@ def parse(json_data):
3428
if type(d[k]) in [dict, list]:
3529
o.pop(k)
3630
l.append(o)
37-
3831
return l
3932

4033
# Reading file locally
@@ -52,8 +45,9 @@ def parse(json_data):
5245
# IMPORTANT: Build the output s3_path without the s3://stage-bucket/
5346
s3_path = f"pre-stage/{team}/{dataset}/{PurePath(output_path).name}"
5447
# IMPORTANT: Notice "stage_bucket" not "bucket"
55-
kms_key = KMSConfiguration(team).get_kms_arn
56-
s3_interface.upload_object(output_path, stage_bucket, s3_path, kms_key=kms_key)
48+
# you can select kms_key = client.kms.data_kms_key => to use the datalake domain data key
49+
# or use the particular team kms_key = client.kms.team_data_kms_key
50+
client.s3.upload_object(output_path, stage_bucket, s3_path, kms_key=client.kms.team_data_kms_key)
5751
# IMPORTANT S3 path(s) must be stored in a list
5852
processed_keys = [s3_path]
5953

@@ -79,30 +73,17 @@ def lambda_handler(event, context):
7973
"""
8074
try:
8175
logger.info("Fetching event data from previous step")
82-
bucket = event["body"]["bucket"]
83-
key = event["body"]["key"]
84-
team = event["body"]["team"]
85-
stage = event["body"]["pipeline_stage"]
86-
dataset = event["body"]["dataset"]
87-
88-
logger.info("Initializing Octagon client")
89-
component = context.function_name.split("-")[-2].title()
90-
octagon_client = (
91-
octagon.OctagonClient().with_run_lambda(True).with_configuration_instance(event["body"]["env"]).build()
92-
)
93-
peh.PipelineExecutionHistoryAPI(octagon_client).retrieve_pipeline_execution(event["body"]["peh_id"])
94-
95-
# Call custom transform created by user and process the file
76+
bucket = event["bucket"]
77+
key = event["key"]
78+
team = event["team"]
79+
dataset = event["dataset"]
80+
9681
logger.info("Calling user custom processing code")
97-
event["body"]["processedKeys"] = transform_object(bucket, key, team, dataset)
98-
octagon_client.update_pipeline_execution(
99-
status="{} {} Processing".format(stage, component), component=component
100-
)
82+
event["processedKeys"] = transform_object(bucket, key, team, dataset)
83+
logger.info("Successfully processed object")
84+
10185
except Exception as e:
10286
logger.error("Fatal error", exc_info=True)
103-
octagon_client.end_pipeline_execution_failed(
104-
component=component,
105-
issue_comment="{} {} Error: {}".format(stage, component, repr(e)),
106-
)
10787
raise e
88+
10889
return event

sdlf-stageA/lambda/stage-a-redrive/src/lambda_function.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,23 @@
11
import os
22

3+
from datalake_library import DataLakeClient
34
from datalake_library.commons import init_logger
4-
from datalake_library.configuration.resource_configs import SQSConfiguration
5-
from datalake_library.interfaces.sqs_interface import SQSInterface
65

76
logger = init_logger(__name__)
87

98

109
def lambda_handler(event, context):
1110
try:
12-
sqs_config = SQSConfiguration(os.environ["TEAM"], os.environ["PIPELINE"], os.environ["STAGE"])
13-
dlq_interface = SQSInterface(sqs_config.get_stage_dlq_name)
14-
messages = dlq_interface.receive_messages(1)
11+
client = DataLakeClient(team=os.environ["TEAM"], pipeline=os.environ["PIPELINE"], stage=os.environ["STAGE"])
12+
13+
messages = client.sqs.receive_messages(1, client.sqs.stage_dlq_url)
1514
if not messages:
16-
logger.info("No messages found in {}".format(sqs_config.get_stage_dlq_name))
15+
logger.info("No messages found in DLQ")
1716
return
1817

1918
logger.info("Received {} messages".format(len(messages)))
20-
queue_interface = SQSInterface(sqs_config.get_stage_queue_name)
2119
for message in messages:
22-
queue_interface.send_message_to_fifo_queue(message["Body"], "redrive")
20+
client.sqs.send_message_to_fifo_queue(message["Body"], "redrive", client.sqs.stage_queue_url)
2321
logger.info("Redrive message succeeded")
2422
except Exception as e:
2523
logger.error("Fatal error", exc_info=True)

sdlf-stageA/lambda/stage-a-routing/src/lambda_function.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import json
22
import os
33

4+
from datalake_library import DataLakeClient
45
from datalake_library.commons import init_logger
5-
from datalake_library.configuration.resource_configs import StateMachineConfiguration
6-
from datalake_library.interfaces.states_interface import StatesInterface
76

87
logger = init_logger(__name__)
98

@@ -35,10 +34,8 @@ def lambda_handler(event, context):
3534
"env": env,
3635
}
3736

38-
state_config = StateMachineConfiguration(team, pipeline, pipeline_stage)
39-
StatesInterface().run_state_machine(
40-
state_config.get_stage_state_machine_arn, json.dumps(event_with_pipeline_details)
41-
)
37+
client = DataLakeClient(team=team, pipeline=pipeline, stage=pipeline_stage)
38+
client.states.run_state_machine(client.states.state_machine_arn, event_with_pipeline_details)
4239
except Exception as e:
4340
logger.error("Fatal error", exc_info=True)
4441
raise e

sdlf-stageA/state-machine/stage-a.asl.json

Lines changed: 2 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -6,61 +6,15 @@
66
"Type": "Parallel",
77
"Branches": [
88
{
9-
"StartAt": "Pre-update Catalog",
9+
"StartAt": "Execute Light Transformation",
1010
"States": {
11-
"Pre-update Catalog": {
12-
"Type": "Task",
13-
"Resource": "arn:aws:states:::lambda:invoke",
14-
"OutputPath": "$.Payload",
15-
"Parameters": {
16-
"Payload.$": "$",
17-
"FunctionName": "${lStep1}:$LATEST"
18-
},
19-
"Retry": [
20-
{
21-
"ErrorEquals": [
22-
"Lambda.ServiceException",
23-
"Lambda.AWSLambdaException",
24-
"Lambda.SdkClientException",
25-
"Lambda.TooManyRequestsException"
26-
],
27-
"IntervalSeconds": 2,
28-
"MaxAttempts": 6,
29-
"BackoffRate": 2
30-
}
31-
],
32-
"Next": "Execute Light Transformation"
33-
},
3411
"Execute Light Transformation": {
3512
"Type": "Task",
3613
"Resource": "arn:aws:states:::lambda:invoke",
3714
"OutputPath": "$.Payload",
3815
"Parameters": {
3916
"Payload.$": "$",
40-
"FunctionName.$": "$.body.lambda.lambda_arn"
41-
},
42-
"Retry": [
43-
{
44-
"ErrorEquals": [
45-
"Lambda.ServiceException",
46-
"Lambda.AWSLambdaException",
47-
"Lambda.SdkClientException",
48-
"Lambda.TooManyRequestsException"
49-
],
50-
"IntervalSeconds": 2,
51-
"MaxAttempts": 6,
52-
"BackoffRate": 2
53-
}
54-
],
55-
"Next": "Post-update Catalog"
56-
},
57-
"Post-update Catalog": {
58-
"Type": "Task",
59-
"Resource": "arn:aws:states:::lambda:invoke",
60-
"ResultPath": null,
61-
"Parameters": {
62-
"Payload.$": "$",
63-
"FunctionName": "${lStep3}:$LATEST"
17+
"FunctionName": "${lStep2}:$LATEST"
6418
},
6519
"Retry": [
6620
{

0 commit comments

Comments
 (0)