Skip to content

Commit fa8c07d

Browse files
robertnishiharapcmoritz
authored andcommitted
Sleep for half a second at exit in order to avoid losing log messages… (#4254)
1 parent 30bf8e4 commit fa8c07d

File tree

2 files changed

+49
-2
lines changed

2 files changed

+49
-2
lines changed

python/ray/tests/test_multi_node.py

+35
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,41 @@ def f():
183183
assert "success" in out
184184

185185

186+
def test_receive_late_worker_logs():
187+
# Make sure that log messages from tasks appear in the stdout even if the
188+
# script exits quickly.
189+
log_message = "some helpful debugging message"
190+
191+
# Define a driver that creates a task that prints something, ensures that
192+
# the task runs, and then exits.
193+
driver_script = """
194+
import ray
195+
import random
196+
import time
197+
198+
log_message = "{}"
199+
200+
@ray.remote
201+
class Actor(object):
202+
def log(self):
203+
print(log_message)
204+
205+
@ray.remote
206+
def f():
207+
print(log_message)
208+
209+
ray.init(num_cpus=2)
210+
211+
a = Actor.remote()
212+
ray.get([a.log.remote(), f.remote()])
213+
ray.get([a.log.remote(), f.remote()])
214+
""".format(log_message)
215+
216+
for _ in range(2):
217+
out = run_string_as_driver(driver_script)
218+
assert out.count(log_message) == 4
219+
220+
186221
@pytest.fixture
187222
def ray_start_head_with_resources():
188223
out = run_and_get_output(

python/ray/worker.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -1541,7 +1541,7 @@ def init(redis_address=None,
15411541
_post_init_hooks = []
15421542

15431543

1544-
def shutdown():
1544+
def shutdown(exiting_interpreter=False):
15451545
"""Disconnect the worker, and terminate processes started by ray.init().
15461546
15471547
This will automatically run at the end when a Python process that uses Ray
@@ -1553,7 +1553,17 @@ def shutdown():
15531553
defined remote functions or actors after calling ray.shutdown(), then you
15541554
need to redefine them. If they were defined in an imported module, then you
15551555
will need to reload the module.
1556+
1557+
Args:
1558+
exiting_interpreter (bool): True if this is called by the atexit hook
1559+
and false otherwise. If we are exiting the interpreter, we will
1560+
wait a little while to print any extra error messages.
15561561
"""
1562+
if exiting_interpreter and global_worker.mode == SCRIPT_MODE:
1563+
# This is a duration to sleep before shutting down everything in order
1564+
# to make sure that log messages finish printing.
1565+
time.sleep(0.5)
1566+
15571567
disconnect()
15581568

15591569
# Shut down the Ray processes.
@@ -1565,7 +1575,7 @@ def shutdown():
15651575
global_worker.set_mode(None)
15661576

15671577

1568-
atexit.register(shutdown)
1578+
atexit.register(shutdown, True)
15691579

15701580
# Define a custom excepthook so that if the driver exits with an exception, we
15711581
# can push that exception to Redis.
@@ -1670,6 +1680,8 @@ def print_error_messages_raylet(task_error_queue, threads_stopped):
16701680
# messages originating from the worker.
16711681
while t + UNCAUGHT_ERROR_GRACE_PERIOD > time.time():
16721682
threads_stopped.wait(timeout=1)
1683+
if threads_stopped.is_set():
1684+
break
16731685
if t < last_task_error_raise_time + UNCAUGHT_ERROR_GRACE_PERIOD:
16741686
logger.debug("Suppressing error from worker: {}".format(error))
16751687
else:

0 commit comments

Comments
 (0)