Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@
KUBERNETES_DISK = from_conf("KUBERNETES_DISK", None)
# Default kubernetes QoS class
KUBERNETES_QOS = from_conf("KUBERNETES_QOS", "burstable")
# Default container security context (JSON) for kubernetes pods
KUBERNETES_SECURITY_CONTEXT = from_conf("KUBERNETES_SECURITY_CONTEXT", "")
Comment on lines 439 to +441
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Inconsistent default: "" instead of None

All adjacent config vars (KUBERNETES_MEMORY, KUBERNETES_DISK, etc.) default to None. Using "" works because the falsiness check in the decorator handles it, but it's inconsistent and slightly harder to reason about. Consider defaulting to None to match the established pattern.

Suggested change
KUBERNETES_QOS = from_conf("KUBERNETES_QOS", "burstable")
# Default container security context (JSON) for kubernetes pods
KUBERNETES_SECURITY_CONTEXT = from_conf("KUBERNETES_SECURITY_CONTEXT", "")
KUBERNETES_SECURITY_CONTEXT = from_conf("KUBERNETES_SECURITY_CONTEXT", None)
# Default pod security context (JSON) for kubernetes pods
KUBERNETES_POD_SECURITY_CONTEXT = from_conf("KUBERNETES_POD_SECURITY_CONTEXT", None)

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

# Default pod security context (JSON) for kubernetes pods
KUBERNETES_POD_SECURITY_CONTEXT = from_conf("KUBERNETES_POD_SECURITY_CONTEXT", "")

# Architecture of kubernetes nodes - used for @conda/@pypi in metaflow-dev
KUBERNETES_CONDA_ARCH = from_conf("KUBERNETES_CONDA_ARCH")
Expand Down
27 changes: 26 additions & 1 deletion metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2581,6 +2581,15 @@ def _container_templates(self):
)
}

pod_security_context = resources.get("pod_security_context", None)
_pod_security_context = {}
if pod_security_context is not None and len(pod_security_context) > 0:
_pod_security_context = {
"security_context": kubernetes_sdk.V1PodSecurityContext(
**pod_security_context
)
}
Comment on lines +2584 to +2591
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unused _pod_security_context variable — dead code

_pod_security_context is computed here but never referenced. For the parallel path, the raw pod_security_context dict is forwarded to KubernetesArgoJobSet (which handles it via JobSetSpec). For the non-parallel path, the pod security context is applied via pod_spec_patch() further down. This block can be removed.


# Create a ContainerTemplate for this node. Ideally, we would have
# liked to inline this ContainerTemplate and avoid scanning the workflow
# twice, but due to issues with variable substitution, we will have to
Expand Down Expand Up @@ -2639,6 +2648,7 @@ def _container_templates(self):
port=port,
qos=resources["qos"],
security_context=security_context,
pod_security_context=pod_security_context,
)

for k, v in env.items():
Expand Down Expand Up @@ -2804,6 +2814,16 @@ def _container_templates(self):
if resources["image_pull_secrets"]
else None
)
# Set pod security context via pod_spec_patch
.pod_spec_patch(
{
"securityContext": kubernetes_sdk.V1PodSecurityContext(
**pod_security_context
).to_dict()
}
if pod_security_context
else None
)
Comment on lines +2818 to +2826
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 to_dict() produces snake_case keys — pod security context silently ignored in Argo

V1PodSecurityContext.to_dict() returns Python attribute names in snake_case (e.g. fs_group, run_as_non_root), but podSpecPatch is applied as a raw JSON strategic-merge patch against the Kubernetes API, which requires camelCase (e.g. fsGroup, runAsNonRoot). Kubernetes ignores unrecognised keys, so the pod security context will be accepted without error but have no effect for all non-parallel Argo Workflow steps.

The existing code already imports to_camelcase from metaflow.util and uses it for the container spec. Apply the same treatment here:

Suggested change
.pod_spec_patch(
{
"securityContext": kubernetes_sdk.V1PodSecurityContext(
**pod_security_context
).to_dict()
}
if pod_security_context
else None
)
.pod_spec_patch(
{
"securityContext": to_camelcase(
kubernetes_sdk.V1PodSecurityContext(
**pod_security_context
).to_dict()
)
}
if pod_security_context
else None
)

