Skip to content

Commit 618147f

Browse files
Revert "Updating zero capacity resource semantics (#4555)"
This reverts commit 0f42f87.
1 parent 8f37d49 commit 618147f

File tree

17 files changed

+191
-246
lines changed

17 files changed

+191
-246
lines changed

java/api/src/main/java/org/ray/api/options/BaseTaskOptions.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,13 @@
77
* The options class for RayCall or ActorCreation.
88
*/
99
public abstract class BaseTaskOptions {
10-
public final Map<String, Double> resources;
10+
public Map<String, Double> resources;
1111

1212
public BaseTaskOptions() {
1313
resources = new HashMap<>();
1414
}
1515

1616
public BaseTaskOptions(Map<String, Double> resources) {
17-
for (Map.Entry<String, Double> entry : resources.entrySet()) {
18-
if (entry.getValue().compareTo(0.0) <= 0) {
19-
throw new IllegalArgumentException(String.format("Resource capacity should be " +
20-
"positive, but got resource %s = %f.", entry.getKey(), entry.getValue()));
21-
}
22-
}
2317
this.resources = resources;
2418
}
2519

java/example.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ ray {
1414
run-mode = CLUSTER
1515

1616
// Available resources on this node.
17-
resources: "CPU:4"
17+
resources: "CPU:4,GPU:0"
1818

1919
// The address of the redis server to connect, in format `ip:port`.
2020
// If not provided, Ray processes will be started locally, including

java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java

+6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.ray.runtime.task.ArgumentsBuilder;
3333
import org.ray.runtime.task.TaskLanguage;
3434
import org.ray.runtime.task.TaskSpec;
35+
import org.ray.runtime.util.ResourceUtil;
3536
import org.ray.runtime.util.UniqueIdUtil;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
@@ -356,6 +357,11 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
356357
resources = new HashMap<>(taskOptions.resources);
357358
}
358359

360+
if (!resources.containsKey(ResourceUtil.CPU_LITERAL)
361+
&& !resources.containsKey(ResourceUtil.CPU_LITERAL.toLowerCase())) {
362+
resources.put(ResourceUtil.CPU_LITERAL, 0.0);
363+
}
364+
359365
int maxActorReconstruction = 0;
360366
if (taskOptions instanceof ActorCreationOptions) {
361367
maxActorReconstruction = ((ActorCreationOptions) taskOptions).maxReconstructions;

java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java

+4
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ public RayConfig(Config config) {
104104
+ "setting it to the number of CPU cores: {}", numCpu);
105105
resources.put("CPU", numCpu * 1.0);
106106
}
107+
if (!resources.containsKey("GPU")) {
108+
LOGGER.warn("No GPU resource is set in configuration, setting it to 0");
109+
resources.put("GPU", 0.0);
110+
}
107111
}
108112
// Driver id.
109113
String driverId = config.getString("ray.driver.id");
+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
ray {
22
run-mode = SINGLE_PROCESS
3-
resources = "CPU:4"
3+
resources = "CPU:4,GPU:0"
44
redis.address = ""
55
}

java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java

+6-14
Original file line numberDiff line numberDiff line change
@@ -46,38 +46,30 @@ public Integer echo(Integer number) {
4646
@Test
4747
public void testMethods() {
4848
TestUtils.skipTestUnderSingleProcess();
49-
CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0));
49+
CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 0.0));
5050

5151
// This is a case that can satisfy required resources.
5252
// The static resources for test are "CPU:4,RES-A:4".
5353
RayObject<Integer> result1 = Ray.call(ResourcesManagementTest::echo, 100, callOptions1);
5454
Assert.assertEquals(100, (int) result1.get());
5555

56-
CallOptions callOptions2 = new CallOptions(ImmutableMap.of("CPU", 4.0));
56+
CallOptions callOptions2 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 2.0));
5757

5858
// This is a case that can't satisfy required resources.
5959
// The static resources for test are "CPU:4,RES-A:4".
6060
final RayObject<Integer> result2 = Ray.call(ResourcesManagementTest::echo, 200, callOptions2);
6161
WaitResult<Integer> waitResult = Ray.wait(ImmutableList.of(result2), 1, 1000);
6262

63-
Assert.assertEquals(1, waitResult.getReady().size());
64-
Assert.assertEquals(0, waitResult.getUnready().size());
65-
66-
try {
67-
CallOptions callOptions3 = new CallOptions(ImmutableMap.of("CPU", 0.0));
68-
Assert.fail();
69-
} catch (RuntimeException e) {
70-
// We should receive a RuntimeException indicates that we should not
71-
// pass a zero capacity resource.
72-
}
63+
Assert.assertEquals(0, waitResult.getReady().size());
64+
Assert.assertEquals(1, waitResult.getUnready().size());
7365
}
7466

