1616
1717from __future__ import annotations
1818
19- import json
2019import logging
2120import subprocess
2221import time
@@ -43,7 +42,6 @@ class KubernetesSystem(System):
4342 _core_v1 : Optional [k8s .client .CoreV1Api ] = None
4443 _batch_v1 : Optional [k8s .client .BatchV1Api ] = None
4544 _custom_objects_api : Optional [k8s .client .CustomObjectsApi ] = None
46- _port_forward_process : subprocess .Popen | None = None
4745 _genai_perf_completed : bool = False
4846
4947 def __getstate__ (self ) -> dict [str , Any ]:
@@ -279,60 +277,15 @@ def are_vllm_pods_ready(self, job: KubernetesJob) -> bool:
279277
280278 return all_ready
281279
282- def _setup_port_forward (self , job : KubernetesJob ) -> None :
283- if self ._port_forward_process and self ._port_forward_process .poll () is None :
284- logging .debug ("Port forwarding is already running" )
285- return
286-
287- if not self .are_vllm_pods_ready (job ):
288- logging .debug ("Pods are not ready yet, skipping port forward" )
289- return
290-
291- cmd = f"kubectl port-forward svc/{ job .name } -frontend 8000:8000 -n { self .default_namespace } "
292- logging .debug ("Starting port forwarding" )
293- self ._port_forward_process = subprocess .Popen (cmd , shell = True , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
294-
295- logging .debug (f"Port forwarding started (pid={ self ._port_forward_process .pid } )" )
296-
297- def _check_model_server (self ) -> bool :
298- if not self ._port_forward_process :
299- logging .debug ("Port forward process is not running" )
300- return False
301-
302- server = "localhost:8000"
303- cmd = f"curl -s http://{ server } /v1/models"
304- logging .debug (f"Checking if model server is up at { server } : { cmd } " )
305- result = subprocess .run (cmd , shell = True , capture_output = True , text = True )
306-
307- if result .returncode != 0 :
308- logging .debug (
309- f"Failed to connect to model server={ server } , "
310- f"output={ result .stdout .strip ()} , "
311- f"error={ result .stderr .strip ()} "
312- )
313- return False
314-
315- try :
316- response = json .loads (result .stdout )
317- if response .get ("data" ) and len (response ["data" ]) > 0 :
318- logging .debug (f"Model server is running. Response: { result .stdout } " )
319- return True
320- else :
321- logging .debug ("Model server is up but no models are loaded yet" )
322- return False
323- except json .JSONDecodeError :
324- logging .warning ("Invalid JSON response from model server" )
325- return False
326-
327- def _get_frontend_pod_name (self ) -> str :
280+ def _get_dynamo_pod_by_role (self , role : str ) -> str :
328281 for pod in self .core_v1 .list_namespaced_pod (namespace = self .default_namespace ).items :
329282 labels = pod .metadata .labels
330283 logging .debug (f"Found pod: { pod .metadata .name } with labels: { labels } " )
331- if labels and str (labels .get ("nvidia.com/dynamo-component" , "" )).lower () == "frontend" : # v0.6.x
284+ if labels and str (labels .get ("nvidia.com/dynamo-component" , "" )).lower () == role . lower () : # v0.6.x
332285 return pod .metadata .name
333- if labels and str (labels .get ("nvidia.com/dynamo-component-type" , "" )).lower () == "frontend" : # v0.7.x
286+ if labels and str (labels .get ("nvidia.com/dynamo-component-type" , "" )).lower () == role . lower () : # v0.7.x
334287 return pod .metadata .name
335- raise RuntimeError ("No frontend pod found for the job " )
288+ raise RuntimeError (f "No pod found for the role ' { role } ' " )
336289
337290 def _run_genai_perf (self , job : KubernetesJob ) -> None :
338291 from cloudai .workloads .ai_dynamo .ai_dynamo import AIDynamoTestDefinition
@@ -352,7 +305,7 @@ def _run_genai_perf(self, job: KubernetesJob) -> None:
352305 genai_perf_cmd .extend (extra_args .split ())
353306 logging .debug (f"GenAI perf arguments: { genai_perf_cmd = } " )
354307
355- frontend_pod = self ._get_frontend_pod_name ( )
308+ frontend_pod = self ._get_dynamo_pod_by_role ( role = "frontend" )
356309
357310 logging .debug (f"Executing genai-perf in pod={ frontend_pod } cmd={ genai_perf_cmd } " )
358311 try :
@@ -402,12 +355,17 @@ def _is_dynamo_graph_deployment_running(self, job: KubernetesJob) -> bool:
402355 return False
403356
404357 if self .are_vllm_pods_ready (job ):
405- self ._setup_port_forward (job )
406- if self ._port_forward_process and self ._check_model_server ():
407- logging .debug ("vLLM server is up and models are loaded" )
408- self ._run_genai_perf (job )
409- self ._genai_perf_completed = True
410- return False
358+ self ._run_genai_perf (job )
359+ self ._genai_perf_completed = True
360+
361+ for pod_role in {"decode" , "prefill" , "frontend" }:
362+ pod_name = self ._get_dynamo_pod_by_role (pod_role )
363+ logging .debug (f"Fetching logs for { pod_role = } { pod_name = } " )
364+ logs = self .core_v1 .read_namespaced_pod_log (name = pod_name , namespace = self .default_namespace )
365+ with (job .test_run .output_path / f"{ pod_role } _pod.log" ).open ("w" ) as f :
366+ f .write (logs )
367+
368+ return False
411369
412370 deployment = cast (
413371 dict ,
@@ -485,9 +443,6 @@ def _delete_dynamo_graph_deployment(self, job_name: str) -> None:
485443 if result .returncode != 0 :
486444 logging .debug (f"Failed to delete DynamoGraphDeployment: { result .stderr } " )
487445
488- if self ._port_forward_process and self ._port_forward_process .poll () is None :
489- self ._port_forward_process .kill ()
490- self ._port_forward_process = None
491446 self ._genai_perf_completed = False
492447
493448 def create_job (self , job_spec : Dict [Any , Any ], timeout : int = 60 , interval : int = 1 ) -> str :
0 commit comments