# Set container
.container(
# TODO: Unify the logic with kubernetes.py
Expand Down Expand Up @@ -4503,7 +4523,12 @@ def pod_spec_patch(self, pod_spec_patch=None):
if pod_spec_patch is None:
return self

self.payload["podSpecPatch"] = json.dumps(pod_spec_patch)
if "podSpecPatch" in self.payload:
existing = json.loads(self.payload["podSpecPatch"])
existing.update(pod_spec_patch)
self.payload["podSpecPatch"] = json.dumps(existing)
else:
self.payload["podSpecPatch"] = json.dumps(pod_spec_patch)

return self

Expand Down
4 changes: 4 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def create_jobset(
num_parallel=None,
qos=None,
security_context=None,
pod_security_context=None,
):
name = "js-%s" % str(uuid4())[:6]
jobset = (
Expand Down Expand Up @@ -233,6 +234,7 @@ def create_jobset(
num_parallel=num_parallel,
qos=qos,
security_context=security_context,
pod_security_context=pod_security_context,
)
.environment_variable("METAFLOW_CODE_METADATA", code_package_metadata)
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
Expand Down Expand Up @@ -499,6 +501,7 @@ def create_job_object(
qos=None,
annotations=None,
security_context=None,
pod_security_context=None,
):
if env is None:
env = {}
Expand Down Expand Up @@ -544,6 +547,7 @@ def create_job_object(
port=port,
qos=qos,
security_context=security_context,
pod_security_context=pod_security_context,
)
.environment_variable("METAFLOW_CODE_METADATA", code_package_metadata)
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
Expand Down
8 changes: 8 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ def kubernetes():
type=JSONTypeClass(),
multiple=False,
)
@click.option(
"--pod-security-context",
default=None,
type=JSONTypeClass(),
multiple=False,
)
@click.pass_context
def step(
ctx,
Expand Down Expand Up @@ -192,6 +198,7 @@ def step(
labels=None,
annotations=None,
security_context=None,
pod_security_context=None,
**kwargs
):
def echo(msg, stream="stderr", job_id=None, **kwargs):
Expand Down Expand Up @@ -338,6 +345,7 @@ def _sync_metadata():
labels=labels,
annotations=annotations,
security_context=security_context,
pod_security_context=pod_security_context,
)
except Exception:
traceback.print_exc(chain=False)
Expand Down
28 changes: 28 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
KUBERNETES_NODE_SELECTOR,
KUBERNETES_PERSISTENT_VOLUME_CLAIMS,
KUBERNETES_PORT,
KUBERNETES_SECURITY_CONTEXT,
KUBERNETES_POD_SECURITY_CONTEXT,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_SHARED_MEMORY,
KUBERNETES_TOLERATIONS,
Expand Down Expand Up @@ -136,6 +138,17 @@ class KubernetesDecorator(StepDecorator):
- run_as_user: int, optional, default None
- run_as_group: int, optional, default None
- run_as_non_root: bool, optional, default None
- read_only_root_filesystem: bool, optional, default None
- capabilities: Dict[str, List[str]], optional, default None
Can also be set via METAFLOW_KUBERNETES_SECURITY_CONTEXT (JSON).
pod_security_context: Dict[str, Any], optional, default None
Pod-level security context. Applies to all containers in the pod. Allows the following keys:
- run_as_user: int, optional, default None
- run_as_group: int, optional, default None
- run_as_non_root: bool, optional, default None
- fs_group: int, optional, default None
- supplemental_groups: List[int], optional, default None
Can also be set via METAFLOW_KUBERNETES_POD_SECURITY_CONTEXT (JSON).
"""

name = "kubernetes"
Expand Down Expand Up @@ -168,6 +181,7 @@ class KubernetesDecorator(StepDecorator):
"hostname_resolution_timeout": 10 * 60,
"qos": KUBERNETES_QOS,
"security_context": None,
"pod_security_context": None,
}
package_metadata = None
package_url = None
Expand Down Expand Up @@ -310,6 +324,19 @@ def init(self):
if not self.attributes["port"]:
self.attributes["port"] = KUBERNETES_PORT

# Security context: decorator takes precedence over env var
if not self.attributes["security_context"] and KUBERNETES_SECURITY_CONTEXT:
self.attributes["security_context"] = json.loads(
KUBERNETES_SECURITY_CONTEXT
)
if (
not self.attributes["pod_security_context"]
and KUBERNETES_POD_SECURITY_CONTEXT
):
self.attributes["pod_security_context"] = json.loads(
KUBERNETES_POD_SECURITY_CONTEXT
)
Comment on lines +327 to +338
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Explicit security_context={} would be overridden by the env var

not {} evaluates to True, so a user who explicitly sets @kubernetes(security_context={}) would have the env var silently take precedence, contradicting the "decorator takes precedence" comment. The same caveat applies to pod_security_context. Consider comparing to None instead of using truthiness, which is also the pattern used for non-JSON options like port and shared_memory.


# Refer https://github.com/Netflix/metaflow/blob/master/docs/lifecycle.png
def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger):
# Executing Kubernetes jobs requires a non-local datastore.
Expand Down Expand Up @@ -500,6 +527,7 @@ def runtime_step_cli(
"labels",
"annotations",
"security_context",
"pod_security_context",
]:
cli_args.command_options[k] = json.dumps(v)
else:
Expand Down
8 changes: 8 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ def create_job_spec(self):
"security_context": client.V1SecurityContext(**security_context)
}

pod_security_context = self._kwargs.get("pod_security_context", {})
_pod_security_context = {}
if pod_security_context is not None and len(pod_security_context) > 0:
_pod_security_context = {
"security_context": client.V1PodSecurityContext(**pod_security_context)
}

return client.V1JobSpec(
# Retries are handled by Metaflow when it is responsible for
# executing the flow. The responsibility is moved to Kubernetes
Expand Down Expand Up @@ -277,6 +284,7 @@ def create_job_spec(self):
if self._kwargs["persistent_volume_claims"] is not None
else []
),
**_pod_security_context,
),
),
)
Expand Down
8 changes: 8 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_jobsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,13 @@ def dump(self):
_security_context = {
"security_context": client.V1SecurityContext(**security_context)
}

pod_security_context = self._kwargs.get("pod_security_context", {})
_pod_security_context = {}
if pod_security_context is not None and len(pod_security_context) > 0:
_pod_security_context = {
"security_context": client.V1PodSecurityContext(**pod_security_context)
}
return dict(
name=self.name,
template=client.api_client.ApiClient().sanitize_for_serialization(
Expand Down Expand Up @@ -784,6 +791,7 @@ def dump(self):
is not None
else []
),
**_pod_security_context,
),
),
),
Expand Down
Loading
Loading