Skip to content

Commit 72e7266

Browse files
author
Nissan Pow
committed
feat: add container and pod security context support for kubernetes workloads
Add security_context (container-level) and pod_security_context (pod-level) parameters to the @kubernetes decorator, with environment variable fallbacks METAFLOW_KUBERNETES_SECURITY_CONTEXT and METAFLOW_KUBERNETES_POD_SECURITY_CONTEXT for org-wide defaults. Supports all standard Kubernetes security context fields including runAsUser, runAsGroup, fsGroup, capabilities, etc. Closes #1262
1 parent 32ae224 commit 72e7266

8 files changed

Lines changed: 460 additions & 1 deletion

File tree

metaflow/metaflow_config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,10 @@
434434
KUBERNETES_DISK = from_conf("KUBERNETES_DISK", None)
435435
# Default kubernetes QoS class
436436
KUBERNETES_QOS = from_conf("KUBERNETES_QOS", "burstable")
437+
# Default container security context (JSON) for kubernetes pods
438+
KUBERNETES_SECURITY_CONTEXT = from_conf("KUBERNETES_SECURITY_CONTEXT", "")
439+
# Default pod security context (JSON) for kubernetes pods
440+
KUBERNETES_POD_SECURITY_CONTEXT = from_conf("KUBERNETES_POD_SECURITY_CONTEXT", "")
437441

438442
# Architecture of kubernetes nodes - used for @conda/@pypi in metaflow-dev
439443
KUBERNETES_CONDA_ARCH = from_conf("KUBERNETES_CONDA_ARCH")

metaflow/plugins/argo/argo_workflows.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2497,6 +2497,15 @@ def _container_templates(self):
24972497
)
24982498
}
24992499

2500+
pod_security_context = resources.get("pod_security_context", None)
2501+
_pod_security_context = {}
2502+
if pod_security_context is not None and len(pod_security_context) > 0:
2503+
_pod_security_context = {
2504+
"security_context": kubernetes_sdk.V1PodSecurityContext(
2505+
**pod_security_context
2506+
)
2507+
}
2508+
25002509
# Create a ContainerTemplate for this node. Ideally, we would have
25012510
# liked to inline this ContainerTemplate and avoid scanning the workflow
25022511
# twice, but due to issues with variable substitution, we will have to
@@ -2555,6 +2564,7 @@ def _container_templates(self):
25552564
port=port,
25562565
qos=resources["qos"],
25572566
security_context=security_context,
2567+
pod_security_context=pod_security_context,
25582568
)
25592569

25602570
for k, v in env.items():
@@ -2720,6 +2730,16 @@ def _container_templates(self):
27202730
if resources["image_pull_secrets"]
27212731
else None
27222732
)
2733+
# Set pod security context via pod_spec_patch
2734+
.pod_spec_patch(
2735+
{
2736+
"securityContext": kubernetes_sdk.V1PodSecurityContext(
2737+
**pod_security_context
2738+
).to_dict()
2739+
}
2740+
if pod_security_context
2741+
else None
2742+
)
27232743
# Set container
27242744
.container(
27252745
# TODO: Unify the logic with kubernetes.py
@@ -4419,7 +4439,12 @@ def pod_spec_patch(self, pod_spec_patch=None):
44194439
if pod_spec_patch is None:
44204440
return self
44214441

4422-
self.payload["podSpecPatch"] = json.dumps(pod_spec_patch)
4442+
if "podSpecPatch" in self.payload:
4443+
existing = json.loads(self.payload["podSpecPatch"])
4444+
existing.update(pod_spec_patch)
4445+
self.payload["podSpecPatch"] = json.dumps(existing)
4446+
else:
4447+
self.payload["podSpecPatch"] = json.dumps(pod_spec_patch)
44234448

44244449
return self
44254450

metaflow/plugins/kubernetes/kubernetes.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ def create_jobset(
198198
num_parallel=None,
199199
qos=None,
200200
security_context=None,
201+
pod_security_context=None,
201202
):
202203
name = "js-%s" % str(uuid4())[:6]
203204
jobset = (
@@ -233,6 +234,7 @@ def create_jobset(
233234
num_parallel=num_parallel,
234235
qos=qos,
235236
security_context=security_context,
237+
pod_security_context=pod_security_context,
236238
)
237239
.environment_variable("METAFLOW_CODE_METADATA", code_package_metadata)
238240
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
@@ -499,6 +501,7 @@ def create_job_object(
499501
qos=None,
500502
annotations=None,
501503
security_context=None,
504+
pod_security_context=None,
502505
):
503506
if env is None:
504507
env = {}
@@ -544,6 +547,7 @@ def create_job_object(
544547
port=port,
545548
qos=qos,
546549
security_context=security_context,
550+
pod_security_context=pod_security_context,
547551
)
548552
.environment_variable("METAFLOW_CODE_METADATA", code_package_metadata)
549553
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)

metaflow/plugins/kubernetes/kubernetes_cli.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@ def kubernetes():
158158
type=JSONTypeClass(),
159159
multiple=False,
160160
)
161+
@click.option(
162+
"--pod-security-context",
163+
default=None,
164+
type=JSONTypeClass(),
165+
multiple=False,
166+
)
161167
@click.pass_context
162168
def step(
163169
ctx,
@@ -192,6 +198,7 @@ def step(
192198
labels=None,
193199
annotations=None,
194200
security_context=None,
201+
pod_security_context=None,
195202
**kwargs
196203
):
197204
def echo(msg, stream="stderr", job_id=None, **kwargs):
@@ -338,6 +345,7 @@ def _sync_metadata():
338345
labels=labels,
339346
annotations=annotations,
340347
security_context=security_context,
348+
pod_security_context=pod_security_context,
341349
)
342350
except Exception:
343351
traceback.print_exc(chain=False)

