-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathdatajob_stack.py
101 lines (86 loc) · 2.96 KB
/
datajob_stack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import boto3
import sagemaker
from aws_cdk import core
from sagemaker import image_uris
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.sagemaker import get_default_sagemaker_role
from datajob.sagemaker.sagemaker_job import EndpointConfigStep
from datajob.sagemaker.sagemaker_job import EndpointStep
from datajob.sagemaker.sagemaker_job import ModelStep
from datajob.sagemaker.sagemaker_job import TrainingStep
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow
app = core.App()
with DataJobStack(scope=app, id="datajob-ml-pipeline-abalone") as djs:
sagemaker_default_role = get_default_sagemaker_role(datajob_stack=djs)
train_path = f"s3://{djs.context.data_bucket_name}/train/abalone.train"
validation_path = (
f"s3://{djs.context.data_bucket_name}/validation/abalone.validation"
)
test_path = f"s3://{djs.context.data_bucket_name}/test/abalone.test"
prepare_dataset_step = GlueJob(
datajob_stack=djs,
name="prepare-dataset",
job_path="jobs/prepare_dataset.py",
job_type="pythonshell",
max_capacity=1,
arguments={
"--train": train_path,
"--validation": validation_path,
"--test": test_path,
},
)
xgb = sagemaker.estimator.Estimator(
image_uris.retrieve("xgboost", djs.env.region, "1.2-1"),
role=sagemaker_default_role.role_arn,
train_instance_count=1,
train_instance_type="ml.m4.4xlarge",
train_volume_size=5,
output_path=f"s3://{djs.context.data_bucket_name}/single-xgboost",
)
xgb.set_hyperparameters(
objective="reg:linear",
num_round=50,
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
)
training_step = TrainingStep(
datajob_stack=djs,
name="train-model",
estimator=xgb,
data={
"train": sagemaker.TrainingInput(train_path, content_type="text/libsvm"),
"validation": sagemaker.TrainingInput(
validation_path, content_type="text/libsvm"
),
},
)
model_step = ModelStep(
datajob_stack=djs,
name="create-sagemaker-model",
model=training_step.sfn_task.get_expected_model(),
)
endpoint_config_step = EndpointConfigStep(
datajob_stack=djs,
name="create-sagemaker-endpoint-config",
model_name=model_step.model_name,
initial_instance_count=1,
instance_type="ml.m5.large",
)
endpoint_step = EndpointStep(
datajob_stack=djs,
name="create-endpoint",
endpoint_config_name=endpoint_config_step.endpoint_config_name,
)
with StepfunctionsWorkflow(djs, "workflow") as sfn_workflow:
(
prepare_dataset_step
>> training_step
>> model_step
>> endpoint_config_step
>> endpoint_step
)
app.synth()