2727 get_workspace_volume ,
2828 get_reana_shared_volume ,
2929)
30+ from reana_commons .job_utils import kubernetes_memory_to_bytes
3031
32+ from reana_workflow_controller .config import DASK_AUTOSCALER_ENABLED
3133from reana_workflow_controller .k8s import create_dask_dashboard_ingress
3234
3335
@@ -64,7 +66,7 @@ def __init__(
6466 self .user_id = user_id
6567
6668 self .cluster_spec = workflow_spec .get ("resources" , {}).get ("dask" , [])
67- self .cluster_body , self . autoscaler_body = self ._load_dask_templates ()
69+ self .cluster_body = self ._load_dask_cluster_template ()
6870 self .cluster_image = self .cluster_spec ["image" ]
6971 self .dask_scheduler_uri = (
7072 f"{ self .cluster_name } -scheduler.default.svc.cluster.local:8786"
@@ -77,29 +79,43 @@ def __init__(
7779 )
7880 self .kubernetes_uid = WORKFLOW_RUNTIME_USER_UID
7981
80- def _load_dask_templates (self ):
81- """Load Dask templates from YAML files."""
82+ if DASK_AUTOSCALER_ENABLED :
83+ self .autoscaler_name = f"dask-autoscaler-{ cluster_name } "
84+ self .autoscaler_body = self ._load_dask_autoscaler_template ()
85+
86+ def _load_dask_cluster_template (self ):
87+ """Load Dask cluster template from YAML file."""
8288 with open (
8389 "reana_workflow_controller/templates/dask_cluster.yaml" , "r"
84- ) as dask_cluster_yaml , open (
85- "reana_workflow_controller/templates/dask_autoscaler.yaml" , "r"
86- ) as dask_autoscaler_yaml :
90+ ) as dask_cluster_yaml :
8791 dask_cluster_body = yaml .safe_load (dask_cluster_yaml )
88- dask_autoscaler_body = yaml .safe_load (dask_autoscaler_yaml )
8992 dask_cluster_body ["spec" ]["worker" ]["spec" ]["initContainers" ] = []
9093 dask_cluster_body ["spec" ]["worker" ]["spec" ]["containers" ][0 ]["env" ] = []
9194 dask_cluster_body ["spec" ]["worker" ]["spec" ]["containers" ][0 ][
9295 "volumeMounts"
9396 ] = []
9497 dask_cluster_body ["spec" ]["worker" ]["spec" ]["volumes" ] = []
9598
96- return dask_cluster_body , dask_autoscaler_body
99+ return dask_cluster_body
100+
101+ def _load_dask_autoscaler_template (self ):
102+ """Load Dask autoscaler template from YAML file."""
103+ with open (
104+ "reana_workflow_controller/templates/dask_autoscaler.yaml" , "r"
105+ ) as dask_autoscaler_yaml :
106+ dask_autoscaler_body = yaml .safe_load (dask_autoscaler_yaml )
107+
108+ return dask_autoscaler_body
97109
98110 def create_dask_resources (self ):
99111 """Create necessary Dask resources for the workflow."""
100112 self ._prepare_cluster ()
101113 self ._create_dask_cluster ()
102- self ._create_dask_autoscaler ()
114+
115+ if DASK_AUTOSCALER_ENABLED :
116+ self ._prepare_autoscaler ()
117+ self ._create_dask_autoscaler ()
118+
103119 create_dask_dashboard_ingress (self .cluster_name , self .workflow_id )
104120
105121 def _prepare_cluster (self ):
@@ -113,16 +129,10 @@ def _prepare_cluster(self):
113129 # Add the name of the cluster, used in scheduler service name
114130 self .cluster_body ["metadata" ] = {"name" : self .cluster_name }
115131
116- # Add the name of the dask autoscaler
117- self .autoscaler_body ["metadata" ] = {"name" : self .autoscaler_name }
118-
119132 self .cluster_body ["spec" ]["scheduler" ]["service" ]["selector" ][
120133 "dask.org/cluster-name"
121134 ] = self .cluster_name
122135
123- # Connect autoscaler to the cluster
124- self .autoscaler_body ["spec" ]["cluster" ] = self .cluster_name
125-
126136 # Add image to worker and scheduler
127137 self .cluster_body ["spec" ]["worker" ]["spec" ]["containers" ][0 ][
128138 "image"
@@ -141,8 +151,9 @@ def _prepare_cluster(self):
141151 "limits" : {"memory" : f"{ self .single_worker_memory } " , "cpu" : "1" }
142152 }
143153
144- # Set max limit on autoscaler
145- self .autoscaler_body ["spec" ]["maximum" ] = self .num_of_workers
154+ self .cluster_body ["spec" ]["worker" ]["replicas" ] = (
155+ 0 if DASK_AUTOSCALER_ENABLED else self .num_of_workers
156+ )
146157
147158 # Add DASK SCHEDULER URI env variable
148159 self .cluster_body ["spec" ]["worker" ]["spec" ]["containers" ][0 ]["env" ].append (
@@ -174,6 +185,17 @@ def _prepare_cluster(self):
174185 if rucio :
175186 self ._add_rucio_init_container ()
176187
188+ def _prepare_autoscaler (self ):
189+ """Prepare Dask autoscaler body."""
190+ # Add the name of the dask autoscaler
191+ self .autoscaler_body ["metadata" ] = {"name" : self .autoscaler_name }
192+
193+ # Connect autoscaler to the cluster
194+ self .autoscaler_body ["spec" ]["cluster" ] = self .cluster_name
195+
196+ # Set max limit on autoscaler
197+ self .autoscaler_body ["spec" ]["maximum" ] = self .num_of_workers
198+
177199 def _add_image_pull_secrets (self ):
178200 """Attach the configured image pull secrets to scheduler and worker containers."""
179201 image_pull_secrets = []
0 commit comments