Skip to content

[core][refactor] Move to_resubmit_ from CoreWorker to TaskManager to avoid an abstraction leak #52779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

kevin85421
Copy link
Member

@kevin85421 kevin85421 commented May 5, 2025

Why are these changes needed?

Currently, CoreWorker has a member to_resubmit_ which is an abstraction leak for me. This PR moves to_resubmit_ from CoreWorker to TaskManager.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

kevin85421 added 5 commits May 5, 2025 02:16
Signed-off-by: kaihsun <[email protected]>
Signed-off-by: kaihsun <[email protected]>
Signed-off-by: kaihsun <[email protected]>
Signed-off-by: kaihsun <[email protected]>
Signed-off-by: kaihsun <[email protected]>
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
std::vector<std::shared_ptr<RayObject>> results;
ASSERT_FALSE(store_->Get({return_id}, 1, 0, ctx, false, &results).ok());
ASSERT_EQ(num_retries_, i + 1);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FailOrRetryPendingTask will not call the retry_task_execution callback because object_recovery is false. Hence, we can't use num_retries_ for testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this. Isn't the number of attempts incremented in RetryTask and not retry_task_execution?

@@ -136,7 +136,8 @@ void Metric::Record(double value, TagsType tags) {
opencensus::stats::Record({{*measure_, value}}, std::move(combined_tags));
}

void Metric::Record(double value, std::unordered_map<std::string_view, std::string> tags) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is updated by the C++ linter.

@@ -172,6 +173,29 @@ class ObjectRefStream {
int64_t total_num_object_consumed_{};
};

/// Represents a task that needs to be retried at a specific time.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy from core_worker.h

@@ -863,6 +901,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// error).
worker::TaskEventBuffer &task_event_buffer_;

/// Queue of tasks to retry, ordered by their execution time.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy from core_worker.h

void TaskManager::RetryTask(TaskSpecification &spec,
bool object_recovery,
uint32_t delay_ms) {
spec.GetMutableMessage().set_attempt_number(spec.AttemptNumber() + 1);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy from core_worker.cc (retry_task_callback)

@@ -697,6 +726,15 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
std::vector<ObjectID> *ids_to_release)
ABSL_LOCKS_EXCLUDED(mu_);

/// A wrapper of `retry_task_callback_`. This function should be the only
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new function

@@ -578,6 +602,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// Record OCL metrics.
void RecordMetrics();

/// Returns a vector of task specifications that are ready to be retried.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new function

Signed-off-by: kaihsun <[email protected]>
@kevin85421 kevin85421 added the go add ONLY when ready to merge, run all tests label May 15, 2025
@kevin85421 kevin85421 marked this pull request as ready for review May 15, 2025 23:36
@kevin85421
Copy link
Member Author

cc @jjyao This is something we discussed a while ago.

@kevin85421 kevin85421 changed the title [core] Move to_resubmit_ from CoreWorker to TaskManager to avoid an abstraction leak [core][refactor] Move to_resubmit_ from CoreWorker to TaskManager to avoid an abstraction leak May 16, 2025
@edoakes
Copy link
Collaborator

edoakes commented May 16, 2025

@israbbani can you review this pls? (not urgent)

@edoakes edoakes requested a review from a team May 16, 2025 21:19
@israbbani israbbani self-assigned this May 19, 2025
Copy link
Contributor

@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this in the code recently and it's definitely an abstraction leak. Really happy to see that you're fixing it!

@@ -863,6 +901,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// error).
worker::TaskEventBuffer &task_event_buffer_;

