Skip to content

Commit f7e72fd

Browse files
author
Nissan Pow
committed
feat: ignore @kubernetes for local runs and add priority_class option
When @kubernetes is defined statically in source code and the user runs locally (e.g., `python flow.py run` with local datastore), the decorator now acts like @resources -- providing resource hints without redirecting execution to Kubernetes. This fixes #2588. Add priority_class parameter to @kubernetes decorator, which sets priorityClassName on the pod spec for K8s PriorityClass scheduling. Supported in direct K8s jobs, jobsets, and Argo Workflows. Configurable via METAFLOW_KUBERNETES_PRIORITY_CLASS. This fixes #1752.
1 parent a40d206 commit f7e72fd

8 files changed

Lines changed: 182 additions & 7 deletions

File tree

metaflow/metaflow_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,8 @@
434434
KUBERNETES_DISK = from_conf("KUBERNETES_DISK", None)
435435
# Default kubernetes QoS class
436436
KUBERNETES_QOS = from_conf("KUBERNETES_QOS", "burstable")
437+
# Default kubernetes PriorityClass name
438+
KUBERNETES_PRIORITY_CLASS = from_conf("KUBERNETES_PRIORITY_CLASS", None)
437439

438440
# Architecture of kubernetes nodes - used for @conda/@pypi in metaflow-dev
439441
KUBERNETES_CONDA_ARCH = from_conf("KUBERNETES_CONDA_ARCH")

metaflow/plugins/argo/argo_workflows.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2554,6 +2554,7 @@ def _container_templates(self):
25542554
shared_memory=shared_memory,
25552555
port=port,
25562556
qos=resources["qos"],
2557+
priority_class=resources.get("priority_class"),
25572558
security_context=security_context,
25582559
)
25592560

