Skip to content

Commit 7eb82f0

Browse files
authored
Add support for shared_memory for Kubernetes workloads. (#1709)
* Add support for shared_memory for Kubernetes workloads. This includes support for Kubernetes jobs as well as Argo workflows.
1 parent aaf70e2 commit 7eb82f0

File tree

7 files changed

+72
-10
lines changed

7 files changed

+72
-10
lines changed

metaflow/metaflow_config.py

+2
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,8 @@
299299
)
300300
# Toggle for trying to fetch EC2 instance metadata
301301
KUBERNETES_FETCH_EC2_METADATA = from_conf("KUBERNETES_FETCH_EC2_METADATA", False)
302+
# Shared memory in MB to use for this step
303+
KUBERNETES_SHARED_MEMORY = from_conf("KUBERNETES_SHARED_MEMORY", None)
302304

303305
ARGO_WORKFLOWS_KUBERNETES_SECRETS = from_conf("ARGO_WORKFLOWS_KUBERNETES_SECRETS", "")
304306
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP = from_conf("ARGO_WORKFLOWS_ENV_VARS_TO_SKIP", "")

metaflow/plugins/argo/argo_workflows.py

+15
Original file line numberDiff line numberDiff line change
@@ -1368,6 +1368,9 @@ def _container_templates(self):
13681368
tmpfs_size = resources["tmpfs_size"]
13691369
tmpfs_path = resources["tmpfs_path"]
13701370
tmpfs_tempdir = resources["tmpfs_tempdir"]
1371+
# Set shared_memory to 0 if it isn't specified. This results
1372+
# in Kubernetes using it's default value when the pod is created.
1373+
shared_memory = resources.get("shared_memory", 0)
13711374

13721375
tmpfs_enabled = use_tmpfs or (tmpfs_size and not use_tmpfs)
13731376

