Skip to content

[core] Deflake sigint cgraph test #52623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions python/ray/dag/tests/experimental/test_dag_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from ray._private.test_utils import (
run_string_as_driver_nonblocking,
wait_for_pid_to_exit,
SignalActor,
)
import signal
import psutil

from ray.dag.tests.experimental.actor_defs import Actor

Expand Down Expand Up @@ -315,14 +315,25 @@ def test_buffered_get_timeout(ray_start_regular):


def test_get_with_zero_timeout(ray_start_regular):
a = Actor.remote(0)
@ray.remote
class Actor:
def __init__(self, signal_actor):
self.signal_actor = signal_actor

def send(self, x):
self.signal_actor.send.remote()
return x

signal_actor = SignalActor.remote()
a = Actor.remote(signal_actor)
with InputNode() as inp:
dag = a.inc.bind(inp)
dag = a.send.bind(inp)

compiled_dag = dag.experimental_compile()
ref = compiled_dag.execute(1)
# Give enough time for DAG execution result to be ready
time.sleep(2)
ray.get(signal_actor.wait.remote())
time.sleep(0.1)
Comment on lines +335 to +336
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this still looks inherently flaky. do we actually need to have an explicit integration test for the timeout=0 case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only test in regular core or cgraphs that we test for success with a 0 timeout. It is potentially behavior that could break, but I don't think we guarantee it anywhere. But it seems like something we should guarantee?

I do think this should never be flaky even without the 0.1 sleep because the time for line 325 to finish should never be more than the time it takes for:

  • the remote signal actor function to schedule
  • execute and flick the asyncio event
  • the result to actually get to the ray.get at 335
  • the 0.1 second sleep

Copy link
Contributor Author

@dayshah dayshah Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also a valid test case we could add for regular core to make sure we don't break this behavior that may be depended on. Way simpler for regular core test though because we can reuse the same actor.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm agree that testing to maintain the guarantee makes sense.

If I were to write this test for Ray Core, I'd write it as: ray.wait for obj to be ready, then assert ray.get(timeout=0) returns OK. That is likely the pattern that users would follow if they're using timeout=0.

Given we don't support ray.wait in cgraph (unless that has been added), I'm OK with leaving the test as-is. But please monitor and make sure it doesn't become flaky :)

# Use timeout=0 to either get result immediately or raise an exception
result = ray.get(ref, timeout=0)
assert result == 1
Expand Down Expand Up @@ -840,7 +851,6 @@ def add(self, a, b):
dag.experimental_compile()


@pytest.mark.skipif(sys.platform == "win32", reason="Sigint not supported on Windows")
def test_sigint_get_dagref(ray_start_cluster):
driver_script = """
import ray
Expand All @@ -852,27 +862,24 @@ def test_sigint_get_dagref(ray_start_cluster):
@ray.remote
class Actor:
def sleep(self, x):
while(True):
time.sleep(x)
time.sleep(x)

a = Actor.remote()
with InputNode() as inp:
dag = a.sleep.bind(inp)
compiled_dag = dag.experimental_compile()
ref = compiled_dag.execute(1)
ray.get(ref, timeout=100)
ref = compiled_dag.execute(100)
print("executing", flush=True)
ray.get(ref)
"""
driver_proc = run_string_as_driver_nonblocking(
driver_script, env={"RAY_CGRAPH_teardown_timeout": "5"}
driver_script, env={"RAY_CGRAPH_teardown_timeout": "0"}
)
pid = driver_proc.pid
# wait for graph execution to start
time.sleep(5)
proc = psutil.Process(pid)
assert proc.status() == psutil.STATUS_RUNNING
os.kill(pid, signal.SIGINT) # ctrl+c
# teardown will kill actors after 5 second timeout
wait_for_pid_to_exit(pid, 10)
assert driver_proc.stdout.readline() == b"executing\n"
driver_proc.send_signal(signal.SIGINT) # ctrl+c
# teardown will kill actors after timeout
wait_for_pid_to_exit(driver_proc.pid, 10)


if __name__ == "__main__":
Expand Down