@@ -205,83 +205,111 @@ impl WorkerManager for RayWorkerManager {
205205 )
206206 } ) ;
207207
208- let mut state = self
209- . state
210- . lock ( )
211- . expect ( "Failed to lock RayWorkerManagerState" ) ;
212-
213- // If no desired bundles, clear outstanding autoscaler requests instead of scale-up.
214- if bundles. is_empty ( )
215- || ( requested_num_cpus <= 0.0
216- && requested_num_gpus <= 0.0
217- && requested_memory_bytes == 0 )
218- {
219- Python :: attach ( |py| -> DaftResult < ( ) > {
220- let flotilla_module = py. import ( pyo3:: intern!( py, "daft.runners.flotilla" ) ) ?;
221- flotilla_module. call_method0 ( pyo3:: intern!( py, "clear_autoscaling_requests" ) ) ?;
222- Ok ( ( ) )
223- } ) ?;
224- state. max_resources_requested = ResourceRequest :: default ( ) ;
225- return Ok ( ( ) ) ;
208+ enum AutoscaleAction {
209+ Noop ,
210+ Clear ,
211+ ScaleUp {
212+ python_bundles : Vec < HashMap < & ' static str , i64 > > ,
213+ } ,
226214 }
227215
228- let ( cluster_num_cpus, cluster_num_gpus, cluster_memory_bytes) = state
229- . ray_workers
230- . values ( )
231- . fold ( ( 0.0 , 0.0 , 0 ) , |acc, worker| {
232- (
233- acc. 0 + worker. total_num_cpus ( ) ,
234- acc. 1 + worker. total_num_gpus ( ) ,
235- acc. 2 + worker. total_memory_bytes ( ) ,
236- )
237- } ) ;
238-
239- let resource_request_greater_than_current_capacity = requested_num_cpus > cluster_num_cpus
240- || requested_num_gpus > cluster_num_gpus
241- || requested_memory_bytes > cluster_memory_bytes;
242-
243- let resource_request_greater_than_max_requested = requested_num_cpus
244- > state. max_resources_requested . num_cpus ( ) . unwrap_or ( 0.0 )
245- || requested_num_gpus > state. max_resources_requested . num_gpus ( ) . unwrap_or ( 0.0 )
246- || requested_memory_bytes > state. max_resources_requested . memory_bytes ( ) . unwrap_or ( 0 ) ;
247-
248- let cluster_is_zero_capacity =
249- cluster_num_cpus <= 0.0 && cluster_num_gpus <= 0.0 && cluster_memory_bytes == 0 ;
250- let should_bootstrap = cluster_is_zero_capacity
251- && ( requested_num_cpus > 0.0 || requested_num_gpus > 0.0 || requested_memory_bytes > 0 ) ;
216+ let mut action = AutoscaleAction :: Noop ;
252217
253- // Only autoscale if we need more capacity AND this is greater than we've seen before
254- if ( resource_request_greater_than_current_capacity
255- && resource_request_greater_than_max_requested)
256- || should_bootstrap
257218 {
258- // On scale-up demand, allow previously blacklisted workers to be reused immediately.
259- state. pending_release_blacklist . clear ( ) ;
260- state. last_refresh = None ;
261- state. max_resources_requested = ResourceRequest :: try_new_internal (
262- Some ( requested_num_cpus) ,
263- Some ( requested_num_gpus) ,
264- Some ( requested_memory_bytes) ,
265- ) ?;
266- let python_bundles = bundles
267- . iter ( )
268- . map ( |bundle| {
269- let mut dict = HashMap :: new ( ) ;
270- dict. insert ( "CPU" , bundle. num_cpus ( ) . ceil ( ) as i64 ) ;
271- dict. insert ( "GPU" , bundle. num_gpus ( ) . ceil ( ) as i64 ) ;
272- dict. insert ( "memory" , bundle. memory_bytes ( ) as i64 ) ;
273- dict
274- } )
275- . collect :: < Vec < _ > > ( ) ;
219+ let mut state = self
220+ . state
221+ . lock ( )
222+ . expect ( "Failed to lock RayWorkerManagerState" ) ;
276223
277- Python :: attach ( |py| -> DaftResult < ( ) > {
278- let flotilla_module = py. import ( pyo3:: intern!( py, "daft.runners.flotilla" ) ) ?;
279- flotilla_module
280- . call_method1 ( pyo3:: intern!( py, "try_autoscale" ) , ( python_bundles, ) ) ?;
224+ // If no desired bundles, clear outstanding autoscaler requests instead of scale-up.
225+ if bundles. is_empty ( )
226+ || ( requested_num_cpus <= 0.0
227+ && requested_num_gpus <= 0.0
228+ && requested_memory_bytes == 0 )
229+ {
230+ state. max_resources_requested = ResourceRequest :: default ( ) ;
231+ action = AutoscaleAction :: Clear ;
232+ } else {
233+ let ( cluster_num_cpus, cluster_num_gpus, cluster_memory_bytes) = state
234+ . ray_workers
235+ . values ( )
236+ . fold ( ( 0.0 , 0.0 , 0 ) , |acc, worker| {
237+ (
238+ acc. 0 + worker. total_num_cpus ( ) ,
239+ acc. 1 + worker. total_num_gpus ( ) ,
240+ acc. 2 + worker. total_memory_bytes ( ) ,
241+ )
242+ } ) ;
243+
244+ let resource_request_greater_than_current_capacity = requested_num_cpus
245+ > cluster_num_cpus
246+ || requested_num_gpus > cluster_num_gpus
247+ || requested_memory_bytes > cluster_memory_bytes;
248+
249+ let resource_request_greater_than_max_requested = requested_num_cpus
250+ > state. max_resources_requested . num_cpus ( ) . unwrap_or ( 0.0 )
251+ || requested_num_gpus > state. max_resources_requested . num_gpus ( ) . unwrap_or ( 0.0 )
252+ || requested_memory_bytes
253+ > state. max_resources_requested . memory_bytes ( ) . unwrap_or ( 0 ) ;
254+
255+ let cluster_is_zero_capacity =
256+ cluster_num_cpus <= 0.0 && cluster_num_gpus <= 0.0 && cluster_memory_bytes == 0 ;
257+ let should_bootstrap = cluster_is_zero_capacity
258+ && ( requested_num_cpus > 0.0
259+ || requested_num_gpus > 0.0
260+ || requested_memory_bytes > 0 ) ;
261+
262+ // Only autoscale if we need more capacity AND this is greater than we've seen before
263+ if ( resource_request_greater_than_current_capacity
264+ && resource_request_greater_than_max_requested)
265+ || should_bootstrap
266+ {
267+ // On scale-up demand, allow previously blacklisted workers to be reused immediately.
268+ state. pending_release_blacklist . clear ( ) ;
269+ state. last_refresh = None ;
270+ state. max_resources_requested = ResourceRequest :: try_new_internal (
271+ Some ( requested_num_cpus) ,
272+ Some ( requested_num_gpus) ,
273+ Some ( requested_memory_bytes) ,
274+ ) ?;
275+
276+ let python_bundles = bundles
277+ . iter ( )
278+ . map ( |bundle| {
279+ let mut dict = HashMap :: new ( ) ;
280+ dict. insert ( "CPU" , bundle. num_cpus ( ) . ceil ( ) as i64 ) ;
281+ dict. insert ( "GPU" , bundle. num_gpus ( ) . ceil ( ) as i64 ) ;
282+ dict. insert ( "memory" , bundle. memory_bytes ( ) as i64 ) ;
283+ dict
284+ } )
285+ . collect :: < Vec < _ > > ( ) ;
286+
287+ action = AutoscaleAction :: ScaleUp { python_bundles } ;
288+ }
289+ }
290+ }
291+
292+ match action {
293+ AutoscaleAction :: Noop => Ok ( ( ) ) ,
294+ AutoscaleAction :: Clear => {
295+ Python :: attach ( |py| -> DaftResult < ( ) > {
296+ let flotilla_module = py. import ( pyo3:: intern!( py, "daft.runners.flotilla" ) ) ?;
297+ flotilla_module
298+ . call_method0 ( pyo3:: intern!( py, "clear_autoscaling_requests" ) ) ?;
299+ Ok ( ( ) )
300+ } ) ?;
281301 Ok ( ( ) )
282- } ) ?;
302+ }
303+ AutoscaleAction :: ScaleUp { python_bundles } => {
304+ Python :: attach ( |py| -> DaftResult < ( ) > {
305+ let flotilla_module = py. import ( pyo3:: intern!( py, "daft.runners.flotilla" ) ) ?;
306+ flotilla_module
307+ . call_method1 ( pyo3:: intern!( py, "try_autoscale" ) , ( python_bundles, ) ) ?;
308+ Ok ( ( ) )
309+ } ) ?;
310+ Ok ( ( ) )
311+ }
283312 }
284- Ok ( ( ) )
285313 }
286314
287315 fn retire_idle_ray_workers (
0 commit comments