Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AWS SageMaker Unified Studio Workflow Operator #45726

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

agupta01
Copy link

@agupta01 agupta01 commented Jan 16, 2025

Description

Adds an operator used for executing Jupyter Notebooks, Querybooks, and Visual ETL jobs within the context of a SageMaker Unified Studio project.

SageMaker Unified Studio (SUS) supports development of Airflow DAGs (called "workflows" within the product) that are run on an MWAA cluster managed by the project. These workflows have the ability to orchestrate the execution of Unified Studio artifacts that can connect to data assets stored in a SUS project.

Implementation-wise, these notebooks are executed on a SageMaker Training Job running a SageMaker Distribution environment within the context of a SUS project.

Components

  • SageMakerNotebookOperator: this operator allows users to execute Unified Studio artifacts within the context of their project.
  • SageMakerNotebookHook: this hook provides a wrapper around the notebook execution
  • SageMakerNotebookSensor: this sensor waits on status updates from the notebook execution
  • SageMakerNotebookJobTrigger: this trigger activates when the notebook execution completes

Usage

Note that this operator introduces a dependency on the SageMaker Studio SDK https://www.pypi.org/project/sagemaker-studio

with DAG(...) as dag:
    ...
    run_notebook = SageMakerNotebookOperator(
        task_id="initial",
        input_config={"input_path": <notebook_path_in_s3>, "input_params": {}},
        output_config={"output_formats": ["NOTEBOOK"]},
        wait_for_completion=True,
        poll_interval=5,
    )
   ...

Testing

MWAA uses python 3.11 and postgres backend, so we will set those values for all tests.

Unit tests

breeze testing core-tests -p 3.11 -b postgres providers/tests/amazon/aws/*/test_sagemaker_unified_studio.py

System tests

Ensure a properly configured SageMaker Unified Domain and Project as indicated in the example_sagemaker_unified_studio.py file. Also ensure AWS credentials are populated and up to date. Then, populate the DOMAIN_ID, PROJECT_ID, ENVIRONMENT_ID, and S3_PATH in files/airflow-breeze-config/variables.env and run:

breeze testing system-tests -p 3.11 -b postgres --forward-credentials --test-timeout 600 providers/tests/system/amazon/aws/example_sagemaker_unified_studio.py

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Jan 16, 2025
Copy link

boring-cyborg bot commented Jan 16, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

:param execution_name: The name of the notebook job to be executed, this is same as task_id.
:param input_config: Configuration for the input file.
Example: {'input_path': 'folder/input/notebook.ipynb', 'input_params': {'param1': 'value1'}}
:param output_config: Configuration for the output format. It should include an output_formats parameter to control
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence seems to just tail off in the middle?

Example: {'input_path': 'folder/input/notebook.ipynb', 'input_params': {'param1': 'value1'}}
:param output_config: Configuration for the output format. It should include an output_formats parameter to control
Example: {'output_formats': ['NOTEBOOK']}
:param compute: compute configuration to use for the notebook execution. This is an required attribute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param compute: compute configuration to use for the notebook execution. This is an required attribute
:param compute: compute configuration to use for the notebook execution. This is a required attribute


def _format_start_execution_output_config(self):
output_formats = (
self.output_config.get("output_formats") if self.output_config else ["NOTEBOOK"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ternary is unnecessary right? There is a default value provided in the constructor so the output_config can't be empty?

:param task_id: A unique, meaningful id for the task.
:param input_config: Configuration for the input file. Input path should be specified as a relative path.
The provided relative path will be automatically resolved to an absolute path within
the context of the user's home directory in the IDE. Input parms should be a dict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
the context of the user's home directory in the IDE. Input parms should be a dict
the context of the user's home directory in the IDE. Input parsms should be a dict

return self._sagemaker_studio.execution_client.start_execution(request)

def wait_for_execution_completion(self, execution_id, context):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

from airflow.utils.context import Context


class SageMakerNotebookSensor(BaseSensorOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use AwsBaseSensor?

Comment on lines +36 to +37
Username: airflowTestUser
Password: airflowSystemTestP@ssword1!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see these credentials used anywhere in the test? Is it really mandatory? If yes, we should make it configurable and provide them through the test context builder

Comment on lines +69 to +72
@task
def emulate_mwaa_environment(
domain_id: str, project_id: str, environment_id: str, s3_path: str
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If run using an container based executor (like ECS or K8s) this will have no effect. A container will spin up, export these envs, then it will get torn down and the next task will run in a new container. So this test will fail to run on our ECS executor test suite.

Any other way around this? Otherwise we'll need to create an image for container based tests, or at least provide these env vars through executor_config

notebook_path = "test_notebook.ipynb" # This should be the path to your .ipynb, .sqlnb, or .vetl file in your project.

run_notebook = SageMakerNotebookOperator(
task_id="initial",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat strange task_id

Comment on lines +136 to +137
run_notebook,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No teardown needed? What if things break, get stuck, timeout? Any way to manually stop a noteboook execution?

@agupta01 agupta01 changed the title Add AWS SageMaker Unified Studio Notebook Operator Add AWS SageMaker Unified Studio Workflow Operator Jan 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants