Skip to content

[core] Reset next_task_reply_position when the actor restarts to avoid submitting the same task twice #52759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

kevin85421
Copy link
Member

@kevin85421 kevin85421 commented May 2, 2025

Why are these changes needed?

Currently, we don't update the next_task_reply_position correctly. Hence, some tasks may belong to both out_of_order_completed_tasks and requests.

That is, when a actor restarts and calls ConnectActor, some tasks may be submitted by both ResendOutOfOrderCompletedTasks (out_of_order_completed_tasks) and SendPendingTasks (requests).

ResendOutOfOrderCompletedTasks(actor_id);
SendPendingTasks(actor_id);

However, the Ray task status transitions to a different state during the first submission (ResendOutOfOrderCompletedTasks), so the second task submission (i.e., SendPendingTasks) fails on the following line.

RAY_CHECK(it->second.GetStatus() == rpc::TaskStatus::PENDING_NODE_ASSIGNMENT)

Example

Currently, we don't update the next_task_reply_position correctly. Hence, some tasks may belong to both out_of_order_completed_tasks and requests.

Add more details for the above statement. For example,

  • Driver sends 100 tasks (seq_no: 0-99) to Actor A.
  • 4 tasks finish and the driver receives the responses of seq_no (0 ~ 3). The driver calls SequentialActorSubmitQueue::MarkSeqnoCompleted 4 times to update next_task_reply_position to 4.
  • Kill the worker Pod
  • Reconstruct the objects by emplacing the first four tasks into the submission queue. Additionally, actor restarts also trigger the emplacement of the remaining 96 tasks. (seq_no: 100 ~ 199)
  • 4 tasks finish and the driver receives the responses of seq_no (100 ~ 103). However, because next_task_reply_position is 4, these 4 tasks are put into out_of_order_completed_tasks.
  • Kill the worker Pod
  • Reconstruct the objects by emplacing the first four tasks into the submission queue. Additionally, actor restarts also trigger the emplacement of the remaining 96 tasks. (seq_no: 200 ~ 299). At the same time, the tasks (100 - 103) in the out_of_order_completed_tasks have already been updated to seq_no (200 - 203).
  • Actor restarts and calls ConnectActor
    • tasks (200 - 203) are in both out_of_order_completed_tasks and requests.

Reproduction

Run the test without this PR

pytest -vs test_actor_lineage_reconstruction.py::test_mix_actor_restart_task_retry_and_lineage_reconstruction
Screenshot 2025-05-03 at 1 30 52 AM

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Kai-Hsun Chen <[email protected]>
Signed-off-by: Kai-Hsun Chen <[email protected]>
Signed-off-by: Kai-Hsun Chen <[email protected]>
Signed-off-by: Kai-Hsun Chen <[email protected]>
@kevin85421 kevin85421 added the go add ONLY when ready to merge, run all tests label May 3, 2025
@kevin85421
Copy link
Member Author

Some changes in this PR revert those introduced in #52249.

@edoakes
Copy link
Collaborator

edoakes commented May 5, 2025

didn't read closely yet, but cpp unit tests?

void SequentialActorSubmitQueue::OnClientConnected() {
auto head = requests.begin();
if (head != requests.end()) {
RAY_CHECK(head->first >= next_task_reply_position);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline: we might be able to completely remove next_task_reply_position and out_of_order_completed_tasks

Comment on lines +61 to +63
/// Called when client is connected/reconnected. Resets `next_task_reply_position` to
/// the smallest sequence number of the tasks in `requests`.
void OnClientConnected() override;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this API need to be thread safe? Or is all concurrency control intended to be handled by the caller?


iter = 10
for i in range(iter):
time.sleep(2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please try to write the test in a way that doesn't rely on time.sleep here or inside of f. if you need help with how to do this, lmk

@kevin85421
Copy link
Member Author

@edoakes I discuss with @jjyao offline. We plan to remove next_task_reply_position and out_of_order_completed_tasks completely instead. I will let you know if this PR needs review.

@kevin85421
Copy link
Member Author

Use #52833 instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-backlog go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants