Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@
KUBERNETES_DISK = from_conf("KUBERNETES_DISK", None)
# Default kubernetes QoS class
KUBERNETES_QOS = from_conf("KUBERNETES_QOS", "burstable")
# Default kubernetes PriorityClass name
KUBERNETES_PRIORITY_CLASS = from_conf("KUBERNETES_PRIORITY_CLASS", None)

# Architecture of kubernetes nodes - used for @conda/@pypi in metaflow-dev
KUBERNETES_CONDA_ARCH = from_conf("KUBERNETES_CONDA_ARCH")
Expand Down
8 changes: 8 additions & 0 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2638,6 +2638,7 @@ def _container_templates(self):
shared_memory=shared_memory,
port=port,
qos=resources["qos"],
priority_class=resources.get("priority_class"),
security_context=security_context,
)

Expand Down Expand Up @@ -2793,6 +2794,8 @@ def _container_templates(self):
.node_selectors(resources.get("node_selector"))
# Set tolerations
.tolerations(resources.get("tolerations"))
# Set priority class
.priority_class_name(resources.get("priority_class"))
# Set image pull secrets if present. We need to use pod_spec_patch due to Argo not supporting this on a template level.
.pod_spec_patch(
{
Expand Down Expand Up @@ -4518,6 +4521,11 @@ def tolerations(self, tolerations):
self.payload["tolerations"] = tolerations
return self

def priority_class_name(self, priority_class_name):
if priority_class_name:
self.payload["priorityClassName"] = priority_class_name
return self

def to_json(self):
return self.payload

Expand Down
4 changes: 4 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def create_jobset(
port=None,
num_parallel=None,
qos=None,
priority_class=None,
security_context=None,
):
name = "js-%s" % str(uuid4())[:6]
Expand Down Expand Up @@ -232,6 +233,7 @@ def create_jobset(
port=port,
num_parallel=num_parallel,
qos=qos,
priority_class=priority_class,
security_context=security_context,
)
.environment_variable("METAFLOW_CODE_METADATA", code_package_metadata)
Expand Down Expand Up @@ -497,6 +499,7 @@ def create_job_object(
port=None,
name_pattern=None,
qos=None,
priority_class=None,
annotations=None,
security_context=None,
):
Expand Down Expand Up @@ -543,6 +546,7 @@ def create_job_object(
shared_memory=shared_memory,
port=port,
qos=qos,
priority_class=priority_class,
security_context=security_context,
)
.environment_variable("METAFLOW_CODE_METADATA", code_package_metadata)
Expand Down
8 changes: 8 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ def kubernetes():
type=JSONTypeClass(),
multiple=False,
)
@click.option(
"--priority-class",
default=None,
type=str,
help="PriorityClass name for the Kubernetes pod",
)
@click.option(
"--security-context",
default=None,
Expand Down Expand Up @@ -189,6 +195,7 @@ def step(
port=None,
num_parallel=None,
qos=None,
priority_class=None,
labels=None,
annotations=None,
security_context=None,
Expand Down Expand Up @@ -335,6 +342,7 @@ def _sync_metadata():
port=port,
num_parallel=num_parallel,
qos=qos,
priority_class=priority_class,
labels=labels,
annotations=annotations,
security_context=security_context,
Expand Down
27 changes: 26 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
KUBERNETES_SHARED_MEMORY,
KUBERNETES_TOLERATIONS,
KUBERNETES_QOS,
KUBERNETES_PRIORITY_CLASS,
KUBERNETES_CONDA_ARCH,
)
from metaflow.plugins.resources_decorator import ResourcesDecorator
Expand Down Expand Up @@ -129,6 +130,11 @@ class KubernetesDecorator(StepDecorator):
qos: str, default: Burstable
Quality of Service class to assign to the pod. Supported values are: Guaranteed, Burstable, BestEffort

priority_class : str, optional, default None
Kubernetes PriorityClass name to assign to the pod. This controls the
scheduling priority of the pod relative to other pods. The priority class
must already exist in the cluster. See
https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/
security_context: Dict[str, Any], optional, default None
Comment on lines +137 to 138
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing blank line in docstring

The priority_class parameter block is missing the blank line separator before security_context that all other parameters have. This breaks the visual consistency of the docstring.

Suggested change
https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/
security_context: Dict[str, Any], optional, default None
https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/
security_context: Dict[str, Any], optional, default None

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Container security context. Applies to the task container. Allows the following keys:
- privileged: bool, optional, default None
Expand Down Expand Up @@ -167,12 +173,14 @@ class KubernetesDecorator(StepDecorator):
"executable": None,
"hostname_resolution_timeout": 10 * 60,
"qos": KUBERNETES_QOS,
"priority_class": KUBERNETES_PRIORITY_CLASS,
"security_context": None,
}
package_metadata = None
package_url = None
package_sha = None
run_time_limit = None
_local_mode = False

# Conda environment support
supports_conda_environment = True
Expand Down Expand Up @@ -312,8 +320,17 @@ def init(self):

# Refer https://github.com/Netflix/metaflow/blob/master/docs/lifecycle.png
def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger):
# Executing Kubernetes jobs requires a non-local datastore.
# When @kubernetes is defined statically in source code but the user
# is running locally (e.g., `python flow.py run`), the decorator
# should act like @resources -- just provide resource hints without
# redirecting execution to Kubernetes. The decorator only redirects
# to Kubernetes when explicitly requested via `--with kubernetes`
# or through a deployer (Argo, Airflow, etc.).
if flow_datastore.TYPE not in ("s3", "azure", "gs"):
if self.statically_defined:
# Local mode: act like @resources, skip K8s-specific setup.
self._local_mode = True
return
raise KubernetesException(
"The *@kubernetes* decorator requires --datastore=s3 or --datastore=azure or --datastore=gs at the moment."
)
Expand Down Expand Up @@ -438,6 +455,8 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge
# TODO: add validation to annotations as well?

