Skip to content

feat: detect Ray measurement tasks that never start, and add remote runtime setup options#998

Open
michael-johnston wants to merge 20 commits into
mainfrom
maj_ray_supervisor
Open

feat: detect Ray measurement tasks that never start, and add remote runtime setup options#998
michael-johnston wants to merge 20 commits into
mainfrom
maj_ray_supervisor

Conversation

@michael-johnston
Copy link
Copy Markdown
Member

@michael-johnston michael-johnston commented Jun 3, 2026

In ado distributed Ray operations hang when ray experiment workers do not post to the update queue or if posts to the queue are "lost". This can happen:

  1. when measurement tasks are submitted but never actually start on a worker
  2. when an unhandled exception is raised from a ray experiment worker
  3. when an experiment worker is killed/ or a worker node is lost
  4. There are network issues/gcs store issues that mean the sent request object is "lost"

This PR adds a generic ExperimentExecutorSupervisor which monitors ray tasks that execute experiments (via their ray state), and can handle many of these issues. The CustomExperiments actuator is updated to use this. Other actuators can optionally avail of it

The PR also exposes some ray runtime env options that can impact if these errors occur

Executor supervisor

  • Watches if submitted Ray tasks are marked as FAILED by Ray
    • For these it waits taskFailedGracePeriod seconds (default 10mins) in-order for the refs from the failed task to become available (if any).
      • If the ref is then available it tries to retrieve it (function exited, uncaught exception)
        - Success: it does nothing as it means the FAILED was after task exit
        - Exception: Uncaught exception. It send InvalidMeasurementResult
      • If the ref is not available (task never started, ref got lost to do some ray error) it cancels task and sends InvalidMeasurementResult
  • Watches if submitted Ray tasks are not in a pending/running/fail state within a configurable time limit taskRunningTimeoutSeconds - default is 15mins
    - Once a task is seen running this timeout no longer is applied. Subsequent issues are handled by FAILED path
  • (OPTIONAL) Watches if submitted RayRay tasks are still pending after a configurable time-limit taskPendingResourceTimeoutSeconds
    - Turn this on if an autoscaling cluster is expected to handle the load

Edge Cases Handled

  • It's possible the executor could send the result but then be marked by ray as FAILED AND the ref never become available e.g. node loss, network issues.
    - In this case without additional logic it would be possible for two results to be recorded for the entity
    - To prevent this the supervisor provides a call-back - tasks can notify it that they have sent the result, so it does not put a duplicate if it later sees the task as FAILED and can't retrieve its ref

Edge Cases Not Handled

  • If in previous scenario the result sent by the task never reaches the DiscoverySpaceManager due to ray errors that FAILED it, this result is lost and the operation will hang
  • If some issue kills a task but also stops ray marking it as FAILED, this will not be handled and the operation will hang
  • If there is some scenario where tasks are completing successfully but the error lies in transmitting the results to DiscoverySpaceManager (network error, GCS error?) this is not handled.

Summary of Ray States Handling

Task are considered dead after taskRunningTimeoutSeconds is exceeded for these states

State at timeout Verdict Reasoning
NIL Definitively stuck Task has not been accepted by the scheduler at all after 900s. Nothing normal keeps a task in NIL this long.
PENDING_ARGS_AVAIL Likely stuck Input object refs are not available. For an executor task submitted with live refs this should be milliseconds. Persisting to timeout implies upstream has failed silently.
PENDING_ARGS_FETCH / GETTING_AND_PINNING_ARGS Ambiguous Brief transfer states. Should be sub-second under normal conditions; persisting to 900s suggests a network or object-store issue. Marginal but still a bad sign.
SUBMITTED_TO_WORKER Ambiguous but suspicious The task has been accepted by a worker process but not yet started. Normally sub-second. At 900s this implies worker process is hung/overloaded, but the task has been picked up.
OTHER / lookup failure Unknown Cannot determine state; fire the running-timeout anyway as a safety net.

if taskPendingResourceTimeoutSeconds is set, tasks are considered dead if they are still in either of these states at the timeout. This is for use with autoscaling cluster, which is expected to scale to the required load.

State at timeout Verdict Reasoning
PENDING_NODE_ASSIGNMENT Likely stuck No node with matching resources has been assigned. The autoscaler has had taskPendingResourceTimeoutSeconds to provision one. Likely a a permanent resource mismatch.
PENDING_OBJ_STORE_MEM_AVAIL Likely stuck Plasma memory pressure. Might eventually resolve if other tasks free memory, but state persisting for taskPendingResourceTimeoutSeconds is considered an issue

Tasks considered dead when taskFailedGracePeriod is exceeded