@@ -2709,6 +2710,8 @@ def _container_templates(self):
27092710
.node_selectors(resources.get("node_selector"))
27102711
# Set tolerations
27112712
.tolerations(resources.get("tolerations"))
2713+
# Set priority class
2714+
.priority_class_name(resources.get("priority_class"))
27122715
# Set image pull secrets if present. We need to use pod_spec_patch due to Argo not supporting this on a template level.
27132716
.pod_spec_patch(
27142717
{
@@ -4434,6 +4437,11 @@ def tolerations(self, tolerations):
44344437
self.payload["tolerations"] = tolerations
44354438
return self
44364439

4440+
def priority_class_name(self, priority_class_name):
4441+
if priority_class_name:
4442+
self.payload["priorityClassName"] = priority_class_name
4443+
return self
4444+
44374445
def to_json(self):
44384446
return self.payload
44394447

metaflow/plugins/kubernetes/kubernetes.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ def create_jobset(
197197
port=None,
198198
num_parallel=None,
199199
qos=None,
200+
priority_class=None,
200201
security_context=None,
201202
):
202203
name = "js-%s" % str(uuid4())[:6]
@@ -232,6 +233,7 @@ def create_jobset(
232233
port=port,
233234
num_parallel=num_parallel,
234235
qos=qos,
236+
priority_class=priority_class,
235237
security_context=security_context,
236238
)
237239
.environment_variable("METAFLOW_CODE_METADATA", code_package_metadata)
@@ -497,6 +499,7 @@ def create_job_object(
497499
port=None,
498500
name_pattern=None,
499501
qos=None,
502+
priority_class=None,
500503
annotations=None,
501504
security_context=None,
502505
):
@@ -543,6 +546,7 @@ def create_job_object(
543546
shared_memory=shared_memory,
544547
port=port,
545548
qos=qos,
549+
priority_class=priority_class,
546550
security_context=security_context,
547551
)
548552
.environment_variable("METAFLOW_CODE_METADATA", code_package_metadata)

metaflow/plugins/kubernetes/kubernetes_cli.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,12 @@ def kubernetes():
152152
type=JSONTypeClass(),
153153
multiple=False,
154154
)
155+
@click.option(
156+
"--priority-class",
157+
default=None,
158+
type=str,
159+
help="PriorityClass name for the Kubernetes pod",
160+
)
155161
@click.option(
156162
"--security-context",
157163
default=None,
@@ -189,6 +195,7 @@ def step(
189195
port=None,
190196
num_parallel=None,
191197
qos=None,
198+
priority_class=None,
192199
labels=None,
193200
annotations=None,
194201
security_context=None,
@@ -335,6 +342,7 @@ def _sync_metadata():
335342
port=port,
336343
num_parallel=num_parallel,
337344
qos=qos,
345+
priority_class=priority_class,
338346
labels=labels,
339347
annotations=annotations,
340348
security_context=security_context,

metaflow/plugins/kubernetes/kubernetes_decorator.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
KUBERNETES_SHARED_MEMORY,
3232
KUBERNETES_TOLERATIONS,
3333
KUBERNETES_QOS,
34+
KUBERNETES_PRIORITY_CLASS,
3435
KUBERNETES_CONDA_ARCH,
3536
)
3637
from metaflow.plugins.resources_decorator import ResourcesDecorator
@@ -129,6 +130,11 @@ class KubernetesDecorator(StepDecorator):
129130
qos: str, default: Burstable
130131
Quality of Service class to assign to the pod. Supported values are: Guaranteed, Burstable, BestEffort
131132
133+
priority_class : str, optional, default None
134+
Kubernetes PriorityClass name to assign to the pod. This controls the
135+
scheduling priority of the pod relative to other pods. The priority class
136+
must already exist in the cluster. See
137+
https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/
132138
security_context: Dict[str, Any], optional, default None
133139
Container security context. Applies to the task container. Allows the following keys:
134140
- privileged: bool, optional, default None
@@ -167,12 +173,14 @@ class KubernetesDecorator(StepDecorator):
167173
"executable": None,
168174
"hostname_resolution_timeout": 10 * 60,
169175
"qos": KUBERNETES_QOS,
176+
"priority_class": KUBERNETES_PRIORITY_CLASS,
170177
"security_context": None,
171178
}
172179
package_metadata = None
173180
package_url = None
174181
package_sha = None
175182
run_time_limit = None
183+
_local_mode = False
176184

177185
# Conda environment support
178186
supports_conda_environment = True
@@ -312,8 +320,17 @@ def init(self):
312320

313321
# Refer https://github.com/Netflix/metaflow/blob/master/docs/lifecycle.png
314322
def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger):
315-
# Executing Kubernetes jobs requires a non-local datastore.
323+
# When @kubernetes is defined statically in source code but the user
324+
# is running locally (e.g., `python flow.py run`), the decorator
325+
# should act like @resources -- just provide resource hints without
326+
# redirecting execution to Kubernetes. The decorator only redirects
327+
# to Kubernetes when explicitly requested via `--with kubernetes`
328+
# or through a deployer (Argo, Airflow, etc.).
316329
if flow_datastore.TYPE not in ("s3", "azure", "gs"):
330+
if self.statically_defined:
331+
# Local mode: act like @resources, skip K8s-specific setup.
332+
self._local_mode = True
333+
return
317334
raise KubernetesException(
318335
"The *@kubernetes* decorator requires --datastore=s3 or --datastore=azure or --datastore=gs at the moment."
319336
)
@@ -438,6 +455,8 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge
438455
# TODO: add validation to annotations as well?
439456

440457
def package_init(self, flow, step_name, environment):
458+
if self._local_mode:
459+
return
441460
try:
442461
# Kubernetes is a soft dependency.
443462
from kubernetes import client, config
@@ -452,6 +471,8 @@ def package_init(self, flow, step_name, environment):
452471
)
453472