def package_init(self, flow, step_name, environment):
if self._local_mode:
return
try:
# Kubernetes is a soft dependency.
from kubernetes import client, config
Expand All @@ -452,6 +471,8 @@ def package_init(self, flow, step_name, environment):
)

def runtime_init(self, flow, graph, package, run_id):
if self._local_mode:
return
# Set some more internal state.
self.flow = flow
self.graph = graph
Expand All @@ -461,6 +482,8 @@ def runtime_init(self, flow, graph, package, run_id):
def runtime_task_created(
self, task_datastore, task_id, split_index, input_paths, is_cloned, ubf_context
):
if self._local_mode:
return
# To execute the Kubernetes job, the job container needs to have
# access to the code package. We store the package in the datastore
# which the pod is able to download as part of it's entrypoint.
Expand All @@ -470,6 +493,8 @@ def runtime_task_created(
def runtime_step_cli(
self, cli_args, retry_count, max_user_code_retries, ubf_context
):
if self._local_mode:
return
if retry_count <= max_user_code_retries:
# After all attempts to run the user code have failed, we don't need
# to execute on Kubernetes anymore. We can execute possible fallback
Expand Down
4 changes: 1 addition & 3 deletions metaflow/plugins/kubernetes/kubernetes_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,7 @@ def create_job_spec(self):
client.V1LocalObjectReference(secret)
for secret in self._kwargs.get("image_pull_secrets") or []
],
# TODO (savin): Support preemption policies
# preemption_policy=?,
#
priority_class_name=self._kwargs.get("priority_class"),
# A Container in a Pod may fail for a number of
# reasons, such as because the process in it exited
# with a non-zero exit code, or the Container was
Expand Down
4 changes: 1 addition & 3 deletions metaflow/plugins/kubernetes/kubernetes_jobsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,9 +724,7 @@ def dump(self):
for secret in self._kwargs.get("image_pull_secrets")
or []
],
# TODO (savin): Support preemption policies
# preemption_policy=?,
#
priority_class_name=self._kwargs.get("priority_class"),
# A Container in a Pod may fail for a number of
# reasons, such as because the process in it exited
# with a non-zero exit code, or the Container was
Expand Down
139 changes: 136 additions & 3 deletions test/unit/test_kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from unittest.mock import MagicMock

import pytest

from metaflow.plugins.kubernetes.kube_utils import (
KubernetesException,
KubernetesException as KubeUtilsException,
validate_kube_labels,
parse_kube_keyvalue_list,
)
from metaflow.plugins.kubernetes.kubernetes import KubernetesException
from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator


@pytest.mark.parametrize(
Expand Down Expand Up @@ -67,7 +71,7 @@ def test_kubernetes_decorator_validate_kube_labels(labels):
)
def test_kubernetes_decorator_validate_kube_labels_fail(labels):
"""Fail if label contains invalid characters or is too long"""
with pytest.raises(KubernetesException):
with pytest.raises(KubeUtilsException):
validate_kube_labels(labels)


Expand All @@ -93,5 +97,134 @@ def test_kubernetes_parse_keyvalue_list(items, requires_both, expected):
],
)
def test_kubernetes_parse_keyvalue_list(items, requires_both):
with pytest.raises(KubernetesException):
with pytest.raises(KubeUtilsException):
parse_kube_keyvalue_list(items, requires_both)


