Skip to content

Commit b3b9b23

Browse files
authored
Merge pull request #759 from NVIDIA/am/nccl-k8s
Automatically install sshd for NCCL k8s workers if no available
2 parents 22d0c84 + e6f20f0 commit b3b9b23

File tree

5 files changed

+97
-53
lines changed

5 files changed

+97
-53
lines changed

src/cloudai/_core/json_gen_strategy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def sanitize_k8s_job_name(self, job_name: str) -> str:
5353
str: The sanitized job name that complies with Kubernetes naming rules.
5454
"""
5555
sanitized_name = job_name.lower()
56-
sanitized_name = re.sub(r"[^a-z0-9.-]", "-", sanitized_name)
56+
sanitized_name = re.sub(r"[^a-z0-9-]", "-", sanitized_name)
5757
sanitized_name = re.sub(r"^[^a-z0-9]+", "", sanitized_name)
5858
sanitized_name = re.sub(r"[^a-z0-9]+$", "", sanitized_name)
5959
return sanitized_name[:253]

src/cloudai/systems/kubernetes/kubernetes_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def _submit_test(self, tr: TestRun) -> KubernetesJob:
3232
job_name = tr.name.replace(".", "-").lower()
3333
job_spec = self.get_json_gen_strategy(self.system, tr).gen_json()
3434
job_kind = job_spec.get("kind", "").lower()
35-
logging.info(f"Generated JSON string for test {tr.name}: {job_spec}")
35+
logging.debug(f"Generated JSON string for test {tr.name}: {job_spec}")
3636

3737
if self.mode == "run":
3838
k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system)

src/cloudai/systems/kubernetes/kubernetes_system.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ def _is_mpijob_running(self, job_name: str) -> bool:
166166
assert isinstance(mpijob, dict)
167167
status: dict = cast(dict, mpijob.get("status", {}))
168168
conditions = status.get("conditions", [])
169+
logging.debug(f"MPIJob '{job_name}': {conditions=} {status=}")
169170

170171
# Consider an empty conditions list as running
171172
if not conditions:
@@ -681,7 +682,7 @@ def store_logs_for_job(self, job_name: str, output_dir: Path) -> None:
681682
stdout_file.write(logs + "\n")
682683

683684
except lazy.k8s.client.ApiException as e:
684-
logging.error(f"Error retrieving logs for pod '{pod_name}': {e}")
685+
logging.debug(f"Error retrieving logs for pod '{pod_name}': {e}")
685686

686687
logging.debug(f"All logs concatenated and saved to '{stdout_file_path}'")
687688

src/cloudai/workloads/nccl_test/kubernetes_json_gen_strategy.py

Lines changed: 69 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,34 @@
1616

1717
from typing import Any, Dict, List, Union, cast
1818

19+
import yaml
20+
1921
from cloudai.core import JsonGenStrategy
22+
from cloudai.systems.kubernetes import KubernetesSystem
2023

2124
from .nccl import NCCLTestDefinition
2225

2326

2427
class NcclTestKubernetesJsonGenStrategy(JsonGenStrategy):
25-
"""JSON generation strategy for NCCL tests on Kubernetes systems."""
28+
"""
29+
JSON generation strategy for NCCL tests on Kubernetes systems.
2630
27-
SSH_PORT: int = 2222
31+
This strategy generates an MPIJob configuration for running NCCL tests.
32+
"""
2833

2934
def gen_json(self) -> dict[Any, Any]:
30-
return {
35+
k8s_system = cast(KubernetesSystem, self.system)
36+
job_name = self.sanitize_k8s_job_name(self.test_run.name)
37+
38+
deployment = {
3139
"apiVersion": "kubeflow.org/v2beta1",
3240
"kind": "MPIJob",
3341
"metadata": {
34-
"name": self.sanitize_k8s_job_name("nccl-test"),
42+
"name": job_name,
43+
"namespace": k8s_system.default_namespace,
3544
},
3645
"spec": {
37-
"slotsPerWorker": 1,
46+
"slotsPerWorker": k8s_system.gpus_per_node,
3847
"runPolicy": {"cleanPodPolicy": "Running"},
3948
"mpiReplicaSpecs": {
4049
"Launcher": self._create_launcher_spec(),
@@ -43,47 +52,53 @@ def gen_json(self) -> dict[Any, Any]:
4352
},
4453
}
4554

46-
def _create_launcher_spec(self) -> dict[str, Any]:
55+
with open(self.test_run.output_path / "deployment.yaml", "w") as f:
56+
yaml.dump(deployment, f)
57+
58+
return deployment
59+
60+
@property
61+
def container_url(self) -> str:
4762
tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test)
63+
return tdef.cmd_args.docker_image_url.replace("#", "/")
64+
65+
def _create_launcher_spec(self) -> dict[str, Any]:
4866
env_vars = self._get_merged_env_vars()
4967
return {
5068
"replicas": 1,
5169
"template": {
5270
"spec": {
53-
"hostNetwork": True,
5471
"containers": [
5572
{
56-
"image": tdef.cmd_args.docker_image_url,
73+
"image": self.container_url,
5774
"name": "nccl-test-launcher",
5875
"imagePullPolicy": "IfNotPresent",
5976
"securityContext": {"privileged": True},
6077
"env": self._generate_env_list(env_vars),
6178
"command": ["/bin/bash", "-c"],
62-
"args": [self._generate_launcher_command(env_vars)],
79+
"args": [self._generate_launcher_command()],
80+
"resources": self._prepare_launcher_resources(),
6381
}
6482
],
6583
},
6684
},
6785
}
6886

6987
def _create_worker_spec(self) -> dict[str, Any]:
70-
tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test)
7188
env_vars = self._get_merged_env_vars()
7289
return {
73-
"replicas": self.test_run.num_nodes,
90+
"replicas": self.test_run.nnodes,
7491
"template": {
7592
"spec": {
76-
"hostNetwork": True,
7793
"containers": [
7894
{
79-
"image": tdef.cmd_args.docker_image_url,
95+
"image": self.container_url,
8096
"name": "nccl-test-worker",
8197
"imagePullPolicy": "IfNotPresent",
8298
"securityContext": {"privileged": True},
83-
"ports": [{"containerPort": self.SSH_PORT, "name": "ssh"}],
8499
"env": self._generate_env_list(env_vars),
85-
"command": ["/bin/bash"],
86-
"args": ["-c", f"/usr/sbin/sshd -p {self.SSH_PORT}; sleep infinity"],
100+
"command": ["/bin/bash", "-c"],
101+
"args": [self._generate_worker_command()],
87102
"resources": self._prepare_worker_resources(),
88103
"volumeMounts": [
89104
{"mountPath": "/dev/shm", "name": "dev-shm"},
@@ -95,31 +110,54 @@ def _create_worker_spec(self) -> dict[str, Any]:
95110
},
96111
}
97112

113+
def _generate_worker_command(self) -> str:
114+
"""
115+
Generate command for worker pods that starts the SSH daemon.
116+
117+
If the SSH daemon is not installed, it will be installed and the SSH keys will be generated.
118+
"""
119+
return """
120+
set -e
121+
if ! command -v sshd &> /dev/null; then
122+
apt-get update && apt-get install -y --no-install-recommends openssh-server
123+
fi
124+
mkdir -p /var/run/sshd
125+
cat >> /etc/ssh/sshd_config << EOF
126+
PermitRootLogin yes
127+
PubkeyAuthentication yes
128+
StrictModes no
129+
EOF
130+
ssh-keygen -A
131+
exec /usr/sbin/sshd -D
132+
""".strip()
133+
98134
def _get_merged_env_vars(self) -> dict[str, str | list[str]]:
99135
final_env_vars = self.system.global_env_vars.copy()
100136
final_env_vars.update(self.test_run.test.extra_env_vars)
101137
return final_env_vars
102138

103139
def _generate_env_list(self, env_vars: Dict[str, Union[str, List[str]]]) -> List[Dict[str, str]]:
104-
env_list = [{"name": "OMPI_ALLOW_RUN_AS_ROOT", "value": "1"}]
140+
env_list = [
141+
{"name": "OMPI_ALLOW_RUN_AS_ROOT", "value": "1"},
142+
{"name": "OMPI_ALLOW_RUN_AS_ROOT_CONFIRM", "value": "1"},
143+
]
105144
for key, value in env_vars.items():
106145
if isinstance(value, list):
107146
value = ",".join(value)
108147
env_list.append({"name": key, "value": value})
109148
return env_list
110149

111-
def _generate_mpi_args(self, env_vars: Dict[str, Union[str, List[str]]]) -> List[str]:
150+
def _generate_mpi_args(self) -> List[str]:
151+
k8s_system = cast(KubernetesSystem, self.system)
152+
total_processes = self.test_run.nnodes * k8s_system.gpus_per_node
153+
112154
mpi_args = [
113-
"--allow-run-as-root",
114-
f"--mca plm_rsh_args '-p {self.SSH_PORT}'",
115-
"-c 2",
116-
"-bind-to none -map-by slot",
117-
"-mca btl tcp,self",
155+
f"-np {total_processes}",
156+
"-bind-to none",
157+
# Disable strict host key checking for SSH
158+
"-mca plm_rsh_args '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'",
118159
]
119160

120-
if "NCCL_SOCKET_IFNAME" in env_vars:
121-
mpi_args.append(f"-mca btl_tcp_if_include {env_vars['NCCL_SOCKET_IFNAME']}")
122-
123161
return mpi_args
124162

125163
def _generate_nccl_args(self, cmd_args_dict: Dict[str, Any]) -> List[str]:
@@ -136,7 +174,7 @@ def _generate_extra_args(self, extra_cmd_args: Dict[str, str]) -> List[str]:
136174
extra_args.append(f"{key} {value}" if value else key)
137175
return extra_args
138176

139-
def _generate_launcher_command(self, env_vars: dict[str, str | list[str]]) -> str:
177+
def _generate_launcher_command(self) -> str:
140178
tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test)
141179
tdef_cmd_args = tdef.cmd_args
142180

@@ -146,7 +184,7 @@ def _generate_launcher_command(self, env_vars: dict[str, str | list[str]]) -> st
146184

147185
command_parts = [
148186
"mpirun",
149-
" ".join(self._generate_mpi_args(env_vars)),
187+
" ".join(self._generate_mpi_args()),
150188
tdef_cmd_args.subtest_name,
151189
" ".join(self._generate_nccl_args(cmd_args_dict)),
152190
]
@@ -163,4 +201,6 @@ def _prepare_launcher_resources(self) -> Dict[str, Dict[str, str]]:
163201
}
164202

165203
def _prepare_worker_resources(self) -> Dict[str, Dict[str, str]]:
166-
return {"requests": {"nvidia.com/gpu": "8"}, "limits": {"nvidia.com/gpu": "8"}}
204+
k8s_system = cast(KubernetesSystem, self.system)
205+
gpu_count = str(k8s_system.gpus_per_node)
206+
return {"requests": {"nvidia.com/gpu": gpu_count}, "limits": {"nvidia.com/gpu": gpu_count}}

tests/json_gen_strategy/test_nccl_kubernetes_json_gen_strategy.py

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# limitations under the License.
1616

1717

18+
from typing import cast
19+
1820
import pytest
1921

2022
from cloudai.core import TestRun
@@ -68,12 +70,13 @@ def json_gen_strategy(
6870
return NcclTestKubernetesJsonGenStrategy(kubernetes_system, test_run)
6971

7072
def test_gen_json_basic_structure(self, basic_test_run: TestRun, k8s_system: KubernetesSystem) -> None:
71-
json_payload = self.json_gen_strategy(k8s_system, basic_test_run).gen_json()
73+
json_gen_strategy = self.json_gen_strategy(k8s_system, basic_test_run)
74+
json_payload = json_gen_strategy.gen_json()
7275

7376
assert json_payload["apiVersion"] == "kubeflow.org/v2beta1"
7477
assert json_payload["kind"] == "MPIJob"
75-
assert json_payload["metadata"]["name"] == "nccl-test"
76-
assert json_payload["spec"]["slotsPerWorker"] == 1
78+
assert json_payload["metadata"]["name"] == json_gen_strategy.sanitize_k8s_job_name(basic_test_run.name)
79+
assert json_payload["spec"]["slotsPerWorker"] == k8s_system.gpus_per_node
7780
assert json_payload["spec"]["runPolicy"]["cleanPodPolicy"] == "Running"
7881
assert "Launcher" in json_payload["spec"]["mpiReplicaSpecs"]
7982
assert "Worker" in json_payload["spec"]["mpiReplicaSpecs"]
@@ -83,7 +86,6 @@ def test_launcher_spec(self, basic_test_run: TestRun, k8s_system: KubernetesSyst
8386
launcher_spec = json_payload["spec"]["mpiReplicaSpecs"]["Launcher"]
8487

8588
assert launcher_spec["replicas"] == 1
86-
assert launcher_spec["template"]["spec"]["hostNetwork"] is True
8789

8890
container = launcher_spec["template"]["spec"]["containers"][0]
8991
assert container["image"] == "fake_image_url"
@@ -93,22 +95,24 @@ def test_launcher_spec(self, basic_test_run: TestRun, k8s_system: KubernetesSyst
9395
assert container["command"] == ["/bin/bash", "-c"]
9496

9597
def test_worker_spec(self, basic_test_run: TestRun, k8s_system: KubernetesSystem) -> None:
96-
json_payload = self.json_gen_strategy(k8s_system, basic_test_run).gen_json()
98+
json_gen_strategy = self.json_gen_strategy(k8s_system, basic_test_run)
99+
json_payload = json_gen_strategy.gen_json()
97100
worker_spec = json_payload["spec"]["mpiReplicaSpecs"]["Worker"]
98101

99-
assert worker_spec["replicas"] == 2
100-
assert worker_spec["template"]["spec"]["hostNetwork"] is True
102+
assert worker_spec["replicas"] == basic_test_run.nnodes
101103

102104
container = worker_spec["template"]["spec"]["containers"][0]
103105
assert container["image"] == "fake_image_url"
104106
assert container["name"] == "nccl-test-worker"
105107
assert container["imagePullPolicy"] == "IfNotPresent"
106108
assert container["securityContext"]["privileged"] is True
107-
assert container["ports"][0] == {"containerPort": 2222, "name": "ssh"}
108-
assert container["command"] == ["/bin/bash"]
109-
assert container["args"] == ["-c", "/usr/sbin/sshd -p 2222; sleep infinity"]
109+
assert container["command"] == ["/bin/bash", "-c"]
110+
assert container["args"] == [json_gen_strategy._generate_worker_command()]
110111

111-
assert container["resources"] == {"requests": {"nvidia.com/gpu": "8"}, "limits": {"nvidia.com/gpu": "8"}}
112+
assert container["resources"] == {
113+
"requests": {"nvidia.com/gpu": str(k8s_system.gpus_per_node)},
114+
"limits": {"nvidia.com/gpu": str(k8s_system.gpus_per_node)},
115+
}
112116

113117
assert container["volumeMounts"] == [{"mountPath": "/dev/shm", "name": "dev-shm"}]
114118
assert worker_spec["template"]["spec"]["volumes"] == [
@@ -127,18 +131,17 @@ def test_env_variables(self, test_run_with_env_vars: TestRun, k8s_system: Kubern
127131
assert env_dict["LIST_VAR"] == "item1,item2"
128132

129133
def test_launcher_command_generation(self, test_run_with_extra_args: TestRun, k8s_system: KubernetesSystem) -> None:
130-
json_payload = self.json_gen_strategy(k8s_system, test_run_with_extra_args).gen_json()
134+
json_gen_strategy = self.json_gen_strategy(k8s_system, test_run_with_extra_args)
135+
json_payload = json_gen_strategy.gen_json()
131136
launcher_args = json_payload["spec"]["mpiReplicaSpecs"]["Launcher"]["template"]["spec"]["containers"][0][
132137
"args"
133138
][0]
139+
nccl = cast(NCCLTestDefinition, json_gen_strategy.test_run.test)
134140

135141
assert "mpirun" in launcher_args
136-
assert "--allow-run-as-root" in launcher_args
137-
assert "--mca plm_rsh_args '-p 2222'" in launcher_args
138-
assert "-bind-to none -map-by slot" in launcher_args
139-
assert "all_reduce_perf" in launcher_args
140-
assert "--nthreads 4" in launcher_args
141-
assert "--ngpus 2" in launcher_args
142-
assert "--minbytes 32M" in launcher_args
143-
assert "--maxbytes 64M" in launcher_args
144-
assert "--extra-flag value" in launcher_args
142+
assert f"-np {test_run_with_extra_args.nnodes * k8s_system.gpus_per_node}" in launcher_args
143+
assert "-bind-to none" in launcher_args
144+
assert "-mca plm_rsh_args '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'" in launcher_args
145+
assert nccl.cmd_args.subtest_name in launcher_args
146+
assert f"--nthreads {nccl.cmd_args.nthreads}" in launcher_args
147+
assert f"--ngpus {nccl.cmd_args.ngpus}" in launcher_args

0 commit comments

Comments
 (0)