1717from  kubernetes  import  client , watch 
1818from  reana_commons .config  import  REANA_RUNTIME_KUBERNETES_NAMESPACE 
1919from  reana_commons .k8s .api_client  import  current_k8s_corev1_api_client 
20- from  reana_db .database  import  Session 
21- from  reana_db .models  import  Job , JobStatus 
20+ from  reana_db .models  import  JobStatus 
2221
2322from  reana_job_controller .config  import  (
2423    COMPUTE_BACKENDS ,
3231    C4P_SSH_TIMEOUT ,
3332    C4P_SSH_BANNER_TIMEOUT ,
3433    C4P_SSH_AUTH_TIMEOUT ,
34+     USE_KUEUE ,
3535)
3636
3737from  reana_job_controller .job_db  import  JOB_DB , store_job_logs , update_job_status 
38- from  reana_job_controller .kubernetes_job_manager  import  KubernetesJobManager 
3938from  reana_job_controller .utils  import  (
4039    SSHClient ,
4140    singleton ,
@@ -115,7 +114,7 @@ def get_backend_job_id(self, job_pod):
115114        """ 
116115        return  job_pod .metadata .labels ["job-name" ]
117116
118-     def  should_process_job (self , job_pod ) ->  bool :
117+     def  should_process_job_pod (self , job_pod ) ->  bool :
119118        """Decide whether the job should be processed or not. 
120119
121120        Each job is processed only once, when it reaches a final state (either `failed` or `finished`). 
@@ -141,6 +140,27 @@ def should_process_job(self, job_pod) -> bool:
141140
142141        return  is_job_in_remaining_jobs  and  is_job_completed 
143142
143+     def  should_process_job (self , job ) ->  bool :
144+         """Decide whether the job should be processed or not. 
145+ 
146+         Each job is processed only once, when it reaches a final state (either `failed` or `finished`). 
147+ 
148+         :param job: Compute backend job object (Kubernetes V1Job 
149+             https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md) 
150+         """ 
151+         remaining_jobs  =  self ._get_remaining_jobs (
152+             statuses_to_skip = [
153+                 JobStatus .finished .name ,
154+                 JobStatus .failed .name ,
155+                 JobStatus .stopped .name ,
156+             ]
157+         )
158+ 
159+         is_job_in_remaining_jobs  =  job .metadata .name  in  remaining_jobs 
160+         is_job_completed  =  job .status .succeeded  and  not  job .status .active 
161+ 
162+         return  is_job_in_remaining_jobs  and  is_job_completed 
163+ 
144164    @staticmethod  
145165    def  _get_job_container_statuses (job_pod ):
146166        return  (job_pod .status .container_statuses  or  []) +  (
@@ -235,46 +255,99 @@ def watch_jobs(self, job_db, app=None):
235255
236256        :param job_db: Dictionary which contains all current jobs. 
237257        """ 
238-         while  True :
239-             logging .info ("Starting a new stream request to watch Jobs" )
240-             try :
241-                 w  =  watch .Watch ()
242-                 for  event  in  w .stream (
243-                     current_k8s_corev1_api_client .list_namespaced_pod ,
244-                     namespace = REANA_RUNTIME_KUBERNETES_NAMESPACE ,
245-                     label_selector = f"reana-run-job-workflow-uuid={ self .workflow_uuid }  ,
246-                 ):
247-                     logging .info ("New Pod event received: {0}" .format (event ["type" ]))
248-                     job_pod  =  event ["object" ]
249- 
250-                     # Each job is processed once, when reaching a final state 
251-                     # (either successfully or not) 
252-                     if  self .should_process_job (job_pod ):
253-                         job_status  =  self .get_job_status (job_pod )
254-                         backend_job_id  =  self .get_backend_job_id (job_pod )
255-                         reana_job_id  =  self .get_reana_job_id (backend_job_id )
258+         # If using MultiKueue, watch jobs instead of pods since worker pods could be 
259+         # running on a remote cluster that we can't directly monitor 
260+         if  True :# todo change to USE_KUEUE 
261+             while  True :
262+                 logging .info ("Starting a new stream request to watch Jobs" )
256263
257-                         logs  =  self .job_manager_cls .get_logs (
258-                             backend_job_id , job_pod = job_pod 
264+                 try :
265+                     w  =  watch .Watch ()
266+                     for  event  in  w .stream (
267+                         client .BatchV1Api ().list_namespaced_job ,
268+                         namespace = REANA_RUNTIME_KUBERNETES_NAMESPACE ,
269+                         label_selector = f"reana-run-job-workflow-uuid={ self .workflow_uuid }  ,
270+                     ):
271+                         logging .info (f"New Job event received: { event ["type" ]}  )
272+ 
273+                         job  =  event ["object" ]
274+                         job_id  =  job .metadata .name 
275+                         job_finished  =  job .status .succeeded  and  not  job .status .active  and  not  job .status .failed 
276+                         job_status  =  (
277+                             JobStatus .finished .name 
278+                             if  job_finished 
279+                             else  (
280+                                 JobStatus .failed .name 
281+                                 if  job .status .failed 
282+                                 else  JobStatus .running .name 
283+                             )
259284                        )
260285
261-                         if  job_status  ==  JobStatus .failed .name :
262-                             self .log_disruption (
263-                                 event ["object" ].status .conditions , backend_job_id 
286+                         if  self .should_process_job (job ):
287+                             reana_job_id  =  self .get_reana_job_id (job_id )
288+ 
289+                             if  job_status  ==  JobStatus .failed .name :
290+                                 self .log_disruption (
291+                                     event ["object" ].status .conditions , job_id 
292+                                 )
293+ 
294+                             # TODO: fetch logs from pod on remote worker when MultiKueue supports this 
295+                             # logs = self.job_manager_cls.get_logs(job_id) 
296+                             # store_job_logs(reana_job_id, logs) 
297+ 
298+                             update_job_status (
299+                                 reana_job_id ,
300+                                 job_status ,
264301                            )
265302
266-                         store_job_logs ( reana_job_id ,  logs ) 
267-                         update_job_status ( reana_job_id ,  job_status )
303+                              if   JobStatus . should_cleanup_job ( job_status ): 
304+                                  self . clean_job ( job_id )
268305
269-                         if  JobStatus .should_cleanup_job (job_status ):
270-                             self .clean_job (backend_job_id )
271-             except  client .rest .ApiException  as  e :
272-                 logging .exception (
273-                     f"Error from Kubernetes API while watching jobs pods: { e }  
274-                 )
275-             except  Exception  as  e :
276-                 logging .error (traceback .format_exc ())
277-                 logging .error ("Unexpected error: {}" .format (e ))
306+                 except  client .rest .ApiException  as  e :
307+                     logging .exception (f"Error from Kubernetes API while watching jobs: { e }  )
308+                 except  Exception  as  e :
309+                     logging .error (traceback .format_exc ())
310+                     logging .error ("Unexpected error: {}" .format (e ))
311+         else :
312+             while  True :
313+                 try :
314+                     w  =  watch .Watch ()
315+                     for  event  in  w .stream (
316+                         current_k8s_corev1_api_client .list_namespaced_pod ,
317+                         namespace = REANA_RUNTIME_KUBERNETES_NAMESPACE ,
318+                         label_selector = f"reana-run-job-workflow-uuid={ self .workflow_uuid }  ,
319+                     ):
320+                         logging .info ("New Pod event received: {0}" .format (event ["type" ]))
321+                         job_pod  =  event ["object" ]
322+ 
323+                         # Each job is processed once, when reaching a final state 
324+                         # (either successfully or not) 
325+                         if  self .should_process_job_pod (job_pod ):
326+                             job_status  =  self .get_job_status (job_pod )
327+                             backend_job_id  =  self .get_backend_job_id (job_pod )
328+                             reana_job_id  =  self .get_reana_job_id (backend_job_id )
329+ 
330+                             logs  =  self .job_manager_cls .get_logs (
331+                                 backend_job_id , job_pod = job_pod 
332+                             )
333+ 
334+                             if  job_status  ==  JobStatus .failed .name :
335+                                 self .log_disruption (
336+                                     event ["object" ].status .conditions , backend_job_id 
337+                                 )
338+ 
339+                             store_job_logs (reana_job_id , logs )
340+                             update_job_status (reana_job_id , job_status )
341+ 
342+                             if  JobStatus .should_cleanup_job (job_status ):
343+                                 self .clean_job (backend_job_id )
344+                 except  client .rest .ApiException  as  e :
345+                     logging .exception (
346+                         f"Error from Kubernetes API while watching jobs pods: { e }  
347+                     )
348+                 except  Exception  as  e :
349+                     logging .error (traceback .format_exc ())
350+                     logging .error ("Unexpected error: {}" .format (e ))
278351
279352    def  log_disruption (self , conditions , backend_job_id ):
280353        """Log disruption message from Kubernetes event conditions. 
0 commit comments