Skip to content

Commit 5f308c7

Browse files
feat(job-manager): add kubernetes_queue option for MultiKueue (#494)
Also adds monitoring of job status (rather than just pods) to the job_monitor to support MK since the status of remote pods cannot be seen. Closes #493
1 parent f014442 commit 5f308c7

File tree

6 files changed

+149
-52
lines changed

6 files changed

+149
-52
lines changed

docs/openapi.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@
8585
"kubernetes_memory_limit": {
8686
"type": "string"
8787
},
88+
"kubernetes_queue": {
89+
"type": "string"
90+
},
8891
"kubernetes_uid": {
8992
"format": "int32",
9093
"type": "integer"

reana_job_controller/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@
180180
USE_KUEUE = bool(strtobool(os.getenv("USE_KUEUE", "False")))
181181
"""Whether to use Kueue to manage job execution."""
182182

183-
KUEUE_LOCAL_QUEUE_NAME = "local-queue-job"
183+
KUEUE_LOCAL_QUEUE_NAME = "job"
184184
"""Name of the local queue to be used by Kueue."""
185185

186186
REANA_USER_ID = os.getenv("REANA_USER_ID")

reana_job_controller/job_monitor.py

Lines changed: 119 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
from kubernetes import client, watch
1818
from reana_commons.config import REANA_RUNTIME_KUBERNETES_NAMESPACE
1919
from reana_commons.k8s.api_client import current_k8s_corev1_api_client
20-
from reana_db.database import Session
21-
from reana_db.models import Job, JobStatus
20+
from reana_db.models import JobStatus
2221

2322
from reana_job_controller.config import (
2423
COMPUTE_BACKENDS,
@@ -32,10 +31,10 @@
3231
C4P_SSH_TIMEOUT,
3332
C4P_SSH_BANNER_TIMEOUT,
3433
C4P_SSH_AUTH_TIMEOUT,
34+
USE_KUEUE,
3535
)
3636

3737
from reana_job_controller.job_db import JOB_DB, store_job_logs, update_job_status
38-
from reana_job_controller.kubernetes_job_manager import KubernetesJobManager
3938
from reana_job_controller.utils import (
4039
SSHClient,
4140
singleton,
@@ -115,7 +114,7 @@ def get_backend_job_id(self, job_pod):
115114
"""
116115
return job_pod.metadata.labels["job-name"]
117116

118-
def should_process_job(self, job_pod) -> bool:
117+
def should_process_job_pod(self, job_pod) -> bool:
119118
"""Decide whether the job should be processed or not.
120119
121120
Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
@@ -141,6 +140,27 @@ def should_process_job(self, job_pod) -> bool:
141140

142141
return is_job_in_remaining_jobs and is_job_completed
143142

143+
def should_process_job(self, job) -> bool:
144+
"""Decide whether the job should be processed or not.
145+
146+
Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
147+
148+
:param job: Compute backend job object (Kubernetes V1Job
149+
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md)
150+
"""
151+
remaining_jobs = self._get_remaining_jobs(
152+
statuses_to_skip=[
153+
JobStatus.finished.name,
154+
JobStatus.failed.name,
155+
JobStatus.stopped.name,
156+
]
157+
)
158+
159+
is_job_in_remaining_jobs = job.metadata.name in remaining_jobs
160+
is_job_completed = job.status.succeeded and not job.status.active
161+
162+
return is_job_in_remaining_jobs and is_job_completed
163+
144164
@staticmethod
145165
def _get_job_container_statuses(job_pod):
146166
return (job_pod.status.container_statuses or []) + (
@@ -235,46 +255,107 @@ def watch_jobs(self, job_db, app=None):
235255
236256
:param job_db: Dictionary which contains all current jobs.
237257
"""
238-
while True:
239-
logging.info("Starting a new stream request to watch Jobs")
240-
try:
241-
w = watch.Watch()
242-
for event in w.stream(
243-
current_k8s_corev1_api_client.list_namespaced_pod,
244-
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
245-
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
246-
):
247-
logging.info("New Pod event received: {0}".format(event["type"]))
248-
job_pod = event["object"]
249-
250-
# Each job is processed once, when reaching a final state
251-
# (either successfully or not)
252-
if self.should_process_job(job_pod):
253-
job_status = self.get_job_status(job_pod)
254-
backend_job_id = self.get_backend_job_id(job_pod)
255-
reana_job_id = self.get_reana_job_id(backend_job_id)
258+
# If using MultiKueue, watch jobs instead of pods since worker pods could be
259+
# running on a remote cluster that we can't directly monitor
260+
if USE_KUEUE:
261+
while True:
262+
logging.info("Starting a new stream request to watch Jobs")
256263

257-
logs = self.job_manager_cls.get_logs(
258-
backend_job_id, job_pod=job_pod
264+
try:
265+
w = watch.Watch()
266+
for event in w.stream(
267+
client.BatchV1Api().list_namespaced_job,
268+
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
269+
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
270+
):
271+
logging.info(f"New Job event received: {event["type"]}")
272+
273+
job = event["object"]
274+
job_id = job.metadata.name
275+
job_finished = (
276+
job.status.succeeded
277+
and not job.status.active
278+
and not job.status.failed
259279
)
280+
job_status = (
281+
JobStatus.finished.name
282+
if job_finished
283+
else (
284+
JobStatus.failed.name
285+
if job.status.failed
286+
else JobStatus.running.name
287+
)
288+
)
289+
290+
if self.should_process_job(job):
291+
reana_job_id = self.get_reana_job_id(job_id)
292+
293+
if job_status == JobStatus.failed.name:
294+
self.log_disruption(
295+
event["object"].status.conditions, job_id
296+
)
297+
298+
# TODO: fetch logs from pod on remote worker when MultiKueue supports this
299+
# logs = self.job_manager_cls.get_logs(job_id)
300+
# store_job_logs(reana_job_id, logs)
260301

261-
if job_status == JobStatus.failed.name:
262-
self.log_disruption(
263-
event["object"].status.conditions, backend_job_id
302+
update_job_status(
303+
reana_job_id,
304+
job_status,
264305
)
265306

266-
store_job_logs(reana_job_id, logs)
267-
update_job_status(reana_job_id, job_status)
307+
if JobStatus.should_cleanup_job(job_status):
308+
self.clean_job(job_id)
268309

269-
if JobStatus.should_cleanup_job(job_status):
270-
self.clean_job(backend_job_id)
271-
except client.rest.ApiException as e:
272-
logging.exception(
273-
f"Error from Kubernetes API while watching jobs pods: {e}"
274-
)
275-
except Exception as e:
276-
logging.error(traceback.format_exc())
277-
logging.error("Unexpected error: {}".format(e))
310+
except client.rest.ApiException as e:
311+
logging.exception(
312+
f"Error from Kubernetes API while watching jobs: {e}"
313+
)
314+
except Exception as e:
315+
logging.error(traceback.format_exc())
316+
logging.error("Unexpected error: {}".format(e))
317+
else:
318+
while True:
319+
try:
320+
w = watch.Watch()
321+
for event in w.stream(
322+
current_k8s_corev1_api_client.list_namespaced_pod,
323+
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
324+
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
325+
):
326+
logging.info(
327+
"New Pod event received: {0}".format(event["type"])
328+
)
329+
job_pod = event["object"]
330+
331+
# Each job is processed once, when reaching a final state
332+
# (either successfully or not)
333+
if self.should_process_job_pod(job_pod):
334+
job_status = self.get_job_status(job_pod)
335+
backend_job_id = self.get_backend_job_id(job_pod)
336+
reana_job_id = self.get_reana_job_id(backend_job_id)
337+
338+
logs = self.job_manager_cls.get_logs(
339+
backend_job_id, job_pod=job_pod
340+
)
341+
342+
if job_status == JobStatus.failed.name:
343+
self.log_disruption(
344+
event["object"].status.conditions, backend_job_id
345+
)
346+
347+
store_job_logs(reana_job_id, logs)
348+
update_job_status(reana_job_id, job_status)
349+
350+
if JobStatus.should_cleanup_job(job_status):
351+
self.clean_job(backend_job_id)
352+
except client.rest.ApiException as e:
353+
logging.exception(
354+
f"Error from Kubernetes API while watching jobs pods: {e}"
355+
)
356+
except Exception as e:
357+
logging.error(traceback.format_exc())
358+
logging.error("Unexpected error: {}".format(e))
278359

279360
def log_disruption(self, conditions, backend_job_id):
280361
"""Log disruption message from Kubernetes event conditions.

reana_job_controller/kubernetes_job_manager.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ class KubernetesJobManager(JobManager):
6767
MAX_NUM_JOB_RESTARTS = 0
6868
"""Maximum number of job restarts in case of internal failures."""
6969

70+
@property
71+
def secrets(self):
72+
"""Get cached secrets if present, otherwise fetch them from k8s."""
73+
if self._secrets is None:
74+
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
75+
return self._secrets
76+
7077
def __init__(
7178
self,
7279
docker_img=None,
@@ -81,6 +88,7 @@ def __init__(
8188
kerberos=False,
8289
kubernetes_uid=None,
8390
kubernetes_memory_limit=None,
91+
kubernetes_queue=None,
8492
voms_proxy=False,
8593
rucio=False,
8694
kubernetes_job_timeout: Optional[int] = None,
@@ -113,6 +121,8 @@ def __init__(
113121
:type kubernetes_uid: int
114122
:param kubernetes_memory_limit: Memory limit for job container.
115123
:type kubernetes_memory_limit: str
124+
:param kubernetes_queue: If Kueue is enabled of the MultiKueue LocalQueue to send jobs to.
125+
:type kubernetes_queue: str
116126
:param kubernetes_job_timeout: Job timeout in seconds.
117127
:type kubernetes_job_timeout: int
118128
:param voms_proxy: Decides if a voms-proxy certificate should be
@@ -142,17 +152,11 @@ def __init__(
142152
self.rucio = rucio
143153
self.set_user_id(kubernetes_uid)
144154
self.set_memory_limit(kubernetes_memory_limit)
155+
self.kubernetes_queue = kubernetes_queue
145156
self.workflow_uuid = workflow_uuid
146157
self.kubernetes_job_timeout = kubernetes_job_timeout
147158
self._secrets: Optional[UserSecrets] = secrets
148159

149-
@property
150-
def secrets(self):
151-
"""Get cached secrets if present, otherwise fetch them from k8s."""
152-
if self._secrets is None:
153-
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
154-
return self._secrets
155-
156160
@JobManager.execution_hook
157161
def execute(self):
158162
"""Execute a job in Kubernetes."""
@@ -164,11 +168,16 @@ def execute(self):
164168
"metadata": {
165169
"name": backend_job_id,
166170
"namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE,
167-
"labels": (
168-
{"kueue.x-k8s.io/queue-name": KUEUE_LOCAL_QUEUE_NAME}
169-
if USE_KUEUE
170-
else {}
171-
),
171+
"labels": {
172+
"reana-run-job-workflow-uuid": self.workflow_uuid,
173+
**(
174+
{
175+
"kueue.x-k8s.io/queue-name": f"{self.kubernetes_queue}-{KUEUE_LOCAL_QUEUE_NAME}-queue"
176+
}
177+
if self.kubernetes_queue and USE_KUEUE
178+
else {}
179+
),
180+
},
172181
},
173182
"spec": {
174183
"backoffLimit": KubernetesJobManager.MAX_NUM_JOB_RESTARTS,

reana_job_controller/schemas.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class JobRequest(Schema):
5050
rucio = fields.Bool(required=False)
5151
kubernetes_uid = fields.Int(required=False)
5252
kubernetes_memory_limit = fields.Str(required=False)
53+
kubernetes_queue = fields.Str(required=False)
5354
kubernetes_job_timeout = fields.Int(required=False)
5455
unpacked_img = fields.Bool(required=False)
5556
htcondor_max_runtime = fields.Str(required=False)

tests/test_job_monitor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ def test_kubernetes_should_process_job(
107107
"Succeeded", "Completed", job_id=backend_job_id
108108
)
109109

110-
assert bool(job_monitor_k8s.should_process_job(job_pod_event)) == should_process
110+
assert (
111+
bool(job_monitor_k8s.should_process_job_pod(job_pod_event))
112+
== should_process
113+
)
111114

112115

113116
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)