From 640cf4f577cad841b32f3f8a5623d4b376868e88 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Mon, 5 May 2025 02:16:51 +0000 Subject: [PATCH 1/5] refactor Signed-off-by: kaihsun --- src/ray/core_worker/core_worker.cc | 38 +++++++---------------------- src/ray/core_worker/core_worker.h | 26 -------------------- src/ray/core_worker/task_manager.cc | 30 ++++++++++++++++++++--- src/ray/core_worker/task_manager.h | 37 ++++++++++++++++++++++++++-- 4 files changed, 71 insertions(+), 60 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 87e8fc7ba076..cd94829a124e 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -719,25 +719,14 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id) RAY_CHECK_OK(PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true)); }, /* retry_task_callback= */ - [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { - spec.GetMutableMessage().set_attempt_number(spec.AttemptNumber() + 1); - if (!object_recovery) { - // Retry after a delay to emulate the existing Raylet reconstruction - // behaviour. TODO(ekl) backoff exponentially. - RAY_LOG(INFO) << "Will resubmit task after a " << delay_ms - << "ms delay: " << spec.DebugString(); - absl::MutexLock lock(&mutex_); - TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec}; - to_resubmit_.push(std::move(task_to_retry)); + [this](TaskSpecification &spec) { + if (spec.IsActorTask()) { + auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); + actor_handle->SetResubmittedActorTaskSpec(spec); + RAY_CHECK_OK(actor_task_submitter_->SubmitTask(spec)); } else { - if (spec.IsActorTask()) { - auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); - actor_handle->SetResubmittedActorTaskSpec(spec); - RAY_CHECK_OK(actor_task_submitter_->SubmitTask(spec)); - } else { - RAY_CHECK(spec.IsNormalTask()); - RAY_CHECK_OK(normal_task_submitter_->SubmitTask(spec)); - } + RAY_CHECK(spec.IsNormalTask()); + RAY_CHECK_OK(normal_task_submitter_->SubmitTask(spec)); } }, push_error_callback, @@ -1358,18 +1347,9 @@ void CoreWorker::ExitIfParentRayletDies() { void CoreWorker::InternalHeartbeat() { // Retry tasks. - std::vector tasks_to_resubmit; - { - absl::MutexLock lock(&mutex_); - const auto current_time = current_time_ms(); - while (!to_resubmit_.empty() && current_time > to_resubmit_.top().execution_time_ms) { - tasks_to_resubmit.emplace_back(to_resubmit_.top()); - to_resubmit_.pop(); - } - } + std::vector tasks_to_retry = task_manager_->PopTasksToRetry(); - for (auto &task_to_retry : tasks_to_resubmit) { - auto &spec = task_to_retry.task_spec; + for (auto &spec : tasks_to_retry) { if (spec.IsActorTask()) { auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); actor_handle->SetResubmittedActorTaskSpec(spec); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 0b899406df65..dc57f2c2e3e6 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -138,28 +138,6 @@ class TaskCounter { int64_t num_tasks_running_ ABSL_GUARDED_BY(mu_) = 0; }; -struct TaskToRetry { - /// Time when the task should be retried. - int64_t execution_time_ms{}; - - /// The details of the task. - TaskSpecification task_spec; -}; - -/// Sorts TaskToRetry in descending order of the execution time. -/// Priority queue naturally sorts elements in descending order, -/// in order to have the tasks ordered by execution time in -/// ascending order we use a comparator that sorts elements in -/// descending order. Per docs "Priority queues are a type of container -/// adaptors, specifically designed such that its first element is always -/// the greatest of the elements it contains". -class TaskToRetryDescComparator { - public: - bool operator()(const TaskToRetry &left, const TaskToRetry &right) { - return left.execution_time_ms > right.execution_time_ms; - } -}; - /// The root class that contains all the core and language-independent functionalities /// of the worker. This class is supposed to be used to implement app-language (Java, /// Python, etc) workers. @@ -1870,10 +1848,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { boost::asio::executor_work_guard task_execution_service_work_; - // Queue of tasks to resubmit when the specified time passes. - std::priority_queue, TaskToRetryDescComparator> - to_resubmit_ ABSL_GUARDED_BY(mutex_); - /// Map of named actor registry. It doesn't need to hold a lock because /// local mode is single-threaded. absl::flat_hash_map local_mode_named_actor_registry_; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 454d622f44c0..dc35ac95e74c 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -1038,12 +1038,24 @@ void TaskManager::RetryTask(TaskEntry *task_entry, bool object_recovery, uint32_t delay_ms) { RAY_CHECK(task_entry != nullptr); + + auto &spec = task_entry->spec; + spec.GetMutableMessage().set_attempt_number(spec.AttemptNumber() + 1); + if (!object_recovery) { + // Retry after a delay to emulate the existing Raylet reconstruction + // behaviour. TODO(ekl) backoff exponentially. + RAY_LOG(INFO) << "Will resubmit task after a " << delay_ms + << "ms delay: " << spec.DebugString(); + absl::MutexLock lock(&mu_); + TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec}; + to_resubmit_.push(std::move(task_to_retry)); + } + SetTaskStatus(*task_entry, rpc::TaskStatus::PENDING_ARGS_AVAIL, /* state_update */ std::nullopt, - /* include_task_info */ true, - task_entry->spec.AttemptNumber() + 1); - retry_task_callback_(task_entry->spec, object_recovery, delay_ms); + /* include_task_info */ true); + retry_task_callback_(spec); } void TaskManager::FailPendingTask(const TaskID &task_id, @@ -1585,5 +1597,17 @@ ObjectID TaskManager::TaskGeneratorId(const TaskID &task_id) const { return it->second.spec.ReturnId(0); } +std::vector TaskManager::PopTasksToRetry() { + absl::MutexLock lock(&mu_); + std::vector tasks_to_retry; + const auto current_time = current_time_ms(); + while (!to_resubmit_.empty() && current_time > to_resubmit_.top().execution_time_ms) { + tasks_to_retry.emplace_back(to_resubmit_.top().task_spec); + to_resubmit_.pop(); + } + + return tasks_to_retry; +} + } // namespace core } // namespace ray diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 3b4fc718e696..2c8ff5d66050 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -14,6 +14,8 @@ #pragma once +#include +#include #include #include #include @@ -48,8 +50,7 @@ class TaskResubmissionInterface { using TaskStatusCounter = CounterMap>; using PutInLocalPlasmaCallback = std::function; -using RetryTaskCallback = - std::function; +using RetryTaskCallback = std::function; using ReconstructObjectCallback = std::function; using PushErrorCallback = std::function right.execution_time_ms; + } +}; + class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface { public: TaskManager(CoreWorkerMemoryStore &in_memory_store, @@ -578,6 +602,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// Record OCL metrics. void RecordMetrics(); + /// Returns a vector of task specifications that are ready to be retried. + /// This function removes tasks from the to_resubmit_ queue whose execution time + /// has passed and returns them. + std::vector PopTasksToRetry(); + private: struct TaskEntry { TaskEntry(TaskSpecification spec_arg, @@ -865,6 +894,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// error). worker::TaskEventBuffer &task_event_buffer_; + /// Queue of tasks to retry, ordered by their execution time. + std::priority_queue, TaskToRetryDescComparator> + to_resubmit_ ABSL_GUARDED_BY(mu_); + friend class TaskManagerTest; }; From a12d6ddcca6021536b3f403179c3635f73045ec7 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Mon, 5 May 2025 03:59:37 +0000 Subject: [PATCH 2/5] refactor Signed-off-by: kaihsun --- src/ray/core_worker/task_manager.cc | 6 ++++ src/ray/core_worker/test/task_manager_test.cc | 33 +++++++------------ 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index dc35ac95e74c..37204867098d 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -681,6 +681,8 @@ bool TaskManager::HandleReportGeneratorItemReturns( // fails, we may receive a report from the first executor after the // second attempt has started. In this case, we should ignore the first // attempt. + RAY_LOG(INFO) + << "[debug][NotFound] Stale object reports from the previous attempt."; execution_signal_callback( Status::NotFound("Stale object reports from the previous attempt."), -1); return false; @@ -696,6 +698,7 @@ bool TaskManager::HandleReportGeneratorItemReturns( auto stream_it = object_ref_streams_.find(generator_id); if (stream_it == object_ref_streams_.end()) { // Stream has been already deleted. Do not handle it. + RAY_LOG(INFO) << "[debug][NotFound] Stream is already deleted."; execution_signal_callback(Status::NotFound("Stream is already deleted"), -1); return false; } @@ -729,6 +732,7 @@ bool TaskManager::HandleReportGeneratorItemReturns( auto total_consumed = stream_it->second.TotalNumObjectConsumed(); if (stream_it->second.IsObjectConsumed(item_index)) { + RAY_LOG(INFO) << "[debug][OK] Object is already consumed."; execution_signal_callback(Status::OK(), total_consumed); return false; } @@ -745,12 +749,14 @@ bool TaskManager::HandleReportGeneratorItemReturns( << ". threshold: " << backpressure_threshold; auto signal_it = ref_stream_execution_signal_callbacks_.find(generator_id); if (signal_it == ref_stream_execution_signal_callbacks_.end()) { + RAY_LOG(INFO) << "[debug][NotFound] Stream is deleted."; execution_signal_callback(Status::NotFound("Stream is deleted."), -1); } else { signal_it->second.push_back(execution_signal_callback); } } else { // No need to backpressure. + RAY_LOG(INFO) << "[debug][OK] No need to backpressure."; execution_signal_callback(Status::OK(), total_consumed); } return num_objects_written != 0; diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 0f485db8f857..5cb2563e18c1 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -72,12 +72,14 @@ rpc::ReportGeneratorItemReturnsRequest GetIntermediateTaskReturn( const ObjectID &generator_id, const ObjectID &dynamic_return_id, std::shared_ptr data, - bool set_in_plasma) { + bool set_in_plasma, + uint64_t attempt_number = 0) { rpc::ReportGeneratorItemReturnsRequest request; rpc::Address addr; request.mutable_worker_addr()->CopyFrom(addr); request.set_item_index(idx); request.set_generator_id(generator_id.Binary()); + request.set_attempt_number(attempt_number); auto dynamic_return_object = request.add_dynamic_return_objects(); dynamic_return_object->set_object_id(dynamic_return_id.Binary()); dynamic_return_object->set_data(data->Data(), data->Size()); @@ -137,10 +139,8 @@ class TaskManagerTest : public ::testing::Test { [this](const RayObject &object, const ObjectID &object_id) { stored_in_plasma.insert(object_id); }, - [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { + [this](TaskSpecification &spec) { num_retries_++; - last_delay_ms_ = delay_ms; - last_object_recovery_ = object_recovery; return Status::OK(); }, [](const JobID &job_id, @@ -190,7 +190,6 @@ class TaskManagerTest : public ::testing::Test { bool all_nodes_alive_ = true; TaskManager manager_; int num_retries_ = 0; - uint32_t last_delay_ms_ = 0; bool last_object_recovery_ = false; std::unordered_set stored_in_plasma; }; @@ -375,8 +374,6 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) { std::vector> results; ASSERT_FALSE(store_->Get({return_id}, 1, 0, ctx, false, &results).ok()); ASSERT_EQ(num_retries_, i + 1); - ASSERT_EQ(last_delay_ms_, RayConfig::instance().task_retry_delay_ms()); - ASSERT_EQ(last_object_recovery_, false); } manager_.FailOrRetryPendingTask(spec.TaskId(), error); @@ -481,15 +478,10 @@ TEST_F(TaskManagerTest, TestTaskOomAndNonOomKillReturnsLastError) { error = rpc::ErrorType::OUT_OF_MEMORY; manager_.FailOrRetryPendingTask(spec.TaskId(), error); ASSERT_EQ(num_retries_, 1); - ASSERT_EQ(last_delay_ms_, RayConfig::instance().task_oom_retry_delay_base_ms()); - ASSERT_EQ(last_object_recovery_, false); error = rpc::ErrorType::WORKER_DIED; manager_.FailOrRetryPendingTask(spec.TaskId(), error); ASSERT_EQ(num_retries_, 2); - ASSERT_EQ(last_delay_ms_, RayConfig::instance().task_retry_delay_ms()); - ASSERT_EQ(last_object_recovery_, false); - error = rpc::ErrorType::WORKER_DIED; manager_.FailOrRetryPendingTask(spec.TaskId(), error); ASSERT_EQ(num_retries_, 2); @@ -988,8 +980,6 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) { ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps)); ASSERT_EQ(resubmitted_task_deps, spec.GetDependencyIds()); ASSERT_EQ(num_retries_, 1); - ASSERT_EQ(last_delay_ms_, 0); - ASSERT_EQ(last_object_recovery_, true); resubmitted_task_deps.clear(); // The return ID goes out of scope. @@ -1051,8 +1041,6 @@ TEST_F(TaskManagerLineageTest, TestResubmittedTaskNondeterministicReturns) { std::vector resubmitted_task_deps; ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps)); ASSERT_EQ(num_retries_, 1); - ASSERT_EQ(last_delay_ms_, 0); - ASSERT_EQ(last_object_recovery_, true); // The re-executed task completes again. One of the return objects is now // returned directly. @@ -1116,8 +1104,6 @@ TEST_F(TaskManagerLineageTest, TestResubmittedTaskFails) { std::vector resubmitted_task_deps; ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps)); ASSERT_EQ(num_retries_, 1); - ASSERT_EQ(last_delay_ms_, 0); - ASSERT_EQ(last_object_recovery_, true); // The re-executed task fails due to worker crashed. { @@ -1237,8 +1223,6 @@ TEST_F(TaskManagerLineageTest, TestResubmittedDynamicReturnsTaskFails) { std::vector resubmitted_task_deps; ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps)); ASSERT_EQ(num_retries_, 1); - ASSERT_EQ(last_delay_ms_, 0); - ASSERT_EQ(last_object_recovery_, true); // Dereference the generator to a list of its internal ObjectRefs. for (const auto &dynamic_return_id : dynamic_return_ids) { @@ -2417,13 +2401,17 @@ TEST_F(TaskManagerTest, TestBackpressureAfterReconstruction) { // it is already replied and the second one should be backpressured. /// 1 generate, 0 consumed, 2 threshold -> should signal immediately. dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), 2); + // `FailOrRetryPendingTask` will retry the task and increase the attempt number + // of the task. Hence, the attempt number is set to 1 so that the object is not + // considered a stale object. req = GetIntermediateTaskReturn( /*idx*/ 0, /*finished*/ false, generator_id, /*dynamic_return_id*/ dynamic_return_id, /*data*/ data, - /*set_in_plasma*/ false); + /*set_in_plasma*/ false, + /*attempt_number*/ 1); bool retry_signal_called = false; ASSERT_FALSE(manager_.HandleReportGeneratorItemReturns( req, @@ -2444,7 +2432,8 @@ TEST_F(TaskManagerTest, TestBackpressureAfterReconstruction) { generator_id, /*dynamic_return_id*/ dynamic_return_id, /*data*/ data, - /*set_in_plasma*/ false); + /*set_in_plasma*/ false, + /*attempt_number*/ 1); retry_signal_called = false; ASSERT_FALSE(manager_.HandleReportGeneratorItemReturns( req, From 69e0dfcace5dd4b2fd4eac608454405d9a60f376 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Mon, 5 May 2025 04:06:50 +0000 Subject: [PATCH 3/5] refactor Signed-off-by: kaihsun --- src/ray/core_worker/task_manager.cc | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 37204867098d..dc35ac95e74c 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -681,8 +681,6 @@ bool TaskManager::HandleReportGeneratorItemReturns( // fails, we may receive a report from the first executor after the // second attempt has started. In this case, we should ignore the first // attempt. - RAY_LOG(INFO) - << "[debug][NotFound] Stale object reports from the previous attempt."; execution_signal_callback( Status::NotFound("Stale object reports from the previous attempt."), -1); return false; @@ -698,7 +696,6 @@ bool TaskManager::HandleReportGeneratorItemReturns( auto stream_it = object_ref_streams_.find(generator_id); if (stream_it == object_ref_streams_.end()) { // Stream has been already deleted. Do not handle it. - RAY_LOG(INFO) << "[debug][NotFound] Stream is already deleted."; execution_signal_callback(Status::NotFound("Stream is already deleted"), -1); return false; } @@ -732,7 +729,6 @@ bool TaskManager::HandleReportGeneratorItemReturns( auto total_consumed = stream_it->second.TotalNumObjectConsumed(); if (stream_it->second.IsObjectConsumed(item_index)) { - RAY_LOG(INFO) << "[debug][OK] Object is already consumed."; execution_signal_callback(Status::OK(), total_consumed); return false; } @@ -749,14 +745,12 @@ bool TaskManager::HandleReportGeneratorItemReturns( << ". threshold: " << backpressure_threshold; auto signal_it = ref_stream_execution_signal_callbacks_.find(generator_id); if (signal_it == ref_stream_execution_signal_callbacks_.end()) { - RAY_LOG(INFO) << "[debug][NotFound] Stream is deleted."; execution_signal_callback(Status::NotFound("Stream is deleted."), -1); } else { signal_it->second.push_back(execution_signal_callback); } } else { // No need to backpressure. - RAY_LOG(INFO) << "[debug][OK] No need to backpressure."; execution_signal_callback(Status::OK(), total_consumed); } return num_objects_written != 0; From 72ab5cad302b82d349c9dbf7e70e484fea739f58 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Mon, 5 May 2025 04:29:18 +0000 Subject: [PATCH 4/5] add unit tests Signed-off-by: kaihsun --- src/ray/core_worker/test/task_manager_test.cc | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 5cb2563e18c1..2988198987dd 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -179,6 +179,13 @@ class TaskManagerTest : public ::testing::Test { manager_.CompletePendingTask(spec.TaskId(), reply, caller_address, false); } + void AddTaskToResubmitQueue(int64_t execution_time_ms, const TaskSpecification &spec) { + TaskToRetry task_to_retry = {execution_time_ms, spec}; + manager_.to_resubmit_.push(std::move(task_to_retry)); + } + + size_t GetResubmitQueueSize() const { return manager_.to_resubmit_.size(); } + bool lineage_pinning_enabled_; rpc::Address addr_; std::shared_ptr publisher_; @@ -2454,6 +2461,20 @@ TEST_F(TaskManagerTest, TestBackpressureAfterReconstruction) { CompletePendingStreamingTask(spec, caller_address, 2); } +TEST_F(TaskManagerTest, TestPopTasksToRetry) { + auto spec1 = CreateTaskHelper(1, {}); + AddTaskToResubmitQueue(current_time_ms() - 100, spec1); + + auto spec2 = CreateTaskHelper(1, {}); + AddTaskToResubmitQueue(current_time_ms() + 100, spec2); + + auto tasks = manager_.PopTasksToRetry(); + ASSERT_EQ(tasks.size(), 1); + ASSERT_EQ(tasks[0].TaskId(), spec1.TaskId()); + + ASSERT_EQ(GetResubmitQueueSize(), 1); +} + } // namespace core } // namespace ray From 097fb787ebc5cd3a8dfadc099b55227d97d228e1 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Mon, 5 May 2025 08:52:12 +0000 Subject: [PATCH 5/5] fix tests Signed-off-by: kaihsun --- src/ray/core_worker/task_manager.cc | 10 ++++--- src/ray/core_worker/test/task_manager_test.cc | 26 +++++++++++-------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index dc35ac95e74c..f777806d4204 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -1041,6 +1041,11 @@ void TaskManager::RetryTask(TaskEntry *task_entry, auto &spec = task_entry->spec; spec.GetMutableMessage().set_attempt_number(spec.AttemptNumber() + 1); + SetTaskStatus(*task_entry, + rpc::TaskStatus::PENDING_ARGS_AVAIL, + /* state_update */ std::nullopt, + /* include_task_info */ true); + if (!object_recovery) { // Retry after a delay to emulate the existing Raylet reconstruction // behaviour. TODO(ekl) backoff exponentially. @@ -1049,12 +1054,9 @@ void TaskManager::RetryTask(TaskEntry *task_entry, absl::MutexLock lock(&mu_); TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec}; to_resubmit_.push(std::move(task_to_retry)); + return; } - SetTaskStatus(*task_entry, - rpc::TaskStatus::PENDING_ARGS_AVAIL, - /* state_update */ std::nullopt, - /* include_task_info */ true); retry_task_callback_(spec); } diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 2988198987dd..bac86e2cd690 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -371,16 +371,17 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) { WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id)); + bool will_retry = false; auto error = rpc::ErrorType::WORKER_DIED; for (int i = 0; i < num_retries; i++) { RAY_LOG(INFO) << "Retry " << i; - manager_.FailOrRetryPendingTask(spec.TaskId(), error); + will_retry = manager_.FailOrRetryPendingTask(spec.TaskId(), error); + ASSERT_TRUE(will_retry); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id)); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); std::vector> results; ASSERT_FALSE(store_->Get({return_id}, 1, 0, ctx, false, &results).ok()); - ASSERT_EQ(num_retries_, i + 1); } manager_.FailOrRetryPendingTask(spec.TaskId(), error); @@ -479,19 +480,20 @@ TEST_F(TaskManagerTest, TestTaskOomAndNonOomKillReturnsLastError) { manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); - ASSERT_EQ(num_retries_, 0); ray::rpc::ErrorType error; + bool will_retry = false; error = rpc::ErrorType::OUT_OF_MEMORY; - manager_.FailOrRetryPendingTask(spec.TaskId(), error); - ASSERT_EQ(num_retries_, 1); + will_retry = manager_.FailOrRetryPendingTask(spec.TaskId(), error); + ASSERT_TRUE(will_retry); error = rpc::ErrorType::WORKER_DIED; - manager_.FailOrRetryPendingTask(spec.TaskId(), error); - ASSERT_EQ(num_retries_, 2); + will_retry = manager_.FailOrRetryPendingTask(spec.TaskId(), error); + ASSERT_TRUE(will_retry); + error = rpc::ErrorType::WORKER_DIED; - manager_.FailOrRetryPendingTask(spec.TaskId(), error); - ASSERT_EQ(num_retries_, 2); + will_retry = manager_.FailOrRetryPendingTask(spec.TaskId(), error); + ASSERT_FALSE(will_retry); std::vector> results; WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); @@ -510,9 +512,11 @@ TEST_F(TaskManagerTest, TestTaskOomInfiniteRetry) { int num_retries = 1; manager_.AddPendingTask(caller_address, spec, "", num_retries); + bool will_retry = false; for (int i = 0; i < 10000; i++) { - ASSERT_EQ(num_retries_, i); - manager_.FailOrRetryPendingTask(spec.TaskId(), rpc::ErrorType::OUT_OF_MEMORY); + will_retry = + manager_.FailOrRetryPendingTask(spec.TaskId(), rpc::ErrorType::OUT_OF_MEMORY); + ASSERT_TRUE(will_retry); } manager_.MarkTaskCanceled(spec.TaskId());