7567
@Test
7668
public void testActors() {
7769
TestUtils.skipTestUnderSingleProcess();
7870

7971
ActorCreationOptions actorCreationOptions1 =
80-
new ActorCreationOptions(ImmutableMap.of("CPU", 2.0));
72+
new ActorCreationOptions(ImmutableMap.of("CPU", 2.0, "GPU", 0.0));
8173

8274
// This is a case that can satisfy required resources.
8375
// The static resources for test are "CPU:4,RES-A:4".
@@ -88,7 +80,7 @@ public void testActors() {
8880
// This is a case that can't satisfy required resources.
8981
// The static resources for test are "CPU:4,RES-A:4".
9082
ActorCreationOptions actorCreationOptions2 =
91-
new ActorCreationOptions(ImmutableMap.of("CPU", 8.0));
83+
new ActorCreationOptions(ImmutableMap.of("CPU", 8.0, "GPU", 0.0));
9284

9385
RayActor<ResourcesManagementTest.Echo> echo2 =
9486
Ray.createActor(Echo::new, actorCreationOptions2);

python/ray/includes/task.pxi

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ cdef class Task:
4444
# Parse the resource map.
4545
if resource_map is not None:
4646
required_resources = resource_map_from_dict(resource_map)
47+
if required_resources.count(b"CPU") == 0:
48+
required_resources[b"CPU"] = 1.0
4749
if placement_resource_map is not None:
4850
required_placement_resources = (
4951
resource_map_from_dict(placement_resource_map))

python/ray/services.py

+3-15
Original file line numberDiff line numberDiff line change
@@ -1029,25 +1029,14 @@ def check_and_update_resources(num_cpus, num_gpus, resources):
10291029
if gpu_ids is not None:
10301030
resources["GPU"] = min(resources["GPU"], len(gpu_ids))
10311031

1032-
resources = {
1033-
resource_label: resource_quantity
1034-
for resource_label, resource_quantity in resources.items()
1035-
if resource_quantity != 0
1036-
}
1037-
10381032
# Check types.
10391033
for _, resource_quantity in resources.items():
10401034
assert (isinstance(resource_quantity, int)
10411035
or isinstance(resource_quantity, float))
10421036
if (isinstance(resource_quantity, float)
10431037
and not resource_quantity.is_integer()):
1044-
raise ValueError(
1045-
"Resource quantities must all be whole numbers. Received {}.".
1046-
format(resources))
1047-
if resource_quantity < 0:
1048-
raise ValueError(
1049-
"Resource quantities must be nonnegative. Received {}.".format(
1050-
resources))
1038+
raise ValueError("Resource quantities must all be whole numbers.")
1039+
10511040
if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY:
10521041
raise ValueError("Resource quantities must be at most {}.".format(
10531042
ray_constants.MAX_RESOURCE_QUANTITY))
@@ -1124,9 +1113,8 @@ def start_raylet(redis_address,
11241113

11251114
# Limit the number of workers that can be started in parallel by the
11261115
# raylet. However, make sure it is at least 1.
1127-
num_cpus_static = static_resources.get("CPU", 0)
11281116
maximum_startup_concurrency = max(
1129-
1, min(multiprocessing.cpu_count(), num_cpus_static))
1117+
1, min(multiprocessing.cpu_count(), static_resources["CPU"]))
11301118

11311119
# Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
11321120
resource_argument = ",".join(

python/ray/tests/test_basic.py

+3-29
Original file line numberDiff line numberDiff line change
@@ -1948,15 +1948,15 @@ def run_lots_of_tasks():
19481948
store_names = []
19491949
store_names += [
19501950
client["ObjectStoreSocketName"] for client in client_table
1951-
if client["Resources"].get("GPU", 0) == 0
1951+
if client["Resources"]["GPU"] == 0
19521952
]
19531953
store_names += [
19541954
client["ObjectStoreSocketName"] for client in client_table
1955-
if client["Resources"].get("GPU", 0) == 5
1955+
if client["Resources"]["GPU"] == 5
19561956
]
19571957
store_names += [
19581958
client["ObjectStoreSocketName"] for client in client_table
1959-
if client["Resources"].get("GPU", 0) == 1
1959+
if client["Resources"]["GPU"] == 1
19601960
]
19611961
assert len(store_names) == 3
19621962

@@ -2126,32 +2126,6 @@ def f():
21262126
ray.get(results)
21272127

21282128

2129-
def test_zero_capacity_deletion_semantics(shutdown_only):
2130-
ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1})
2131-
2132-
def test():
2133-
resources = ray.global_state.available_resources()
2134-
retry_count = 0
2135-
2136-
while resources and retry_count < 5:
2137-
time.sleep(0.1)
2138-
resources = ray.global_state.available_resources()
2139-
retry_count += 1
2140-
2141-
if retry_count >= 5:
2142-
raise RuntimeError("Resources were available even after retries.")
2143-
2144-
return resources
2145-
2146-
function = ray.remote(
2147-
num_cpus=2, num_gpus=1, resources={"test_resource": 1})(test)
2148-
cluster_resources = ray.get(function.remote())
2149-
2150-
# All cluster resources should be utilized and
2151-
# cluster_resources must be empty
2152-
assert cluster_resources == {}
2153-
2154-
21552129
@pytest.fixture
21562130
def save_gpu_ids_shutdown_only():
21572131
# Record the curent value of this environment variable so that we can

python/ray/tests/test_global_state.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ def cpu_task():
5151

5252
while not resource_used:
5353
available_resources = ray.global_state.available_resources()
54-
resource_used = available_resources.get(
55-
"CPU", 0) == cluster_resources.get("CPU", 0) - 1
54+
resource_used = available_resources[
55+
"CPU"] == cluster_resources["CPU"] - 1
5656

5757
assert resource_used
5858

python/ray/tune/ray_trial_executor.py

+8-13
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import traceback
1212

1313
import ray
14-
from ray.tune.error import AbortTrialExecution
14+
from ray.tune.error import TuneError, AbortTrialExecution
1515
from ray.tune.logger import NoopLogger
1616
from ray.tune.trial import Trial, Resources, Checkpoint
1717
from ray.tune.trial_executor import TrialExecutor
@@ -363,22 +363,17 @@ def _update_avail_resources(self, num_retries=5):
363363
resources = ray.services.check_and_update_resources(
364364
None, None, None)
365365
if not resources:
366-
logger.warning(
367-
"Cluster resources not detected or are 0. Retrying...")
366+
logger.warning("Cluster resources not detected. Retrying...")
368367
time.sleep(0.5)
369368

370-
if not resources:
371-
# NOTE: This hides the possibility that Ray may be waiting for
372-
# clients to connect.
373-
resources.setdefault("CPU", 0)
374-
resources.setdefault("GPU", 0)
375-
logger.warning("Cluster resources cannot be detected or are 0. "
376-
"You can resume this experiment by passing in "
377-
"`resume=True` to `run`.")
369+
if not resources or "CPU" not in resources:
370+
raise TuneError("Cluster resources cannot be detected. "
371+
"You can resume this experiment by passing in "
372+
"`resume=True` to `run`.")
378373

379374
resources = resources.copy()
380-
num_cpus = resources.pop("CPU", 0)
381-
num_gpus = resources.pop("GPU", 0)
375+
num_cpus = resources.pop("CPU")
376+
num_gpus = resources.pop("GPU")
382377
custom_resources = resources
383378

384379
self._avail_resources = Resources(

python/ray/worker.py

+1-8
Original file line numberDiff line numberDiff line change
@@ -653,13 +653,6 @@ def submit_task(self,
653653
raise ValueError(
654654
"Resource quantities must all be whole numbers.")
655655

656-
# Remove any resources with zero quantity requirements
657-
resources = {
658-
resource_label: resource_quantity
659-
for resource_label, resource_quantity in resources.items()
660-
if resource_quantity > 0
661-
}
662-
663656
if placement_resources is None:
664657
placement_resources = {}
665658

@@ -1877,7 +1870,7 @@ def connect(node,
18771870
nil_actor_counter, # actor_counter.
18781871
[], # new_actor_handles.
18791872
[], # execution_dependencies.
1880-
{}, # resource_map.
1873+
{"CPU": 0}, # resource_map.
18811874
{}, # placement_resource_map.
18821875
)
18831876

src/ray/raylet/node_manager.cc

+14-19
Original file line numberDiff line numberDiff line change
@@ -1469,16 +1469,15 @@ void NodeManager::HandleTaskBlocked(const std::shared_ptr<LocalClientConnection>
14691469
// Get the CPU resources required by the running task.
14701470
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
14711471
double required_cpus = required_resources.GetNumCpus();
1472-
std::unordered_map<std::string, double> cpu_resources;
1473-
if (required_cpus > 0) {
1474-
cpu_resources[kCPU_ResourceLabel] = required_cpus;
1475-
}
1472+
const std::unordered_map<std::string, double> cpu_resources = {
1473+
{kCPU_ResourceLabel, required_cpus}};
14761474

14771475
// Release the CPU resources.
14781476
auto const cpu_resource_ids = worker->ReleaseTaskCpuResources();
14791477
local_available_resources_.Release(cpu_resource_ids);
1480-
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
1481-
ResourceSet(cpu_resources));
1478+
RAY_CHECK(
1479+
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
1480+
ResourceSet(cpu_resources)));
14821481
worker->MarkBlocked();
14831482

14841483
// Try dispatching tasks since we may have released some resources.
@@ -1522,11 +1521,9 @@ void NodeManager::HandleTaskUnblocked(
15221521
// Get the CPU resources required by the running task.
15231522
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
15241523
double required_cpus = required_resources.GetNumCpus();
1525-
std::unordered_map<std::string, double> cpu_resources_map;
1526-
if (required_cpus > 0) {
1527-
cpu_resources_map[kCPU_ResourceLabel] = required_cpus;
1528-
}
1529-
const ResourceSet cpu_resources(cpu_resources_map);
1524+
const ResourceSet cpu_resources(
1525+
std::unordered_map<std::string, double>({{kCPU_ResourceLabel, required_cpus}}));
1526+
15301527
// Check if we can reacquire the CPU resources.
15311528
bool oversubscribed = !local_available_resources_.Contains(cpu_resources);
15321529

@@ -1536,8 +1533,9 @@ void NodeManager::HandleTaskUnblocked(
15361533
// reacquire here may be different from the ones that the task started with.
15371534
auto const resource_ids = local_available_resources_.Acquire(cpu_resources);
15381535
worker->AcquireTaskCpuResources(resource_ids);
1539-
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire(
1540-
cpu_resources);
1536+
RAY_CHECK(
1537+
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire(
1538+
cpu_resources));
15411539
} else {
15421540
// In this case, we simply don't reacquire the CPU resources for the worker.
15431541
// The worker can keep running and when the task finishes, it will simply
@@ -1629,7 +1627,7 @@ bool NodeManager::AssignTask(const Task &task) {
16291627
auto acquired_resources =
16301628
local_available_resources_.Acquire(spec.GetRequiredResources());
16311629
const auto &my_client_id = gcs_client_->client_table().GetLocalClientId();
1632-
cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources());
1630+
RAY_CHECK(cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources()));
16331631

16341632
if (spec.IsActorCreationTask()) {
16351633
// Check that we are not placing an actor creation task on a node with 0 CPUs.
@@ -1743,8 +1741,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
17431741
// Release task's resources. The worker's lifetime resources are still held.
17441742
auto const &task_resources = worker.GetTaskResourceIds();
17451743
local_available_resources_.Release(task_resources);
1746-
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
1747-
task_resources.ToResourceSet());
1744+
RAY_CHECK(cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
1745+
task_resources.ToResourceSet()));
17481746
worker.ResetTaskResourceIds();
17491747

17501748
// If this was an actor or actor creation task, handle the actor's new state.
@@ -2036,9 +2034,6 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
20362034

20372035
RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager "
20382036
<< node_manager_id;
2039-
2040-
// TODO(romilb): We should probably revert the load subtraction from
2041-
// SchedulingPolicy::Schedule()
20422037
// Mark the failed task as pending to let other raylets know that we still
20432038
// have the task. TaskDependencyManager::TaskPending() is assumed to be
20442039
// idempotent.

src/ray/raylet/scheduling_policy.cc

+3-5
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,11 @@ std::unordered_map<TaskID, ClientID> SchedulingPolicy::Schedule(
4949
const auto &node_resources = client_resource_pair.second;
5050
ResourceSet available_node_resources =
5151
ResourceSet(node_resources.GetAvailableResources());
52-
// TODO(romilb): Why do we need to subtract load from available resources?
53-
// Even if we don't the code path below for choosing a dst_client_id would be
54-
// similar.
55-
available_node_resources.SubtractResources(node_resources.GetLoadResources());
52+
available_node_resources.SubtractResourcesStrict(node_resources.GetLoadResources());
5653
RAY_LOG(DEBUG) << "client_id " << node_client_id
5754
<< " avail: " << node_resources.GetAvailableResources().ToString()
58-
<< " load: " << node_resources.GetLoadResources().ToString();
55+
<< " load: " << node_resources.GetLoadResources().ToString()
56+
<< " avail-load: " << available_node_resources.ToString();
5957

6058
if (resource_demand.IsSubset(available_node_resources)) {
6159
// This node is a feasible candidate.

0 commit comments

Comments
 (0)