diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 87e8fc7ba076a..cd94829a124e4 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -719,25 +719,14 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id) RAY_CHECK_OK(PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true)); }, /* retry_task_callback= */ - [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { - spec.GetMutableMessage().set_attempt_number(spec.AttemptNumber() + 1); - if (!object_recovery) { - // Retry after a delay to emulate the existing Raylet reconstruction - // behaviour. TODO(ekl) backoff exponentially. - RAY_LOG(INFO) << "Will resubmit task after a " << delay_ms - << "ms delay: " << spec.DebugString(); - absl::MutexLock lock(&mutex_); - TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec}; - to_resubmit_.push(std::move(task_to_retry)); + [this](TaskSpecification &spec) { + if (spec.IsActorTask()) { + auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); + actor_handle->SetResubmittedActorTaskSpec(spec); + RAY_CHECK_OK(actor_task_submitter_->SubmitTask(spec)); } else { - if (spec.IsActorTask()) { - auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); - actor_handle->SetResubmittedActorTaskSpec(spec); - RAY_CHECK_OK(actor_task_submitter_->SubmitTask(spec)); - } else { - RAY_CHECK(spec.IsNormalTask()); - RAY_CHECK_OK(normal_task_submitter_->SubmitTask(spec)); - } + RAY_CHECK(spec.IsNormalTask()); + RAY_CHECK_OK(normal_task_submitter_->SubmitTask(spec)); } }, push_error_callback, @@ -1358,18 +1347,9 @@ void CoreWorker::ExitIfParentRayletDies() { void CoreWorker::InternalHeartbeat() { // Retry tasks. - std::vector tasks_to_resubmit; - { - absl::MutexLock lock(&mutex_); - const auto current_time = current_time_ms(); - while (!to_resubmit_.empty() && current_time > to_resubmit_.top().execution_time_ms) { - tasks_to_resubmit.emplace_back(to_resubmit_.top()); - to_resubmit_.pop(); - } - } + std::vector tasks_to_retry = task_manager_->PopTasksToRetry(); - for (auto &task_to_retry : tasks_to_resubmit) { - auto &spec = task_to_retry.task_spec; + for (auto &spec : tasks_to_retry) { if (spec.IsActorTask()) { auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); actor_handle->SetResubmittedActorTaskSpec(spec); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 0b899406df65b..dc57f2c2e3e6b 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -138,28 +138,6 @@ class TaskCounter { int64_t num_tasks_running_ ABSL_GUARDED_BY(mu_) = 0; }; -struct TaskToRetry { - /// Time when the task should be retried. - int64_t execution_time_ms{}; - - /// The details of the task. - TaskSpecification task_spec; -}; - -/// Sorts TaskToRetry in descending order of the execution time. -/// Priority queue naturally sorts elements in descending order, -/// in order to have the tasks ordered by execution time in -/// ascending order we use a comparator that sorts elements in -/// descending order. Per docs "Priority queues are a type of container -/// adaptors, specifically designed such that its first element is always -/// the greatest of the elements it contains". -class TaskToRetryDescComparator { - public: - bool operator()(const TaskToRetry &left, const TaskToRetry &right) { - return left.execution_time_ms > right.execution_time_ms; - } -}; - /// The root class that contains all the core and language-independent functionalities /// of the worker. This class is supposed to be used to implement app-language (Java, /// Python, etc) workers. @@ -1870,10 +1848,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { boost::asio::executor_work_guard task_execution_service_work_; - // Queue of tasks to resubmit when the specified time passes. - std::priority_queue, TaskToRetryDescComparator> - to_resubmit_ ABSL_GUARDED_BY(mutex_); - /// Map of named actor registry. It doesn't need to hold a lock because /// local mode is single-threaded. absl::flat_hash_map local_mode_named_actor_registry_; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 454d622f44c05..f777806d42045 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -1038,12 +1038,26 @@ void TaskManager::RetryTask(TaskEntry *task_entry, bool object_recovery, uint32_t delay_ms) { RAY_CHECK(task_entry != nullptr); + + auto &spec = task_entry->spec; + spec.GetMutableMessage().set_attempt_number(spec.AttemptNumber() + 1); SetTaskStatus(*task_entry, rpc::TaskStatus::PENDING_ARGS_AVAIL, /* state_update */ std::nullopt, - /* include_task_info */ true, - task_entry->spec.AttemptNumber() + 1); - retry_task_callback_(task_entry->spec, object_recovery, delay_ms); + /* include_task_info */ true); + + if (!object_recovery) { + // Retry after a delay to emulate the existing Raylet reconstruction + // behaviour. TODO(ekl) backoff exponentially. + RAY_LOG(INFO) << "Will resubmit task after a " << delay_ms + << "ms delay: " << spec.DebugString(); + absl::MutexLock lock(&mu_); + TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec}; + to_resubmit_.push(std::move(task_to_retry)); + return; + } + + retry_task_callback_(spec); } void TaskManager::FailPendingTask(const TaskID &task_id, @@ -1585,5 +1599,17 @@ ObjectID TaskManager::TaskGeneratorId(const TaskID &task_id) const { return it->second.spec.ReturnId(0); } +std::vector TaskManager::PopTasksToRetry() { + absl::MutexLock lock(&mu_); + std::vector tasks_to_retry; + const auto current_time = current_time_ms(); + while (!to_resubmit_.empty() && current_time > to_resubmit_.top().execution_time_ms) { + tasks_to_retry.emplace_back(to_resubmit_.top().task_spec); + to_resubmit_.pop(); + } + + return tasks_to_retry; +} + } // namespace core } // namespace ray diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 3b4fc718e6965..2c8ff5d66050d 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -14,6 +14,8 @@ #pragma once +#include +#include #include #include #include @@ -48,8 +50,7 @@ class TaskResubmissionInterface { using TaskStatusCounter = CounterMap>; using PutInLocalPlasmaCallback = std::function; -using RetryTaskCallback = - std::function; +using RetryTaskCallback = std::function; using ReconstructObjectCallback = std::function; using PushErrorCallback = std::function right.execution_time_ms; + } +}; + class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface { public: TaskManager(CoreWorkerMemoryStore &in_memory_store, @@ -578,6 +602,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// Record OCL metrics. void RecordMetrics(); + /// Returns a vector of task specifications that are ready to be retried. + /// This function removes tasks from the to_resubmit_ queue whose execution time + /// has passed and returns them. + std::vector PopTasksToRetry(); + private: struct TaskEntry { TaskEntry(TaskSpecification spec_arg, @@ -865,6 +894,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// error). worker::TaskEventBuffer &task_event_buffer_; + /// Queue of tasks to retry, ordered by their execution time. + std::priority_queue, TaskToRetryDescComparator> + to_resubmit_ ABSL_GUARDED_BY(mu_); + friend class TaskManagerTest; }; diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 0f485db8f8578..bac86e2cd6909 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -72,12 +72,14 @@ rpc::ReportGeneratorItemReturnsRequest GetIntermediateTaskReturn( const ObjectID &generator_id, const ObjectID &dynamic_return_id, std::shared_ptr data, - bool set_in_plasma) { + bool set_in_plasma, + uint64_t attempt_number = 0) { rpc::ReportGeneratorItemReturnsRequest request; rpc::Address addr; request.mutable_worker_addr()->CopyFrom(addr); request.set_item_index(idx); request.set_generator_id(generator_id.Binary()); + request.set_attempt_number(attempt_number); auto dynamic_return_object = request.add_dynamic_return_objects(); dynamic_return_object->set_object_id(dynamic_return_id.Binary()); dynamic_return_object->set_data(data->Data(), data->Size()); @@ -137,10 +139,8 @@ class TaskManagerTest : public ::testing::Test { [this](const RayObject &object, const ObjectID &object_id) { stored_in_plasma.insert(object_id); }, - [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { + [this](TaskSpecification &spec) { num_retries_++; - last_delay_ms_ = delay_ms; - last_object_recovery_ = object_recovery; return Status::OK(); }, [](const JobID &job_id, @@ -179,6 +179,13 @@ class TaskManagerTest : public ::testing::Test { manager_.CompletePendingTask(spec.TaskId(), reply, caller_address, false); } + void AddTaskToResubmitQueue(int64_t execution_time_ms, const TaskSpecification &spec) { + TaskToRetry task_to_retry = {execution_time_ms, spec}; + manager_.to_resubmit_.push(std::move(task_to_retry)); + } + + size_t GetResubmitQueueSize() const { return manager_.to_resubmit_.size(); } + bool lineage_pinning_enabled_; rpc::Address addr_; std::shared_ptr publisher_; @@ -190,7 +197,6 @@ class TaskManagerTest : public ::testing::Test { bool all_nodes_alive_ = true; TaskManager manager_; int num_retries_ = 0; - uint32_t last_delay_ms_ = 0; bool last_object_recovery_ = false; std::unordered_set stored_in_plasma; }; @@ -365,18 +371,17 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) { WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id)); + bool will_retry = false; auto error = rpc::ErrorType::WORKER_DIED; for (int i = 0; i < num_retries; i++) { RAY_LOG(INFO) << "Retry " << i; - manager_.FailOrRetryPendingTask(spec.TaskId(), error); + will_retry = manager_.FailOrRetryPendingTask(spec.TaskId(), error); + ASSERT_TRUE(will_retry); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id)); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); std::vector> results; ASSERT_FALSE(store_->Get({return_id}, 1, 0, ctx, false, &results).ok()); - ASSERT_EQ(num_retries_, i + 1); - ASSERT_EQ(last_delay_ms_, RayConfig::instance().task_retry_delay_ms()); - ASSERT_EQ(last_object_recovery_, false); } manager_.FailOrRetryPendingTask(spec.TaskId(), error); @@ -475,24 +480,20 @@ TEST_F(TaskManagerTest, TestTaskOomAndNonOomKillReturnsLastError) { manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); - ASSERT_EQ(num_retries_, 0); ray::rpc::ErrorType error; + bool will_retry = false; error = rpc::ErrorType::OUT_OF_MEMORY; - manager_.FailOrRetryPendingTask(spec.TaskId(), error); - ASSERT_EQ(num_retries_, 1); - ASSERT_EQ(last_delay_ms_, RayConfig::instance().task_oom_retry_delay_base_ms()); - ASSERT_EQ(last_object_recovery_, false); + will_retry = manager_.FailOrRetryPendingTask(spec.TaskId(), error); + ASSERT_TRUE(will_retry); error = rpc::ErrorType::WORKER_DIED; - manager_.FailOrRetryPendingTask(spec.TaskId(), error); - ASSERT_EQ(num_retries_, 2); - ASSERT_EQ(last_delay_ms_, RayConfig::instance().task_retry_delay_ms()); - ASSERT_EQ(last_object_recovery_, false); + will_retry = manager_.FailOrRetryPendingTask(spec.TaskId(), error); + ASSERT_TRUE(will_retry); error = rpc::ErrorType::WORKER_DIED; - manager_.FailOrRetryPendingTask(spec.TaskId(), error); - ASSERT_EQ(num_retries_, 2); + will_retry = manager_.FailOrRetryPendingTask(spec.TaskId(), error); + ASSERT_FALSE(will_retry); std::vector> results; WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); @@ -511,9 +512,11 @@ TEST_F(TaskManagerTest, TestTaskOomInfiniteRetry) { int num_retries = 1; manager_.AddPendingTask(caller_address, spec, "", num_retries); + bool will_retry = false; for (int i = 0; i < 10000; i++) { - ASSERT_EQ(num_retries_, i); - manager_.FailOrRetryPendingTask(spec.TaskId(), rpc::ErrorType::OUT_OF_MEMORY); + will_retry = + manager_.FailOrRetryPendingTask(spec.TaskId(), rpc::ErrorType::OUT_OF_MEMORY); + ASSERT_TRUE(will_retry); } manager_.MarkTaskCanceled(spec.TaskId()); @@ -988,8 +991,6 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) { ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps)); ASSERT_EQ(resubmitted_task_deps, spec.GetDependencyIds()); ASSERT_EQ(num_retries_, 1); - ASSERT_EQ(last_delay_ms_, 0); - ASSERT_EQ(last_object_recovery_, true); resubmitted_task_deps.clear(); // The return ID goes out of scope. @@ -1051,8 +1052,6 @@ TEST_F(TaskManagerLineageTest, TestResubmittedTaskNondeterministicReturns) { std::vector resubmitted_task_deps; ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps)); ASSERT_EQ(num_retries_, 1); - ASSERT_EQ(last_delay_ms_, 0); - ASSERT_EQ(last_object_recovery_, true); // The re-executed task completes again. One of the return objects is now // returned directly. @@ -1116,8 +1115,6 @@ TEST_F(TaskManagerLineageTest, TestResubmittedTaskFails) { std::vector resubmitted_task_deps; ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps)); ASSERT_EQ(num_retries_, 1); - ASSERT_EQ(last_delay_ms_, 0); - ASSERT_EQ(last_object_recovery_, true); // The re-executed task fails due to worker crashed. { @@ -1237,8 +1234,6 @@ TEST_F(TaskManagerLineageTest, TestResubmittedDynamicReturnsTaskFails) { std::vector resubmitted_task_deps; ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps)); ASSERT_EQ(num_retries_, 1); - ASSERT_EQ(last_delay_ms_, 0); - ASSERT_EQ(last_object_recovery_, true); // Dereference the generator to a list of its internal ObjectRefs. for (const auto &dynamic_return_id : dynamic_return_ids) { @@ -2417,13 +2412,17 @@ TEST_F(TaskManagerTest, TestBackpressureAfterReconstruction) { // it is already replied and the second one should be backpressured. /// 1 generate, 0 consumed, 2 threshold -> should signal immediately. dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), 2); + // `FailOrRetryPendingTask` will retry the task and increase the attempt number + // of the task. Hence, the attempt number is set to 1 so that the object is not + // considered a stale object. req = GetIntermediateTaskReturn( /*idx*/ 0, /*finished*/ false, generator_id, /*dynamic_return_id*/ dynamic_return_id, /*data*/ data, - /*set_in_plasma*/ false); + /*set_in_plasma*/ false, + /*attempt_number*/ 1); bool retry_signal_called = false; ASSERT_FALSE(manager_.HandleReportGeneratorItemReturns( req, @@ -2444,7 +2443,8 @@ TEST_F(TaskManagerTest, TestBackpressureAfterReconstruction) { generator_id, /*dynamic_return_id*/ dynamic_return_id, /*data*/ data, - /*set_in_plasma*/ false); + /*set_in_plasma*/ false, + /*attempt_number*/ 1); retry_signal_called = false; ASSERT_FALSE(manager_.HandleReportGeneratorItemReturns( req, @@ -2465,6 +2465,20 @@ TEST_F(TaskManagerTest, TestBackpressureAfterReconstruction) { CompletePendingStreamingTask(spec, caller_address, 2); } +TEST_F(TaskManagerTest, TestPopTasksToRetry) { + auto spec1 = CreateTaskHelper(1, {}); + AddTaskToResubmitQueue(current_time_ms() - 100, spec1); + + auto spec2 = CreateTaskHelper(1, {}); + AddTaskToResubmitQueue(current_time_ms() + 100, spec2); + + auto tasks = manager_.PopTasksToRetry(); + ASSERT_EQ(tasks.size(), 1); + ASSERT_EQ(tasks[0].TaskId(), spec1.TaskId()); + + ASSERT_EQ(GetResubmitQueueSize(), 1); +} + } // namespace core } // namespace ray