@@ -261,155 +261,6 @@ def add_nodes_to_cluster(cluster):
261261 )
262262
263263
264- @pytest .mark .parametrize ("multi_bundle" , [True , False ])
265- @pytest .mark .parametrize ("even_pack" , [True , False ])
266- @pytest .mark .parametrize ("scheduling_strategy" , ["SPREAD" , "STRICT_PACK" , "PACK" ])
267- def test_placement_group_max_cpu_frac (
268- ray_start_cluster , multi_bundle , even_pack , scheduling_strategy
269- ):
270- cluster = ray_start_cluster
271- cluster .add_node (num_cpus = 4 )
272- cluster .wait_for_nodes ()
273- ray .init (address = cluster .address )
274-
275- if multi_bundle :
276- bundles = [{"CPU" : 1 }] * 3
277- else :
278- bundles = [{"CPU" : 3 }]
279-
280- # Input validation - max_cpu_fraction_per_node must be between 0 and 1.
281- with pytest .raises (ValueError ):
282- ray .util .placement_group (bundles , _max_cpu_fraction_per_node = - 1 )
283- with pytest .raises (ValueError ):
284- ray .util .placement_group (bundles , _max_cpu_fraction_per_node = 2 )
285-
286- pg = ray .util .placement_group (
287- bundles , strategy = scheduling_strategy , _max_cpu_fraction_per_node = 0.5
288- )
289-
290- # Placement group will never be scheduled since it would violate the max CPU
291- # fraction reservation.
292- with pytest .raises (ray .exceptions .GetTimeoutError ):
293- ray .get (pg .ready (), timeout = 5 )
294-
295- # Add new node with enough CPU cores to scheduled placement group bundle while
296- # adhering to the max CPU fraction constraint.
297- if even_pack :
298- num_cpus = 6
299- else :
300- num_cpus = 8
301- cluster .add_node (num_cpus = num_cpus )
302- cluster .wait_for_nodes ()
303- # The placement group should be schedulable so this shouldn't raise.
304- ray .get (pg .ready (), timeout = 5 )
305-
306-
307- def test_placement_group_max_cpu_frac_multiple_pgs (ray_start_cluster ):
308- """
309- Make sure when there's more than 1 pg, they respect the fraction.
310- """
311- cluster = ray_start_cluster
312- cluster .add_node (num_cpus = 8 )
313- cluster .wait_for_nodes ()
314- ray .init (address = cluster .address )
315-
316- # This pg should be scheduable.
317- pg = ray .util .placement_group ([{"CPU" : 4 }], _max_cpu_fraction_per_node = 0.5 )
318- ray .get (pg .ready ())
319-
320- # When we schedule another placement group, it shouldn't be scheduled.
321- pg2 = ray .util .placement_group ([{"CPU" : 4 }], _max_cpu_fraction_per_node = 0.5 )
322- with pytest .raises (ray .exceptions .GetTimeoutError ):
323- ray .get (pg2 .ready (), timeout = 5 )
324-
325- # When you add a new node, it is finally schedulable.
326- cluster .add_node (num_cpus = 8 )
327- ray .get (pg2 .ready ())
328-
329-
330- def test_placement_group_max_cpu_frac_edge_cases (ray_start_cluster ):
331- """
332- _max_cpu_fraction_per_node <= 0 ---> should raise error (always)
333- _max_cpu_fraction_per_node = 0.999 --->
334- should exclude 1 CPU (this is already the case)
335- _max_cpu_fraction_per_node = 0.001 --->
336- should exclude 3 CPUs (not currently the case, we'll exclude all 4 CPUs).
337-
338- Related: https://github.com/ray-project/ray/issues/26635
339- """
340- cluster = ray_start_cluster
341- cluster .add_node (num_cpus = 4 )
342- cluster .wait_for_nodes ()
343- ray .init (address = cluster .address )
344-
345- """
346- 0 or 1 is not allowed.
347- """
348- with pytest .raises (ValueError ):
349- ray .util .placement_group ([{"CPU" : 1 }], _max_cpu_fraction_per_node = 0 )
350-
351- """
352- Make sure when _max_cpu_fraction_per_node = 0.999, 1 CPU is always excluded.
353- """
354- pg = ray .util .placement_group (
355- [{"CPU" : 1 } for _ in range (4 )], _max_cpu_fraction_per_node = 0.999
356- )
357- # Since 1 CPU is excluded, we cannot schedule this pg.
358- with pytest .raises (ray .exceptions .GetTimeoutError ):
359- ray .get (pg .ready (), timeout = 5 )
360- ray .util .remove_placement_group (pg )
361-
362- # Since 1 CPU is excluded, we can schedule 1 num_cpus actor after creating
363- # CPU: 1 * 3 bundle placement groups.
364- @ray .remote (num_cpus = 1 )
365- class A :
366- def ready (self ):
367- pass
368-
369- # Try actor creation -> pg creation.
370- a = A .remote ()
371- ray .get (a .ready .remote ())
372- pg = ray .util .placement_group (
373- [{"CPU" : 1 } for _ in range (3 )], _max_cpu_fraction_per_node = 0.999
374- )
375- ray .get (pg .ready ())
376-
377- ray .kill (a )
378- ray .util .remove_placement_group (pg )
379-
380- # Make sure the opposite order also works. pg creation -> actor creation.
381- pg = ray .util .placement_group (
382- [{"CPU" : 1 } for _ in range (3 )], _max_cpu_fraction_per_node = 0.999
383- )
384- a = A .remote ()
385- ray .get (a .ready .remote ())
386- ray .get (pg .ready ())
387-
388- ray .kill (a )
389- ray .util .remove_placement_group (pg )
390-
391- """
392- _max_cpu_fraction_per_node = 0.001 --->
393- should exclude 3 CPUs (not currently the case, we'll exclude all 4 CPUs).
394- """
395- # We can schedule up to 1 pg.
396- pg = ray .util .placement_group ([{"CPU" : 1 }], _max_cpu_fraction_per_node = 0.001 )
397- ray .get (pg .ready ())
398- # Cannot schedule any more PG.
399- pg2 = ray .util .placement_group ([{"CPU" : 1 }], _max_cpu_fraction_per_node = 0.001 )
400- with pytest .raises (ray .exceptions .GetTimeoutError ):
401- ray .get (pg2 .ready (), timeout = 5 )
402-
403- # Since 3 CPUs are excluded, we can schedule actors.
404- actors = [A .remote () for _ in range (3 )]
405- ray .get ([a .ready .remote () for a in actors ])
406-
407- # Once pg 1 is removed, pg 2 can be created since there's 1 CPU that can be
408- # used for this pg.
409- ray .util .remove_placement_group (pg )
410- ray .get (pg2 .ready ())
411-
412-
413264def test_placement_group_parallel_submission (ray_start_cluster ):
414265 NUM_PARALLEL_PGS = 5
415266 cluster = ray_start_cluster
0 commit comments