454473
def runtime_init(self, flow, graph, package, run_id):
474+
if self._local_mode:
475+
return
455476
# Set some more internal state.
456477
self.flow = flow
457478
self.graph = graph
@@ -461,6 +482,8 @@ def runtime_init(self, flow, graph, package, run_id):
461482
def runtime_task_created(
462483
self, task_datastore, task_id, split_index, input_paths, is_cloned, ubf_context
463484
):
485+
if self._local_mode:
486+
return
464487
# To execute the Kubernetes job, the job container needs to have
465488
# access to the code package. We store the package in the datastore
466489
# which the pod is able to download as part of it's entrypoint.
@@ -470,6 +493,8 @@ def runtime_task_created(
470493
def runtime_step_cli(
471494
self, cli_args, retry_count, max_user_code_retries, ubf_context
472495
):
496+
if self._local_mode:
497+
return
473498
if retry_count <= max_user_code_retries:
474499
# After all attempts to run the user code have failed, we don't need
475500
# to execute on Kubernetes anymore. We can execute possible fallback

metaflow/plugins/kubernetes/kubernetes_job.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,7 @@ def create_job_spec(self):
221221
client.V1LocalObjectReference(secret)
222222
for secret in self._kwargs.get("image_pull_secrets") or []
223223
],
224-
# TODO (savin): Support preemption policies
225-
# preemption_policy=?,
226-
#
224+
priority_class_name=self._kwargs.get("priority_class"),
227225
# A Container in a Pod may fail for a number of
228226
# reasons, such as because the process in it exited
229227
# with a non-zero exit code, or the Container was

metaflow/plugins/kubernetes/kubernetes_jobsets.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -724,9 +724,7 @@ def dump(self):
724724
for secret in self._kwargs.get("image_pull_secrets")
725725
or []
726726
],
727-
# TODO (savin): Support preemption policies
728-
# preemption_policy=?,
729-
#
727+
priority_class_name=self._kwargs.get("priority_class"),
730728
# A Container in a Pod may fail for a number of
731729
# reasons, such as because the process in it exited
732730
# with a non-zero exit code, or the Container was

test/unit/test_kubernetes.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
from unittest.mock import MagicMock
2+
13
import pytest
24

35
from metaflow.plugins.kubernetes.kube_utils import (
46
KubernetesException,
57
validate_kube_labels,
68
parse_kube_keyvalue_list,
79
)
10+
from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator
811

912

