Skip to content
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
ce1de79
Add version 0.84.3 to legacy docs (#3949)
github-actions[bot] Sep 12, 2025
e87d336
started creating required files and mapping out the zenml config -> a…
SebastianScherer88 Sep 12, 2025
6b076d1
finished first draft of aws batch step operator
SebastianScherer88 Sep 13, 2025
c6f2a87
renaming modules and adding unit tests
SebastianScherer88 Sep 13, 2025
01017cc
added support for multinode aws batch job type
SebastianScherer88 Sep 13, 2025
b22672b
added support for multinode aws batch job type
SebastianScherer88 Sep 13, 2025
371a4ac
adding test dependency back in and fixing typo in sagemaker doc string
SebastianScherer88 Sep 14, 2025
a80f266
renaming the aws batch runtime context retrieval utility
SebastianScherer88 Sep 14, 2025
e372f85
started creating required files and mapping out the zenml config -> a…
SebastianScherer88 Sep 12, 2025
3d8c39b
finished first draft of aws batch step operator
SebastianScherer88 Sep 13, 2025
0543331
renaming modules and adding unit tests
SebastianScherer88 Sep 13, 2025
c9b5829
added support for multinode aws batch job type
SebastianScherer88 Sep 13, 2025
c787379
added support for multinode aws batch job type
SebastianScherer88 Sep 13, 2025
5fd0761
adding test dependency back in and fixing typo in sagemaker doc string
SebastianScherer88 Sep 14, 2025
5466799
renaming the aws batch runtime context retrieval utility
SebastianScherer88 Sep 14, 2025
17de12b
bounding aws integration dependency boto3 < 2
SebastianScherer88 Sep 16, 2025
1fcefac
using immutable default dict factory instead of mutable empty dict value
SebastianScherer88 Sep 16, 2025
eb6c320
removing commented out default args
SebastianScherer88 Sep 16, 2025
d1c002b
removing incorrect warning stating that step level resources specific…
SebastianScherer88 Sep 16, 2025
98e014e
increased timeout error to 1h and added batch client error handling
SebastianScherer88 Sep 16, 2025
1be5965
replicated the sagemaker orchestrator aws authentication and session …
SebastianScherer88 Sep 16, 2025
dab9340
resolving merge conflicts
SebastianScherer88 Sep 16, 2025
070ef62
fixes off the back initial functional testing
SebastianScherer88 Sep 21, 2025
a398139
more changes after successfully e2e testing single node (i.e. aws bat…
SebastianScherer88 Sep 21, 2025
1a602eb
fixed step environment settings bug
SebastianScherer88 Sep 21, 2025
69e60d1
fixed the multinode targetnode syntax
SebastianScherer88 Sep 21, 2025
0d53bce
fixed type hints for instance type
SebastianScherer88 Sep 22, 2025
739fdaa
stripping out multinode support as its not really needed given batch …
SebastianScherer88 Sep 23, 2025
9665107
fixed fargate networking bug. the container spec model didnt have a n…
SebastianScherer88 Sep 25, 2025
4e171c1
default backend is fargate bc its faster and easier to set up the infra
SebastianScherer88 Sep 25, 2025
02f9281
fixed integration tests
SebastianScherer88 Sep 25, 2025
778bdfd
Merge branch 'develop' into feature/aws-step-operator
SebastianScherer88 Sep 27, 2025
8fc2959
addressed all comments except logging
SebastianScherer88 Oct 1, 2025
835929c
Merge branch 'feature/aws-step-operator' of https://github.com/Sebast…
SebastianScherer88 Oct 1, 2025
6232f4f
buffer of 5 chars
SebastianScherer88 Oct 1, 2025
705c2a9
added validation of pipeline and step name before assembling full job…
SebastianScherer88 Oct 4, 2025
971cd68
implemented name sanitization as suggested instead of raising excepti…
SebastianScherer88 Oct 5, 2025
d2ace24
added ec2 and fargate resource validation to schemas, simplified reso…
SebastianScherer88 Oct 5, 2025
d3be040
fixed bug in fargate resource memory validation range
SebastianScherer88 Oct 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/zenml/integrations/aws/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
AWS_CONTAINER_REGISTRY_FLAVOR = "aws"
AWS_SAGEMAKER_STEP_OPERATOR_FLAVOR = "sagemaker"
AWS_SAGEMAKER_ORCHESTRATOR_FLAVOR = "sagemaker"
AWS_BATCH_STEP_OPERATOR_FLAVOR = "aws_batch"
AWS_DEPLOYER_FLAVOR = "aws"

# Service connector constants
Expand Down Expand Up @@ -66,6 +67,7 @@ def flavors(cls) -> List[Type[Flavor]]:
AWSImageBuilderFlavor,
SagemakerOrchestratorFlavor,
SagemakerStepOperatorFlavor,
AWSBatchStepOperatorFlavor
)

return [
Expand All @@ -74,4 +76,5 @@ def flavors(cls) -> List[Type[Flavor]]:
AWSImageBuilderFlavor,
SagemakerStepOperatorFlavor,
SagemakerOrchestratorFlavor,
AWSBatchStepOperatorFlavor
]
7 changes: 7 additions & 0 deletions src/zenml/integrations/aws/flavors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
SagemakerStepOperatorConfig,
SagemakerStepOperatorFlavor,
)
from zenml.integrations.aws.flavors.aws_batch_step_operator_flavor import (
AWSBatchStepOperatorConfig,
AWSBatchStepOperatorFlavor
)

