diff --git a/doc/placeholders.rst b/doc/placeholders.rst index fa87b1d..103312b 100644 --- a/doc/placeholders.rst +++ b/doc/placeholders.rst @@ -1,11 +1,12 @@ Placeholders ============= + Once defined, a workflow is static unless you update it explicitly. But, you can pass input to workflow executions. You can have dynamic values -that you use in the **parameters** fields of the steps in your workflow. For this, +that you use in the **parameters** or **result_selector** fields of the steps in your workflow. For this, the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you -create your workflow. There are 2 mechanisms for passing dynamic values in a workflow. +create your workflow. There are 3 mechanisms for passing dynamic values in a workflow. The first mechanism is a global input to the workflow execution. This input is accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput` @@ -50,6 +51,7 @@ to define the schema for this input, and to access the values in your workflow. workflow.execute(inputs={'myDynamicInput': "WorldHello"}) + The second mechanism is for passing dynamic values from one step to the next step. The output of one step becomes the input of the next step. The SDK provides the :py:meth:`stepfunctions.inputs.StepInput` class for this. @@ -64,10 +66,10 @@ that returns the placeholder output for that step. parameters={ "FunctionName": "MakeApiCall", "Payload": { - "input": "20192312" - } + "input": "20192312" } - ) + } + ) lambda_state_second = LambdaStep( state_id="MySecondLambdaStep", @@ -83,6 +85,27 @@ that returns the placeholder output for that step. +The third mechanism is a placeholder for a step's result. The result of a step can be modified +with the **result_selector** field to replace the step's result. +The SDK provides :py:meth:`stepfunctions.inputs.StepResult` class for this. + +.. code-block:: python + + lambda_result = StepResult( + schema={ + "Id": str, + } + ) + + lambda_state_first = LambdaStep( + state_id="MyFirstLambdaStep", + result_selector={ + "Output": lambda_result["Id"], + "Status": "Success" + } + ) + + .. autoclass:: stepfunctions.inputs.Placeholder .. autoclass:: stepfunctions.inputs.ExecutionInput @@ -90,3 +113,6 @@ that returns the placeholder output for that step. .. autoclass:: stepfunctions.inputs.StepInput :inherited-members: + +.. autoclass:: stepfunctions.inputs.StepResult + :inherited-members: diff --git a/src/stepfunctions/inputs/__init__.py b/src/stepfunctions/inputs/__init__.py index ffa01b0..81519d8 100644 --- a/src/stepfunctions/inputs/__init__.py +++ b/src/stepfunctions/inputs/__init__.py @@ -6,10 +6,10 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import absolute_import -from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput +from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput, StepResult diff --git a/src/stepfunctions/inputs/placeholders.py b/src/stepfunctions/inputs/placeholders.py index 3b7f2b6..8e63325 100644 --- a/src/stepfunctions/inputs/placeholders.py +++ b/src/stepfunctions/inputs/placeholders.py @@ -6,9 +6,9 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import absolute_import @@ -51,11 +51,11 @@ def __init__(self, schema=None, **kwargs): self._set_schema(schema) self._make_immutable() self.json_str_template = "{}" - + self.name = kwargs.get("name") self.type = kwargs.get("type") self.parent = kwargs.get("parent") - + def get(self, name, type): """ @@ -64,11 +64,11 @@ def get(self, name, type): Args: name (str): Name of the placeholder variable. type (type): Type of the placeholder variable. - + Raises: ValueError: If placeholder variable with the same name but different type already exists. ValueError: If placeholder variable does not fit into a previously specified schema for the placeholder collection. - + Returns: Placeholder: Placeholder variable. """ @@ -240,7 +240,7 @@ def _join_path(self, path): def to_jsonpath(self): """ Returns a JSON path representation of the placeholder variable to be used for step parameters. - + Returns: str: JSON path representation of the placeholder variable """ @@ -252,7 +252,7 @@ class ExecutionInput(Placeholder): """ Top-level class for execution input placeholders. """ - + def __init__(self, schema=None, **kwargs): super(ExecutionInput, self).__init__(schema, **kwargs) self.json_str_template = '$$.Execution.Input{}' @@ -260,15 +260,17 @@ def __init__(self, schema=None, **kwargs): def _create_variable(self, name, parent, type=None): """ Creates a placeholder variable for Workflow Input. - A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. + A placeholder variable can only be created if the collection is mutable. + A collection is mutable if no pre-specified schema was defined at construction. """ if self.immutable: - raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.") + raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection:" + f" {self.schema}") if type: return ExecutionInput(name=name, parent=parent, type=type) else: return ExecutionInput(name=name, parent=parent) - + class StepInput(Placeholder): @@ -279,15 +281,42 @@ class StepInput(Placeholder): def __init__(self, schema=None, **kwargs): super(StepInput, self).__init__(schema, **kwargs) self.json_str_template = '${}' - + def _create_variable(self, name, parent, type=None): """ Creates a placeholder variable for Step Input. - A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. + A placeholder variable can only be created if the collection is mutable. + A collection is mutable if no pre-specified schema was defined at construction.. """ if self.immutable: - raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.") + raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection:" + f" {self.schema}") if type: return StepInput(name=name, parent=parent, type=type) else: return StepInput(name=name, parent=parent) + + +class StepResult(Placeholder): + + """ + Top-level class for step result placeholders. + """ + + def __init__(self, schema=None, **kwargs): + super(StepResult, self).__init__(schema, **kwargs) + self.json_str_template = '${}' + + def _create_variable(self, name, parent, type=None): + """ + Creates a placeholder variable for Step Result. + A placeholder variable can only be created if the collection is mutable. + A collection is mutable if no pre-specified schema was defined at construction. + """ + if self.immutable: + raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection:" + f" {self.schema}") + if type: + return StepResult(name=name, parent=parent, type=type) + else: + return StepResult(name=name, parent=parent) diff --git a/src/stepfunctions/steps/compute.py b/src/stepfunctions/steps/compute.py index 203ed47..0f827ac 100644 --- a/src/stepfunctions/steps/compute.py +++ b/src/stepfunctions/steps/compute.py @@ -57,6 +57,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -98,6 +99,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -138,6 +140,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -178,6 +181,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ diff --git a/src/stepfunctions/steps/fields.py b/src/stepfunctions/steps/fields.py index 8eb102d..b0d2fae 100644 --- a/src/stepfunctions/steps/fields.py +++ b/src/stepfunctions/steps/fields.py @@ -22,6 +22,7 @@ class Field(Enum): InputPath = 'input_path' OutputPath = 'output_path' Parameters = 'parameters' + ResultSelector = 'result_selector' ResultPath = 'result_path' Next = 'next' Retry = 'retry' diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 986217c..84840a1 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -92,6 +92,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -122,6 +123,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -162,6 +164,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -192,6 +195,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -222,6 +226,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -251,6 +256,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -290,6 +296,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -329,6 +336,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -368,6 +376,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -407,6 +416,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -446,6 +456,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -485,6 +496,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -524,6 +536,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -555,6 +568,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -596,6 +610,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -636,6 +651,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -674,6 +690,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -713,6 +730,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -752,6 +770,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -791,6 +810,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -821,6 +841,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -851,6 +872,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -881,6 +903,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index 8396e69..bb7dfb3 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -71,7 +71,7 @@ def to_dict(self): for k, v in self.fields.items(): if v is not None or k in fields_accepted_as_none: k = to_pascalcase(k) - if k == to_pascalcase(Field.Parameters.value): + if k == to_pascalcase(Field.Parameters.value) or k == to_pascalcase(Field.ResultSelector.value): result[k] = self._replace_placeholders(v) else: result[k] = v @@ -171,6 +171,7 @@ def __init__(self, state_id, state_type, output_schema=None, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -195,6 +196,7 @@ def allowed_fields(self): Field.InputPath, Field.OutputPath, Field.Parameters, + Field.ResultSelector, Field.ResultPath ] @@ -498,6 +500,7 @@ def __init__(self, state_id, retry=None, catch=None, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -516,6 +519,7 @@ def allowed_fields(self): Field.InputPath, Field.OutputPath, Field.Parameters, + Field.ResultSelector, Field.ResultPath, Field.Retry, Field.Catch @@ -558,6 +562,7 @@ def __init__(self, state_id, retry=None, catch=None, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -584,6 +589,7 @@ def allowed_fields(self): Field.InputPath, Field.OutputPath, Field.Parameters, + Field.ResultSelector, Field.ResultPath, Field.Retry, Field.Catch, @@ -618,6 +624,7 @@ def __init__(self, state_id, retry=None, catch=None, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -640,6 +647,7 @@ def allowed_fields(self): Field.InputPath, Field.OutputPath, Field.Parameters, + Field.ResultSelector, Field.ResultPath, Field.TimeoutSeconds, Field.TimeoutSecondsPath, diff --git a/tests/unit/test_compute_steps.py b/tests/unit/test_compute_steps.py index 368010a..69020e1 100644 --- a/tests/unit/test_compute_steps.py +++ b/tests/unit/test_compute_steps.py @@ -16,16 +16,24 @@ import boto3 from unittest.mock import patch + +from stepfunctions.inputs import StepResult from stepfunctions.steps.compute import LambdaStep, GlueStartJobRunStep, BatchSubmitJobStep, EcsRunTaskStep +STEP_RESULT = StepResult() +RESULT_SELECTOR = { + "OutputA": STEP_RESULT['A'] +} + @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_lambda_step_creation(): - step = LambdaStep('Echo') + step = LambdaStep('Echo', result_selector=RESULT_SELECTOR) assert step.to_dict() == { 'Type': 'Task', 'Resource': 'arn:aws:states:::lambda:invoke', + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -51,11 +59,12 @@ def test_lambda_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_glue_start_job_run_step_creation(): - step = GlueStartJobRunStep('Glue Job', wait_for_completion=False) + step = GlueStartJobRunStep('Glue Job', wait_for_completion=False, result_selector=RESULT_SELECTOR) assert step.to_dict() == { 'Type': 'Task', 'Resource': 'arn:aws:states:::glue:startJobRun', + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -75,11 +84,12 @@ def test_glue_start_job_run_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_batch_submit_job_step_creation(): - step = BatchSubmitJobStep('Batch Job', wait_for_completion=False) + step = BatchSubmitJobStep('Batch Job', wait_for_completion=False, result_selector=RESULT_SELECTOR) assert step.to_dict() == { 'Type': 'Task', 'Resource': 'arn:aws:states:::batch:submitJob', + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -101,11 +111,12 @@ def test_batch_submit_job_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_ecs_run_task_step_creation(): - step = EcsRunTaskStep('Ecs Job', wait_for_completion=False) + step = EcsRunTaskStep('Ecs Job', wait_for_completion=False, result_selector=RESULT_SELECTOR) assert step.to_dict() == { 'Type': 'Task', 'Resource': 'arn:aws:states:::ecs:runTask', + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } diff --git a/tests/unit/test_placeholders.py b/tests/unit/test_placeholders.py index 456a7bf..98fa898 100644 --- a/tests/unit/test_placeholders.py +++ b/tests/unit/test_placeholders.py @@ -6,75 +6,83 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import absolute_import import pytest import json -from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.inputs import ExecutionInput, StepInput, StepResult -def test_placeholder_creation_with_subscript_operator(): - step_input = StepInput() - placeholder_variable = step_input["A"] + +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_creation_with_subscript_operator(placeholder): + placeholder_variable = placeholder["A"] assert placeholder_variable.name == "A" assert placeholder_variable.type is None -def test_placeholder_creation_with_type(): - workflow_input = ExecutionInput() - placeholder_variable = workflow_input["A"]["b"].get("C", float) + +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_creation_with_type(placeholder): + placeholder_variable = placeholder["A"]["b"].get("C", float) assert placeholder_variable.name == "C" assert placeholder_variable.type == float -def test_placeholder_creation_with_int_key(): - workflow_input = ExecutionInput() - placeholder_variable = workflow_input["A"][0] + +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_creation_with_int_key(placeholder): + placeholder_variable = placeholder["A"][0] assert placeholder_variable.name == 0 assert placeholder_variable.type == None -def test_placeholder_creation_with_invalid_key(): - step_input = StepInput() + +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_creation_with_invalid_key(placeholder): with pytest.raises(ValueError): - step_input["A"][1.3] + placeholder["A"][1.3] with pytest.raises(ValueError): - step_input["A"].get(1.2, str) + placeholder["A"].get(1.2, str) -def test_placeholder_creation_failure_with_type(): - workflow_input = ExecutionInput() - placeholder_variable = workflow_input["A"]["b"].get("C", float) + +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_creation_failure_with_type(placeholder): + placeholder_variable = placeholder["A"]["b"].get("C", float) with pytest.raises(ValueError): - workflow_input["A"]["b"].get("C", int) + placeholder["A"]["b"].get("C", int) -def test_placeholder_path(): - workflow_input = ExecutionInput() - placeholder_variable = workflow_input["A"]["b"]["C"] + +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_path(placeholder): + placeholder_variable = placeholder["A"]["b"]["C"] expected_path = ["A", "b", "C"] assert placeholder_variable._get_path() == expected_path -def test_placeholder_contains(): - step_input = StepInput() - var_one = step_input["Key01"] - var_two = step_input["Key02"]["Key03"] - var_three = step_input["Key01"]["Key04"] - var_four = step_input["Key05"] - step_input_two = StepInput() - var_five = step_input_two["Key07"] +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_contains(placeholder): + var_one = placeholder["Key01"] + var_two = placeholder["Key02"]["Key03"] + var_three = placeholder["Key01"]["Key04"] + var_four = placeholder["Key05"] - assert step_input.contains(var_three) == True - assert step_input.contains(var_five) == False - assert step_input_two.contains(var_three) == False + placeholder_two = StepInput() + var_five = placeholder_two["Key07"] -def test_placeholder_schema_as_dict(): - workflow_input = ExecutionInput() - workflow_input["A"]["b"].get("C", float) - workflow_input["Message"] - workflow_input["Key01"]["Key02"] - workflow_input["Key03"] - workflow_input["Key03"]["Key04"] + assert placeholder.contains(var_three) == True + assert placeholder.contains(var_five) == False + assert placeholder_two.contains(var_three) == False + + +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_schema_as_dict(placeholder): + placeholder["A"]["b"].get("C", float) + placeholder["Message"] + placeholder["Key01"]["Key02"] + placeholder["Key03"] + placeholder["Key03"]["Key04"] expected_schema = { "A": { @@ -91,14 +99,15 @@ def test_placeholder_schema_as_dict(): } } - assert workflow_input.get_schema_as_dict() == expected_schema + assert placeholder.get_schema_as_dict() == expected_schema -def test_placeholder_schema_as_json(): - step_input = StepInput() - step_input["Response"].get("StatusCode", int) - step_input["Hello"]["World"] - step_input["A"] - step_input["Hello"]["World"].get("Test", str) + +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_schema_as_json(placeholder): + placeholder["Response"].get("StatusCode", int) + placeholder["Hello"]["World"] + placeholder["A"] + placeholder["Hello"]["World"].get("Test", str) expected_schema = { "Response": { @@ -112,28 +121,29 @@ def test_placeholder_schema_as_json(): "A": "str" } - assert step_input.get_schema_as_json() == json.dumps(expected_schema) + assert placeholder.get_schema_as_json() == json.dumps(expected_schema) -def test_placeholder_is_empty(): - workflow_input = ExecutionInput() - placeholder_variable = workflow_input["A"]["B"]["C"] + +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_is_empty(placeholder): + placeholder_variable = placeholder["A"]["B"]["C"] assert placeholder_variable._is_empty() == True - workflow_input["A"]["B"]["C"]["D"] + placeholder["A"]["B"]["C"]["D"] assert placeholder_variable._is_empty() == False -def test_placeholder_make_immutable(): - workflow_input = ExecutionInput() - workflow_input["A"]["b"].get("C", float) - workflow_input["Message"] - workflow_input["Key01"]["Key02"] - workflow_input["Key03"] - workflow_input["Key03"]["Key04"] +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_make_immutable(placeholder): + placeholder["A"]["b"].get("C", float) + placeholder["Message"] + placeholder["Key01"]["Key02"] + placeholder["Key03"] + placeholder["Key03"]["Key04"] - assert check_immutable(workflow_input) == False + assert check_immutable(placeholder) == False - workflow_input._make_immutable() - assert check_immutable(workflow_input) == True + placeholder._make_immutable() + assert check_immutable(placeholder) == True def test_placeholder_with_schema(): @@ -154,22 +164,30 @@ def test_placeholder_with_schema(): with pytest.raises(ValueError): workflow_input["A"]["B"]["D"] - + with pytest.raises(ValueError): workflow_input["A"]["B"].get("C", float) + def test_workflow_input_jsonpath(): workflow_input = ExecutionInput() placeholder_variable = workflow_input["A"]["b"].get("C", float) assert placeholder_variable.to_jsonpath() == "$$.Execution.Input['A']['b']['C']" + def test_step_input_jsonpath(): step_input = StepInput() placeholder_variable = step_input["A"]["b"].get(0, float) assert placeholder_variable.to_jsonpath() == "$['A']['b'][0]" -# UTILS +def test_step_result_jsonpath(): + step_result = StepResult() + placeholder_variable = step_result["A"]["b"].get(0, float) + assert placeholder_variable.to_jsonpath() == "$['A']['b'][0]" + + +# UTILS def check_immutable(placeholder): if placeholder.immutable is True: if placeholder._is_empty(): @@ -178,4 +196,4 @@ def check_immutable(placeholder): for k, v in placeholder.store.items(): return check_immutable(v) else: - return False \ No newline at end of file + return False diff --git a/tests/unit/test_placeholders_with_steps.py b/tests/unit/test_placeholders_with_steps.py index 54d1543..11ea80e 100644 --- a/tests/unit/test_placeholders_with_steps.py +++ b/tests/unit/test_placeholders_with_steps.py @@ -6,28 +6,28 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import absolute_import import pytest from stepfunctions.steps import Pass, Succeed, Fail, Wait, Choice, ChoiceRule, Parallel, Map, Task, Retry, Catch, Chain, Graph -from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.inputs import ExecutionInput, StepInput, StepResult def test_workflow_input_placeholder(): workflow_input = ExecutionInput() test_step = Pass( - state_id='StateOne', + state_id="StateOne", parameters={ - 'ParamA': 'SampleValueA', - 'ParamB': workflow_input, - 'ParamC': workflow_input['Key01'], - 'ParamD': workflow_input['Key02']['Key03'], - 'ParamE': workflow_input['Key01']['Key03'], + "ParamA": "SampleValueA", + "ParamB": workflow_input, + "ParamC": workflow_input["Key01"], + "ParamD": workflow_input["Key02"]["Key03"], + "ParamE": workflow_input["Key01"]["Key03"], } ) @@ -48,14 +48,14 @@ def test_workflow_input_placeholder(): def test_step_input_placeholder(): test_step_01 = Pass( - state_id='StateOne' + state_id="StateOne" ) test_step_02 = Pass( - state_id='StateTwo', + state_id="StateTwo", parameters={ - 'ParamA': test_step_01.output(), - 'ParamB': test_step_01.output()["Response"]["Key02"], + "ParamA": test_step_01.output(), + "ParamB": test_step_01.output()["Response"]["Key02"], "ParamC": "SampleValueC" } ) @@ -72,31 +72,31 @@ def test_step_input_placeholder(): assert test_step_02.to_dict() == expected_repr - + def test_workflow_with_placeholders(): workflow_input = ExecutionInput() test_step_01 = Pass( - state_id='StateOne', + state_id="StateOne", parameters={ - 'ParamA': workflow_input['Key02']['Key03'], - 'ParamD': workflow_input['Key01']['Key03'], + "ParamA": workflow_input["Key02"]["Key03"], + "ParamD": workflow_input["Key01"]["Key03"], } ) test_step_02 = Pass( - state_id='StateTwo', + state_id="StateTwo", parameters={ - 'ParamC': workflow_input["Key05"], + "ParamC": workflow_input["Key05"], "ParamB": "SampleValueB", "ParamE": test_step_01.output()["Response"]["Key04"] } ) test_step_03 = Pass( - state_id='StateThree', + state_id="StateThree", parameters={ - 'ParamG': "SampleValueG", + "ParamG": "SampleValueG", "ParamF": workflow_input["Key06"], "ParamH": "SampleValueH" } @@ -139,31 +139,31 @@ def test_workflow_with_placeholders(): } assert result == expected_workflow_repr - + def test_step_input_order_validation(): workflow_input = ExecutionInput() test_step_01 = Pass( - state_id='StateOne', + state_id="StateOne", parameters={ - 'ParamA': workflow_input['Key02']['Key03'], - 'ParamD': workflow_input['Key01']['Key03'], + "ParamA": workflow_input["Key02"]["Key03"], + "ParamD": workflow_input["Key01"]["Key03"], } ) test_step_02 = Pass( - state_id='StateTwo', + state_id="StateTwo", parameters={ - 'ParamC': workflow_input["Key05"], + "ParamC": workflow_input["Key05"], "ParamB": "SampleValueB", "ParamE": test_step_01.output()["Response"]["Key04"] } ) test_step_03 = Pass( - state_id='StateThree', + state_id="StateThree", parameters={ - 'ParamG': "SampleValueG", + "ParamG": "SampleValueG", "ParamF": workflow_input["Key06"], "ParamH": "SampleValueH" } @@ -176,13 +176,20 @@ def test_step_input_order_validation(): def test_map_state_with_placeholders(): workflow_input = ExecutionInput() + step_result = StepResult() - map_state = Map('MapState01') + map_state = Map( + state_id="MapState01", + result_selector={ + "foo": step_result["foo"], + "bar": step_result["bar1"]["bar2"] + } + ) iterator_state = Pass( - 'TrainIterator', + "TrainIterator", parameters={ - 'ParamA': map_state.output()['X']["Y"], - 'ParamB': workflow_input["Key01"]["Key02"]["Key03"] + "ParamA": map_state.output()["X"]["Y"], + "ParamB": workflow_input["Key01"]["Key02"]["Key03"] }) map_state.attach_iterator(iterator_state) @@ -193,6 +200,10 @@ def test_map_state_with_placeholders(): "States": { "MapState01": { "Type": "Map", + "ResultSelector": { + "foo.$": "$['foo']", + "bar.$": "$['bar1']['bar2']" + }, "End": True, "Iterator": { "StartAt": "TrainIterator", @@ -216,28 +227,35 @@ def test_map_state_with_placeholders(): def test_parallel_state_with_placeholders(): workflow_input = ExecutionInput() + step_result = StepResult() + + parallel_state = Parallel( + state_id="ParallelState01", + result_selector={ + "foo": step_result["foo"], + "bar": step_result["bar1"]["bar2"] + } + ) - parallel_state = Parallel('ParallelState01') - branch_A = Pass( - 'Branch_A', + "Branch_A", parameters={ - 'ParamA': parallel_state.output()['A']["B"], - 'ParamB': workflow_input["Key01"] + "ParamA": parallel_state.output()["A"]["B"], + "ParamB": workflow_input["Key01"] }) branch_B = Pass( - 'Branch_B', + "Branch_B", parameters={ - 'ParamA': "TestValue", - 'ParamB': parallel_state.output()["Response"]["Key"]["State"] + "ParamA": "TestValue", + "ParamB": parallel_state.output()["Response"]["Key"]["State"] }) branch_C = Pass( - 'Branch_C', + "Branch_C", parameters={ - 'ParamA': parallel_state.output()['A']["B"].get("C", float), - 'ParamB': "HelloWorld" + "ParamA": parallel_state.output()["A"]["B"].get("C", float), + "ParamB": "HelloWorld" }) parallel_state.add_branch(branch_A) @@ -252,6 +270,10 @@ def test_parallel_state_with_placeholders(): "States": { "ParallelState01": { "Type": "Parallel", + "ResultSelector": { + "foo.$": "$['foo']", + "bar.$": "$['bar1']['bar2']" + }, "End": True, "Branches": [ { @@ -303,16 +325,16 @@ def test_parallel_state_with_placeholders(): def test_choice_state_with_placeholders(): - first_state = Task('FirstState', resource='arn:aws:lambda:us-east-1:1234567890:function:FirstState') - retry = Chain([Pass('Retry'), Pass('Cleanup'), first_state]) + first_state = Task("FirstState", resource="arn:aws:lambda:us-east-1:1234567890:function:FirstState") + retry = Chain([Pass("Retry"), Pass("Cleanup"), first_state]) - choice_state = Choice('Is Completed?') + choice_state = Choice("Is Completed?") choice_state.add_choice( - ChoiceRule.BooleanEquals(choice_state.output()["Completed"], True), - Succeed('Complete') + ChoiceRule.BooleanEquals(choice_state.output()["Completed"], True), + Succeed("Complete") ) choice_state.add_choice( - ChoiceRule.BooleanEquals(choice_state.output()["Completed"], False), + ChoiceRule.BooleanEquals(choice_state.output()["Completed"], False), retry ) @@ -360,9 +382,9 @@ def test_choice_state_with_placeholders(): assert result == expected_repr def test_schema_validation_for_step_input(): - + test_step_01 = Pass( - state_id='StateOne', + state_id="StateOne", output_schema={ "Response": { "Key01": str @@ -372,18 +394,76 @@ def test_schema_validation_for_step_input(): with pytest.raises(ValueError): test_step_02 = Pass( - state_id='StateTwo', + state_id="StateTwo", parameters={ - 'ParamA': test_step_01.output()["Response"]["Key02"], + "ParamA": test_step_01.output()["Response"]["Key02"], "ParamB": "SampleValueB" } ) - + with pytest.raises(ValueError): test_step_03 = Pass( - state_id='StateTwo', + state_id="StateTwo", parameters={ - 'ParamA': test_step_01.output()["Response"].get("Key01", float), + "ParamA": test_step_01.output()["Response"].get("Key01", float), "ParamB": "SampleValueB" } ) + + +def test_step_result_placeholder(): + step_result = StepResult() + + test_step_01 = Task( + state_id="StateOne", + result_selector={ + "ParamA": step_result["foo"], + "ParamC": "SampleValueC" + } + ) + + expected_repr = { + "Type": "Task", + "ResultSelector": { + "ParamA.$": "$['foo']", + "ParamC": "SampleValueC" + }, + "End": True + } + + assert test_step_01.to_dict() == expected_repr + + +def test_schema_validation_for_step_result(): + step_result = StepResult( + schema={ + "Payload": { + "Key01": str + } + } + ) + + with pytest.raises(ValueError): + test_step_01 = Task( + state_id="StateOne", + result_selector={ + "ParamA": step_result["Payload"]["Key02"], + "ParamB": "SampleValueB" + } + ) + + with pytest.raises(ValueError): + test_step_02 = Task( + state_id="StateOne", + parameters={ + "ParamA": step_result["Payload"].get("Key01", float), + "ParamB": "SampleValueB" + } + ) + + test_step_03 = Task( + state_id="StateOne", + result_selector={ + "ParamA": step_result["Payload"]["Key01"] + } + ) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 9ac5def..022063b 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -16,6 +16,8 @@ import pytest from unittest.mock import patch + +from stepfunctions.inputs import StepResult from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep from stepfunctions.steps.service import ( EksCallStep, @@ -35,9 +37,15 @@ from stepfunctions.steps.integration_resources import IntegrationPattern +STEP_RESULT = StepResult() +RESULT_SELECTOR = { + "OutputA": STEP_RESULT['A'] +} + + @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_sns_publish_step_creation(): - step = SnsPublishStep('Publish to SNS', parameters={ + step = SnsPublishStep('Publish to SNS', result_selector=RESULT_SELECTOR, parameters={ 'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic', 'Message': 'message', }) @@ -49,6 +57,7 @@ def test_sns_publish_step_creation(): 'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic', 'Message': 'message', }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -76,7 +85,7 @@ def test_sns_publish_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_sqs_send_message_step_creation(): - step = SqsSendMessageStep('Send to SQS', parameters={ + step = SqsSendMessageStep('Send to SQS', result_selector=RESULT_SELECTOR, parameters={ 'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue', 'MessageBody': 'Hello' }) @@ -88,6 +97,7 @@ def test_sqs_send_message_step_creation(): 'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue', 'MessageBody': 'Hello' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -114,7 +124,7 @@ def test_sqs_send_message_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eventbridge_put_events_step_creation(): - step = EventBridgePutEventsStep('Send to EventBridge', parameters={ + step = EventBridgePutEventsStep('Send to EventBridge', result_selector=RESULT_SELECTOR, parameters={ "Entries": [ { "Detail": { @@ -142,6 +152,7 @@ def test_eventbridge_put_events_step_creation(): } ] }, + 'ResultSelector': {'OutputA.$': "$['A']"}, "End": True } @@ -179,7 +190,7 @@ def test_eventbridge_put_events_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_dynamodb_get_item_step_creation(): - step = DynamoDBGetItemStep('Read Message From DynamoDB', parameters={ + step = DynamoDBGetItemStep('Read Message From DynamoDB', result_selector=RESULT_SELECTOR, parameters={ 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', 'Key': { 'MessageId': { @@ -199,13 +210,14 @@ def test_dynamodb_get_item_step_creation(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_dynamodb_put_item_step_creation(): - step = DynamoDBPutItemStep('Add Message From DynamoDB', parameters={ + step = DynamoDBPutItemStep('Add Message From DynamoDB', result_selector=RESULT_SELECTOR, parameters={ 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', 'Item': { 'MessageId': { @@ -225,13 +237,14 @@ def test_dynamodb_put_item_step_creation(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_dynamodb_delete_item_step_creation(): - step = DynamoDBDeleteItemStep('Delete Message From DynamoDB', parameters={ + step = DynamoDBDeleteItemStep('Delete Message From DynamoDB', result_selector=RESULT_SELECTOR, parameters={ 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', 'Key': { 'MessageId': { @@ -251,13 +264,14 @@ def test_dynamodb_delete_item_step_creation(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_dynamodb_update_item_step_creation(): - step = DynamoDBUpdateItemStep('Update Message From DynamoDB', parameters={ + step = DynamoDBUpdateItemStep('Update Message From DynamoDB', result_selector=RESULT_SELECTOR, parameters={ 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', 'Key': { 'RecordId': { @@ -285,13 +299,14 @@ def test_dynamodb_update_item_step_creation(): ':val1': { 'S': '2' } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_create_cluster_step_creation(): - step = EmrCreateClusterStep('Create EMR cluster', parameters={ + step = EmrCreateClusterStep('Create EMR cluster', result_selector=RESULT_SELECTOR, parameters={ 'Name': 'MyWorkflowCluster', 'VisibleToAllUsers': True, 'ReleaseLabel': 'emr-5.28.0', @@ -371,6 +386,7 @@ def test_emr_create_cluster_step_creation(): ] } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -461,7 +477,7 @@ def test_emr_create_cluster_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_terminate_cluster_step_creation(): - step = EmrTerminateClusterStep('Terminate EMR cluster', parameters={ + step = EmrTerminateClusterStep('Terminate EMR cluster', result_selector=RESULT_SELECTOR, parameters={ 'ClusterId': 'MyWorkflowClusterId' }) @@ -471,6 +487,7 @@ def test_emr_terminate_cluster_step_creation(): 'Parameters': { 'ClusterId': 'MyWorkflowClusterId', }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -490,7 +507,7 @@ def test_emr_terminate_cluster_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_add_step_step_creation(): - step = EmrAddStepStep('Add step to EMR cluster', parameters={ + step = EmrAddStepStep('Add step to EMR cluster', result_selector=RESULT_SELECTOR, parameters={ 'ClusterId': 'MyWorkflowClusterId', 'Step': { 'Name': 'The first step', @@ -536,6 +553,7 @@ def test_emr_add_step_step_creation(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -591,7 +609,7 @@ def test_emr_add_step_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_cancel_step_step_creation(): - step = EmrCancelStepStep('Cancel step from EMR cluster', parameters={ + step = EmrCancelStepStep('Cancel step from EMR cluster', result_selector=RESULT_SELECTOR, parameters={ 'ClusterId': 'MyWorkflowClusterId', 'StepId': 'MyWorkflowStepId' }) @@ -603,16 +621,20 @@ def test_emr_cancel_step_step_creation(): 'ClusterId': 'MyWorkflowClusterId', 'StepId': 'MyWorkflowStepId' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_set_cluster_termination_protection_step_creation(): - step = EmrSetClusterTerminationProtectionStep('Set termination protection for EMR cluster', parameters={ - 'ClusterId': 'MyWorkflowClusterId', - 'TerminationProtected': True - }) + step = EmrSetClusterTerminationProtectionStep( + 'Set termination protection for EMR cluster', + result_selector=RESULT_SELECTOR, + parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'TerminationProtected': True + }) assert step.to_dict() == { 'Type': 'Task', @@ -621,20 +643,24 @@ def test_emr_set_cluster_termination_protection_step_creation(): 'ClusterId': 'MyWorkflowClusterId', 'TerminationProtected': True }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_modify_instance_fleet_by_name_step_creation(): - step = EmrModifyInstanceFleetByNameStep('Modify Instance Fleet by name for EMR cluster', parameters={ - 'ClusterId': 'MyWorkflowClusterId', - 'InstanceFleetName': 'MyCoreFleet', - 'InstanceFleet': { - 'TargetOnDemandCapacity': 8, - 'TargetSpotCapacity': 0 - } - }) + step = EmrModifyInstanceFleetByNameStep( + 'Modify Instance Fleet by name for EMR cluster', + result_selector=RESULT_SELECTOR, + parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'InstanceFleetName': 'MyCoreFleet', + 'InstanceFleet': { + 'TargetOnDemandCapacity': 8, + 'TargetSpotCapacity': 0 + } + }) assert step.to_dict() == { 'Type': 'Task', @@ -647,19 +673,23 @@ def test_emr_modify_instance_fleet_by_name_step_creation(): 'TargetSpotCapacity': 0 } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_modify_instance_group_by_name_step_creation(): - step = EmrModifyInstanceGroupByNameStep('Modify Instance Group by name for EMR cluster', parameters={ - 'ClusterId': 'MyWorkflowClusterId', - 'InstanceGroupName': 'MyCoreGroup', - 'InstanceGroup': { - 'InstanceCount': 8 - } - }) + step = EmrModifyInstanceGroupByNameStep( + 'Modify Instance Group by name for EMR cluster', + result_selector=RESULT_SELECTOR, + parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'InstanceGroupName': 'MyCoreGroup', + 'InstanceGroup': { + 'InstanceCount': 8 + } + }) assert step.to_dict() == { 'Type': 'Task', @@ -671,15 +701,19 @@ def test_emr_modify_instance_group_by_name_step_creation(): 'InstanceCount': 8 } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_databrew_start_job_run_step_creation_sync(): - step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Sync', parameters={ - "Name": "MyWorkflowJobRun" - }) + step = GlueDataBrewStartJobRunStep( + 'Start Glue DataBrew Job Run - Sync', + result_selector=RESULT_SELECTOR, + parameters={ + "Name": "MyWorkflowJobRun" + }) assert step.to_dict() == { 'Type': 'Task', @@ -687,15 +721,20 @@ def test_databrew_start_job_run_step_creation_sync(): 'Parameters': { 'Name': 'MyWorkflowJobRun' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_databrew_start_job_run_step_creation(): - step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run', wait_for_completion=False, parameters={ - "Name": "MyWorkflowJobRun" - }) + step = GlueDataBrewStartJobRunStep( + 'Start Glue DataBrew Job Run', + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + "Name": "MyWorkflowJobRun" + }) assert step.to_dict() == { 'Type': 'Task', @@ -703,22 +742,27 @@ def test_databrew_start_job_run_step_creation(): 'Parameters': { 'Name': 'MyWorkflowJobRun' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_cluster_step_creation(): - step = EksCreateClusterStep("Create Eks cluster", wait_for_completion=False, parameters={ - 'Name': 'MyCluster', - 'ResourcesVpcConfig': { - 'SubnetIds': [ - 'subnet-00000000000000000', - 'subnet-00000000000000001' - ] - }, - 'RoleArn': 'arn:aws:iam::123456789012:role/MyEKSClusterRole' - }) + step = EksCreateClusterStep( + "Create Eks cluster", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'Name': 'MyCluster', + 'ResourcesVpcConfig': { + 'SubnetIds': [ + 'subnet-00000000000000000', + 'subnet-00000000000000001' + ] + }, + 'RoleArn': 'arn:aws:iam::123456789012:role/MyEKSClusterRole' + }) assert step.to_dict() == { 'Type': 'Task', @@ -733,13 +777,14 @@ def test_eks_create_cluster_step_creation(): }, 'RoleArn': 'arn:aws:iam::123456789012:role/MyEKSClusterRole' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_cluster_step_creation_sync(): - step = EksCreateClusterStep("Create Eks cluster sync", parameters={ + step = EksCreateClusterStep("Create Eks cluster sync", result_selector=RESULT_SELECTOR, parameters={ 'Name': 'MyCluster', 'ResourcesVpcConfig': { 'SubnetIds': [ @@ -763,15 +808,20 @@ def test_eks_create_cluster_step_creation_sync(): }, 'RoleArn': 'arn:aws:iam::123456789012:role/MyEKSClusterRole' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_cluster_step_creation(): - step = EksDeleteClusterStep("Delete Eks cluster", wait_for_completion=False, parameters={ - 'Name': 'MyCluster' - }) + step = EksDeleteClusterStep( + "Delete Eks cluster", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'Name': 'MyCluster' + }) assert step.to_dict() == { 'Type': 'Task', @@ -779,15 +829,19 @@ def test_eks_delete_cluster_step_creation(): 'Parameters': { 'Name': 'MyCluster' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_cluster_step_creation_sync(): - step = EksDeleteClusterStep("Delete Eks cluster sync", parameters={ - 'Name': 'MyCluster' - }) + step = EksDeleteClusterStep( + "Delete Eks cluster sync", + result_selector=RESULT_SELECTOR, + parameters={ + 'Name': 'MyCluster' + }) assert step.to_dict() == { 'Type': 'Task', @@ -795,21 +849,26 @@ def test_eks_delete_cluster_step_creation_sync(): 'Parameters': { 'Name': 'MyCluster' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_fargate_profile_step_creation(): - step = EksCreateFargateProfileStep("Create Fargate profile", wait_for_completion=False, parameters={ - 'ClusterName': 'MyCluster', - 'FargateProfileName': 'MyFargateProfile', - 'PodExecutionRoleArn': 'arn:aws:iam::123456789012:role/MyFargatePodExecutionRole', - 'Selectors': [{ - 'Namespace': 'my-namespace', - 'Labels': {'my-label': 'my-value'} - }] - }) + step = EksCreateFargateProfileStep( + "Create Fargate profile", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'ClusterName': 'MyCluster', + 'FargateProfileName': 'MyFargateProfile', + 'PodExecutionRoleArn': 'arn:aws:iam::123456789012:role/MyFargatePodExecutionRole', + 'Selectors': [{ + 'Namespace': 'my-namespace', + 'Labels': {'my-label': 'my-value'} + }] + }) assert step.to_dict() == { 'Type': 'Task', @@ -823,13 +882,14 @@ def test_eks_create_fargate_profile_step_creation(): 'Labels': {'my-label': 'my-value'} }] }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_fargate_profile_step_creation_sync(): - step = EksCreateFargateProfileStep("Create Fargate profile sync", parameters={ + step = EksCreateFargateProfileStep("Create Fargate profile sync", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'FargateProfileName': 'MyFargateProfile', 'PodExecutionRoleArn': 'arn:aws:iam::123456789012:role/MyFargatePodExecutionRole', @@ -851,16 +911,21 @@ def test_eks_create_fargate_profile_step_creation_sync(): 'Labels': {'my-label': 'my-value'} }] }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_fargate_profile_step_creation(): - step = EksDeleteFargateProfileStep("Delete Fargate profile", wait_for_completion=False, parameters={ - 'ClusterName': 'MyCluster', - 'FargateProfileName': 'MyFargateProfile' - }) + step = EksDeleteFargateProfileStep( + "Delete Fargate profile", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'ClusterName': 'MyCluster', + 'FargateProfileName': 'MyFargateProfile' + }) assert step.to_dict() == { 'Type': 'Task', @@ -869,13 +934,14 @@ def test_eks_delete_fargate_profile_step_creation(): 'ClusterName': 'MyCluster', 'FargateProfileName': 'MyFargateProfile' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_fargate_profile_step_creation_sync(): - step = EksDeleteFargateProfileStep("Delete Fargate profile sync", parameters={ + step = EksDeleteFargateProfileStep("Delete Fargate profile sync", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'FargateProfileName': 'MyFargateProfile' }) @@ -887,21 +953,26 @@ def test_eks_delete_fargate_profile_step_creation_sync(): 'ClusterName': 'MyCluster', 'FargateProfileName': 'MyFargateProfile' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_node_group_step_creation(): - step = EksCreateNodeGroupStep("Create Node Group", wait_for_completion=False, parameters={ - 'ClusterName': 'MyCluster', - 'NodegroupName': 'MyNodegroup', - 'NodeRole': 'arn:aws:iam::123456789012:role/MyNodeInstanceRole', - 'Subnets': [ - 'subnet-00000000000000000', - 'subnet-00000000000000001' - ] - }) + step = EksCreateNodeGroupStep( + "Create Node Group", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'ClusterName': 'MyCluster', + 'NodegroupName': 'MyNodegroup', + 'NodeRole': 'arn:aws:iam::123456789012:role/MyNodeInstanceRole', + 'Subnets': [ + 'subnet-00000000000000000', + 'subnet-00000000000000001' + ] + }) assert step.to_dict() == { 'Type': 'Task', @@ -915,12 +986,13 @@ def test_eks_create_node_group_step_creation(): 'subnet-00000000000000001' ], }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } def test_eks_create_node_group_step_creation_sync(): - step = EksCreateNodeGroupStep("Create Node Group sync", parameters={ + step = EksCreateNodeGroupStep("Create Node Group sync", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'NodegroupName': 'MyNodegroup', 'NodeRole': 'arn:aws:iam::123456789012:role/MyNodeInstanceRole', @@ -942,15 +1014,20 @@ def test_eks_create_node_group_step_creation_sync(): 'subnet-00000000000000001' ], }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_node_group_step_creation(): - step = EksDeleteNodegroupStep("Delete Node Group", wait_for_completion=False, parameters={ - 'ClusterName': 'MyCluster', - 'NodegroupName': 'MyNodegroup' - }) + step = EksDeleteNodegroupStep( + "Delete Node Group", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'ClusterName': 'MyCluster', + 'NodegroupName': 'MyNodegroup' + }) assert step.to_dict() == { 'Type': 'Task', @@ -959,13 +1036,14 @@ def test_eks_delete_node_group_step_creation(): 'ClusterName': 'MyCluster', 'NodegroupName': 'MyNodegroup' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_node_group_step_creation_sync(): - step = EksDeleteNodegroupStep("Delete Node Group sync", parameters={ + step = EksDeleteNodegroupStep("Delete Node Group sync", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'NodegroupName': 'MyNodegroup' }) @@ -977,13 +1055,14 @@ def test_eks_delete_node_group_step_creation_sync(): 'ClusterName': 'MyCluster', 'NodegroupName': 'MyNodegroup' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_run_job_step_creation(): - step = EksRunJobStep("Run Job", wait_for_completion=False, parameters={ + step = EksRunJobStep("Run Job", result_selector=RESULT_SELECTOR, wait_for_completion=False, parameters={ 'ClusterName': 'MyCluster', 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', 'Endpoint': 'https://AKIAIOSFODNN7EXAMPLE.yl4.us-east-1.eks.amazonaws.com', @@ -1048,13 +1127,14 @@ def test_eks_run_job_step_creation(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_run_job_step_creation_sync(): - step = EksRunJobStep("Run Job sync", parameters={ + step = EksRunJobStep("Run Job sync", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', 'Endpoint': 'https://AKIAIOSFODNN7EXAMPLE.yl4.us-east-1.eks.amazonaws.com', @@ -1125,13 +1205,14 @@ def test_eks_run_job_step_creation_sync(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_call_step_creation(): - step = EksCallStep("Call", parameters={ + step = EksCallStep("Call", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', 'Endpoint': 'https://444455556666.yl4.us-east-1.eks.amazonaws.com', @@ -1159,6 +1240,7 @@ def test_eks_call_step_creation(): ] } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } diff --git a/tests/unit/test_steps.py b/tests/unit/test_steps.py index 3d34ee8..f4e2af7 100644 --- a/tests/unit/test_steps.py +++ b/tests/unit/test_steps.py @@ -32,6 +32,7 @@ def test_state_creation(): input_path='$.Input', output_path='$.Output', parameters={'Key': 'Value'}, + result_selector={'foo': 'bar'}, result_path='$.Result' ) @@ -43,6 +44,9 @@ def test_state_creation(): 'Parameters': { 'Key': 'Value' }, + 'ResultSelector': { + 'foo': 'bar' + }, 'ResultPath': '$.Result', 'End': True }