-
Notifications
You must be signed in to change notification settings - Fork 90
/
Copy pathcompute.py
200 lines (160 loc) · 14.1 KB
/
compute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import absolute_import
from enum import Enum
from stepfunctions.steps.states import Task
from stepfunctions.steps.fields import Field
from stepfunctions.steps.integration_resources import IntegrationPattern, get_service_integration_arn
LAMBDA_SERVICE_NAME = "lambda"
GLUE_SERVICE_NAME = "glue"
ECS_SERVICE_NAME = "ecs"
BATCH_SERVICE_NAME = "batch"
class LambdaApi(Enum):
Invoke = "invoke"
class GlueApi(Enum):
StartJobRun = "startJobRun"
class EcsApi(Enum):
RunTask = "runTask"
class BatchApi(Enum):
SubmitJob = "submitJob"
class LambdaStep(Task):
"""
Creates a Task state to invoke an AWS Lambda function. See `Invoke Lambda with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html>`_ for more details.
"""
def __init__(self, state_id, wait_for_callback=False, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_callback:
"""
Example resource arn: arn:aws:states:::lambda:invoke.waitForTaskToken
"""
kwargs[Field.Resource.value] = get_service_integration_arn(LAMBDA_SERVICE_NAME,
LambdaApi.Invoke,
IntegrationPattern.WaitForTaskToken)
else:
"""
Example resource arn: arn:aws:states:::lambda:invoke
"""
kwargs[Field.Resource.value] = get_service_integration_arn(LAMBDA_SERVICE_NAME, LambdaApi.Invoke)
super(LambdaStep, self).__init__(state_id, **kwargs)
class GlueStartJobRunStep(Task):
"""
Creates a Task state to run an AWS Glue job. See `Manage AWS Glue Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html>`_ for more details.
"""
def __init__(self, state_id, wait_for_completion=True, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the glue job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the glue job and proceed to the next step. (default: True)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_completion:
"""
Example resource arn: arn:aws:states:::glue:startJobRun.sync
"""
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_SERVICE_NAME,
GlueApi.StartJobRun,
IntegrationPattern.WaitForCompletion)
else:
"""
Example resource arn: arn:aws:states:::glue:startJobRun
"""
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_SERVICE_NAME,
GlueApi.StartJobRun)
super(GlueStartJobRunStep, self).__init__(state_id, **kwargs)
class BatchSubmitJobStep(Task):
"""
Creates a Task State to start an AWS Batch job. See `Manage AWS Batch with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-batch.html>`_ for more details.
"""
def __init__(self, state_id, wait_for_completion=True, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the batch job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the batch job and proceed to the next step. (default: True)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_completion:
"""
Example resource arn: arn:aws:states:::batch:submitJob.sync
"""
kwargs[Field.Resource.value] = get_service_integration_arn(BATCH_SERVICE_NAME,
BatchApi.SubmitJob,
IntegrationPattern.WaitForCompletion)
else:
"""
Example resource arn: arn:aws:states:::batch:submitJob
"""
kwargs[Field.Resource.value] = get_service_integration_arn(BATCH_SERVICE_NAME,
BatchApi.SubmitJob)
super(BatchSubmitJobStep, self).__init__(state_id, **kwargs)
class EcsRunTaskStep(Task):
"""
Creates a Task State to run Amazon ECS or Fargate Tasks. See `Manage Amazon ECS or Fargate Tasks with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html>`_ for more details.
"""
def __init__(self, state_id, wait_for_completion=True, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the ecs job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the ecs job and proceed to the next step. (default: True)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_completion:
"""
Example resource arn: arn:aws:states:::ecs:runTask.sync
"""
kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME,
EcsApi.RunTask,
IntegrationPattern.WaitForCompletion)
else:
"""
Example resource arn: arn:aws:states:::ecs:runTask
"""
kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME,
EcsApi.RunTask)
super(EcsRunTaskStep, self).__init__(state_id, **kwargs)