1013
@pytest.mark.parametrize(
@@ -95,3 +98,132 @@ def test_kubernetes_parse_keyvalue_list(items, requires_both, expected):
9598
def test_kubernetes_parse_keyvalue_list(items, requires_both):
9699
with pytest.raises(KubernetesException):
97100
parse_kube_keyvalue_list(items, requires_both)
101+
102+
103+
class TestKubernetesDecoratorLocalMode:
104+
"""Tests for issue #2588: @kubernetes should be ignored for local runs."""
105+
106+
def _make_decorator(self, statically_defined=True, **attrs):
107+
deco = KubernetesDecorator(
108+
attributes=attrs or None, statically_defined=statically_defined
109+
)
110+
return deco
111+
112+
def test_static_decorator_enters_local_mode_with_local_datastore(self):
113+
"""When @kubernetes is in source code and datastore is local, enter local mode."""
114+
deco = self._make_decorator(statically_defined=True)
115+
flow_datastore = MagicMock()
116+
flow_datastore.TYPE = "local"
117+
# step_init should not raise; it should silently enter local mode
118+
deco.step_init(
119+
flow=MagicMock(),
120+
graph=MagicMock(),
121+
step="my_step",
122+
decos=[],
123+
environment=MagicMock(),
124+
flow_datastore=flow_datastore,
125+
logger=MagicMock(),
126+
)
127+
assert deco._local_mode is True
128+
129+
def test_dynamic_decorator_raises_with_local_datastore(self):
130+
"""When @kubernetes is added via --with and datastore is local, raise error."""
131+
deco = self._make_decorator(statically_defined=False)
132+
flow_datastore = MagicMock()
133+
flow_datastore.TYPE = "local"
134+
with pytest.raises(KubernetesException, match="--datastore=s3"):
135+
deco.step_init(
136+
flow=MagicMock(),
137+
graph=MagicMock(),
138+
step="my_step",
139+
decos=[],
140+
environment=MagicMock(),
141+
flow_datastore=flow_datastore,
142+
logger=MagicMock(),
143+
)
144+
145+
def test_local_mode_skips_runtime_step_cli(self):
146+
"""In local mode, runtime_step_cli should not redirect to kubernetes."""
147+
deco = self._make_decorator(statically_defined=True)
148+
deco._local_mode = True
149+
cli_args = MagicMock()
150+
cli_args.commands = ["step"]
151+
deco.runtime_step_cli(
152+
cli_args, retry_count=0, max_user_code_retries=3, ubf_context=None
153+
)
154+
# commands should NOT have been changed to ["kubernetes", "step"]
155+
assert cli_args.commands == ["step"]
156+
157+
def test_local_mode_skips_package_init(self):
158+
"""In local mode, package_init should be a no-op (no kubernetes import needed)."""
159+
deco = self._make_decorator(statically_defined=True)
160+
deco._local_mode = True
161+
# Should not raise even if kubernetes package is not installed
162+
deco.package_init(
163+
flow=MagicMock(), step_name="my_step", environment=MagicMock()
164+
)
165+
166+
def test_local_mode_skips_runtime_init(self):
167+
"""In local mode, runtime_init should be a no-op."""
168+
deco = self._make_decorator(statically_defined=True)
169+
deco._local_mode = True
170+
deco.runtime_init(
171+
flow=MagicMock(), graph=MagicMock(), package=MagicMock(), run_id="123"
172+
)
173+
# Should not set self.flow etc.
174+
assert not hasattr(deco, "flow")
175+
176+
def test_local_mode_skips_runtime_task_created(self):
177+
"""In local mode, runtime_task_created should be a no-op."""
178+
deco = self._make_decorator(statically_defined=True)
179+
deco._local_mode = True
180+
# Should not raise even though flow_datastore and package are not set
181+
deco.runtime_task_created(
182+
task_datastore=MagicMock(),
183+
task_id="1",
184+
split_index=None,
185+
input_paths=[],
186+
is_cloned=False,
187+
ubf_context=None,
188+
)
189+
190+
def test_s3_datastore_does_not_enter_local_mode(self):
191+
"""With S3 datastore, even a static decorator should NOT enter local mode."""
192+
deco = self._make_decorator(statically_defined=True)
193+
flow_datastore = MagicMock()
194+
flow_datastore.TYPE = "s3"
195+
# This will proceed with normal K8s setup - it will fail at QoS
196+
# validation or other checks, but should NOT set _local_mode
197+
try:
198+
deco.step_init(
199+
flow=MagicMock(),
200+
graph=MagicMock(),
201+
step="my_step",
202+
decos=[],
203+
environment=MagicMock(),
204+
flow_datastore=flow_datastore,
205+
logger=MagicMock(),
206+
)
207+
except Exception:
208+
pass # Expected to fail on later validation
209+
assert deco._local_mode is False
210+
211+
212+
class TestKubernetesDecoratorPriorityClass:
213+
"""Tests for issue #1752: priority_class option for @kubernetes."""
214+
215+
def test_priority_class_default_is_from_config(self):
216+
"""priority_class should default to KUBERNETES_PRIORITY_CLASS config value."""
217+
from metaflow.metaflow_config import KUBERNETES_PRIORITY_CLASS
218+
219+
deco = KubernetesDecorator()
220+
assert deco.attributes["priority_class"] == KUBERNETES_PRIORITY_CLASS
221+
222+
def test_priority_class_can_be_set(self):
223+
"""priority_class should be settable via decorator attributes."""
224+
deco = KubernetesDecorator(attributes={"priority_class": "high-priority"})
225+
assert deco.attributes["priority_class"] == "high-priority"
226+
227+
def test_priority_class_in_defaults(self):
228+
"""priority_class should be in the decorator defaults."""
229+
assert "priority_class" in KubernetesDecorator.defaults

0 commit comments

Comments
 (0)