/// Queue of tasks to retry, ordered by their execution time.
std::priority_queue<TaskToRetry, std::deque<TaskToRetry>, TaskToRetryDescComparator>
to_resubmit_ ABSL_GUARDED_BY(mu_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're using resubmit and retry interchangeably. E.g. PopTasksToRetry from the queue to_rebsubmit_. I have no preference between them but lets use either Retry or Resubmit.

Comment on lines +185 to +191
/// Sorts TaskToRetry in descending order of the execution time.
/// Priority queue naturally sorts elements in descending order,
/// in order to have the tasks ordered by execution time in
/// ascending order we use a comparator that sorts elements in
/// descending order. Per docs "Priority queues are a type of container
/// adaptors, specifically designed such that its first element is always
/// the greatest of the elements it contains".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're refactoring, do you think we can make this comment more concise?

Comment on lines +1039 to +1040
// Retry after a delay to emulate the existing Raylet reconstruction
// behaviour. TODO(ekl) backoff exponentially.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO still relevant? Can we figure it out and either create a ticket/issue or remove it?

@@ -863,6 +901,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// error).
worker::TaskEventBuffer &task_event_buffer_;

/// Queue of tasks to retry, ordered by their execution time.
std::priority_queue<TaskToRetry, std::deque<TaskToRetry>, TaskToRetryDescComparator>
to_resubmit_ ABSL_GUARDED_BY(mu_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a separate mutex to guard this priority_queue since it's updated independently of any other state in task_manager?

absl::MutexLock lock(&mutex_);
TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec};
to_resubmit_.push(std::move(task_to_retry));
[this](TaskSpecification &spec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not from your PR, but from before. Binding this to the scope of a lambda is also an abstraction leak too. Can we try to clean this up?

/// Represents a task that needs to be retried at a specific time.
struct TaskToRetry {
/// Time when the task should be retried.
int64_t execution_time_ms{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the comment and change the name of the variable to something that describes its purpose more accurately.

Comment on lines +182 to +192
void AddTaskToResubmitQueue(int64_t execution_time_ms, const TaskSpecification &spec) {
absl::MutexLock lock(&manager_.mu_);
TaskToRetry task_to_retry = {execution_time_ms, spec};
manager_.to_resubmit_.push(std::move(task_to_retry));
}

size_t GetResubmitQueueSize() const {
absl::MutexLock lock(&manager_.mu_);
return manager_.to_resubmit_.size();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's already a pattern that we use in the codebase, but we should not write tests against private variables and functions of a class.

Instead, use the Public API of the class for testing. See discussion here for more context. Can you use RetryTask to write the test?

Comment on lines +2473 to +2484
auto spec1 = CreateTaskHelper(1, {});
AddTaskToResubmitQueue(current_time_ms() - 100, spec1);

auto spec2 = CreateTaskHelper(1, {});
AddTaskToResubmitQueue(current_time_ms() + 100, spec2);

auto tasks = manager_.PopTasksToRetry();
ASSERT_EQ(tasks.size(), 1);
ASSERT_EQ(tasks[0].TaskId(), spec1.TaskId());

ASSERT_EQ(GetResubmitQueueSize(), 1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test relies on timing and is going to be flaky.

If your testing process gets descheduled by the OS scheduler for more than 100ms, then PopTasksToRetry() can return both tasks. This is going to be non-trivial to mock, I'd suggest using a much larger time when you submit spec2.

absl::MutexLock lock(&mutex_);
TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec};
to_resubmit_.push(std::move(task_to_retry));
[this](TaskSpecification &spec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be renamed to object_reconstruction_callback instead since it's only called on a task retry where object_reconstruction is true?

ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
std::vector<std::shared_ptr<RayObject>> results;
ASSERT_FALSE(store_->Get({return_id}, 1, 0, ctx, false, &results).ok());
ASSERT_EQ(num_retries_, i + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this. Isn't the number of attempts incremented in RetryTask and not retry_task_execution?

Copy link

github-actions bot commented Jun 9, 2025

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jun 9, 2025
@kevin85421
Copy link
Member Author

not stale

@github-actions github-actions bot removed the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jun 10, 2025
Copy link

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jun 25, 2025
Copy link

github-actions bot commented Jul 9, 2025

This pull request has been automatically closed because there has been no more activity in the 14 days
since being marked stale.

Please feel free to reopen or open a new pull request if you'd still like this to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for your contribution!

@github-actions github-actions bot closed this Jul 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests stale The issue is stale. It will be closed within 7 days unless there are further conversation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants