Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/zenml/config/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,10 @@ def _compute_pipeline_spec(
Raises:
ValueError: If the pipeline has no steps.
"""
if not step_specs:
from zenml.pipelines.dynamic_pipeline import DynamicPipeline

if not step_specs and not isinstance(pipeline, DynamicPipeline):
# Static pipelines require at least one step
raise ValueError(
f"Pipeline '{pipeline.name}' cannot be compiled because it has "
f"no steps. Please make sure that your steps are decorated "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

if TYPE_CHECKING:
from zenml.config.step_run_info import StepRunInfo
from zenml.models import PipelineSnapshotBase
from zenml.models import PipelineSnapshotBase, PipelineSnapshotResponse

logger = get_logger(__name__)

Expand Down Expand Up @@ -128,6 +128,13 @@ def get_docker_builds(
The required Docker builds.
"""
builds = []
# TODO: only needed for dynamic pipelines
build = BuildConfiguration(
key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY,
settings=snapshot.pipeline_configuration.docker_settings,
)
builds.append(build)

for step_name, step in snapshot.step_configurations.items():
if step.config.uses_step_operator(self.name):
build = BuildConfiguration(
Expand Down Expand Up @@ -286,3 +293,76 @@ def launch(
stream_logs=True,
)
logger.info("Step operator job completed.")

def run_dynamic_pipeline(
self,
command: List[str],
snapshot: "PipelineSnapshotResponse",
environment: Dict[str, str],
sync: bool = False,
) -> None:
settings = self.get_settings(snapshot)
command, args = command[:3], command[3:]

image = snapshot.build.get_image(
component_key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY
)

# We set some default minimum memory resource requests for the step pod
# here if the user has not specified any, because the step pod takes up
# some memory resources itself and, if not specified, the pod will be
# scheduled on any node regardless of available memory and risk
# negatively impacting or even crashing the node due to memory pressure.
pod_settings = kube_utils.apply_default_resource_requests(
memory="400Mi",
pod_settings=settings.pod_settings,
)

pod_manifest = build_pod_manifest(
pod_name=None,
image_name=image,
command=command,
args=args,
env=environment,
privileged=settings.privileged,
pod_settings=pod_settings,
service_account_name=settings.service_account_name,
)

job_name = settings.job_name_prefix or ""
random_prefix = "".join(random.choices("0123456789abcdef", k=8))
job_name += f"-{random_prefix}-step-operator"
# The job name will be used as a label on the pods, so we need to make
# sure it doesn't exceed the label length limit
job_name = kube_utils.sanitize_label(job_name)

job_manifest = build_job_manifest(
job_name=job_name,
pod_template=pod_template_manifest_from_pod(pod_manifest),
# The orchestrator already handles retries, so we don't need to
# retry the step operator job.
backoff_limit=0,
ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
active_deadline_seconds=settings.active_deadline_seconds,
)

kube_utils.create_job(
batch_api=self._k8s_batch_api,
namespace=self.config.kubernetes_namespace,
job_manifest=job_manifest,
)

if sync:
logger.info(
"Waiting for step operator job `%s` to finish...",
job_name,
)
kube_utils.wait_for_job_to_finish(
batch_api=self._k8s_batch_api,
core_api=self._k8s_core_api,
namespace=self.config.kubernetes_namespace,
job_name=job_name,
fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons,
stream_logs=True,
)
logger.info("Step operator job completed.")
1 change: 1 addition & 0 deletions src/zenml/models/v2/core/pipeline_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class PipelineSnapshotUpdate(BaseUpdate):
remove_tags: Optional[List[str]] = Field(
default=None, title="Tags to remove from the snapshot."
)
add_steps: Optional[Dict[str, Step]] = None

@field_validator("name")
@classmethod
Expand Down
20 changes: 19 additions & 1 deletion src/zenml/orchestrators/publish_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,23 @@ def publish_failed_pipeline_run(
)


def publish_successful_pipeline_run(
pipeline_run_id: "UUID",
) -> "PipelineRunResponse":
"""Publishes a successful pipeline run.
Args:
pipeline_run_id: The ID of the pipeline run to update.
"""
return Client().zen_store.update_run(
run_id=pipeline_run_id,
run_update=PipelineRunUpdate(
status=ExecutionStatus.COMPLETED,
end_time=utc_now(),
),
)


def publish_pipeline_run_status_update(
pipeline_run_id: "UUID",
status: ExecutionStatus,
Expand Down Expand Up @@ -224,7 +241,8 @@ def get_pipeline_run_status(

# Any other state is completed
else:
return ExecutionStatus.COMPLETED
# In dynamic pipelines, the amount of steps isn't known ahead of time
return run_status


def publish_pipeline_run_metadata(
Expand Down
Loading