__all__ = [
"AWSContainerRegistryFlavor",
Expand All @@ -45,4 +49,7 @@
"SagemakerStepOperatorConfig",
"SagemakerOrchestratorFlavor",
"SagemakerOrchestratorConfig",
"AWSBatchStepOperatorFlavor",
"AWSBatchStepOperatorConfig",

]
201 changes: 201 additions & 0 deletions src/zenml/integrations/aws/flavors/aws_batch_step_operator_flavor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# Copyright (c) ZenML GmbH 2022. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Amazon SageMaker step operator flavor."""

from typing import TYPE_CHECKING, Dict, Optional, Type, Literal

from pydantic import Field, PositiveInt, field_validator
from zenml.utils.secret_utils import SecretField
from zenml.config.base_settings import BaseSettings
from zenml.integrations.aws import (
AWS_RESOURCE_TYPE,
AWS_BATCH_STEP_OPERATOR_FLAVOR,
)
from zenml.models import ServiceConnectorRequirements
from zenml.step_operators.base_step_operator import (
BaseStepOperatorConfig,
BaseStepOperatorFlavor,
)

if TYPE_CHECKING:
from zenml.integrations.aws.step_operators import AWSBatchStepOperator


class AWSBatchStepOperatorSettings(BaseSettings):
"""Settings for the Sagemaker step operator."""

environment: Dict[str, str] = Field(
default_factory=dict,
description="Environment variables to pass to the container during " \
"execution. Example: {'LOG_LEVEL': 'INFO', 'DEBUG_MODE': 'False'}",
)
job_queue_name: str = Field(
default="",
description="The AWS Batch job queue to submit the step AWS Batch job"
" to. If not provided, falls back to the default job queue name "
"specified at stack registration time. Must be compatible with"
"`backend`."
)
backend: Literal['EC2','FARGATE'] = Field(
default="FARGATE",
description="The AWS Batch platform capability for the step AWS Batch "
"job to be orchestrated with. Must be compatible with `job_queue_name`."
"Defaults to 'FARGATE'."
)
assign_public_ip: Literal['ENABLED','DISABLED'] = Field(
default="ENABLED",
description="Sets the network configuration's assignPublicIp field."
"Only relevant for FARGATE backend."
)
timeout_seconds: PositiveInt = Field(
default=3600,
description="The number of seconds before AWS Batch times out the job."
)



class AWSBatchStepOperatorConfig(
BaseStepOperatorConfig, AWSBatchStepOperatorSettings
):
"""Config for the AWS Batch step operator.
Note: We use ECS as a backend (not EKS), and EC2 as a compute engine (not
Fargate). This is because
- users can avoid the complexity of setting up an EKS cluster, and
- we can AWS Batch multinode type job support later, which requires EC2
"""

execution_role: str = Field(
description="The IAM role arn of the ECS execution role."
)
job_role: str = Field(
description="The IAM role arn of the ECS job role."
)
default_job_queue_name: str = Field(
description="The default AWS Batch job queue to submit AWS Batch jobs to."
)
aws_access_key_id: Optional[str] = SecretField(
default=None,
description="The AWS access key ID to use to authenticate to AWS. "
"If not provided, the value from the default AWS config will be used.",
)
aws_secret_access_key: Optional[str] = SecretField(
default=None,
description="The AWS secret access key to use to authenticate to AWS. "
"If not provided, the value from the default AWS config will be used.",
)
aws_profile: Optional[str] = Field(
None,
description="The AWS profile to use for authentication if not using "
"service connectors or explicit credentials. If not provided, the "
"default profile will be used.",
)
aws_auth_role_arn: Optional[str] = Field(
None,
description="The ARN of an intermediate IAM role to assume when "
"authenticating to AWS.",
)
region: Optional[str] = Field(
None,
description="The AWS region where the processing job will be run. "
"If not provided, the value from the default AWS config will be used.",
)

@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
return True


class AWSBatchStepOperatorFlavor(BaseStepOperatorFlavor):
"""Flavor for the AWS Batch step operator."""

@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AWS_BATCH_STEP_OPERATOR_FLAVOR

@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(resource_type=AWS_RESOURCE_TYPE)

@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()

@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()

@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/step_operator/aws_batch.png"

@property
def config_class(self) -> Type[AWSBatchStepOperatorConfig]:
"""Returns BatchStepOperatorConfig config class.
Returns:
The config class.
"""
return AWSBatchStepOperatorConfig

@property
def implementation_class(self) -> Type["AWSBatchStepOperator"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.aws.step_operators import AWSBatchStepOperator

return AWSBatchStepOperator
8 changes: 5 additions & 3 deletions src/zenml/integrations/aws/step_operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# permissions and limitations under the License.
"""Initialization of the Sagemaker Step Operator."""

from zenml.integrations.aws.step_operators.sagemaker_step_operator import ( # noqa
from zenml.integrations.aws.step_operators.sagemaker_step_operator import ( # noqa: F401
SagemakerStepOperator,
)

__all__ = ["SagemakerStepOperator"]
from zenml.integrations.aws.step_operators.aws_batch_step_operator import ( # noqa: F401
AWSBatchStepOperator,
)
__all__ = ["SagemakerStepOperator","AWSBatchStepOperator"]
Loading