Skip to content

Commit abf2a70

Browse files
[core] Add task and object reconstruction status to ray memory (ray-project#22317)
Improve observability for general objects and lineage reconstruction by adding a "Status" field to `ray memory`. The value of the field can be: ``` // The task is waiting for its dependencies to be created. WAITING_FOR_DEPENDENCIES = 1; // All dependencies have been created and the task is scheduled to execute. SCHEDULED = 2; // The task finished successfully. FINISHED = 3; ``` In addition, tasks that failed or that needed to be re-executed due to lineage reconstruction will have a field listing the attempt number. Example output: ``` IP Address | PID | Type | Call Site | Status | Size | Reference Type | Object Ref 192.168.4.22 | 279475 | Driver | (task call) ... | Attempt #2: FINISHED | 10000254.0 B | LOCAL_REFERENCE | c2668a65bda616c1ffffffffffffffffffffffff0100000001000000 ```
1 parent 9261428 commit abf2a70

File tree

16 files changed

+283
-20
lines changed

16 files changed

+283
-20
lines changed

dashboard/memory_utils.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ def __init__(
9999
self.node_address = node_address
100100

101101
# object info
102+
self.task_status = object_ref.get("taskStatus", "?")
103+
if self.task_status == "NIL":
104+
self.task_status = "-"
105+
self.attempt_number = int(object_ref.get("attemptNumber", 0))
106+
if self.attempt_number > 0:
107+
self.task_status = f"Attempt #{self.attempt_number + 1}: {self.task_status}"
102108
self.object_size = int(object_ref.get("objectSize", -1))
103109
self.call_site = object_ref.get("callSite", "<Unknown>")
104110
self.object_ref = ray.ObjectRef(
@@ -177,6 +183,7 @@ def as_dict(self):
177183
"object_size": self.object_size,
178184
"reference_type": self.reference_type,
179185
"call_site": self.call_site,
186+
"task_status": self.task_status,
180187
"local_ref_count": self.local_ref_count,
181188
"pinned_in_memory": self.pinned_in_memory,
182189
"submitted_task_ref_count": self.submitted_task_ref_count,
@@ -385,9 +392,14 @@ def memory_summary(
385392
# Fetch core memory worker stats, store as a dictionary
386393
core_worker_stats = []
387394
for raylet in state.node_table():
388-
stats = node_stats_to_dict(
389-
node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"])
390-
)
395+
if not raylet["Alive"]:
396+
continue
397+
try:
398+
stats = node_stats_to_dict(
399+
node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"])
400+
)
401+
except RuntimeError:
402+
continue
391403
core_worker_stats.extend(stats["coreWorkersStats"])
392404
assert type(stats) is dict and "coreWorkersStats" in stats
393405

@@ -407,7 +419,7 @@ def memory_summary(
407419
"Mem Used by Objects",
408420
"Local References",
409421
"Pinned",
410-
"Pending Tasks",
422+
"Used by task",
411423
"Captured in Objects",
412424
"Actor Handles",
413425
]
@@ -418,15 +430,16 @@ def memory_summary(
418430
"PID",
419431
"Type",
420432
"Call Site",
433+
"Status",
421434
"Size",
422435
"Reference Type",
423436
"Object Ref",
424437
]
425438
object_ref_string = "{:<13} | {:<8} | {:<7} | {:<9} \
426-
| {:<8} | {:<14} | {:<10}\n"
439+
| {:<9} | {:<8} | {:<14} | {:<10}\n"
427440

428441
if size > line_wrap_threshold and line_wrap:
429-
object_ref_string = "{:<15} {:<5} {:<6} {:<22} {:<6} {:<18} \
442+
object_ref_string = "{:<15} {:<5} {:<6} {:<22} {:<14} {:<6} {:<18} \
430443
{:<56}\n"
431444

432445
mem += f"Grouping by {group_by}...\
@@ -469,14 +482,22 @@ def memory_summary(
469482
entry["call_site"][i : i + call_site_length]
470483
for i in range(0, len(entry["call_site"]), call_site_length)
471484
]
472-
num_lines = len(entry["call_site"])
485+
486+
task_status_length = 12
487+
entry["task_status"] = [
488+
entry["task_status"][i : i + task_status_length]
489+
for i in range(0, len(entry["task_status"]), task_status_length)
490+
]
491+
num_lines = max(len(entry["call_site"]), len(entry["task_status"]))
492+
473493
else:
474494
mem += "\n"
475495
object_ref_values = [
476496
entry["node_ip_address"],
477497
entry["pid"],
478498
entry["type"],
479499
entry["call_site"],
500+
entry["task_status"],
480501
entry["object_size"],
481502
entry["reference_type"],
482503
entry["object_ref"],

python/ray/tests/test_memstat.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66
import ray
77
from ray.cluster_utils import Cluster, cluster_not_supported
88
from ray.internal.internal_api import memory_summary
9+
from ray._private.test_utils import (
10+
wait_for_condition,
11+
Semaphore,
12+
)
13+
914

1015
# RayConfig to enable recording call sites during ObjectRej creations.
1116
ray_config = {"record_ref_creation_sites": True}
@@ -37,6 +42,11 @@
3742
OBJECT_SIZE = "object size"
3843
REFERENCE_TYPE = "reference type"
3944

45+
# Task status.
46+
WAITING_FOR_DEPENDENCIES = "WAITING_FOR_DEPENDENCIES"
47+
SCHEDULED = "SCHEDULED"
48+
FINISHED = "FINISHED"
49+
4050

4151
def data_lines(memory_str):
4252
for line in memory_str.split("\n"):
@@ -309,6 +319,56 @@ def test_memory_used_output(ray_start_regular):
309319
assert count(info, "8388861.0 B") == 2, info
310320

311321

322+
def test_task_status(ray_start_regular):
323+
address = ray_start_regular["address"]
324+
325+
@ray.remote
326+
def dep(sema, x=None):
327+
ray.get(sema.acquire.remote())
328+
return
329+
330+
# Filter out actor handle refs.
331+
def filtered_summary():
332+
return "\n".join(
333+
[
334+
line
335+
for line in memory_summary(address, line_wrap=False).split("\n")
336+
if "ACTOR_HANDLE" not in line
337+
]
338+
)
339+
340+
sema = Semaphore.remote(value=0)
341+
x = dep.remote(sema)
342+
y = dep.remote(sema, x=x)
343+
# x and its semaphore task are scheduled.
344+
wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 2)
345+
wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 1)
346+
347+
z = dep.remote(sema, x=x)
348+
wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 2)
349+
wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 2)
350+
wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 0)
351+
352+
sema.release.remote()
353+
time.sleep(2)
354+
wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 1)
355+
wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 0)
356+
# y, z, and two semaphore tasks are scheduled.
357+
wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 4)
358+
359+
sema.release.remote()
360+
wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 2)
361+
wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 0)
362+
wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 2)
363+
364+
sema.release.remote()
365+
ray.get(y)
366+
ray.get(z)
367+
wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 3)
368+
wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 0)
369+
wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 0)
370+
371+
312372
if __name__ == "__main__":
313373
import sys
314374