class TestKubernetesDecoratorLocalMode:
"""Tests for issue #2588: @kubernetes should be ignored for local runs."""

def _make_decorator(self, statically_defined=True, **attrs):
deco = KubernetesDecorator(
attributes=attrs or None, statically_defined=statically_defined
)
return deco

def test_static_decorator_enters_local_mode_with_local_datastore(self):
"""When @kubernetes is in source code and datastore is local, enter local mode."""
deco = self._make_decorator(statically_defined=True)
flow_datastore = MagicMock()
flow_datastore.TYPE = "local"
# step_init should not raise; it should silently enter local mode
deco.step_init(
flow=MagicMock(),
graph=MagicMock(),
step="my_step",
decos=[],
environment=MagicMock(),
flow_datastore=flow_datastore,
logger=MagicMock(),
)
assert deco._local_mode is True

def test_dynamic_decorator_raises_with_local_datastore(self):
"""When @kubernetes is added via --with and datastore is local, raise error."""
deco = self._make_decorator(statically_defined=False)
flow_datastore = MagicMock()
flow_datastore.TYPE = "local"
with pytest.raises(KubernetesException, match="--datastore=s3"):
deco.step_init(
flow=MagicMock(),
graph=MagicMock(),
step="my_step",
decos=[],
environment=MagicMock(),
flow_datastore=flow_datastore,
logger=MagicMock(),
)

def test_local_mode_skips_runtime_step_cli(self):
"""In local mode, runtime_step_cli should not redirect to kubernetes."""
deco = self._make_decorator(statically_defined=True)
deco._local_mode = True
cli_args = MagicMock()
cli_args.commands = ["step"]
deco.runtime_step_cli(
cli_args, retry_count=0, max_user_code_retries=3, ubf_context=None
)
# commands should NOT have been changed to ["kubernetes", "step"]
assert cli_args.commands == ["step"]

def test_local_mode_skips_package_init(self):
"""In local mode, package_init should be a no-op (no kubernetes import needed)."""
deco = self._make_decorator(statically_defined=True)
deco._local_mode = True
# Should not raise even if kubernetes package is not installed
deco.package_init(
flow=MagicMock(), step_name="my_step", environment=MagicMock()
)

def test_local_mode_skips_runtime_init(self):
"""In local mode, runtime_init should be a no-op."""
deco = self._make_decorator(statically_defined=True)
deco._local_mode = True
deco.runtime_init(
flow=MagicMock(), graph=MagicMock(), package=MagicMock(), run_id="123"
)
# Should not set self.flow etc.
assert not hasattr(deco, "flow")

def test_local_mode_skips_runtime_task_created(self):
"""In local mode, runtime_task_created should be a no-op."""
deco = self._make_decorator(statically_defined=True)
deco._local_mode = True
# Should not raise even though flow_datastore and package are not set
deco.runtime_task_created(
task_datastore=MagicMock(),
task_id="1",
split_index=None,
input_paths=[],
is_cloned=False,
ubf_context=None,
)

def test_s3_datastore_does_not_enter_local_mode(self):
"""With S3 datastore, even a static decorator should NOT enter local mode."""
deco = self._make_decorator(statically_defined=True)
flow_datastore = MagicMock()
flow_datastore.TYPE = "s3"
# This will proceed with normal K8s setup - it will fail at QoS
# validation or other checks, but should NOT set _local_mode
try:
deco.step_init(
flow=MagicMock(),
graph=MagicMock(),
step="my_step",
decos=[],
environment=MagicMock(),
flow_datastore=flow_datastore,
logger=MagicMock(),
)
except Exception:
pass # Expected to fail on later validation
assert deco._local_mode is False


class TestKubernetesDecoratorPriorityClass:
"""Tests for issue #1752: priority_class option for @kubernetes."""

def test_priority_class_default_is_from_config(self):
"""priority_class should default to KUBERNETES_PRIORITY_CLASS config value."""
from metaflow.metaflow_config import KUBERNETES_PRIORITY_CLASS

deco = KubernetesDecorator()
assert deco.attributes["priority_class"] == KUBERNETES_PRIORITY_CLASS

def test_priority_class_can_be_set(self):
"""priority_class should be settable via decorator attributes."""
deco = KubernetesDecorator(attributes={"priority_class": "high-priority"})
assert deco.attributes["priority_class"] == "high-priority"

def test_priority_class_in_defaults(self):
"""priority_class should be in the decorator defaults."""
assert "priority_class" in KubernetesDecorator.defaults
Loading
Loading