|
19 | 19 | DeploymentDownscaleRequest, |
20 | 20 | DeploymentSchedulingInfo, |
21 | 21 | ReplicaSchedulingRequest, |
| 22 | + ReplicaSchedulingRequestStatus, |
22 | 23 | Resources, |
23 | 24 | SpreadDeploymentSchedulingPolicy, |
24 | 25 | ) |
@@ -1424,6 +1425,161 @@ def on_scheduled(actor_handle, placement_group): |
1424 | 1425 | downscales={}, |
1425 | 1426 | ) |
1426 | 1427 |
|
| 1428 | + def test_actor_creation_failure_does_not_decrement_resources(self): |
| 1429 | + """When actor creation fails for a replica, available resources |
| 1430 | + should not be decremented so subsequent replicas in the same |
| 1431 | + scheduling batch can still use that node. |
| 1432 | + """ |
| 1433 | + |
| 1434 | + d_id = DeploymentID(name="deployment1") |
| 1435 | + node_id = NodeID.from_random().hex() |
| 1436 | + |
| 1437 | + cluster_node_info_cache = MockClusterNodeInfoCache() |
| 1438 | + # Node has exactly 2 CPUs — enough for two 1-CPU replicas. |
| 1439 | + cluster_node_info_cache.add_node(node_id, {"CPU": 2}) |
| 1440 | + |
| 1441 | + scheduler = default_impl.create_deployment_scheduler( |
| 1442 | + cluster_node_info_cache, |
| 1443 | + head_node_id_override="fake-head-node-id", |
| 1444 | + create_placement_group_fn_override=None, |
| 1445 | + ) |
| 1446 | + scheduler.on_deployment_created(d_id, SpreadDeploymentSchedulingPolicy()) |
| 1447 | + scheduler.on_deployment_deployed( |
| 1448 | + d_id, |
| 1449 | + ReplicaConfig.create(dummy, ray_actor_options={"num_cpus": 1}), |
| 1450 | + ) |
| 1451 | + |
| 1452 | + # Create a mock actor class whose .options().remote() raises on the |
| 1453 | + # first call (simulating actor creation failure) but succeeds after. |
| 1454 | + call_count = 0 |
| 1455 | + |
| 1456 | + class FailOnceMockActorClass(MockActorClass): |
| 1457 | + def remote(self, *args): |
| 1458 | + nonlocal call_count |
| 1459 | + call_count += 1 |
| 1460 | + if call_count == 1: |
| 1461 | + raise RuntimeError("Simulated actor creation failure") |
| 1462 | + return super().remote(*args) |
| 1463 | + |
| 1464 | + on_scheduled_mock = Mock() |
| 1465 | + r0_id = ReplicaID(unique_id="r0", deployment_id=d_id) |
| 1466 | + r1_id = ReplicaID(unique_id="r1", deployment_id=d_id) |
| 1467 | + |
| 1468 | + req0 = ReplicaSchedulingRequest( |
| 1469 | + replica_id=r0_id, |
| 1470 | + actor_def=FailOnceMockActorClass(), |
| 1471 | + actor_resources={"CPU": 1}, |
| 1472 | + actor_options={}, |
| 1473 | + actor_init_args=(), |
| 1474 | + on_scheduled=on_scheduled_mock, |
| 1475 | + ) |
| 1476 | + req1 = ReplicaSchedulingRequest( |
| 1477 | + replica_id=r1_id, |
| 1478 | + actor_def=MockActorClass(), |
| 1479 | + actor_resources={"CPU": 1}, |
| 1480 | + actor_options={}, |
| 1481 | + actor_init_args=(), |
| 1482 | + on_scheduled=on_scheduled_mock, |
| 1483 | + ) |
| 1484 | + |
| 1485 | + scheduler.schedule( |
| 1486 | + upscales={d_id: [req0, req1]}, |
| 1487 | + downscales={}, |
| 1488 | + ) |
| 1489 | + |
| 1490 | + # The first replica should have failed. |
| 1491 | + assert req0.status == ReplicaSchedulingRequestStatus.ACTOR_CREATION_FAILED |
| 1492 | + |
| 1493 | + # The second replica should have succeeded and been scheduled to the |
| 1494 | + # node. |
| 1495 | + assert req1.status == ReplicaSchedulingRequestStatus.SUCCEEDED |
| 1496 | + assert on_scheduled_mock.call_count == 1 |
| 1497 | + call = on_scheduled_mock.call_args_list[0] |
| 1498 | + scheduling_strategy = call.args[0]._options["scheduling_strategy"] |
| 1499 | + assert isinstance(scheduling_strategy, NodeAffinitySchedulingStrategy) |
| 1500 | + assert scheduling_strategy.node_id == node_id |
| 1501 | + |
| 1502 | + def test_pg_creation_failure_does_not_decrement_resources(self): |
| 1503 | + """When placement group creation fails for a replica, available |
| 1504 | + resources should not be decremented so subsequent replicas in the |
| 1505 | + same scheduling batch can still use that node. |
| 1506 | + """ |
| 1507 | + |
| 1508 | + d_id = DeploymentID(name="deployment1") |
| 1509 | + node_id = NodeID.from_random().hex() |
| 1510 | + |
| 1511 | + cluster_node_info_cache = MockClusterNodeInfoCache() |
| 1512 | + # Node has exactly 2 CPUs — enough for two replicas with 1-CPU PGs. |
| 1513 | + cluster_node_info_cache.add_node(node_id, {"CPU": 2}) |
| 1514 | + |
| 1515 | + call_count = 0 |
| 1516 | + |
| 1517 | + def fail_once_create_pg(request): |
| 1518 | + nonlocal call_count |
| 1519 | + call_count += 1 |
| 1520 | + if call_count == 1: |
| 1521 | + raise RuntimeError("Simulated PG creation failure") |
| 1522 | + return MockPlacementGroup(request) |
| 1523 | + |
| 1524 | + scheduler = default_impl.create_deployment_scheduler( |
| 1525 | + cluster_node_info_cache, |
| 1526 | + head_node_id_override="fake-head-node-id", |
| 1527 | + create_placement_group_fn_override=fail_once_create_pg, |
| 1528 | + ) |
| 1529 | + scheduler.on_deployment_created(d_id, SpreadDeploymentSchedulingPolicy()) |
| 1530 | + scheduler.on_deployment_deployed( |
| 1531 | + d_id, |
| 1532 | + ReplicaConfig.create( |
| 1533 | + dummy, |
| 1534 | + ray_actor_options={"num_cpus": 0}, |
| 1535 | + placement_group_bundles=[{"CPU": 1}], |
| 1536 | + placement_group_strategy="STRICT_PACK", |
| 1537 | + ), |
| 1538 | + ) |
| 1539 | + |
| 1540 | + on_scheduled_mock = Mock() |
| 1541 | + r0_id = ReplicaID(unique_id="r0", deployment_id=d_id) |
| 1542 | + r1_id = ReplicaID(unique_id="r1", deployment_id=d_id) |
| 1543 | + |
| 1544 | + req0 = ReplicaSchedulingRequest( |
| 1545 | + replica_id=r0_id, |
| 1546 | + actor_def=MockActorClass(), |
| 1547 | + actor_resources={"CPU": 0}, |
| 1548 | + placement_group_bundles=[{"CPU": 1}], |
| 1549 | + placement_group_strategy="STRICT_PACK", |
| 1550 | + actor_options={"name": "r0"}, |
| 1551 | + actor_init_args=(), |
| 1552 | + on_scheduled=on_scheduled_mock, |
| 1553 | + ) |
| 1554 | + req1 = ReplicaSchedulingRequest( |
| 1555 | + replica_id=r1_id, |
| 1556 | + actor_def=MockActorClass(), |
| 1557 | + actor_resources={"CPU": 0}, |
| 1558 | + placement_group_bundles=[{"CPU": 1}], |
| 1559 | + placement_group_strategy="STRICT_PACK", |
| 1560 | + actor_options={"name": "r1"}, |
| 1561 | + actor_init_args=(), |
| 1562 | + on_scheduled=on_scheduled_mock, |
| 1563 | + ) |
| 1564 | + |
| 1565 | + scheduler.schedule( |
| 1566 | + upscales={d_id: [req0, req1]}, |
| 1567 | + downscales={}, |
| 1568 | + ) |
| 1569 | + |
| 1570 | + # The first replica should have failed at PG creation. |
| 1571 | + assert ( |
| 1572 | + req0.status |
| 1573 | + == ReplicaSchedulingRequestStatus.PLACEMENT_GROUP_CREATION_FAILED |
| 1574 | + ) |
| 1575 | + |
| 1576 | + # The second replica should still succeed. |
| 1577 | + assert req1.status == ReplicaSchedulingRequestStatus.SUCCEEDED |
| 1578 | + assert on_scheduled_mock.call_count == 1 |
| 1579 | + call = on_scheduled_mock.call_args_list[0] |
| 1580 | + scheduling_strategy = call.args[0]._options["scheduling_strategy"] |
| 1581 | + assert isinstance(scheduling_strategy, PlacementGroupSchedulingStrategy) |
| 1582 | + |
1427 | 1583 |
|
1428 | 1584 | if __name__ == "__main__": |
1429 | 1585 | sys.exit(pytest.main(["-v", "-s", __file__])) |
0 commit comments