Skip to content

Commit f5b2ceb

Browse files
authored
Merge pull request #752 from NVIDIA/am/k8s-dse
Enhancements for Dynamo with k8s
2 parents 67fe5f8 + 452c179 commit f5b2ceb

File tree

5 files changed

+45
-62
lines changed

5 files changed

+45
-62
lines changed

src/cloudai/_core/json_gen_strategy.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
from abc import ABC, abstractmethod
1919
from typing import Any, Dict
2020

21+
import toml
22+
2123
from .system import System
2224
from .test_scenario import TestRun
2325

@@ -29,6 +31,8 @@ class JsonGenStrategy(ABC):
2931
It specifies how to generate JSON job specifications based on system and test parameters.
3032
"""
3133

34+
TEST_RUN_DUMP_FILE_NAME: str = "test-run.toml"
35+
3236
def __init__(self, system: System, test_run: TestRun) -> None:
3337
self.system = system
3438
self.test_run = test_run
@@ -54,6 +58,14 @@ def sanitize_k8s_job_name(self, job_name: str) -> str:
5458
sanitized_name = re.sub(r"[^a-z0-9]+$", "", sanitized_name)
5559
return sanitized_name[:253]
5660

61+
def store_test_run(self) -> None:
62+
from cloudai.models.scenario import TestRunDetails
63+
64+
test_cmd, srun_cmd = ("", "n/a")
65+
with (self.test_run.output_path / self.TEST_RUN_DUMP_FILE_NAME).open("w") as f:
66+
trd = TestRunDetails.from_test_run(self.test_run, test_cmd=test_cmd, full_cmd=srun_cmd)
67+
toml.dump(trd.model_dump(), f)
68+
5769
@abstractmethod
5870
def gen_json(self) -> Dict[Any, Any]:
5971
"""

src/cloudai/cli/cli.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ def setup_logging(log_file: str, log_level: str) -> None:
7474
"handlers": ["debug_file"],
7575
"propagate": False,
7676
},
77+
"kubernetes": {
78+
"handlers": [],
79+
"propagate": False,
80+
},
7781
},
7882
}
7983
logging.config.dictConfig(LOGGING_CONFIG)

src/cloudai/systems/kubernetes/kubernetes_runner.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ def _submit_test(self, tr: TestRun) -> KubernetesJob:
4242

4343
return job
4444

45+
def on_job_submit(self, tr: TestRun) -> None:
46+
json_gen = self.get_json_gen_strategy(self.system, tr)
47+
json_gen.store_test_run()
48+
4549
def on_job_completion(self, job: BaseJob) -> None:
4650
k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system)
4751
k_job = cast(KubernetesJob, job)

src/cloudai/systems/kubernetes/kubernetes_system.py

Lines changed: 24 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
from __future__ import annotations
1818

19-
import json
2019
import logging
2120
import subprocess
2221
import time
@@ -43,7 +42,6 @@ class KubernetesSystem(System):
4342
_core_v1: Optional[k8s.client.CoreV1Api] = None
4443
_batch_v1: Optional[k8s.client.BatchV1Api] = None
4544
_custom_objects_api: Optional[k8s.client.CustomObjectsApi] = None
46-
_port_forward_process: subprocess.Popen | None = None
4745
_genai_perf_completed: bool = False
4846

4947
def __getstate__(self) -> dict[str, Any]:
@@ -279,60 +277,15 @@ def are_vllm_pods_ready(self, job: KubernetesJob) -> bool:
279277

280278
return all_ready
281279

282-
def _setup_port_forward(self, job: KubernetesJob) -> None:
283-
if self._port_forward_process and self._port_forward_process.poll() is None:
284-
logging.debug("Port forwarding is already running")
285-
return
286-
287-
if not self.are_vllm_pods_ready(job):
288-
logging.debug("Pods are not ready yet, skipping port forward")
289-
return
290-
291-
cmd = f"kubectl port-forward svc/{job.name}-frontend 8000:8000 -n {self.default_namespace}"
292-
logging.debug("Starting port forwarding")
293-
self._port_forward_process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
294-
295-
logging.debug(f"Port forwarding started (pid={self._port_forward_process.pid})")
296-
297-
def _check_model_server(self) -> bool:
298-
if not self._port_forward_process:
299-
logging.debug("Port forward process is not running")
300-
return False
301-
302-
server = "localhost:8000"
303-
cmd = f"curl -s http://{server}/v1/models"
304-
logging.debug(f"Checking if model server is up at {server}: {cmd}")
305-
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
306-
307-
if result.returncode != 0:
308-
logging.debug(
309-
f"Failed to connect to model server={server}, "
310-
f"output={result.stdout.strip()}, "
311-
f"error={result.stderr.strip()}"
312-
)
313-
return False
314-
315-
try:
316-
response = json.loads(result.stdout)
317-
if response.get("data") and len(response["data"]) > 0:
318-
logging.debug(f"Model server is running. Response: {result.stdout}")
319-
return True
320-
else:
321-
logging.debug("Model server is up but no models are loaded yet")
322-
return False
323-
except json.JSONDecodeError:
324-
logging.warning("Invalid JSON response from model server")
325-
return False
326-
327-
def _get_frontend_pod_name(self) -> str:
280+
def _get_dynamo_pod_by_role(self, role: str) -> str:
328281
for pod in self.core_v1.list_namespaced_pod(namespace=self.default_namespace).items:
329282
labels = pod.metadata.labels
330283
logging.debug(f"Found pod: {pod.metadata.name} with labels: {labels}")
331-
if labels and str(labels.get("nvidia.com/dynamo-component", "")).lower() == "frontend": # v0.6.x
284+
if labels and str(labels.get("nvidia.com/dynamo-component", "")).lower() == role.lower(): # v0.6.x
332285
return pod.metadata.name
333-
if labels and str(labels.get("nvidia.com/dynamo-component-type", "")).lower() == "frontend": # v0.7.x
286+
if labels and str(labels.get("nvidia.com/dynamo-component-type", "")).lower() == role.lower(): # v0.7.x
334287
return pod.metadata.name
335-
raise RuntimeError("No frontend pod found for the job")
288+
raise RuntimeError(f"No pod found for the role '{role}'")
336289

