From d9c0bef3012f81014ca3666fd28d58a9e4ff253d Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Sun, 27 Apr 2025 18:13:52 -0700 Subject: [PATCH 1/4] update Signed-off-by: Kai-Hsun Chen --- src/ray/core_worker/task_manager.cc | 88 ++++++++++++++--------------- src/ray/core_worker/task_manager.h | 11 +++- 2 files changed, 52 insertions(+), 47 deletions(-) diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index b50fc77bbff20..4bcbc099a3c3c 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -888,10 +888,11 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, it->second.num_successful_executions++; if (is_application_error) { - SetTaskStatus(it->second, - rpc::TaskStatus::FAILED, - gcs::GetRayErrorInfo(rpc::ErrorType::TASK_EXECUTION_EXCEPTION, - reply.task_execution_error())); + SetTaskStatus( + it->second, + rpc::TaskStatus::FAILED, + worker::TaskStatusEvent::TaskStateUpdate(gcs::GetRayErrorInfo( + rpc::ErrorType::TASK_EXECUTION_EXCEPTION, reply.task_execution_error()))); } else { SetTaskStatus(it->second, rpc::TaskStatus::FINISHED); } @@ -1060,12 +1061,10 @@ void TaskManager::FailPendingTask(const TaskID &task_id, // to exit and not be marked as failure. SetTaskStatus(it->second, rpc::TaskStatus::FINISHED); } else { - SetTaskStatus(it->second, - rpc::TaskStatus::FAILED, - (ray_error_info == nullptr - ? gcs::GetRayErrorInfo( - error_type, (status != nullptr ? status->ToString() : "")) - : *ray_error_info)); + auto state_update = worker::TaskStatusEvent::TaskStateUpdate(gcs::GetRayErrorInfo( + error_type, (status != nullptr ? status->ToString() : ""))); + + SetTaskStatus(it->second, rpc::TaskStatus::FAILED, state_update); } submissible_tasks_.erase(it); num_pending_tasks_--; @@ -1417,15 +1416,9 @@ void TaskManager::MarkTaskWaitingForExecution(const TaskID &task_id, RAY_CHECK(it->second.GetStatus() == rpc::TaskStatus::PENDING_NODE_ASSIGNMENT) << ", task ID = " << it->first << ", status = " << it->second.GetStatus(); it->second.SetNodeId(node_id); - it->second.SetStatus(rpc::TaskStatus::SUBMITTED_TO_WORKER); - RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded( - it->second.spec.TaskId(), - it->second.spec.JobId(), - it->second.spec.AttemptNumber(), - it->second.spec, - rpc::TaskStatus::SUBMITTED_TO_WORKER, - /* include_task_info */ false, - worker::TaskStatusEvent::TaskStateUpdate(node_id, worker_id))); + SetTaskStatus(it->second, + rpc::TaskStatus::SUBMITTED_TO_WORKER, + worker::TaskStatusEvent::TaskStateUpdate(node_id, worker_id)); } void TaskManager::MarkTaskRetryOnResubmit(TaskEntry &task_entry) { @@ -1435,17 +1428,15 @@ void TaskManager::MarkTaskRetryOnResubmit(TaskEntry &task_entry) { task_entry.MarkRetry(); // Mark the new status and also include task spec info for the new attempt. - task_entry.SetStatus(rpc::TaskStatus::PENDING_ARGS_AVAIL); + // // NOTE(rickyx): We only increment the AttemptNumber on the task spec when // `retry_task_callback_` is invoked. In order to record the correct status change for // the new task attempt, we pass the the attempt number explicitly. - RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded( - task_entry.spec.TaskId(), - task_entry.spec.JobId(), - task_entry.spec.AttemptNumber() + 1, - task_entry.spec, - rpc::TaskStatus::PENDING_ARGS_AVAIL, - /* include_task_info */ true)); + SetTaskStatus(task_entry, + rpc::TaskStatus::PENDING_ARGS_AVAIL, + /* state_update */ std::nullopt, + /* include_task_info */ true, + task_entry.spec.AttemptNumber() + 1); } void TaskManager::MarkTaskRetryOnFailed(TaskEntry &task_entry, @@ -1453,33 +1444,40 @@ void TaskManager::MarkTaskRetryOnFailed(TaskEntry &task_entry, RAY_CHECK(task_entry.IsPending()); // Record the old attempt status as FAILED. - SetTaskStatus(task_entry, rpc::TaskStatus::FAILED, error_info); + SetTaskStatus(task_entry, + rpc::TaskStatus::FAILED, + worker::TaskStatusEvent::TaskStateUpdate(error_info)); task_entry.MarkRetry(); // Mark the new status and also include task spec info for the new attempt. - task_entry.SetStatus(rpc::TaskStatus::PENDING_ARGS_AVAIL); - RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded( - task_entry.spec.TaskId(), - task_entry.spec.JobId(), - task_entry.spec.AttemptNumber() + 1, - task_entry.spec, - rpc::TaskStatus::PENDING_ARGS_AVAIL, - /* include_task_info */ true)); + SetTaskStatus(task_entry, + rpc::TaskStatus::PENDING_ARGS_AVAIL, + /* state_update */ std::nullopt, + /* include_task_info */ true, + task_entry.spec.AttemptNumber() + 1); } void TaskManager::SetTaskStatus( TaskEntry &task_entry, rpc::TaskStatus status, - const std::optional &error_info) { + std::optional state_update, + bool include_task_info, + std::optional attempt_number) { + RAY_LOG(DEBUG).WithField(task_entry.spec.TaskId()) + << "Setting task status from " << task_entry.GetStatus() << " to " << status; task_entry.SetStatus(status); - RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded( - task_entry.spec.TaskId(), - task_entry.spec.JobId(), - task_entry.spec.AttemptNumber(), - task_entry.spec, - status, - /* include_task_info */ false, - worker::TaskStatusEvent::TaskStateUpdate(error_info))); + + int32_t attempt_number_to_record = + attempt_number.value_or(task_entry.spec.AttemptNumber()); + auto state_update_to_record = + state_update.value_or(worker::TaskStatusEvent::TaskStateUpdate()); + RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded(task_entry.spec.TaskId(), + task_entry.spec.JobId(), + attempt_number_to_record, + task_entry.spec, + status, + include_task_info, + state_update_to_record)); } std::unordered_map diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 8b0599280ba60..f5c864a743812 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -734,11 +734,18 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// /// \param task_entry corresponding TaskEntry of a task to record the event. /// \param status new status. - /// \param error_info Optional error info for task execution. + /// \param state_update The state update for the task status change event. + /// \param include_task_info Whether to include task info in the task status change + /// event. + /// \param attempt_number The attempt number to record the task status change + /// event. If not specified, the attempt number will be the current attempt number of + /// the task. void SetTaskStatus( TaskEntry &task_entry, rpc::TaskStatus status, - const std::optional &error_info = absl::nullopt); + std::optional state_update = std::nullopt, + bool include_task_info = false, + std::optional attempt_number = std::nullopt); /// Update the task entry for the task attempt to reflect retry on resubmit. /// From acc7997f6a5bf96c8eac829d84226157ade237c0 Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Sun, 27 Apr 2025 18:24:42 -0700 Subject: [PATCH 2/4] update Signed-off-by: Kai-Hsun Chen --- src/ray/core_worker/task_manager.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 4bcbc099a3c3c..3aebf466c3448 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -1061,10 +1061,14 @@ void TaskManager::FailPendingTask(const TaskID &task_id, // to exit and not be marked as failure. SetTaskStatus(it->second, rpc::TaskStatus::FINISHED); } else { - auto state_update = worker::TaskStatusEvent::TaskStateUpdate(gcs::GetRayErrorInfo( - error_type, (status != nullptr ? status->ToString() : ""))); - - SetTaskStatus(it->second, rpc::TaskStatus::FAILED, state_update); + auto error_info = + (ray_error_info == nullptr + ? gcs::GetRayErrorInfo(error_type, + (status != nullptr ? status->ToString() : "")) + : *ray_error_info); + SetTaskStatus(it->second, + rpc::TaskStatus::FAILED, + worker::TaskStatusEvent::TaskStateUpdate(error_info)); } submissible_tasks_.erase(it); num_pending_tasks_--; From 5cbcb8b0b2e812cb76321d2acbf0144e4d2564a3 Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Mon, 28 Apr 2025 13:47:39 -0700 Subject: [PATCH 3/4] address comments Signed-off-by: Kai-Hsun Chen --- src/ray/core_worker/task_manager.cc | 6 +++--- src/ray/core_worker/transport/actor_task_submitter.cc | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 3aebf466c3448..9622541c4d58d 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -1061,7 +1061,7 @@ void TaskManager::FailPendingTask(const TaskID &task_id, // to exit and not be marked as failure. SetTaskStatus(it->second, rpc::TaskStatus::FINISHED); } else { - auto error_info = + const auto error_info = (ray_error_info == nullptr ? gcs::GetRayErrorInfo(error_type, (status != nullptr ? status->ToString() : "")) @@ -1471,9 +1471,9 @@ void TaskManager::SetTaskStatus( << "Setting task status from " << task_entry.GetStatus() << " to " << status; task_entry.SetStatus(status); - int32_t attempt_number_to_record = + const int32_t attempt_number_to_record = attempt_number.value_or(task_entry.spec.AttemptNumber()); - auto state_update_to_record = + const auto state_update_to_record = state_update.value_or(worker::TaskStatusEvent::TaskStateUpdate()); RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded(task_entry.spec.TaskId(), task_entry.spec.JobId(), diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index b922003dac145..b0f0d5a9b7425 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -522,6 +522,7 @@ void ActorTaskSubmitter::CheckTimeoutTasks() { } void ActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) { + RAY_LOG(INFO).WithField(actor_id) << "SendPendingTasks"; auto it = client_queues_.find(actor_id); RAY_CHECK(it != client_queues_.end()); auto &client_queue = it->second; @@ -569,6 +570,7 @@ void ActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) { } void ActorTaskSubmitter::ResendOutOfOrderCompletedTasks(const ActorID &actor_id) { + RAY_LOG(INFO).WithField(actor_id) << "ResendOutOfOrderCompletedTasks"; auto it = client_queues_.find(actor_id); RAY_CHECK(it != client_queues_.end()); if (!it->second.rpc_client) { From c5f4356916f82e8e0d8d625207c0c69f0bbf38f5 Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Mon, 28 Apr 2025 13:48:37 -0700 Subject: [PATCH 4/4] address comments Signed-off-by: Kai-Hsun Chen --- src/ray/core_worker/transport/actor_task_submitter.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index b0f0d5a9b7425..b922003dac145 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -522,7 +522,6 @@ void ActorTaskSubmitter::CheckTimeoutTasks() { } void ActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) { - RAY_LOG(INFO).WithField(actor_id) << "SendPendingTasks"; auto it = client_queues_.find(actor_id); RAY_CHECK(it != client_queues_.end()); auto &client_queue = it->second; @@ -570,7 +569,6 @@ void ActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) { } void ActorTaskSubmitter::ResendOutOfOrderCompletedTasks(const ActorID &actor_id) { - RAY_LOG(INFO).WithField(actor_id) << "ResendOutOfOrderCompletedTasks"; auto it = client_queues_.find(actor_id); RAY_CHECK(it != client_queues_.end()); if (!it->second.rpc_client) {