11import logging
22import math
33import time
4- from collections import defaultdict
4+ from collections import Counter , defaultdict
55from dataclasses import dataclass
6- from logging import getLogger
7- from typing import TYPE_CHECKING , Callable , Dict , Optional
6+ from typing import TYPE_CHECKING , Any , Callable , Dict , List , Optional
87
98import ray
109from .base_autoscaling_coordinator import AutoscalingCoordinator
1615 RollingLogicalUtilizationGauge ,
1716)
1817from .util import cap_resource_request_to_limits
19- from ray ._private .ray_constants import env_float , env_integer
18+ from ray ._private .ray_constants import env_bool , env_float , env_integer
2019from ray .data ._internal .cluster_autoscaler import ClusterAutoscaler
2120from ray .data ._internal .execution .interfaces .execution_options import ExecutionResources
21+ from ray .data ._internal .execution .util import memory_string
2222
2323if TYPE_CHECKING :
2424 from ray .data ._internal .execution .resource_manager import ResourceManager
2525
26- logger = getLogger (__name__ )
26+ logger = logging . getLogger (__name__ )
2727
2828
2929@dataclass (frozen = True )
@@ -40,13 +40,28 @@ def __post_init__(self):
4040 assert isinstance (self .mem , int )
4141 assert self .mem >= 0
4242
43+ def __str__ (self ):
44+ return (
45+ "{"
46+ + f"CPU: { self .cpu } , GPU: { self .gpu } , memory: { memory_string (self .mem )} "
47+ + "}"
48+ )
49+
4350 @classmethod
4451 def of (cls , * , cpu = 0 , gpu = 0 , mem = 0 ):
4552 cpu = math .floor (cpu )
4653 gpu = math .floor (gpu )
4754 mem = math .floor (mem )
4855 return cls (cpu = cpu , gpu = gpu , mem = mem )
4956
57+ @classmethod
58+ def from_bundle (cls , bundle : Dict [str , Any ]) -> "_NodeResourceSpec" :
59+ return _NodeResourceSpec .of (
60+ cpu = bundle .get ("CPU" , 0 ),
61+ gpu = bundle .get ("GPU" , 0 ),
62+ mem = bundle .get ("memory" , 0 ),
63+ )
64+
5065 def to_bundle (self ):
5166 return {"CPU" : self .cpu , "GPU" : self .gpu , "memory" : self .mem }
5267
@@ -60,10 +75,9 @@ def _get_node_resource_spec_and_count() -> Dict[_NodeResourceSpec, int]:
6075 for node_group_config in cluster_config .node_group_configs :
6176 if not node_group_config .resources or node_group_config .max_count == 0 :
6277 continue
63- node_resource_spec = _NodeResourceSpec .of (
64- cpu = node_group_config .resources .get ("CPU" , 0 ),
65- gpu = node_group_config .resources .get ("GPU" , 0 ),
66- mem = node_group_config .resources .get ("memory" , 0 ),
78+
79+ node_resource_spec = _NodeResourceSpec .from_bundle (
80+ node_group_config .resources
6781 )
6882 nodes_resource_spec_count [node_resource_spec ] = 0
6983
@@ -75,9 +89,7 @@ def _get_node_resource_spec_and_count() -> Dict[_NodeResourceSpec, int]:
7589 ]
7690
7791 for r in node_resources :
78- node_resource_spec = _NodeResourceSpec .of (
79- cpu = r .get ("CPU" , 0 ), gpu = r .get ("GPU" , 0 ), mem = r .get ("memory" , 0 )
80- )
92+ node_resource_spec = _NodeResourceSpec .from_bundle (r )
8193 nodes_resource_spec_count [node_resource_spec ] += 1
8294
8395 return nodes_resource_spec_count
@@ -128,12 +140,16 @@ class DefaultClusterAutoscalerV2(ClusterAutoscaler):
128140 "RAY_DATA_AUTOSCALING_REQUEST_EXPIRE_TIME_S" ,
129141 180 ,
130142 )
143+ # Whether to disable INFO-level logs.
144+ RAY_DATA_DISABLE_AUTOSCALER_LOGGING = env_bool (
145+ "RAY_DATA_DISABLE_AUTOSCALER_LOGGING" , False
146+ )
131147
132148 def __init__ (
133149 self ,
134150 resource_manager : "ResourceManager" ,
135- resource_limits : ExecutionResources ,
136151 execution_id : str ,
152+ resource_limits : ExecutionResources = ExecutionResources .inf (),
137153 resource_utilization_calculator : Optional [ResourceUtilizationGauge ] = None ,
138154 cluster_scaling_up_util_threshold : float = DEFAULT_CLUSTER_SCALING_UP_UTIL_THRESHOLD , # noqa: E501
139155 cluster_scaling_up_delta : float = DEFAULT_CLUSTER_SCALING_UP_DELTA ,
@@ -160,7 +176,7 @@ def __init__(
160176 self ._resource_utilization_calculator = resource_utilization_calculator
161177 # Threshold of cluster utilization to trigger scaling up.
162178 self ._cluster_scaling_up_util_threshold = cluster_scaling_up_util_threshold
163- self ._cluster_scaling_up_delta = cluster_scaling_up_delta
179+ self ._cluster_scaling_up_delta = int ( math . ceil ( cluster_scaling_up_delta ))
164180 self ._min_gap_between_autoscaling_requests_s = (
165181 min_gap_between_autoscaling_requests_s
166182 )
@@ -204,30 +220,18 @@ def try_trigger_scaling(self):
204220
205221 # We separate active bundles (existing nodes) from pending bundles (scale-up delta)
206222 # to ensure existing nodes' resources are never crowded out by scale-up requests.
207- active_bundles = []
208- pending_bundles = []
209- debug_msg = ""
210- if logger .isEnabledFor (logging .DEBUG ):
211- debug_msg = (
212- "Scaling up cluster. Current utilization: "
213- f"CPU={ util .cpu :.2f} , GPU={ util .gpu :.2f} , object_store_memory={ util .object_store_memory :.2f} ."
214- " Requesting resources:"
215- )
216223 # TODO(hchen): We scale up all nodes by the same delta for now.
217224 # We may want to distinguish different node types based on their individual
218225 # utilization.
226+ active_bundles = []
227+ pending_bundles = []
219228 node_resource_spec_count = self ._get_node_counts ()
220229 for node_resource_spec , count in node_resource_spec_count .items ():
221230 bundle = node_resource_spec .to_bundle ()
222231 # Bundles for existing nodes -> active (must include)
223232 active_bundles .extend ([bundle ] * count )
224233 # Bundles for scale-up delta -> pending (best-effort)
225- delta_count = int (math .ceil (self ._cluster_scaling_up_delta ))
226- pending_bundles .extend ([bundle ] * delta_count )
227- if logger .isEnabledFor (logging .DEBUG ):
228- num_to_request = count + delta_count
229- debug_msg += f" [{ bundle } : { count } -> { num_to_request } ]"
230- logger .debug (debug_msg )
234+ pending_bundles .extend ([bundle ] * self ._cluster_scaling_up_delta )
231235
232236 # Cap the resource request to respect user-configured limits.
233237 # Active bundles (existing nodes) are always included; pending bundles
@@ -236,8 +240,42 @@ def try_trigger_scaling(self):
236240 active_bundles , pending_bundles , self ._resource_limits
237241 )
238242
243+ if resource_request != active_bundles :
244+ self ._log_resource_request (util , active_bundles , resource_request )
245+
239246 self ._send_resource_request (resource_request )
240247
248+ def _log_resource_request (
249+ self ,
250+ current_utilization : ExecutionResources ,
251+ active_bundles : List [Dict [str , float ]],
252+ resource_request : List [Dict [str , float ]],
253+ ) -> None :
254+ message = (
255+ "The utilization of one or more logical resource is higher than the "
256+ f"specified threshold of { self ._cluster_scaling_up_util_threshold :.0%} : "
257+ f"CPU={ current_utilization .cpu :.0%} , GPU={ current_utilization .gpu :.0%} , "
258+ f"object_store_memory={ current_utilization .object_store_memory :.0%} . "
259+ f"Requesting { self ._cluster_scaling_up_delta } node(s) of each shape:"
260+ )
261+
262+ current_node_counts = Counter (
263+ [_NodeResourceSpec .from_bundle (bundle ) for bundle in active_bundles ]
264+ )
265+ requested_node_counts = Counter (
266+ [_NodeResourceSpec .from_bundle (bundle ) for bundle in resource_request ]
267+ )
268+ for node_spec , requested_count in requested_node_counts .items ():
269+ current_count = current_node_counts .get (node_spec , 0 )
270+ message += f" [{ node_spec } : { current_count } -> { requested_count } ]"
271+
272+ if self .RAY_DATA_DISABLE_AUTOSCALER_LOGGING :
273+ level = logging .DEBUG
274+ else :
275+ level = logging .INFO
276+
277+ logger .log (level , message )
278+
241279 def _send_resource_request (self , resource_request ):
242280 # Make autoscaler resource request.
243281 self ._autoscaling_coordinator .request_resources (
0 commit comments