337290
def _run_genai_perf(self, job: KubernetesJob) -> None:
338291
from cloudai.workloads.ai_dynamo.ai_dynamo import AIDynamoTestDefinition
@@ -352,7 +305,7 @@ def _run_genai_perf(self, job: KubernetesJob) -> None:
352305
genai_perf_cmd.extend(extra_args.split())
353306
logging.debug(f"GenAI perf arguments: {genai_perf_cmd=}")
354307

355-
frontend_pod = self._get_frontend_pod_name()
308+
frontend_pod = self._get_dynamo_pod_by_role(role="frontend")
356309

357310
logging.debug(f"Executing genai-perf in pod={frontend_pod} cmd={genai_perf_cmd}")
358311
try:
@@ -402,12 +355,20 @@ def _is_dynamo_graph_deployment_running(self, job: KubernetesJob) -> bool:
402355
return False
403356

404357
if self.are_vllm_pods_ready(job):
405-
self._setup_port_forward(job)
406-
if self._port_forward_process and self._check_model_server():
407-
logging.debug("vLLM server is up and models are loaded")
408-
self._run_genai_perf(job)
409-
self._genai_perf_completed = True
410-
return False
358+
self._run_genai_perf(job)
359+
self._genai_perf_completed = True
360+
361+
for pod_role in {"decode", "prefill", "frontend"}:
362+
try:
363+
pod_name = self._get_dynamo_pod_by_role(pod_role)
364+
logging.debug(f"Fetching logs for {pod_role=} {pod_name=}")
365+
logs = self.core_v1.read_namespaced_pod_log(name=pod_name, namespace=self.default_namespace)
366+
with (job.test_run.output_path / f"{pod_role}_pod.log").open("w") as f:
367+
f.write(logs)
368+
except Exception as e:
369+
logging.debug(f"Error fetching logs for role '{pod_role}': {e}")
370+
371+
return False
411372

412373
deployment = cast(
413374
dict,
@@ -485,9 +446,7 @@ def _delete_dynamo_graph_deployment(self, job_name: str) -> None:
485446
if result.returncode != 0:
486447
logging.debug(f"Failed to delete DynamoGraphDeployment: {result.stderr}")
487448

488-
if self._port_forward_process and self._port_forward_process.poll() is None:
489-
self._port_forward_process.kill()
490-
self._port_forward_process = None
449+
self._genai_perf_completed = False
491450

492451
def create_job(self, job_spec: Dict[Any, Any], timeout: int = 60, interval: int = 1) -> str:
493452
"""
@@ -562,6 +521,10 @@ def _create_mpi_job(self, job_spec: Dict[Any, Any]) -> str:
562521
return job_name
563522

564523
def _create_dynamo_graph_deployment(self, job_spec: Dict[Any, Any]) -> str:
524+
logging.debug(f"Attempting to delete existing job='{job_spec['metadata']['name']}' before creation.")
525+
self._delete_dynamo_graph_deployment(job_spec["metadata"]["name"])
526+
527+
logging.debug("Creating DynamoGraphDeployment with spec")
565528
try:
566529
api_response = self.custom_objects_api.create_namespaced_custom_object(
567530
group="nvidia.com",

src/cloudai/workloads/ai_dynamo/ai_dynamo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def was_run_successful(self, tr: TestRun) -> JobStatusResult:
154154
output_path = tr.output_path
155155
csv_files = list(output_path.rglob(CSV_FILES_PATTERN))
156156
json_files = list(output_path.rglob(JSON_FILES_PATTERN))
157-
logging.debug(f"Found CSV files: {csv_files}, JSON files: {json_files}")
157+
logging.debug(f"Found CSV files in {output_path.absolute()}: {csv_files}, JSON files: {json_files}")
158158
has_results = len(csv_files) > 0 and len(json_files) > 0
159159
if not has_results:
160160
return JobStatusResult(False, "No result files found in the output directory.")

0 commit comments

Comments
 (0)