@@ -1412,6 +1415,7 @@ def _container_templates(self):
14121415
medium="Memory",
14131416
size_limit=tmpfs_size if tmpfs_enabled else 0,
14141417
)
1418+
.empty_dir_volume("dhsm", medium="Memory", size_limit=shared_memory)
14151419
.pvc_volumes(resources.get("persistent_volume_claims"))
14161420
# Set node selectors
14171421
.node_selectors(resources.get("node_selector"))
@@ -1505,6 +1509,17 @@ def _container_templates(self):
15051509
if tmpfs_enabled
15061510
else []
15071511
)
1512+
# Support shared_memory
1513+
+ (
1514+
[
1515+
kubernetes_sdk.V1VolumeMount(
1516+
name="dhsm",
1517+
mount_path="/dev/shm",
1518+
)
1519+
]
1520+
if shared_memory
1521+
else []
1522+
)
15081523
# Support persistent volume claims.
15091524
+ (
15101525
[

metaflow/plugins/kubernetes/kubernetes.py

+2
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ def create_job(
174174
persistent_volume_claims=None,
175175
tolerations=None,
176176
labels=None,
177+
shared_memory=None,
177178
):
178179
if env is None:
179180
env = {}
@@ -213,6 +214,7 @@ def create_job(
213214
tmpfs_size=tmpfs_size,
214215
tmpfs_path=tmpfs_path,
215216
persistent_volume_claims=persistent_volume_claims,
217+
shared_memory=shared_memory,
216218
)
217219
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
218220
.environment_variable("METAFLOW_CODE_URL", code_package_url)

metaflow/plugins/kubernetes/kubernetes_cli.py

+3
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def kubernetes():
107107
type=JSONTypeClass(),
108108
multiple=False,
109109
)
110+
@click.option("--shared-memory", default=None, help="Size of shared memory in MiB")
110111
@click.pass_context
111112
def step(
112113
ctx,
@@ -132,6 +133,7 @@ def step(
132133
run_time_limit=None,
133134
persistent_volume_claims=None,
134135
tolerations=None,
136+
shared_memory=None,
135137
**kwargs
136138
):
137139
def echo(msg, stream="stderr", job_id=None, **kwargs):
@@ -245,6 +247,7 @@ def _sync_metadata():
245247
env=env,
246248
persistent_volume_claims=persistent_volume_claims,
247249
tolerations=tolerations,
250+
shared_memory=shared_memory,
248251
)
249252
except Exception as e:
250253
traceback.print_exc(chain=False)

metaflow/plugins/kubernetes/kubernetes_decorator.py

+17
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
KUBERNETES_PERSISTENT_VOLUME_CLAIMS,
2121
KUBERNETES_TOLERATIONS,
2222
KUBERNETES_SERVICE_ACCOUNT,
23+
KUBERNETES_SHARED_MEMORY,
2324
)
2425
from metaflow.plugins.resources_decorator import ResourcesDecorator
2526
from metaflow.plugins.timeout_decorator import get_run_time_limit_for_task
@@ -87,6 +88,8 @@ class KubernetesDecorator(StepDecorator):
8788
persistent_volume_claims : Dict[str, str], optional, default None
8889
A map (dictionary) of persistent volumes to be mounted to the pod for this step. The map is from persistent
8990
volumes to the path to which the volume is to be mounted, e.g., `{'pvc-name': '/path/to/mount/on'}`.
91+
shared_memory: int, optional
92+
Shared memory size (in MiB) required for this step
9093
"""
9194

9295
name = "kubernetes"
@@ -109,6 +112,7 @@ class KubernetesDecorator(StepDecorator):
109112
"tmpfs_size": None,
110113
"tmpfs_path": "/metaflow_temp",
111114
"persistent_volume_claims": None, # e.g., {"pvc-name": "/mnt/vol", "another-pvc": "/mnt/vol2"}
115+
"shared_memory": None,
112116
}
113117
package_url = None
114118
package_sha = None
@@ -194,6 +198,8 @@ def __init__(self, attributes=None, statically_defined=False):
194198
if not self.attributes["tmpfs_size"]:
195199
# default tmpfs behavior - https://man7.org/linux/man-pages/man5/tmpfs.5.html
196200
self.attributes["tmpfs_size"] = int(self.attributes["memory"]) // 2
201+
if not self.attributes["shared_memory"]:
202+
self.attributes["shared_memory"] = KUBERNETES_SHARED_MEMORY
197203

198204
# Refer https://github.com/Netflix/metaflow/blob/master/docs/lifecycle.png
199205
def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger):
@@ -289,6 +295,17 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge
289295
)
290296
)
291297

298+
if self.attributes["shared_memory"]:
299+
if not (
300+
isinstance(self.attributes["shared_memory"], int)
301+
and int(self.attributes["shared_memory"]) > 0
302+
):
303+
raise KubernetesException(
304+
"Invalid shared_memory value: *{size}* for step *{step}* (should be an integer greater than 0)".format(
305+
size=self.attributes["shared_memory"], step=step
306+
)
307+
)
308+
292309
def package_init(self, flow, step_name, environment):
293310
try:
294311
# Kubernetes is a soft dependency.

metaflow/plugins/kubernetes/kubernetes_job.py

+27
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ def create(self):
7777
use_tmpfs = self._kwargs["use_tmpfs"]
7878
tmpfs_size = self._kwargs["tmpfs_size"]
7979
tmpfs_enabled = use_tmpfs or (tmpfs_size and not use_tmpfs)
80+
shared_memory = (
81+
int(self._kwargs["shared_memory"])
82+
if self._kwargs["shared_memory"]
83+
else None
84+
)
8085

8186
self._job = client.V1Job(
8287
api_version="batch/v1",
@@ -184,6 +189,15 @@ def create(self):
184189
if tmpfs_enabled
185190
else []
186191
)
192+
+ (
193+
[
194+
client.V1VolumeMount(
195+
mount_path="/dev/shm", name="dhsm"
196+
)
197+
]
198+
if shared_memory
199+
else []
200+
)
187201
+ (
188202
[
189203
client.V1VolumeMount(
@@ -232,6 +246,19 @@ def create(self):
232246
if tmpfs_enabled
233247
else []
234248
)
249+
+ (
250+
[
251+
client.V1Volume(
252+
name="dhsm",
253+
empty_dir=client.V1EmptyDirVolumeSource(
254+
medium="Memory",
255+
size_limit="{}Mi".format(shared_memory),
256+
),
257+
)
258+
]
259+
if shared_memory
260+
else []
261+
)
235262
+ (
236263
[
237264
client.V1Volume(

stubs/test/test_stubs.yml

+6-10
Original file line numberDiff line numberDiff line change
@@ -501,18 +501,14 @@
501501
...
502502
out: |
503503
main:3: error: .*incompatible type.*\[arg-type\]
504+
main:4: error: .*Too many positional arguments for "kubernetes"\s+\[misc\]
504505
main:7: error: .*incompatible type.*\[arg-type\]
505-
main:12: error: No overload variant of "kubernetes" matches argument.*\[call-overload\]
506-
main:12: note: Possible overload variants
507-
main:12: note: .*def.*kubernetes.*
508-
main:12: note: .*def.*kubernetes.*
509-
main:12: note: .*def.*kubernetes.*
510-
main:17: error: No overload variant of "kubernetes" matches argument.*\[call-overload\]
511-
main:17: note: Possible overload variants
512-
main:17: note: .*def.*kubernetes.*
513-
main:17: note: .*def.*kubernetes.*
514-
main:17: note: .*def.*kubernetes.*
506+
main:12: error: .*Too many positional arguments for "kubernetes"\s+\[misc\]
507+
main:17: error: .*Unexpected keyword argument "foo" for "kubernetes"\s+\[call-arg\]
508+
main:22: error: .*Too many positional arguments for "kubernetes"\s+\[misc\]
515509
main:22: error: .*incompatible type.*\[arg-type\]
510+
main:26: error: .*Too many positional arguments for "kubernetes"\s+\[misc\]
511+
main:26: error: .*incompatible type.*\[arg-type\]
516512
517513
- case: environment_decorator_validity
518514
regex: yes

0 commit comments

Comments
 (0)