Skip to content

Commit 3a8e37e

Browse files
committed
feat(dask): introduce labels and node selector to Dask resources (#628)
This commit introduces support for adding labels and a node selector to Dask resources, allowing finer control over resource scheduling in K8s. Following changes are introduced: - `reana-run-dask-owner-uuid` and `reana-run-dask-workflow-uuid` label for Dask scheduler and worker pods in addition to cluster and autoscaler custom K8s resources. - runtime_jobs node selector for Dask scheduler and worker pods if `node_label_runtimejobs` is defined in `values.yaml`. Closes #623
1 parent a71ad83 commit 3a8e37e

2 files changed

Lines changed: 38 additions & 8 deletions

File tree

reana_workflow_controller/dask.py

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# This file is part of REANA.
2-
# Copyright (C) 2024 CERN.
2+
# Copyright (C) 2024, 2025 CERN.
33
#
44
# REANA is free software; you can redistribute it and/or modify it
55
# under the terms of the MIT License; see LICENSE file for more details.
@@ -24,6 +24,7 @@
2424
REANA_JOB_HOSTPATH_MOUNTS,
2525
WORKFLOW_RUNTIME_USER_UID,
2626
REANA_RUNTIME_KUBERNETES_NAMESPACE,
27+
REANA_RUNTIME_JOBS_KUBERNETES_NODE_LABEL,
2728
)
2829
from reana_commons.k8s.api_client import (
2930
current_k8s_networking_api_client,
@@ -73,8 +74,8 @@ def __init__(
7374
self.single_worker_memory = single_worker_memory
7475
self.workflow_spec = workflow_spec
7576
self.workflow_workspace = workflow_workspace
76-
self.workflow_id = workflow_workspace.split("/")[-1]
77-
self.user_id = user_id
77+
self.workflow_id = str(workflow_id)
78+
self.user_id = str(user_id)
7879

7980
self.cluster_spec = workflow_spec.get("resources", {}).get("dask", [])
8081
self.cluster_body = self._load_dask_cluster_template()
@@ -128,7 +129,7 @@ def create_dask_resources(self):
128129
self._prepare_autoscaler()
129130
self._create_dask_autoscaler()
130131

131-
create_dask_dashboard_ingress(self.workflow_id)
132+
create_dask_dashboard_ingress(self.workflow_id, self.user_id)
132133

133134
except Exception as e:
134135
logging.error(
@@ -145,7 +146,13 @@ def _prepare_cluster(self):
145146
self._add_eos_volume()
146147

147148
# Add the name of the cluster, used in scheduler service name
148-
self.cluster_body["metadata"] = {"name": self.cluster_name}
149+
self.cluster_body["metadata"] = {
150+
"name": self.cluster_name,
151+
"labels": {
152+
"reana-run-dask-owner-uuid": self.user_id,
153+
"reana-run-dask-workflow-uuid": self.workflow_id,
154+
},
155+
}
149156

150157
# self.cluster_body["spec"]["worker"]["spec"]["metadata"] = {"name": "amcik"}
151158

@@ -180,6 +187,15 @@ def _prepare_cluster(self):
180187
{"name": "DASK_SCHEDULER_URI", "value": self.dask_scheduler_uri},
181188
)
182189

190+
# Add kubernetes node label if exists
191+
if REANA_RUNTIME_JOBS_KUBERNETES_NODE_LABEL:
192+
self.cluster_body["spec"]["scheduler"]["spec"][
193+
"nodeSelector"
194+
] = REANA_RUNTIME_JOBS_KUBERNETES_NODE_LABEL
195+
self.cluster_body["spec"]["worker"]["spec"][
196+
"nodeSelector"
197+
] = REANA_RUNTIME_JOBS_KUBERNETES_NODE_LABEL
198+
183199
# Add secrets
184200
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].extend(
185201
self.secret_env_vars
@@ -208,7 +224,13 @@ def _prepare_cluster(self):
208224
def _prepare_autoscaler(self):
209225
"""Prepare Dask autoscaler body."""
210226
# Add the name of the dask autoscaler
211-
self.autoscaler_body["metadata"] = {"name": self.autoscaler_name}
227+
self.autoscaler_body["metadata"] = {
228+
"name": self.autoscaler_name,
229+
"labels": {
230+
"reana-run-dask-owner-uuid": self.user_id,
231+
"reana-run-dask-workflow-uuid": self.workflow_id,
232+
},
233+
}
212234

213235
# Connect autoscaler to the cluster
214236
self.autoscaler_body["spec"]["cluster"] = self.cluster_name
@@ -612,7 +634,7 @@ def delete_dask_cluster(workflow_id, user_id) -> None:
612634
)
613635

614636

615-
def create_dask_dashboard_ingress(workflow_id):
637+
def create_dask_dashboard_ingress(workflow_id, user_id):
616638
"""Create K8S Ingress object for Dask dashboard."""
617639
# Define the middleware spec
618640
middleware_spec = {
@@ -622,6 +644,10 @@ def create_dask_dashboard_ingress(workflow_id):
622644
"name": get_dask_component_name(
623645
workflow_id, "dashboard_ingress_middleware"
624646
),
647+
"labels": {
648+
"reana-run-dask-owner-uuid": user_id,
649+
"reana-run-dask-workflow-uuid": workflow_id,
650+
},
625651
"namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE,
626652
},
627653
"spec": {
@@ -641,6 +667,10 @@ def create_dask_dashboard_ingress(workflow_id):
641667
**REANA_INGRESS_ANNOTATIONS,
642668
"traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-{get_dask_component_name(workflow_id, 'dashboard_ingress_middleware')}@kubernetescrd",
643669
},
670+
labels={
671+
"reana-run-dask-owner-uuid": user_id,
672+
"reana-run-dask-workflow-uuid": workflow_id,
673+
},
644674
),
645675
spec=client.V1IngressSpec(
646676
rules=[

tests/test_dask.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def test_create_dask_resources(dask_resource_manager):
174174
mock_create_cluster.assert_called_once()
175175
mock_create_autoscaler.assert_called_once()
176176
mock_create_dashboard_ingress.assert_called_once_with(
177-
dask_resource_manager.workflow_id
177+
dask_resource_manager.workflow_id, dask_resource_manager.user_id
178178
)
179179

180180

0 commit comments

Comments
 (0)