Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
RAY_CHECK(task_deps->empty());

TaskSpecification spec;
TaskEntry *task_entry = nullptr;
bool resubmit = false;
{
absl::MutexLock lock(&mu_);
Expand Down Expand Up @@ -342,7 +341,6 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
RAY_CHECK(it->second.num_retries_left == -1);
}
spec = it->second.spec;
task_entry = &(it->second);
}
}

Expand Down Expand Up @@ -386,7 +384,7 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
<< spec.AttemptNumber() << ": " << spec.DebugString();
// We should actually detect if the actor for this task is dead, but let's just assume
// it's not for now.
RetryTask(task_entry, /*object_recovery*/ true, /*delay_ms*/ 0);
retry_task_callback_(spec, /*object_recovery*/ true, /*delay_ms*/ 0);

return true;
}
Expand Down Expand Up @@ -967,7 +965,6 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
bool TaskManager::RetryTaskIfPossible(const TaskID &task_id,
const rpc::RayErrorInfo &error_info) {
TaskSpecification spec;
TaskEntry *task_entry = nullptr;
bool will_retry = false;
int32_t num_retries_left = 0;
int32_t num_oom_retries_left = 0;
Expand All @@ -980,7 +977,6 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id,
RAY_CHECK(it->second.IsPending())
<< "Tried to retry task that was not pending " << task_id;
spec = it->second.spec;
task_entry = &(it->second);
num_retries_left = it->second.num_retries_left;
num_oom_retries_left = it->second.num_oom_retries_left;
if (task_failed_due_to_oom) {
Expand All @@ -1003,7 +999,7 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id,
}
}
if (will_retry) {
MarkTaskRetryOnFailed(*task_entry, error_info);
MarkTaskRetryOnFailed(it->second, error_info);
}
}

Expand All @@ -1025,7 +1021,7 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id,
spec.AttemptNumber(),
RayConfig::instance().task_oom_retry_delay_base_ms())
: RayConfig::instance().task_retry_delay_ms();
RetryTask(task_entry, /*object_recovery*/ false, delay_ms);
retry_task_callback_(spec, /*object_recovery*/ false, delay_ms);
return true;
} else {
RAY_LOG(INFO) << "No retries left for task " << spec.TaskId()
Expand All @@ -1034,18 +1030,6 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id,
}
}

void TaskManager::RetryTask(TaskEntry *task_entry,
bool object_recovery,
uint32_t delay_ms) {
RAY_CHECK(task_entry != nullptr);
SetTaskStatus(*task_entry,
rpc::TaskStatus::PENDING_ARGS_AVAIL,
/* state_update */ std::nullopt,
/* include_task_info */ true,
task_entry->spec.AttemptNumber() + 1);
retry_task_callback_(task_entry->spec, object_recovery, delay_ms);
}

void TaskManager::FailPendingTask(const TaskID &task_id,
rpc::ErrorType error_type,
const Status *status,
Expand Down Expand Up @@ -1446,6 +1430,17 @@ void TaskManager::MarkTaskRetryOnResubmit(TaskEntry &task_entry) {
<< "Only finished tasks can be resubmitted: " << task_entry.spec.TaskId();

task_entry.MarkRetry();

// Mark the new status and also include task spec info for the new attempt.
//
// NOTE(rickyx): We only increment the AttemptNumber on the task spec when
// `retry_task_callback_` is invoked. In order to record the correct status change for
// the new task attempt, we pass the the attempt number explicitly.
SetTaskStatus(task_entry,
rpc::TaskStatus::PENDING_ARGS_AVAIL,
/* state_update */ std::nullopt,
/* include_task_info */ true,
task_entry.spec.AttemptNumber() + 1);
}

void TaskManager::MarkTaskRetryOnFailed(TaskEntry &task_entry,
Expand All @@ -1457,6 +1452,13 @@ void TaskManager::MarkTaskRetryOnFailed(TaskEntry &task_entry,
rpc::TaskStatus::FAILED,
worker::TaskStatusEvent::TaskStateUpdate(error_info));
task_entry.MarkRetry();

// Mark the new status and also include task spec info for the new attempt.
SetTaskStatus(task_entry,
rpc::TaskStatus::PENDING_ARGS_AVAIL,
/* state_update */ std::nullopt,
/* include_task_info */ true,
task_entry.spec.AttemptNumber() + 1);
}

void TaskManager::SetTaskStatus(
Expand Down
22 changes: 10 additions & 12 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -697,15 +697,6 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
std::vector<ObjectID> *ids_to_release)
ABSL_LOCKS_EXCLUDED(mu_);

/// A wrapper of `retry_task_callback_` that sets the task status
/// and calls `retry_task_callback_`. This function should be the only
/// caller of `retry_task_callback_`.
///
/// \param[in] task_entry The task entry to retry.
/// \param[in] object_recovery Whether to retry the task for object recovery.
/// \param[in] delay_ms The delay in milliseconds before retrying the task.
void RetryTask(TaskEntry *task_entry, bool object_recovery, uint32_t delay_ms);

/// Helper function to call RemoveSubmittedTaskReferences on the remaining
/// dependencies of the given task spec after the task has finished or
/// failed. The remaining dependencies are plasma objects and any ObjectIDs
Expand Down Expand Up @@ -749,28 +740,35 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// \param attempt_number The attempt number to record the task status change
/// event. If not specified, the attempt number will be the current attempt number of
/// the task.
///
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only change. Others are for reverting.

/// \note This function updates `task_entry` in place. Please only call
Copy link
Member Author

@kevin85421 kevin85421 May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this note is not equivalent to ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_).

For example, the following code satisfies ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) but does not satisfy the requirement described in this note.

Acquire lock
Retrieve TaskEntry
Release lock
...
Acquire lock
SetTaskStatus
Release lock

The following example satisfies both.

Acquire lock
Retrieve TaskEntry
SetTaskStatus
Release lock

/// this function within the same lock scope where `task_entry` is retrieved from
/// `submissible_tasks_`. If not, the task entry may be invalidated if the flat_hash_map
/// is rehashed or the element is removed from the map.
void SetTaskStatus(
TaskEntry &task_entry,
rpc::TaskStatus status,
std::optional<worker::TaskStatusEvent::TaskStateUpdate> state_update = std::nullopt,
bool include_task_info = false,
std::optional<int32_t> attempt_number = std::nullopt);
std::optional<int32_t> attempt_number = std::nullopt)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

/// Update the task entry for the task attempt to reflect retry on resubmit.
///
/// This will set the task status, update the attempt number for the task, and increment
/// the retry counter.
///
/// \param task_entry Task entry for the corresponding task attempt
void MarkTaskRetryOnResubmit(TaskEntry &task_entry);
void MarkTaskRetryOnResubmit(TaskEntry &task_entry) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

/// Update the task entry for the task attempt to reflect retry on failure.
///
/// This will set the task status, update the attempt number for the task, and increment
/// the retry counter.
///
/// \param task_entry Task entry for the corresponding task attempt
void MarkTaskRetryOnFailed(TaskEntry &task_entry, const rpc::RayErrorInfo &error_info);
void MarkTaskRetryOnFailed(TaskEntry &task_entry, const rpc::RayErrorInfo &error_info)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

/// Mark the stream is ended.
/// The end of the stream always contains a "sentinel object" passed
Expand Down