Skip to content

Commit 2c696ae

Browse files
committed
feat: Change ECS Task to use IntegrationPattern for input
1 parent fbab338 commit 2c696ae

File tree

2 files changed

+34
-35
lines changed

2 files changed

+34
-35
lines changed

Diff for: src/stepfunctions/steps/compute.py

+31-30
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
from enum import Enum
1616
from stepfunctions.steps.states import Task
1717
from stepfunctions.steps.fields import Field
18-
from stepfunctions.steps.integration_resources import IntegrationPattern, get_service_integration_arn
18+
from stepfunctions.steps.integration_resources import IntegrationPattern, get_service_integration_arn, \
19+
is_integration_pattern_valid
1920

2021
LAMBDA_SERVICE_NAME = "lambda"
2122
GLUE_SERVICE_NAME = "glue"
@@ -161,17 +162,25 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
161162

162163

163164
class EcsRunTaskStep(Task):
164-
165165
"""
166166
Creates a Task State to run Amazon ECS or Fargate Tasks. See `Manage Amazon ECS or Fargate Tasks with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html>`_ for more details.
167167
"""
168168

169-
def __init__(self, state_id, wait_for_completion=True, wait_for_callback=False, **kwargs):
169+
supported_integration_patterns = [
170+
IntegrationPattern.WaitForCompletion,
171+
IntegrationPattern.WaitForTaskToken,
172+
IntegrationPattern.CallAndContinue
173+
]
174+
175+
def __init__(self, state_id, wait_for_completion=True, integration_pattern=None, **kwargs):
170176
"""
171177
Args:
172178
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
173179
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the ecs job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the ecs job and proceed to the next step. (default: True)
174-
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
180+
integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. This is mutually exclusive from wait_for_completion Supported integration patterns (default: None):
181+
* WaitForCompletion: Wait for the state machine execution to complete before going to the next state. (See `Run A Job <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync>`_ for more details.)
182+
* WaitForTaskToken: Wait for the state machine execution to return a task token before progressing to the next state (See `Wait for a Callback with the Task Token <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token>`_ for more details.)
183+
* CallAndContinue: Call StartExecution and progress to the next state (See `Request Response <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-default>`_ for more details.)
175184
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
176185
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
177186
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
@@ -182,31 +191,23 @@ def __init__(self, state_id, wait_for_completion=True, wait_for_callback=False,
182191
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
183192
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
184193
"""
185-
if wait_for_completion and wait_for_callback:
186-
raise ValueError("Only one of wait_for_completion and wait_for_callback can be true")
187-
188-
if wait_for_callback:
189-
"""
190-
Example resource arn: arn:aws:states:::ecs:runTask.waitForTaskToken
191-
"""
192-
193-
kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME,
194-
EcsApi.RunTask,
195-
IntegrationPattern.WaitForTaskToken)
196-
elif wait_for_completion:
197-
"""
198-
Example resource arn: arn:aws:states:::ecs:runTask.sync
199-
"""
200-
201-
kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME,
202-
EcsApi.RunTask,
203-
IntegrationPattern.WaitForCompletion)
204-
else:
205-
"""
206-
Example resource arn: arn:aws:states:::ecs:runTask
207-
"""
208-
209-
kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME,
210-
EcsApi.RunTask)
194+
if wait_for_completion and integration_pattern:
195+
raise ValueError(
196+
"Only one of wait_for_completion and integration_pattern set. "
197+
"Set wait_for_completion to False if you wish to use integration_pattern."
198+
)
199+
200+
# The old implementation type still has to be supported until a new
201+
# major is realeased.
202+
if wait_for_completion:
203+
integration_pattern = IntegrationPattern.WaitForCompletion
204+
if not wait_for_completion and not integration_pattern:
205+
integration_pattern = IntegrationPattern.CallAndContinue
206+
207+
is_integration_pattern_valid(integration_pattern,
208+
self.supported_integration_patterns)
209+
kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME,
210+
EcsApi.RunTask,
211+
integration_pattern)
211212

212213
super(EcsRunTaskStep, self).__init__(state_id, **kwargs)

Diff for: tests/unit/test_compute_steps.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from unittest.mock import patch
1919
from stepfunctions.steps.compute import LambdaStep, GlueStartJobRunStep, BatchSubmitJobStep, EcsRunTaskStep
20+
from stepfunctions.steps.integration_resources import IntegrationPattern
2021

2122

2223
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
@@ -102,17 +103,15 @@ def test_batch_submit_job_step_creation():
102103
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
103104
def test_ecs_run_task_step_creation():
104105
step = EcsRunTaskStep('Ecs Job', wait_for_completion=False)
105-
106106
assert step.to_dict() == {
107107
'Type': 'Task',
108108
'Resource': 'arn:aws:states:::ecs:runTask',
109109
'End': True
110110
}
111111

112112
step = EcsRunTaskStep('Ecs Job',
113-
wait_for_callback=True,
113+
integration_pattern=IntegrationPattern.WaitForTaskToken,
114114
wait_for_completion=False)
115-
116115
assert step.to_dict() == {
117116
'Type': 'Task',
118117
'Resource': 'arn:aws:states:::ecs:runTask.waitForTaskToken',
@@ -122,7 +121,6 @@ def test_ecs_run_task_step_creation():
122121
step = EcsRunTaskStep('Ecs Job', parameters={
123122
'TaskDefinition': 'Task'
124123
})
125-
126124
assert step.to_dict() == {
127125
'Type': 'Task',
128126
'Resource': 'arn:aws:states:::ecs:runTask.sync',
@@ -135,4 +133,4 @@ def test_ecs_run_task_step_creation():
135133
with pytest.raises(ValueError):
136134
step = EcsRunTaskStep('Ecs Job',
137135
wait_for_completion=True,
138-
wait_for_callback=True)
136+
integration_pattern=IntegrationPattern.WaitForTaskToken)

0 commit comments

Comments
 (0)