python/ray/tests/test_reconstruction.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,17 @@
1111
wait_for_condition,
1212
wait_for_pid_to_exit,
1313
SignalActor,
14+
Semaphore,
1415
)
16+
from ray.internal.internal_api import memory_summary
1517

1618
SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
1719

20+
# Task status.
21+
WAITING_FOR_DEPENDENCIES = "WAITING_FOR_DEPENDENCIES"
22+
SCHEDULED = "SCHEDULED"
23+
FINISHED = "FINISHED"
24+
1825

1926
def test_cached_object(ray_start_cluster):
2027
config = {
@@ -1014,6 +1021,85 @@ def dependent_task(x):
10141021
ray.get(obj, timeout=60)
10151022

10161023

1024+
def test_memory_util(ray_start_cluster):
1025+
config = {
1026+
"num_heartbeats_timeout": 10,
1027+
"raylet_heartbeat_period_milliseconds": 100,
1028+
"object_timeout_milliseconds": 200,
1029+
}
1030+
1031+
cluster = ray_start_cluster
1032+
# Head node with no resources.
1033+
cluster.add_node(
1034+
num_cpus=0,
1035+
resources={"head": 1},
1036+
_system_config=config,
1037+
enable_object_reconstruction=True,
1038+
)
1039+
ray.init(address=cluster.address)
1040+
# Node to place the initial object.
1041+
node_to_kill = cluster.add_node(
1042+
num_cpus=1, resources={"node1": 1}, object_store_memory=10 ** 8
1043+
)
1044+
cluster.wait_for_nodes()
1045+
1046+
@ray.remote
1047+
def large_object(sema=None):
1048+
if sema is not None:
1049+
ray.get(sema.acquire.remote())
1050+
return np.zeros(10 ** 7, dtype=np.uint8)
1051+
1052+
@ray.remote
1053+
def dependent_task(x, sema):
1054+
ray.get(sema.acquire.remote())
1055+
return x
1056+
1057+
def stats():
1058+
info = memory_summary(cluster.address, line_wrap=False)
1059+
info = info.split("\n")
1060+
reconstructing_waiting = [
1061+
line
1062+
for line in info
1063+
if "Attempt #2" in line and WAITING_FOR_DEPENDENCIES in line
1064+
]
1065+
reconstructing_scheduled = [
1066+
line for line in info if "Attempt #2" in line and SCHEDULED in line
1067+
]
1068+
reconstructing_finished = [
1069+
line for line in info if "Attempt #2" in line and FINISHED in line
1070+
]
1071+
return (
1072+
len(reconstructing_waiting),
1073+
len(reconstructing_scheduled),
1074+
len(reconstructing_finished),
1075+
)
1076+
1077+
sema = Semaphore.options(resources={"head": 1}).remote(value=0)
1078+
obj = large_object.options(resources={"node1": 1}).remote(sema)
1079+
x = dependent_task.options(resources={"node1": 1}).remote(obj, sema)
1080+
ref = dependent_task.options(resources={"node1": 1}).remote(x, sema)
1081+
ray.get(sema.release.remote())
1082+
ray.get(sema.release.remote())
1083+
ray.get(sema.release.remote())
1084+
ray.get(ref)
1085+
wait_for_condition(lambda: stats() == (0, 0, 0))
1086+
del ref
1087+
1088+
cluster.remove_node(node_to_kill, allow_graceful=False)
1089+
node_to_kill = cluster.add_node(
1090+
num_cpus=1, resources={"node1": 1}, object_store_memory=10 ** 8
1091+
)
1092+
1093+
ref = dependent_task.remote(x, sema)
1094+
wait_for_condition(lambda: stats() == (1, 1, 0))
1095+
ray.get(sema.release.remote())
1096+
wait_for_condition(lambda: stats() == (0, 1, 1))
1097+
ray.get(sema.release.remote())
1098+
ray.get(sema.release.remote())
1099+
ray.get(ref)
1100+
wait_for_condition(lambda: stats() == (0, 0, 2))
1101+
1102+
10171103
if __name__ == "__main__":
10181104
import pytest
10191105

src/mock/ray/core_worker/task_manager.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface {
4343
MOCK_METHOD(absl::optional<TaskSpecification>, GetTaskSpec, (const TaskID &task_id),
4444
(const, override));
4545
MOCK_METHOD(bool, RetryTaskIfPossible, (const TaskID &task_id), (override));
46+
MOCK_METHOD(void, MarkDependenciesResolved, (const TaskID &task_id), (override));
4647
};
4748

4849
} // namespace core