State at timeout Verdict Reasoning
FAILED Stuck if also no ref after timeout AND task did not inform of completion If ray has marked as FAILED but no ref has appeared after 10mins AND the task did not inform it has sent results we can assume no result will be sent and we need to act

Remote execution

  • Adds optional runtimeEnv settings to remote execution context files:
    • setupTimeoutSeconds — how long Ray may spend setting up the job environment
    • eagerInstall — whether to install the environment when the job starts or wait until the first task runs
  • Documents these options in the remote run guide.

Tests

  • Unit tests for helper logic and configuration mapping.
  • Integration tests against a live Ray cluster for timeout, running-task, completion, and duplicate-result behaviour.

If you are expecting many jobs to start simultaneously with autoscaling it can be useful to changes these from the ray defaults.
If you are expecting many jobs to start simultaneously with autoscaling it can be useful to changes these from the ray defaults.
The most common reason for operation hangs with distributed ray is ray tasks never starting. Current actuators "fire-and-forget" so there is no code for checking if a task ever started.

LaunchSupervisor provides a generic way to monitor if launched ray tasks start and handle if they don't.
Notify the supervisor immediately after queue.put and ensure only timeout tasks that never reached RUNNING.
When manager catches an exception when monitoring measurement queue it informs subscribers.

Prior to this change it passed the exception - however this may contain non-ray serializable components causing a second exception crashing the manager.

This change fixes the problem
@michael-johnston michael-johnston marked this pull request as ready for review June 4, 2026 15:28
@AlessandroPomponio
Copy link
Copy Markdown
Member

@michael-johnston tests are failing

With pytest-xdist the pytest workers for ray supervision tests each call ray.init() in a session fixture, so parallel workers each start a separate local Ray cluster on the same machine.

The tests then query Ray’s State API to observe task scheduling state; under parallel cluster startup that API is often slow or temporarily unavailable (ConnectionError, empty results).
The supervisor under test then sees OTHER instead of real states, causing false timeouts and missed pending-resource failures. This then causes tests to fail as the required behaviour is not observed.

To avoid this this commit creates a xdist_group to keep these tests on one worker (one Ray cluster, serial execution).

tox must then use --dist loadgroup, because worksteal ignores xdist_group and the tests remain parallel.
The following problems were observed in tests in CI

* Ray State API becomes unreliable after many tests on the same xdist worker — list_tasks returns empty results or ServerUnavailable, so tasks look like OTHER instead of their real state.
* list_tasks retries were too short (3 quick attempts) for CI-level API lag, so lookups failed before the State API recovered.
* FAILED state fell through to launch timeout when grace hadn’t elapsed yet, causing false “did not start within Xs” failures on healthy or completed tasks.
* Resource timeout was blocked by seen_running — a brief false RUNNING report could prevent pending-resource failures from ever firing.
* Launch timeout didn’t re-check mark_completed in _check_pending, leaving a race where a duplicate failure could still be emitted.

This commit solves them as follows:

* Ray State API becomes unreliable and list_tasks retries too short — fixed by more list_tasks retries (8), longer backoff, and extra delay when error is ServerUnavailable.
* FAILED fell through to launch timeout — fixed by always returning after FAILED (only emit failure once grace has elapsed).
* Resource timeout blocked by seen_running — fixed by removing not pending.seen_running from resource-wait and OTHER resource-timeout checks.
* Launch timeout race with mark_completed — fixed by re-checking completed_request_ids in _check_pending before emitting launch timeout.
Issues:

* Ray State API becomes unreliable after many tests on the same xdist worker — list_tasks returns empty results or ServerUnavailable, so tasks look like OTHER instead of their real state.
* State lookup integration test was too brittle — hard-failed after 5s if PENDING_NODE_ASSIGNMENT never appeared, even when the API was just temporarily unavailable under parallel load.

Fixes:

* Ray State API becomes unreliable (state-lookup test) — fixed by polling up to 15s, only stopping on PENDING_NODE_ASSIGNMENT, and pytest.skip when the API stays unavailable under parallel load.
* State lookup integration test too brittle — same changes as above (longer window + skip instead of hard fail).
Modules that call configure_logging() at module level can be lazily imported
inside a typer CliRunner.invoke() call, at which point sys.stderr is the
runner's capture buffer. The resulting root logger handler then points to a
closed stream after the invoke returns, causing "ValueError: I/O operation on
closed file" to appear in subsequent invocations' result.output.

A _logging_configured flag ensures the full handler setup runs only once;
subsequent calls merely update the log level. Ray workers are unaffected as
each starts with a fresh interpreter.

Exposed by the --dist worksteal → --dist loadgroup change in tox.ini.
@michael-johnston
Copy link
Copy Markdown
Member Author

Everything passing now @AlessandroPomponio

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants