Skip to content

[core] Move to_resubmit_ from CoreWorker to TaskManager to avoid an abstraction leak #52779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 9 additions & 29 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,25 +719,14 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
RAY_CHECK_OK(PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true));
},
/* retry_task_callback= */
[this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) {
spec.GetMutableMessage().set_attempt_number(spec.AttemptNumber() + 1);
if (!object_recovery) {
// Retry after a delay to emulate the existing Raylet reconstruction
// behaviour. TODO(ekl) backoff exponentially.
RAY_LOG(INFO) << "Will resubmit task after a " << delay_ms
<< "ms delay: " << spec.DebugString();
absl::MutexLock lock(&mutex_);
TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec};
to_resubmit_.push(std::move(task_to_retry));
[this](TaskSpecification &spec) {
if (spec.IsActorTask()) {
auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId());
actor_handle->SetResubmittedActorTaskSpec(spec);
RAY_CHECK_OK(actor_task_submitter_->SubmitTask(spec));
} else {
if (spec.IsActorTask()) {
auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId());
actor_handle->SetResubmittedActorTaskSpec(spec);
RAY_CHECK_OK(actor_task_submitter_->SubmitTask(spec));
} else {
RAY_CHECK(spec.IsNormalTask());
RAY_CHECK_OK(normal_task_submitter_->SubmitTask(spec));
}
RAY_CHECK(spec.IsNormalTask());
RAY_CHECK_OK(normal_task_submitter_->SubmitTask(spec));
}
},
push_error_callback,
Expand Down Expand Up @@ -1358,18 +1347,9 @@ void CoreWorker::ExitIfParentRayletDies() {

void CoreWorker::InternalHeartbeat() {
// Retry tasks.
std::vector<TaskToRetry> tasks_to_resubmit;
{
absl::MutexLock lock(&mutex_);
const auto current_time = current_time_ms();
while (!to_resubmit_.empty() && current_time > to_resubmit_.top().execution_time_ms) {
tasks_to_resubmit.emplace_back(to_resubmit_.top());
to_resubmit_.pop();
}
}
std::vector<TaskSpecification> tasks_to_retry = task_manager_->PopTasksToRetry();

for (auto &task_to_retry : tasks_to_resubmit) {
auto &spec = task_to_retry.task_spec;
for (auto &spec : tasks_to_retry) {
if (spec.IsActorTask()) {
auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId());
actor_handle->SetResubmittedActorTaskSpec(spec);
Expand Down
26 changes: 0 additions & 26 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,28 +138,6 @@ class TaskCounter {
int64_t num_tasks_running_ ABSL_GUARDED_BY(mu_) = 0;
};

struct TaskToRetry {
/// Time when the task should be retried.
int64_t execution_time_ms{};

/// The details of the task.
TaskSpecification task_spec;
};

/// Sorts TaskToRetry in descending order of the execution time.
/// Priority queue naturally sorts elements in descending order,
/// in order to have the tasks ordered by execution time in
/// ascending order we use a comparator that sorts elements in
/// descending order. Per docs "Priority queues are a type of container
/// adaptors, specifically designed such that its first element is always
/// the greatest of the elements it contains".
class TaskToRetryDescComparator {
public:
bool operator()(const TaskToRetry &left, const TaskToRetry &right) {
return left.execution_time_ms > right.execution_time_ms;
}
};

/// The root class that contains all the core and language-independent functionalities
/// of the worker. This class is supposed to be used to implement app-language (Java,
/// Python, etc) workers.
Expand Down Expand Up @@ -1870,10 +1848,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
task_execution_service_work_;

// Queue of tasks to resubmit when the specified time passes.
std::priority_queue<TaskToRetry, std::deque<TaskToRetry>, TaskToRetryDescComparator>
to_resubmit_ ABSL_GUARDED_BY(mutex_);

/// Map of named actor registry. It doesn't need to hold a lock because
/// local mode is single-threaded.
absl::flat_hash_map<std::string, ActorID> local_mode_named_actor_registry_;
Expand Down
32 changes: 29 additions & 3 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1038,12 +1038,26 @@ void TaskManager::RetryTask(TaskEntry *task_entry,
bool object_recovery,
uint32_t delay_ms) {
RAY_CHECK(task_entry != nullptr);

auto &spec = task_entry->spec;
spec.GetMutableMessage().set_attempt_number(spec.AttemptNumber() + 1);
SetTaskStatus(*task_entry,
rpc::TaskStatus::PENDING_ARGS_AVAIL,
/* state_update */ std::nullopt,
/* include_task_info */ true,
task_entry->spec.AttemptNumber() + 1);
retry_task_callback_(task_entry->spec, object_recovery, delay_ms);
/* include_task_info */ true);

if (!object_recovery) {
// Retry after a delay to emulate the existing Raylet reconstruction
// behaviour. TODO(ekl) backoff exponentially.
RAY_LOG(INFO) << "Will resubmit task after a " << delay_ms
<< "ms delay: " << spec.DebugString();
absl::MutexLock lock(&mu_);
TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec};
to_resubmit_.push(std::move(task_to_retry));
return;
}

retry_task_callback_(spec);
}

void TaskManager::FailPendingTask(const TaskID &task_id,
Expand Down Expand Up @@ -1585,5 +1599,17 @@ ObjectID TaskManager::TaskGeneratorId(const TaskID &task_id) const {
return it->second.spec.ReturnId(0);
}

std::vector<TaskSpecification> TaskManager::PopTasksToRetry() {
absl::MutexLock lock(&mu_);
std::vector<TaskSpecification> tasks_to_retry;
const auto current_time = current_time_ms();
while (!to_resubmit_.empty() && current_time > to_resubmit_.top().execution_time_ms) {
tasks_to_retry.emplace_back(to_resubmit_.top().task_spec);
to_resubmit_.pop();
}

return tasks_to_retry;
}

} // namespace core
} // namespace ray
37 changes: 35 additions & 2 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <deque>
#include <queue>
#include <string>
#include <tuple>
#include <unordered_map>
Expand Down Expand Up @@ -48,8 +50,7 @@ class TaskResubmissionInterface {
using TaskStatusCounter = CounterMap<std::tuple<std::string, rpc::TaskStatus, bool>>;
using PutInLocalPlasmaCallback =
std::function<void(const RayObject &object, const ObjectID &object_id)>;
using RetryTaskCallback =
std::function<void(TaskSpecification &spec, bool object_recovery, uint32_t delay_ms)>;
using RetryTaskCallback = std::function<void(TaskSpecification &spec)>;
using ReconstructObjectCallback = std::function<void(const ObjectID &object_id)>;
using PushErrorCallback = std::function<Status(const JobID &job_id,
const std::string &type,
Expand Down Expand Up @@ -172,6 +173,29 @@ class ObjectRefStream {
int64_t total_num_object_consumed_{};
};

/// Represents a task that needs to be retried at a specific time.
struct TaskToRetry {
/// Time when the task should be retried.
int64_t execution_time_ms{};

/// The details of the task.
TaskSpecification task_spec;
};

/// Sorts TaskToRetry in descending order of the execution time.
/// Priority queue naturally sorts elements in descending order,
/// in order to have the tasks ordered by execution time in
/// ascending order we use a comparator that sorts elements in
/// descending order. Per docs "Priority queues are a type of container
/// adaptors, specifically designed such that its first element is always
/// the greatest of the elements it contains".
class TaskToRetryDescComparator {
public:
bool operator()(const TaskToRetry &left, const TaskToRetry &right) {
return left.execution_time_ms > right.execution_time_ms;
}
};

class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface {
public:
TaskManager(CoreWorkerMemoryStore &in_memory_store,
Expand Down Expand Up @@ -578,6 +602,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// Record OCL metrics.
void RecordMetrics();

/// Returns a vector of task specifications that are ready to be retried.
/// This function removes tasks from the to_resubmit_ queue whose execution time
/// has passed and returns them.
std::vector<TaskSpecification> PopTasksToRetry();

private:
struct TaskEntry {
TaskEntry(TaskSpecification spec_arg,
Expand Down Expand Up @@ -865,6 +894,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// error).
worker::TaskEventBuffer &task_event_buffer_;

/// Queue of tasks to retry, ordered by their execution time.
std::priority_queue<TaskToRetry, std::deque<TaskToRetry>, TaskToRetryDescComparator>
to_resubmit_ ABSL_GUARDED_BY(mu_);

friend class TaskManagerTest;
};

Expand Down
Loading