metaflow/plugins/kubernetes/kubernetes_decorator.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
KUBERNETES_NODE_SELECTOR,
2828
KUBERNETES_PERSISTENT_VOLUME_CLAIMS,
2929
KUBERNETES_PORT,
30+
KUBERNETES_SECURITY_CONTEXT,
31+
KUBERNETES_POD_SECURITY_CONTEXT,
3032
KUBERNETES_SERVICE_ACCOUNT,
3133
KUBERNETES_SHARED_MEMORY,
3234
KUBERNETES_TOLERATIONS,
@@ -136,6 +138,17 @@ class KubernetesDecorator(StepDecorator):
136138
- run_as_user: int, optional, default None
137139
- run_as_group: int, optional, default None
138140
- run_as_non_root: bool, optional, default None
141+
- read_only_root_filesystem: bool, optional, default None
142+
- capabilities: Dict[str, List[str]], optional, default None
143+
Can also be set via METAFLOW_KUBERNETES_SECURITY_CONTEXT (JSON).
144+
pod_security_context: Dict[str, Any], optional, default None
145+
Pod-level security context. Applies to all containers in the pod. Allows the following keys:
146+
- run_as_user: int, optional, default None
147+
- run_as_group: int, optional, default None
148+
- run_as_non_root: bool, optional, default None
149+
- fs_group: int, optional, default None
150+
- supplemental_groups: List[int], optional, default None
151+
Can also be set via METAFLOW_KUBERNETES_POD_SECURITY_CONTEXT (JSON).
139152
"""
140153

141154
name = "kubernetes"
@@ -168,6 +181,7 @@ class KubernetesDecorator(StepDecorator):
168181
"hostname_resolution_timeout": 10 * 60,
169182
"qos": KUBERNETES_QOS,
170183
"security_context": None,
184+
"pod_security_context": None,
171185
}
172186
package_metadata = None
173187
package_url = None
@@ -310,6 +324,19 @@ def init(self):
310324
if not self.attributes["port"]:
311325
self.attributes["port"] = KUBERNETES_PORT
312326

327+
# Security context: decorator takes precedence over env var
328+
if not self.attributes["security_context"] and KUBERNETES_SECURITY_CONTEXT:
329+
self.attributes["security_context"] = json.loads(
330+
KUBERNETES_SECURITY_CONTEXT
331+
)
332+
if (
333+
not self.attributes["pod_security_context"]
334+
and KUBERNETES_POD_SECURITY_CONTEXT
335+
):
336+
self.attributes["pod_security_context"] = json.loads(
337+
KUBERNETES_POD_SECURITY_CONTEXT
338+
)
339+
313340
# Refer https://github.com/Netflix/metaflow/blob/master/docs/lifecycle.png
314341
def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger):
315342
# Executing Kubernetes jobs requires a non-local datastore.
@@ -500,6 +527,7 @@ def runtime_step_cli(
500527
"labels",
501528
"annotations",
502529
"security_context",
530+
"pod_security_context",
503531
]:
504532
cli_args.command_options[k] = json.dumps(v)
505533
else:

metaflow/plugins/kubernetes/kubernetes_job.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ def create_job_spec(self):
9393
"security_context": client.V1SecurityContext(**security_context)
9494
}
9595

96+
pod_security_context = self._kwargs.get("pod_security_context", {})
97+
_pod_security_context = {}
98+
if pod_security_context is not None and len(pod_security_context) > 0:
99+
_pod_security_context = {
100+
"security_context": client.V1PodSecurityContext(**pod_security_context)
101+
}
102+
96103
return client.V1JobSpec(
97104
# Retries are handled by Metaflow when it is responsible for
98105
# executing the flow. The responsibility is moved to Kubernetes
@@ -277,6 +284,7 @@ def create_job_spec(self):
277284
if self._kwargs["persistent_volume_claims"] is not None
278285
else []
279286
),
287+
**_pod_security_context,
280288
),
281289
),
282290
)

metaflow/plugins/kubernetes/kubernetes_jobsets.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,13 @@ def dump(self):
569569
_security_context = {
570570
"security_context": client.V1SecurityContext(**security_context)
571571
}
572+
573+
pod_security_context = self._kwargs.get("pod_security_context", {})
574+
_pod_security_context = {}
575+
if pod_security_context is not None and len(pod_security_context) > 0:
576+
_pod_security_context = {
577+
"security_context": client.V1PodSecurityContext(**pod_security_context)
578+
}
572579
return dict(
573580
name=self.name,
574581
template=client.api_client.ApiClient().sanitize_for_serialization(
@@ -784,6 +791,7 @@ def dump(self):
784791
is not None
785792
else []
786793
),
794+
**_pod_security_context,
787795
),
788796
),
789797
),

0 commit comments

Comments
 (0)