2626if TYPE_CHECKING :
2727 import kubernetes as k8s
2828
29+
2930from pydantic import BaseModel , ConfigDict
3031
3132from cloudai .core import BaseJob , System
@@ -65,7 +66,7 @@ class KubernetesSystem(BaseModel, System):
6566 _core_v1 : Optional [k8s .client .CoreV1Api ] = None
6667 _batch_v1 : Optional [k8s .client .BatchV1Api ] = None
6768 _custom_objects_api : Optional [k8s .client .CustomObjectsApi ] = None
68- _port_forward_process = None
69+ _port_forward_process : subprocess . Popen | None = None
6970 _genai_perf_completed : bool = False
7071
7172 def __getstate__ (self ) -> dict [str , Any ]:
@@ -252,7 +253,7 @@ def _is_batch_job_running(self, job_name: str) -> bool:
252253 )
253254 raise
254255
255- def are_vllm_pods_ready (self ) -> bool :
256+ def are_vllm_pods_ready (self , job : KubernetesJob ) -> bool :
256257 cmd = ["kubectl" , "get" , "pods" , "-n" , self .default_namespace ]
257258 try :
258259 result = subprocess .run (cmd , capture_output = True , text = True , check = True )
@@ -272,7 +273,7 @@ def are_vllm_pods_ready(self) -> bool:
272273 continue
273274
274275 pod_name = columns [0 ]
275- if "vllm-v1-agg" not in pod_name :
276+ if job . name not in pod_name :
276277 continue
277278
278279 vllm_pods_found = True
@@ -296,36 +297,42 @@ def are_vllm_pods_ready(self) -> bool:
296297 all_ready = False
297298
298299 if not vllm_pods_found :
299- logging .warning ("No vLLM pods found" )
300+ logging .debug ("No vLLM pods found" )
300301 return False
301302
302303 return all_ready
303304
304- def _setup_port_forward (self ) -> None :
305+ def _setup_port_forward (self , job : KubernetesJob ) -> None :
305306 if self ._port_forward_process and self ._port_forward_process .poll () is None :
306307 logging .debug ("Port forwarding is already running" )
307308 return
308309
309- if not self .are_vllm_pods_ready ():
310+ if not self .are_vllm_pods_ready (job ):
310311 logging .debug ("Pods are not ready yet, skipping port forward" )
311312 return
312313
313- get_pod_cmd = (
314- f"kubectl get pods -n { self .default_namespace } --no-headers | "
315- "grep vllm-v1-agg-frontend | "
316- "awk 'NR==1{print $1}'"
317- )
318- cmd = f"kubectl port-forward pod/$({ get_pod_cmd } ) 8000:8000 -n { self .default_namespace } "
314+ cmd = f"kubectl port-forward svc/{ job .name } -frontend 8000:8000 -n { self .default_namespace } "
319315 logging .debug ("Starting port forwarding" )
320316 self ._port_forward_process = subprocess .Popen (cmd , shell = True , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
321- logging .debug ("Port forwarding started" )
317+
318+ logging .debug (f"Port forwarding started (pid={ self ._port_forward_process .pid } )" )
322319
323320 def _check_model_server (self ) -> bool :
324- cmd = "curl -s http://localhost:8000/v1/models"
321+ if not self ._port_forward_process :
322+ logging .debug ("Port forward process is not running" )
323+ return False
324+
325+ server = "localhost:8000"
326+ cmd = f"curl -s http://{ server } /v1/models"
327+ logging .debug (f"Checking if model server is up at { server } : { cmd } " )
325328 result = subprocess .run (cmd , shell = True , capture_output = True , text = True )
326329
327330 if result .returncode != 0 :
328- logging .debug ("Failed to connect to model server" )
331+ logging .debug (
332+ f"Failed to connect to model server={ server } , "
333+ f"output={ result .stdout .strip ()} , "
334+ f"error={ result .stderr .strip ()} "
335+ )
329336 return False
330337
331338 try :
@@ -374,18 +381,23 @@ def _run_genai_perf(self, job: KubernetesJob) -> None:
374381 args_str = " " .join (args )
375382
376383 venv_path = python_exec .venv_path .absolute ()
377- cmd = f". { venv_path } /bin/activate && genai-perf profile { args_str } "
378- logging .debug ("Running GenAI performance test with command: " )
379- logging . debug ( cmd )
384+ cmd = f"{ venv_path } /bin/genai-perf profile { args_str } "
385+ logging .debug (f "Running GenAI performance test: { cmd } " )
386+ result : subprocess . CompletedProcess | None = None
380387 try :
381388 result = subprocess .run (cmd , shell = True , capture_output = True , text = True , check = True )
382389 logging .debug ("GenAI performance test completed successfully" )
383- logging .debug (f"Output: { result .stdout } " )
384390 except subprocess .CalledProcessError as e :
385391 logging .error (f"GenAI performance test failed: { e .stderr } " )
386- raise
392+
393+ if result :
394+ with (job .test_run .output_path / "stdout.txt" ).open ("w" ) as f :
395+ f .write (result .stdout )
396+ with (job .test_run .output_path / "stderr.txt" ).open ("w" ) as f :
397+ f .write (result .stderr )
387398
388399 def _check_deployment_conditions (self , conditions : list ) -> bool :
400+ logging .debug (f"Checking deployment conditions: { conditions } " )
389401 if not conditions :
390402 return True
391403
@@ -401,8 +413,8 @@ def _is_dynamo_graph_deployment_running(self, job: KubernetesJob) -> bool:
401413 if self ._genai_perf_completed :
402414 return False
403415
404- if self .are_vllm_pods_ready ():
405- self ._setup_port_forward ()
416+ if self .are_vllm_pods_ready (job ):
417+ self ._setup_port_forward (job )
406418 if self ._port_forward_process and self ._check_model_server ():
407419 logging .debug ("vLLM server is up and models are loaded" )
408420 self ._run_genai_perf (job )
@@ -438,7 +450,7 @@ def delete_job(self, job_name: str, job_kind: str) -> None:
438450 elif "job" in job_kind .lower ():
439451 self ._delete_batch_job (job_name )
440452 elif "dynamographdeployment" in job_kind .lower ():
441- pass
453+ self . _delete_dynamo_graph_deployment ( job_name )
442454 else :
443455 error_message = f"Unsupported job kind: '{ job_kind } '."
444456 logging .error (error_message )
@@ -480,11 +492,14 @@ def _delete_batch_job(self, job_name: str) -> None:
480492
481493 def _delete_dynamo_graph_deployment (self , job_name : str ) -> None :
482494 logging .debug (f"Deleting DynamoGraphDeployment '{ job_name } '" )
483- cmd = f"kubectl delete dgd vllm-v1-agg -n { self .default_namespace } "
495+ cmd = f"kubectl delete dgd { job_name } -n { self .default_namespace } "
484496 result = subprocess .run (cmd , shell = True , capture_output = True , text = True )
485497 if result .returncode != 0 :
486- raise subprocess .SubprocessError (f"Failed to delete DynamoGraphDeployment: { result .stderr } " )
487- logging .debug ("DynamoGraphDeployment deleted successfully" )
498+ logging .debug (f"Failed to delete DynamoGraphDeployment: { result .stderr } " )
499+
500+ if self ._port_forward_process and self ._port_forward_process .poll () is None :
501+ self ._port_forward_process .kill ()
502+ self ._port_forward_process = None
488503
489504 def create_job (self , job_spec : Dict [Any , Any ], timeout : int = 60 , interval : int = 1 ) -> str :
490505 """
@@ -559,15 +574,20 @@ def _create_mpi_job(self, job_spec: Dict[Any, Any]) -> str:
559574 return job_name
560575
561576 def _create_dynamo_graph_deployment (self , job_spec : Dict [Any , Any ]) -> str :
562- api_response = self .custom_objects_api .create_namespaced_custom_object (
563- group = "nvidia.com" ,
564- version = "v1alpha1" ,
565- namespace = self .default_namespace ,
566- plural = "dynamographdeployments" ,
567- body = job_spec ,
568- )
577+ try :
578+ api_response = self .custom_objects_api .create_namespaced_custom_object (
579+ group = "nvidia.com" ,
580+ version = "v1alpha1" ,
581+ namespace = self .default_namespace ,
582+ plural = "dynamographdeployments" ,
583+ body = job_spec ,
584+ )
585+ except lazy .k8s .client .ApiException as e :
586+ logging .error (f"An error occurred while creating DynamoGraphDeployment: { e .reason } " )
587+ self ._delete_dynamo_graph_deployment (job_spec ["metadata" ]["name" ])
588+ raise
569589
570- job_name : str = api_response ["metadata" ]["name" ]
590+ job_name = str ( api_response ["metadata" ]["name" ])
571591 logging .debug (f"DynamoGraphDeployment '{ job_name } ' created with status: { api_response .get ('status' )} " )
572592 return job_name
573593
0 commit comments