Skip to content

Commit 17e7b9a

Browse files
Merge pull request #947 from linsword13/gke-wm
Add a workflow manager to support running hpc apps in k8s cluster
2 parents 5d866b0 + 8f77f27 commit 17e7b9a

File tree

10 files changed

+365
-0
lines changed

10 files changed

+365
-0
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Copyright 2022-2025 The Ramble Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5+
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6+
# option. This file may not be copied, modified, or distributed
7+
# except according to those terms.
8+
9+
import os
10+
11+
import pytest
12+
13+
import ramble.workspace
14+
from ramble.main import RambleCommand
15+
16+
workspace = RambleCommand("workspace")
17+
18+
pytestmark = pytest.mark.usefixtures(
19+
"mutable_config",
20+
"mutable_mock_workspace_path",
21+
)
22+
23+
24+
def test_gke_mpi_workflow(request):
25+
workspace_name = request.node.name
26+
test_config = """
27+
ramble:
28+
env_vars:
29+
set:
30+
OMP_NUM_THREADS: '{n_threads}'
31+
variants:
32+
workflow_manager: gke-mpi
33+
variables:
34+
mpi_command: mpirun -n {n_ranks}
35+
processes_per_node: 1
36+
n_nodes: 2
37+
container_image: docker.pkg.dev/myproject/myimage
38+
extra_metadata: |
39+
a: 1
40+
b: 2
41+
extra_container_config_files: |
42+
{experiment_run_dir}/app_config.txt
43+
applications:
44+
hostname:
45+
workloads:
46+
parallel:
47+
experiments:
48+
generated: {}
49+
"""
50+
with ramble.workspace.create(workspace_name) as ws:
51+
ws.write()
52+
config_path = os.path.join(ws.config_dir, ramble.workspace.config_file_name)
53+
with open(config_path, "w+") as f:
54+
f.write(test_config)
55+
ws._re_read()
56+
workspace("setup", "--dry-run", global_args=["-D", ws.root])
57+
58+
run_path = os.path.join(ws.experiment_dir, "hostname", "parallel", "generated")
59+
files = [f for f in os.listdir(run_path) if os.path.isfile(os.path.join(run_path, f))]
60+
assert "batch_submit" in files
61+
assert "batch_query" in files
62+
assert "batch_cancel" in files
63+
assert "gke_mpi.yaml" in files
64+
assert "kustomization.yaml" in files
65+
assert "launcher_execute_script" in files
66+
assert "worker_execute_script" in files
67+
assert "batch_print_deployment" in files
68+
with open(os.path.join(run_path, "batch_submit")) as f:
69+
content = f.read()
70+
assert f"kubectl apply --kustomize {run_path}" in content
71+
with open(os.path.join(run_path, "batch_query")) as f:
72+
content = f.read()
73+
assert "kubectl describe mpijobs hostname-parallel-generated" in content
74+
with open(os.path.join(run_path, "batch_cancel")) as f:
75+
content = f.read()
76+
assert "kubectl delete mpijobs hostname-parallel-generated" in content
77+
with open(os.path.join(run_path, "gke_mpi.yaml")) as f:
78+
content = f.read()
79+
assert "kind: MPIJob" in content
80+
assert "name: hostname-parallel-generated" in content
81+
assert "replicas: 2" in content
82+
assert "image: docker.pkg.dev/myproject/myimage" in content
83+
with open(os.path.join(run_path, "kustomization.yaml")) as f:
84+
content = f.read()
85+
assert "files:" in content
86+
assert os.path.join(run_path, "app_config.txt") in content
87+
with open(os.path.join(run_path, "launcher_execute_script")) as f:
88+
content = f.read()
89+
assert "hostname" in content
90+
with open(os.path.join(run_path, "worker_execute_script")) as f:
91+
content = f.read()
92+
assert "sshd" in content
93+
with open(os.path.join(run_path, "batch_print_deployment")) as f:
94+
content = f.read()
95+
assert "kubectl kustomize" in content
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
kubectl delete mpijobs {job_name}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
kubectl kustomize {experiment_run_dir}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#!/bin/bash
2+
echo "========================"
3+
echo "Print out mpi job status"
4+
echo "========================"
5+
echo ""
6+
kubectl describe mpijobs {job_name}
7+
8+
lname=$(kubectl get pods | grep '{job_name}-launcher' | awk '{print $1}')
9+
if [ ! -z "$lname" ]; then
10+
echo " "
11+
echo "=========================="
12+
echo "Print out the launcher log"
13+
echo "=========================="
14+
echo " "
15+
kubectl logs $lname | tee {experiment_run_dir}/launcher.log
16+
fi
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
kubectl apply --kustomize {experiment_run_dir}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
apiVersion: kubeflow.org/v2beta1
2+
kind: MPIJob
3+
metadata:
4+
name: {job_name}
5+
{extra_metadata_section}
6+
spec:
7+
slotsPerWorker: {cores_per_node}
8+
runPolicy:
9+
cleanPodPolicy: Running
10+
mpiReplicaSpecs:
11+
Launcher:
12+
replicas: 1
13+
template:
14+
spec:
15+
hostPID: true
16+
hostIPC: true
17+
dnsPolicy: ClusterFirstWithHostNet
18+
volumes:
19+
- name: config
20+
configMap:
21+
name: gke-mpi-config
22+
containers:
23+
- image: {container_image}
24+
name: mpi-launcher
25+
volumeMounts:
26+
- name: config
27+
mountPath: /config
28+
command: ["bash", "{launcher_script_path}"]
29+
securityContext:
30+
privileged: true
31+
Worker:
32+
replicas: {n_nodes}
33+
template:
34+
spec:
35+
containers:
36+
- image: {container_image}
37+
name: mpi-worker
38+
securityContext:
39+
privileged: true
40+
volumeMounts:
41+
- name: config
42+
mountPath: /config
43+
command: ["bash", "{worker_script_path}"]
44+
volumes:
45+
- name: config
46+
configMap:
47+
name: gke-mpi-config
48+
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
resources:
2+
- {gke_mpi_yaml}
3+
{config_map_gen_section}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
mkdir -p {container_work_dir} && cd {container_work_dir}
3+
# important to resolve symlink
4+
cp --remove-destination -r -L /config/* .
5+
6+
{unformatted_command_without_logs}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/bin/bash
2+
mkdir -p {container_work_dir} && cd {container_work_dir}
3+
# important to resolve symlink
4+
cp --remove-destination -r -L /config/* .
5+
/usr/sbin/sshd -De -f /etc/ssh/sshd_config
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
# Copyright 2022-2025 The Ramble Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5+
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6+
# option. This file may not be copied, modified, or distributed
7+
# except according to those terms.
8+
9+
import os
10+
import shutil
11+
import textwrap
12+
13+
from ramble.wmkit import *
14+
15+
from spack.util.executable import Executable, ProcessError
16+
17+
18+
class GkeMpi(WorkflowManagerBase):
19+
"""GKE workflow manager that uses the MPI operator"""
20+
21+
name = "gke-mpi"
22+
23+
maintainers("linsword13")
24+
25+
tags("workflow", "gke", "mpi")
26+
27+
workflow_manager_variable(
28+
name="job_name",
29+
default="{application_name}-{workload_name}-{experiment_name}",
30+
description="GKE job name",
31+
)
32+
33+
workflow_manager_variable(
34+
name="extra_metadata",
35+
default="",
36+
description="Extra line-separated key:val pairs for the metadata section",
37+
)
38+
39+
workflow_manager_variable(
40+
name="cores_per_node",
41+
default="{processes_per_node}",
42+
description="Cores per node",
43+
)
44+
45+
workflow_manager_variable(
46+
name="container_image",
47+
default="",
48+
description="url to the container image",
49+
)
50+
51+
workflow_manager_variable(
52+
name="extra_container_config_files",
53+
default="",
54+
description="extra line-separated list of config files to be mapped to containers",
55+
)
56+
57+
workflow_manager_variable(
58+
name="container_work_dir",
59+
default="/config",
60+
description="working directory inside the container",
61+
)
62+
63+
workflow_manager_variable(
64+
name="launcher_execute_script_template",
65+
default="launcher_execute_script.tpl",
66+
description="execute script template for the launcher",
67+
)
68+
69+
register_template(
70+
name="launcher_execute_script",
71+
src_path="{launcher_execute_script_template}",
72+
)
73+
74+
workflow_manager_variable(
75+
name="worker_execute_script_template",
76+
default="worker_execute_script.tpl",
77+
description="execute script template for the workers",
78+
)
79+
80+
register_template(
81+
name="worker_execute_script",
82+
src_path="{worker_execute_script_template}",
83+
)
84+
85+
register_template(
86+
name="gke_mpi_yaml",
87+
src_path="gke_mpi.yaml.tpl",
88+
dest_path="gke_mpi.yaml",
89+
extra_vars_func="gke_mpi_yaml_vars",
90+
)
91+
92+
def _gke_mpi_yaml_vars(self):
93+
expander = self.app_inst.expander
94+
extra_metadata_str = expander.expand_var_name("extra_metadata")
95+
launcher_script = expander.expand_var_name("launcher_execute_script")
96+
worker_script = expander.expand_var_name("worker_execute_script")
97+
if extra_metadata_str:
98+
extra_metadata_section = textwrap.indent(
99+
extra_metadata_str, " " * 2
100+
)
101+
else:
102+
extra_metadata_section = ""
103+
104+
return {
105+
"extra_metadata_section": extra_metadata_section,
106+
"launcher_script_path": os.path.join(
107+
"/config", os.path.basename(launcher_script)
108+
),
109+
"worker_script_path": os.path.join(
110+
"/config", os.path.basename(worker_script)
111+
),
112+
}
113+
114+
register_template(
115+
name="kustomization.yaml",
116+
src_path="kustomization.yaml.tpl",
117+
dest_path="kustomization.yaml",
118+
extra_vars_func="kustomization_yaml_vars",
119+
)
120+
121+
def _kustomization_yaml_vars(self):
122+
files = ["{launcher_execute_script}", "{worker_execute_script}"]
123+
expander = self.app_inst.expander
124+
extra_files_str = expander.expand_var_name(
125+
"extra_container_config_files"
126+
)
127+
if extra_files_str:
128+
files.extend(extra_files_str.split("\n"))
129+
file_lines = "\n".join(
130+
[expander.expand_var(f.lstrip("- ")) for f in files]
131+
)
132+
lines = [
133+
"configMapGenerator:",
134+
"- name: gke-mpi-config",
135+
" files:",
136+
textwrap.indent(file_lines, " - "),
137+
"generatorOptions:",
138+
# For some reason kustomization does not apply the generated name properly.
139+
# So disable the suffix as a workaround.
140+
" disableNameSuffixHash: true",
141+
]
142+
config_map_gen_section = "\n".join(lines)
143+
return {
144+
"config_map_gen_section": config_map_gen_section,
145+
}
146+
147+
register_template(
148+
name="batch_submit",
149+
src_path="batch_submit.tpl",
150+
dest_path="batch_submit",
151+
)
152+
153+
register_template(
154+
name="batch_query",
155+
src_path="batch_query.tpl",
156+
dest_path="batch_query",
157+
)
158+
159+
register_template(
160+
name="batch_cancel",
161+
src_path="batch_cancel.tpl",
162+
dest_path="batch_cancel",
163+
)
164+
165+
# A convenience for printing the deployment config
166+
register_template(
167+
name="batch_print_deployment",
168+
src_path="batch_print_deployment.tpl",
169+
dest_path="batch_print_deployment",
170+
)
171+
172+
def _prepare_analysis(self, workspace):
173+
if workspace.dry_run:
174+
return
175+
expander = self.app_inst.expander
176+
query_script = expander.expand_var_name("batch_query")
177+
query_cmd = Executable(query_script)
178+
try:
179+
query_cmd(output=os.devnull)
180+
except ProcessError as e:
181+
logger.warn(f"batch_query returns error {e}")
182+
run_dir = expander.expand_var_name("experiment_run_dir")
183+
launcher_log = os.path.join(run_dir, "launcher.log")
184+
if os.path.exists(launcher_log):
185+
log_file = expander.expand_var_name("log_file")
186+
shutil.copy2(launcher_log, log_file)

0 commit comments

Comments
 (0)