Skip to content

Conversation

@machichima
Copy link
Contributor

@machichima machichima commented Nov 22, 2025

Description

Enable sync actors to detect task cancellation during execution via ray.get_runtime_context().is_canceled() API for graceful termination

Main changes:

Record task cancellation in executor:

  • core_worker.cc
    • Track canceled task with canceled_tasks_ and expose it to Python API through IsTaskCanceled()
  • runtime_context.py
    • Add is_canceled() API that calls CoreWorker::IsTaskCanceled() following path worker.py -> _raylet.pyx -> core_worker.cc

Raise cancellation error when using ray.get() (on submitter side)

  • actor_task_submitter.cc
    • For sync actor task that completed without error but marked as canceled (Get from TaskManager::IsTaskCanceled()), explicitly set TASK_CANCELLED error
  • task_manager.cc
    • Add IsTaskCanceled() to expose is_canceled_ flag

Related issues

Related to #58213

Additional information

Tested with following example:

import ray
import time

ray.init()


@ray.remote(max_concurrency=10)
class ThreadedActor:
    def __init__(self):
        self.counter = 0

    def long_running_task(self, duration=10):
        for i in range(duration):
            print("counter: ", self.counter)
            if ray.get_runtime_context().is_canceled():
                return "canceled"

            self.counter += 1
            time.sleep(0.1)

        return "completed"


if __name__ == "__main__":
    actor = ThreadedActor.remote()
    task_ref = actor.long_running_task.remote(duration=10)

    time.sleep(0.3)
    ray.cancel(task_ref)

    try:
        result = ray.get(task_ref)
        print(f"Result: {result}")
    except ray.exceptions.TaskCancelledError as e:
        print(f"Cancelled: {e}")

    ray.shutdown()

Result screenshot:

image

@machichima machichima force-pushed the 58213-cancel-sync-actor-from-cpp branch from 00c14b3 to 4064df3 Compare November 23, 2025 01:59
@machichima machichima force-pushed the 58213-cancel-sync-actor-from-cpp branch from 4064df3 to 80a610d Compare November 23, 2025 02:00
@machichima machichima marked this pull request as ready for review November 23, 2025 13:04
@machichima machichima requested a review from a team as a code owner November 23, 2025 13:04
@ray-gardener ray-gardener bot added usability core Issues that should be addressed in Ray Core community-contribution Contributed by the community labels Nov 23, 2025
Comment on lines +677 to +684
RAY_LOG(INFO) << "Task " << task_id << " completed but was cancelled, failing it";
rpc::RayErrorInfo error_info;
std::ostringstream error_message;
error_message << "Task: " << task_id.Hex() << " was cancelled.";
error_info.set_error_message(error_message.str());
error_info.set_error_type(rpc::ErrorType::TASK_CANCELLED);
task_manager_.FailPendingTask(
task_id, rpc::ErrorType::TASK_CANCELLED, nullptr, &error_info);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to make ray.get() also raising TaskCanceled error for canceled sync task here.

I don't think this the best way handling it. Please let me know if there's a better way or where can I look into 🙏

@machichima machichima changed the title [Feat] cancel sync actor by getting TaskManager is_canceled_ [Feat] cancel sync actor by checking is_canceled() Nov 23, 2025
@machichima
Copy link
Contributor Author

cc @edoakes @Sparks0219 PTAL. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community core Issues that should be addressed in Ray Core usability

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant