diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index e5e03a401a6..f67a7808a53 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -437,6 +437,8 @@ KUBERNETES_DISK = from_conf("KUBERNETES_DISK", None) # Default kubernetes QoS class KUBERNETES_QOS = from_conf("KUBERNETES_QOS", "burstable") +# Default kubernetes PriorityClass name +KUBERNETES_PRIORITY_CLASS = from_conf("KUBERNETES_PRIORITY_CLASS", None) # Architecture of kubernetes nodes - used for @conda/@pypi in metaflow-dev KUBERNETES_CONDA_ARCH = from_conf("KUBERNETES_CONDA_ARCH") diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index b7b25c8c69d..345ba5e5488 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -2638,6 +2638,7 @@ def _container_templates(self): shared_memory=shared_memory, port=port, qos=resources["qos"], + priority_class=resources.get("priority_class"), security_context=security_context, ) @@ -2793,6 +2794,8 @@ def _container_templates(self): .node_selectors(resources.get("node_selector")) # Set tolerations .tolerations(resources.get("tolerations")) + # Set priority class + .priority_class_name(resources.get("priority_class")) # Set image pull secrets if present. We need to use pod_spec_patch due to Argo not supporting this on a template level. .pod_spec_patch( { @@ -4518,6 +4521,11 @@ def tolerations(self, tolerations): self.payload["tolerations"] = tolerations return self + def priority_class_name(self, priority_class_name): + if priority_class_name: + self.payload["priorityClassName"] = priority_class_name + return self + def to_json(self): return self.payload diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index c19b3efe3b9..6714cb4d9a5 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -197,6 +197,7 @@ def create_jobset( port=None, num_parallel=None, qos=None, + priority_class=None, security_context=None, ): name = "js-%s" % str(uuid4())[:6] @@ -232,6 +233,7 @@ def create_jobset( port=port, num_parallel=num_parallel, qos=qos, + priority_class=priority_class, security_context=security_context, ) .environment_variable("METAFLOW_CODE_METADATA", code_package_metadata) @@ -497,6 +499,7 @@ def create_job_object( port=None, name_pattern=None, qos=None, + priority_class=None, annotations=None, security_context=None, ): @@ -543,6 +546,7 @@ def create_job_object( shared_memory=shared_memory, port=port, qos=qos, + priority_class=priority_class, security_context=security_context, ) .environment_variable("METAFLOW_CODE_METADATA", code_package_metadata) diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index e15f7b06cb9..2088d664eb7 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -152,6 +152,12 @@ def kubernetes(): type=JSONTypeClass(), multiple=False, ) +@click.option( + "--priority-class", + default=None, + type=str, + help="PriorityClass name for the Kubernetes pod", +) @click.option( "--security-context", default=None, @@ -189,6 +195,7 @@ def step( port=None, num_parallel=None, qos=None, + priority_class=None, labels=None, annotations=None, security_context=None, @@ -335,6 +342,7 @@ def _sync_metadata(): port=port, num_parallel=num_parallel, qos=qos, + priority_class=priority_class, labels=labels, annotations=annotations, security_context=security_context, diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index bd3ae7e12c4..c1da496a757 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -31,6 +31,7 @@ KUBERNETES_SHARED_MEMORY, KUBERNETES_TOLERATIONS, KUBERNETES_QOS, + KUBERNETES_PRIORITY_CLASS, KUBERNETES_CONDA_ARCH, ) from metaflow.plugins.resources_decorator import ResourcesDecorator @@ -129,6 +130,11 @@ class KubernetesDecorator(StepDecorator): qos: str, default: Burstable Quality of Service class to assign to the pod. Supported values are: Guaranteed, Burstable, BestEffort + priority_class : str, optional, default None + Kubernetes PriorityClass name to assign to the pod. This controls the + scheduling priority of the pod relative to other pods. The priority class + must already exist in the cluster. See + https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/ security_context: Dict[str, Any], optional, default None Container security context. Applies to the task container. Allows the following keys: - privileged: bool, optional, default None @@ -167,12 +173,14 @@ class KubernetesDecorator(StepDecorator): "executable": None, "hostname_resolution_timeout": 10 * 60, "qos": KUBERNETES_QOS, + "priority_class": KUBERNETES_PRIORITY_CLASS, "security_context": None, } package_metadata = None package_url = None package_sha = None run_time_limit = None + _local_mode = False # Conda environment support supports_conda_environment = True @@ -312,8 +320,17 @@ def init(self): # Refer https://github.com/Netflix/metaflow/blob/master/docs/lifecycle.png def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger): - # Executing Kubernetes jobs requires a non-local datastore. + # When @kubernetes is defined statically in source code but the user + # is running locally (e.g., `python flow.py run`), the decorator + # should act like @resources -- just provide resource hints without + # redirecting execution to Kubernetes. The decorator only redirects + # to Kubernetes when explicitly requested via `--with kubernetes` + # or through a deployer (Argo, Airflow, etc.). if flow_datastore.TYPE not in ("s3", "azure", "gs"): + if self.statically_defined: + # Local mode: act like @resources, skip K8s-specific setup. + self._local_mode = True + return raise KubernetesException( "The *@kubernetes* decorator requires --datastore=s3 or --datastore=azure or --datastore=gs at the moment." ) @@ -438,6 +455,8 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge # TODO: add validation to annotations as well? def package_init(self, flow, step_name, environment): + if self._local_mode: + return try: # Kubernetes is a soft dependency. from kubernetes import client, config @@ -452,6 +471,8 @@ def package_init(self, flow, step_name, environment): ) def runtime_init(self, flow, graph, package, run_id): + if self._local_mode: + return # Set some more internal state. self.flow = flow self.graph = graph @@ -461,6 +482,8 @@ def runtime_init(self, flow, graph, package, run_id): def runtime_task_created( self, task_datastore, task_id, split_index, input_paths, is_cloned, ubf_context ): + if self._local_mode: + return # To execute the Kubernetes job, the job container needs to have # access to the code package. We store the package in the datastore # which the pod is able to download as part of it's entrypoint. @@ -470,6 +493,8 @@ def runtime_task_created( def runtime_step_cli( self, cli_args, retry_count, max_user_code_retries, ubf_context ): + if self._local_mode: + return if retry_count <= max_user_code_retries: # After all attempts to run the user code have failed, we don't need # to execute on Kubernetes anymore. We can execute possible fallback diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index b81777bcc7b..77ae58af078 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -221,9 +221,7 @@ def create_job_spec(self): client.V1LocalObjectReference(secret) for secret in self._kwargs.get("image_pull_secrets") or [] ], - # TODO (savin): Support preemption policies - # preemption_policy=?, - # + priority_class_name=self._kwargs.get("priority_class"), # A Container in a Pod may fail for a number of # reasons, such as because the process in it exited # with a non-zero exit code, or the Container was diff --git a/metaflow/plugins/kubernetes/kubernetes_jobsets.py b/metaflow/plugins/kubernetes/kubernetes_jobsets.py index da0f0fc3130..98cf3906ac5 100644 --- a/metaflow/plugins/kubernetes/kubernetes_jobsets.py +++ b/metaflow/plugins/kubernetes/kubernetes_jobsets.py @@ -724,9 +724,7 @@ def dump(self): for secret in self._kwargs.get("image_pull_secrets") or [] ], - # TODO (savin): Support preemption policies - # preemption_policy=?, - # + priority_class_name=self._kwargs.get("priority_class"), # A Container in a Pod may fail for a number of # reasons, such as because the process in it exited # with a non-zero exit code, or the Container was diff --git a/test/unit/test_kubernetes.py b/test/unit/test_kubernetes.py index 6ede190d428..687fcc2d706 100644 --- a/test/unit/test_kubernetes.py +++ b/test/unit/test_kubernetes.py @@ -1,10 +1,14 @@ +from unittest.mock import MagicMock + import pytest from metaflow.plugins.kubernetes.kube_utils import ( - KubernetesException, + KubernetesException as KubeUtilsException, validate_kube_labels, parse_kube_keyvalue_list, ) +from metaflow.plugins.kubernetes.kubernetes import KubernetesException +from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator @pytest.mark.parametrize( @@ -67,7 +71,7 @@ def test_kubernetes_decorator_validate_kube_labels(labels): ) def test_kubernetes_decorator_validate_kube_labels_fail(labels): """Fail if label contains invalid characters or is too long""" - with pytest.raises(KubernetesException): + with pytest.raises(KubeUtilsException): validate_kube_labels(labels) @@ -93,5 +97,134 @@ def test_kubernetes_parse_keyvalue_list(items, requires_both, expected): ], ) def test_kubernetes_parse_keyvalue_list(items, requires_both): - with pytest.raises(KubernetesException): + with pytest.raises(KubeUtilsException): parse_kube_keyvalue_list(items, requires_both) + + +class TestKubernetesDecoratorLocalMode: + """Tests for issue #2588: @kubernetes should be ignored for local runs.""" + + def _make_decorator(self, statically_defined=True, **attrs): + deco = KubernetesDecorator( + attributes=attrs or None, statically_defined=statically_defined + ) + return deco + + def test_static_decorator_enters_local_mode_with_local_datastore(self): + """When @kubernetes is in source code and datastore is local, enter local mode.""" + deco = self._make_decorator(statically_defined=True) + flow_datastore = MagicMock() + flow_datastore.TYPE = "local" + # step_init should not raise; it should silently enter local mode + deco.step_init( + flow=MagicMock(), + graph=MagicMock(), + step="my_step", + decos=[], + environment=MagicMock(), + flow_datastore=flow_datastore, + logger=MagicMock(), + ) + assert deco._local_mode is True + + def test_dynamic_decorator_raises_with_local_datastore(self): + """When @kubernetes is added via --with and datastore is local, raise error.""" + deco = self._make_decorator(statically_defined=False) + flow_datastore = MagicMock() + flow_datastore.TYPE = "local" + with pytest.raises(KubernetesException, match="--datastore=s3"): + deco.step_init( + flow=MagicMock(), + graph=MagicMock(), + step="my_step", + decos=[], + environment=MagicMock(), + flow_datastore=flow_datastore, + logger=MagicMock(), + ) + + def test_local_mode_skips_runtime_step_cli(self): + """In local mode, runtime_step_cli should not redirect to kubernetes.""" + deco = self._make_decorator(statically_defined=True) + deco._local_mode = True + cli_args = MagicMock() + cli_args.commands = ["step"] + deco.runtime_step_cli( + cli_args, retry_count=0, max_user_code_retries=3, ubf_context=None + ) + # commands should NOT have been changed to ["kubernetes", "step"] + assert cli_args.commands == ["step"] + + def test_local_mode_skips_package_init(self): + """In local mode, package_init should be a no-op (no kubernetes import needed).""" + deco = self._make_decorator(statically_defined=True) + deco._local_mode = True + # Should not raise even if kubernetes package is not installed + deco.package_init( + flow=MagicMock(), step_name="my_step", environment=MagicMock() + ) + + def test_local_mode_skips_runtime_init(self): + """In local mode, runtime_init should be a no-op.""" + deco = self._make_decorator(statically_defined=True) + deco._local_mode = True + deco.runtime_init( + flow=MagicMock(), graph=MagicMock(), package=MagicMock(), run_id="123" + ) + # Should not set self.flow etc. + assert not hasattr(deco, "flow") + + def test_local_mode_skips_runtime_task_created(self): + """In local mode, runtime_task_created should be a no-op.""" + deco = self._make_decorator(statically_defined=True) + deco._local_mode = True + # Should not raise even though flow_datastore and package are not set + deco.runtime_task_created( + task_datastore=MagicMock(), + task_id="1", + split_index=None, + input_paths=[], + is_cloned=False, + ubf_context=None, + ) + + def test_s3_datastore_does_not_enter_local_mode(self): + """With S3 datastore, even a static decorator should NOT enter local mode.""" + deco = self._make_decorator(statically_defined=True) + flow_datastore = MagicMock() + flow_datastore.TYPE = "s3" + # This will proceed with normal K8s setup - it will fail at QoS + # validation or other checks, but should NOT set _local_mode + try: + deco.step_init( + flow=MagicMock(), + graph=MagicMock(), + step="my_step", + decos=[], + environment=MagicMock(), + flow_datastore=flow_datastore, + logger=MagicMock(), + ) + except Exception: + pass # Expected to fail on later validation + assert deco._local_mode is False + + +class TestKubernetesDecoratorPriorityClass: + """Tests for issue #1752: priority_class option for @kubernetes.""" + + def test_priority_class_default_is_from_config(self): + """priority_class should default to KUBERNETES_PRIORITY_CLASS config value.""" + from metaflow.metaflow_config import KUBERNETES_PRIORITY_CLASS + + deco = KubernetesDecorator() + assert deco.attributes["priority_class"] == KUBERNETES_PRIORITY_CLASS + + def test_priority_class_can_be_set(self): + """priority_class should be settable via decorator attributes.""" + deco = KubernetesDecorator(attributes={"priority_class": "high-priority"}) + assert deco.attributes["priority_class"] == "high-priority" + + def test_priority_class_in_defaults(self): + """priority_class should be in the decorator defaults.""" + assert "priority_class" in KubernetesDecorator.defaults diff --git a/test/unit/test_kubernetes_priority_class.py b/test/unit/test_kubernetes_priority_class.py new file mode 100644 index 00000000000..d27f7dde64d --- /dev/null +++ b/test/unit/test_kubernetes_priority_class.py @@ -0,0 +1,114 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator + + +def test_priority_class_in_defaults(): + """priority_class key exists in KubernetesDecorator.defaults.""" + assert "priority_class" in KubernetesDecorator.defaults + + +def test_priority_class_default_from_config(): + """Default value comes from KUBERNETES_PRIORITY_CLASS config variable.""" + from metaflow.metaflow_config import KUBERNETES_PRIORITY_CLASS + + assert KubernetesDecorator.defaults["priority_class"] == KUBERNETES_PRIORITY_CLASS + + +def test_priority_class_attribute_set_via_decorator(): + """Explicit priority_class value is preserved in attributes.""" + deco = KubernetesDecorator(attributes={"priority_class": "high-priority"}) + assert deco.attributes["priority_class"] == "high-priority" + + +def test_priority_class_flows_to_pod_spec(): + """priority_class kwarg becomes priorityClassName in V1PodSpec.""" + from metaflow.plugins.kubernetes.kubernetes_job import KubernetesJob + + mock_client = MagicMock() + # The client.get() returns the kubernetes.client module mock + k8s_module = MagicMock() + mock_client.get.return_value = k8s_module + + kwargs = dict( + use_tmpfs=False, + tmpfs_size=None, + tmpfs_path="/metaflow_temp", + shared_memory=None, + qos="Burstable", + cpu="1", + memory="4096", + disk="10240", + gpu=None, + gpu_vendor="nvidia", + image="python:3.9", + image_pull_policy="IfNotPresent", + image_pull_secrets=[], + command=["echo", "hello"], + step_name="my_step", + namespace="default", + timeout_in_seconds=300, + annotations={}, + labels={}, + port=None, + secrets=[], + node_selector=None, + tolerations=[], + persistent_volume_claims=None, + priority_class="batch-high", + service_account="default", + security_context={}, + ) + job = KubernetesJob(client=mock_client, **kwargs) + job.create_job_spec() + + # V1PodSpec should have been called with priority_class_name="batch-high" + k8s_module.V1PodSpec.assert_called_once() + call_kwargs = k8s_module.V1PodSpec.call_args + assert call_kwargs[1]["priority_class_name"] == "batch-high" + + +def test_priority_class_none_passes_none_to_pod_spec(): + """When priority_class is None, priorityClassName is None in V1PodSpec.""" + from metaflow.plugins.kubernetes.kubernetes_job import KubernetesJob + + mock_client = MagicMock() + k8s_module = MagicMock() + mock_client.get.return_value = k8s_module + + kwargs = dict( + use_tmpfs=False, + tmpfs_size=None, + tmpfs_path="/metaflow_temp", + shared_memory=None, + qos="Burstable", + cpu="1", + memory="4096", + disk="10240", + gpu=None, + gpu_vendor="nvidia", + image="python:3.9", + image_pull_policy="IfNotPresent", + image_pull_secrets=[], + command=["echo", "hello"], + step_name="my_step", + namespace="default", + timeout_in_seconds=300, + annotations={}, + labels={}, + port=None, + secrets=[], + node_selector=None, + tolerations=[], + persistent_volume_claims=None, + priority_class=None, + service_account="default", + security_context={}, + ) + job = KubernetesJob(client=mock_client, **kwargs) + job.create_job_spec() + + call_kwargs = k8s_module.V1PodSpec.call_args + assert call_kwargs[1]["priority_class_name"] is None