src/ray/common/id.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,14 @@ ActorID TaskID::ActorId() const {
212212
reinterpret_cast<const char *>(id_ + kUniqueBytesLength), ActorID::Size()));
213213
}
214214

215+
bool TaskID::IsForActorCreationTask() const {
216+
static std::string nil_data(kUniqueBytesLength, 0);
217+
FillNil(&nil_data);
218+
bool unique_bytes_nil = std::memcmp(id_, nil_data.data(), kUniqueBytesLength) == 0;
219+
bool actor_id_nil = ActorId().IsNil();
220+
return unique_bytes_nil && !actor_id_nil;
221+
}
222+
215223
JobID TaskID::JobId() const { return ActorId().JobId(); }
216224

217225
TaskID TaskID::ComputeDriverTaskId(const WorkerID &driver_id) {

src/ray/common/id.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,9 @@ class TaskID : public BaseID<TaskID> {
240240
/// \return The `ActorID` of the actor which creates this task.
241241
ActorID ActorId() const;
242242

243+
/// Returns whether this is the ID of an actor creation task.
244+
bool IsForActorCreationTask() const;
245+
243246
/// Get the id of the job to which this task belongs.
244247
///
245248
/// \return The `JobID` of the job which creates this task.

src/ray/common/id_test.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ TEST(TaskIDTest, TestTaskID) {
6666
const TaskID task_id_1 =
6767
TaskID::ForActorTask(kDefaultJobId, kDefaultDriverTaskId, 1, actor_id);
6868
ASSERT_EQ(actor_id, task_id_1.ActorId());
69+
ASSERT_FALSE(task_id_1.IsForActorCreationTask());
70+
71+
auto actor_creation_task_id = TaskID::ForActorCreationTask(actor_id);
72+
ASSERT_TRUE(actor_creation_task_id.IsForActorCreationTask());
73+
74+
ASSERT_FALSE(TaskID::Nil().IsForActorCreationTask());
75+
ASSERT_FALSE(TaskID::FromRandom(kDefaultJobId).IsForActorCreationTask());
6976
}
7077
}
7178

src/ray/core_worker/core_worker.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2855,6 +2855,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &
28552855
if (request.include_memory_info()) {
28562856
reference_counter_->AddObjectRefStats(plasma_store_provider_->UsedObjectsList(),
28572857
stats);
2858+
task_manager_->AddTaskStatusInfo(stats);
28582859
}
28592860

28602861
send_reply_callback(Status::OK(), nullptr, nullptr);

src/ray/core_worker/object_recovery_manager.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ namespace ray {
2020
namespace core {
2121

2222
bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
23+
if (object_id.TaskId().IsForActorCreationTask()) {
24+
// The GCS manages all actor restarts, so we should never try to
25+
// reconstruct an actor here.
26+
return true;
27+
}
2328
// Check the ReferenceCounter to see if there is a location for the object.
2429
bool owned_by_us = false;
2530
NodeID pinned_at;

src/ray/core_worker/reference_count.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ void ReferenceCounter::AddObjectRefStats(
151151
for (const auto &obj_id : ref.second.contained_in_owned) {
152152
ref_proto->add_contained_in_owned(obj_id.Binary());
153153
}
154+
155+
if (ref.second.owned_by_us && !ref.second.pending_creation) {
156+
// For finished tasks only, we set the status here instead of in the
157+
// TaskManager in case the task spec has already been GCed.
158+
ref_proto->set_task_status(rpc::TaskStatus::FINISHED);
159+
}
154160
}
155161
// Also include any unreferenced objects that are pinned in memory.
156162
for (const auto &entry : pinned_objects) {

0 commit comments

Comments
 (0)