diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index b50fc77bbff20..9622541c4d58d 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -888,10 +888,11 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, it->second.num_successful_executions++; if (is_application_error) { - SetTaskStatus(it->second, - rpc::TaskStatus::FAILED, - gcs::GetRayErrorInfo(rpc::ErrorType::TASK_EXECUTION_EXCEPTION, - reply.task_execution_error())); + SetTaskStatus( + it->second, + rpc::TaskStatus::FAILED, + worker::TaskStatusEvent::TaskStateUpdate(gcs::GetRayErrorInfo( + rpc::ErrorType::TASK_EXECUTION_EXCEPTION, reply.task_execution_error()))); } else { SetTaskStatus(it->second, rpc::TaskStatus::FINISHED); } @@ -1060,12 +1061,14 @@ void TaskManager::FailPendingTask(const TaskID &task_id, // to exit and not be marked as failure. SetTaskStatus(it->second, rpc::TaskStatus::FINISHED); } else { + const auto error_info = + (ray_error_info == nullptr + ? gcs::GetRayErrorInfo(error_type, + (status != nullptr ? status->ToString() : "")) + : *ray_error_info); SetTaskStatus(it->second, rpc::TaskStatus::FAILED, - (ray_error_info == nullptr - ? gcs::GetRayErrorInfo( - error_type, (status != nullptr ? status->ToString() : "")) - : *ray_error_info)); + worker::TaskStatusEvent::TaskStateUpdate(error_info)); } submissible_tasks_.erase(it); num_pending_tasks_--; @@ -1417,15 +1420,9 @@ void TaskManager::MarkTaskWaitingForExecution(const TaskID &task_id, RAY_CHECK(it->second.GetStatus() == rpc::TaskStatus::PENDING_NODE_ASSIGNMENT) << ", task ID = " << it->first << ", status = " << it->second.GetStatus(); it->second.SetNodeId(node_id); - it->second.SetStatus(rpc::TaskStatus::SUBMITTED_TO_WORKER); - RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded( - it->second.spec.TaskId(), - it->second.spec.JobId(), - it->second.spec.AttemptNumber(), - it->second.spec, - rpc::TaskStatus::SUBMITTED_TO_WORKER, - /* include_task_info */ false, - worker::TaskStatusEvent::TaskStateUpdate(node_id, worker_id))); + SetTaskStatus(it->second, + rpc::TaskStatus::SUBMITTED_TO_WORKER, + worker::TaskStatusEvent::TaskStateUpdate(node_id, worker_id)); } void TaskManager::MarkTaskRetryOnResubmit(TaskEntry &task_entry) { @@ -1435,17 +1432,15 @@ void TaskManager::MarkTaskRetryOnResubmit(TaskEntry &task_entry) { task_entry.MarkRetry(); // Mark the new status and also include task spec info for the new attempt. - task_entry.SetStatus(rpc::TaskStatus::PENDING_ARGS_AVAIL); + // // NOTE(rickyx): We only increment the AttemptNumber on the task spec when // `retry_task_callback_` is invoked. In order to record the correct status change for // the new task attempt, we pass the the attempt number explicitly. - RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded( - task_entry.spec.TaskId(), - task_entry.spec.JobId(), - task_entry.spec.AttemptNumber() + 1, - task_entry.spec, - rpc::TaskStatus::PENDING_ARGS_AVAIL, - /* include_task_info */ true)); + SetTaskStatus(task_entry, + rpc::TaskStatus::PENDING_ARGS_AVAIL, + /* state_update */ std::nullopt, + /* include_task_info */ true, + task_entry.spec.AttemptNumber() + 1); } void TaskManager::MarkTaskRetryOnFailed(TaskEntry &task_entry, @@ -1453,33 +1448,40 @@ void TaskManager::MarkTaskRetryOnFailed(TaskEntry &task_entry, RAY_CHECK(task_entry.IsPending()); // Record the old attempt status as FAILED. - SetTaskStatus(task_entry, rpc::TaskStatus::FAILED, error_info); + SetTaskStatus(task_entry, + rpc::TaskStatus::FAILED, + worker::TaskStatusEvent::TaskStateUpdate(error_info)); task_entry.MarkRetry(); // Mark the new status and also include task spec info for the new attempt. - task_entry.SetStatus(rpc::TaskStatus::PENDING_ARGS_AVAIL); - RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded( - task_entry.spec.TaskId(), - task_entry.spec.JobId(), - task_entry.spec.AttemptNumber() + 1, - task_entry.spec, - rpc::TaskStatus::PENDING_ARGS_AVAIL, - /* include_task_info */ true)); + SetTaskStatus(task_entry, + rpc::TaskStatus::PENDING_ARGS_AVAIL, + /* state_update */ std::nullopt, + /* include_task_info */ true, + task_entry.spec.AttemptNumber() + 1); } void TaskManager::SetTaskStatus( TaskEntry &task_entry, rpc::TaskStatus status, - const std::optional &error_info) { + std::optional state_update, + bool include_task_info, + std::optional attempt_number) { + RAY_LOG(DEBUG).WithField(task_entry.spec.TaskId()) + << "Setting task status from " << task_entry.GetStatus() << " to " << status; task_entry.SetStatus(status); - RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded( - task_entry.spec.TaskId(), - task_entry.spec.JobId(), - task_entry.spec.AttemptNumber(), - task_entry.spec, - status, - /* include_task_info */ false, - worker::TaskStatusEvent::TaskStateUpdate(error_info))); + + const int32_t attempt_number_to_record = + attempt_number.value_or(task_entry.spec.AttemptNumber()); + const auto state_update_to_record = + state_update.value_or(worker::TaskStatusEvent::TaskStateUpdate()); + RAY_UNUSED(task_event_buffer_.RecordTaskStatusEventIfNeeded(task_entry.spec.TaskId(), + task_entry.spec.JobId(), + attempt_number_to_record, + task_entry.spec, + status, + include_task_info, + state_update_to_record)); } std::unordered_map diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 8b0599280ba60..f5c864a743812 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -734,11 +734,18 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// /// \param task_entry corresponding TaskEntry of a task to record the event. /// \param status new status. - /// \param error_info Optional error info for task execution. + /// \param state_update The state update for the task status change event. + /// \param include_task_info Whether to include task info in the task status change + /// event. + /// \param attempt_number The attempt number to record the task status change + /// event. If not specified, the attempt number will be the current attempt number of + /// the task. void SetTaskStatus( TaskEntry &task_entry, rpc::TaskStatus status, - const std::optional &error_info = absl::nullopt); + std::optional state_update = std::nullopt, + bool include_task_info = false, + std::optional attempt_number = std::nullopt); /// Update the task entry for the task attempt to reflect retry on resubmit. ///