Skip to content

Add support for Amazon States Language "ResultSelector" in Task, Map … #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions doc/placeholders.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
Placeholders
=============


Once defined, a workflow is static unless you update it explicitly. But, you can pass
input to workflow executions. You can have dynamic values
that you use in the **parameters** fields of the steps in your workflow. For this,
that you use in the **parameters** or **result_selector** fields of the steps in your workflow. For this,
the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you
create your workflow. There are 2 mechanisms for passing dynamic values in a workflow.
create your workflow. There are 3 mechanisms for passing dynamic values in a workflow.

.. autoclass:: stepfunctions.inputs.Placeholder

.. autoclass:: stepfunctions.inputs.ExecutionInput
:inherited-members:

.. autoclass:: stepfunctions.inputs.StepInput
:inherited-members:

.. autoclass:: stepfunctions.inputs.StepResult
:inherited-members:

- `Execution Input <#execution-input>`__

- `Step Input <#step-input>`__

- `Step Result <#step-result>`__

Execution Input
---------------
The first mechanism is a global input to the workflow execution. This input is
accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput`
to define the schema for this input, and to access the values in your workflow.
Expand Down Expand Up @@ -50,6 +70,9 @@ to define the schema for this input, and to access the values in your workflow.

workflow.execute(inputs={'myDynamicInput': "WorldHello"})


Step Input
----------
The second mechanism is for passing dynamic values from one step to the next
step. The output of one step becomes the input of the next step.
The SDK provides the :py:meth:`stepfunctions.inputs.StepInput` class for this.
Expand All @@ -64,10 +87,10 @@ that returns the placeholder output for that step.
parameters={
"FunctionName": "MakeApiCall",
"Payload": {
"input": "20192312"
}
"input": "20192312"
}
)
}
)

lambda_state_second = LambdaStep(
state_id="MySecondLambdaStep",
Expand All @@ -82,11 +105,23 @@ that returns the placeholder output for that step.
definition = Chain([lambda_state_first, lambda_state_second])


Step Result
-----------
The third mechanism is a placeholder for a step's result. The result of a step can be modified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still wondering if it's really necessary to introduce StepResult when StepInput could also be used to achieve the same thing. I left that comment earlier but it didn't get an answer. Literally the only difference is the class name. Thoughts @shivlaks?

We can always add classes later, but removing it afterwards breaks back-compat.

lambda_result = StepInput(
    schema={
        "Id": str,
    }
)

lambda_state_first = LambdaStep(
    state_id="MyFirstLambdaStep",
    result_selector={
        "Output": lambda_result["Id"],
        "Status": "Success"
    }
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - I missed that comment earlier

While it does make sense to use the same object, I think the name StepInput could bring confusion since, in this case, we are not using the step's input, but the step's result (or output)

To be backwards compatible, we can't rename StepInput to a more general term. Alternatively, we could make StepResult inherit from StepInput instead of Placeholder and avoid code duplication.

with the **result_selector** field to replace the step's result.

.. autoclass:: stepfunctions.inputs.Placeholder
.. code-block:: python

.. autoclass:: stepfunctions.inputs.ExecutionInput
:inherited-members:
lambda_result = StepResult(
schema={
"Id": str,
}
)

.. autoclass:: stepfunctions.inputs.StepInput
:inherited-members:
lambda_state_first = LambdaStep(
state_id="MyFirstLambdaStep",
result_selector={
"Output": lambda_result["Id"],
"Status": "Success"
}
)
8 changes: 4 additions & 4 deletions src/stepfunctions/inputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import absolute_import

from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput
from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput, StepResult
59 changes: 44 additions & 15 deletions src/stepfunctions/inputs/placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import absolute_import

Expand Down Expand Up @@ -51,11 +51,11 @@ def __init__(self, schema=None, **kwargs):
self._set_schema(schema)
self._make_immutable()
self.json_str_template = "{}"

self.name = kwargs.get("name")
self.type = kwargs.get("type")
self.parent = kwargs.get("parent")


def get(self, name, type):
"""
Expand All @@ -64,11 +64,11 @@ def get(self, name, type):
Args:
name (str): Name of the placeholder variable.
type (type): Type of the placeholder variable.

Raises:
ValueError: If placeholder variable with the same name but different type already exists.
ValueError: If placeholder variable does not fit into a previously specified schema for the placeholder collection.

Returns:
Placeholder: Placeholder variable.
"""
Expand Down Expand Up @@ -240,7 +240,7 @@ def _join_path(self, path):
def to_jsonpath(self):
"""
Returns a JSON path representation of the placeholder variable to be used for step parameters.

Returns:
str: JSON path representation of the placeholder variable
"""
Expand All @@ -252,23 +252,25 @@ class ExecutionInput(Placeholder):
"""
Top-level class for execution input placeholders.
"""

def __init__(self, schema=None, **kwargs):
super(ExecutionInput, self).__init__(schema, **kwargs)
self.json_str_template = '$$.Execution.Input{}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Workflow Input.
A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema.
A placeholder variable can only be created if the collection is mutable.
A collection is mutable if no pre-specified schema was defined at construction.
"""
if self.immutable:
raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.")
raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection:"
f" {self.schema}")
if type:
return ExecutionInput(name=name, parent=parent, type=type)
else:
return ExecutionInput(name=name, parent=parent)


class StepInput(Placeholder):

Expand All @@ -279,15 +281,42 @@ class StepInput(Placeholder):
def __init__(self, schema=None, **kwargs):
super(StepInput, self).__init__(schema, **kwargs)
self.json_str_template = '${}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Step Input.
A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema.
A placeholder variable can only be created if the collection is mutable.
A collection is mutable if no pre-specified schema was defined at construction..
"""
if self.immutable:
raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.")
raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection:"
f" {self.schema}")
if type:
return StepInput(name=name, parent=parent, type=type)
else:
return StepInput(name=name, parent=parent)


class StepResult(Placeholder):

"""
Top-level class for step result placeholders.
"""

def __init__(self, schema=None, **kwargs):
super(StepResult, self).__init__(schema, **kwargs)
self.json_str_template = '${}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Step Result.
A placeholder variable can only be created if the collection is mutable.
A collection is mutable if no pre-specified schema was defined at construction.
"""
if self.immutable:
raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection:"
f" {self.schema}")
if type:
return StepResult(name=name, parent=parent, type=type)
else:
return StepResult(name=name, parent=parent)
4 changes: 4 additions & 0 deletions src/stepfunctions/steps/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_selector (dict, optional): The value of this field becomes the effective result of the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
Expand Down Expand Up @@ -98,6 +99,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_selector (dict, optional): The value of this field becomes the effective result of the state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maintaining these docstrings for every inheriting class is pretty annoying. I wonder if there anything we can do to make that easier.

result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
Expand Down Expand Up @@ -138,6 +140,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_selector (dict, optional): The value of this field becomes the effective result of the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
Expand Down Expand Up @@ -178,6 +181,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_selector (dict, optional): The value of this field becomes the effective result of the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
Expand Down
1 change: 1 addition & 0 deletions src/stepfunctions/steps/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Field(Enum):
InputPath = 'input_path'
OutputPath = 'output_path'
Parameters = 'parameters'
ResultSelector = 'result_selector'
ResultPath = 'result_path'
Next = 'next'
Retry = 'retry'
Expand Down
Loading