Description
Describe the issue:
When using SSHCluster eventually my cluster deadlocks (and dashboard stops responding).
Scheduler stack shows it's stuck reporting an asyncio unhandled task exception. My real-world cluster will deadlock pretty consistently after an hour or so of normal task execution. I'm not using run_on_scheduler
in the real world, it's just a convenient way to trigger the issue quickly.
The root of the problem seems to be that both distributed.deploy.ssh.Scheduler
and distributed.deploy.ssh.Worker
stop polling the stdout/stderr pipes shortly after startup, which eventually causes the remote processes pipes to fill up and block if anything in the process writes to them.
Minimal Complete Verifiable Example:
from dask.distributed import (
SSHCluster,
worker_client,
fire_and_forget,
)
async def _raise():
raise RuntimeError('broken')
def _thing(n):
with worker_client() as client:
fire_and_forget(client.run_on_scheduler(_raise, wait=False))
fire_and_forget(client.submit(_thing, n=n+1))
def main():
cluster = SSHCluster(
['localhost', 'localhost'],
connect_options=dict(known_hosts=None),
worker_options=dict(nthreads=1, n_workers=1),
scheduler_options=dict(dashboard=True),
)
with cluster.get_client() as client:
client.upload_file(__file__)
client.submit(_thing, n=1).result()
input('waiting')
if __name__ == '__main__':
main()
(run with python -c 'import deadlock; deadlock.main()'
)
When left running this example will eventually deadlock once the scheduler stderr pipe buffer fills.
Below is the py-spy stack trace:
Thread 4175110 (idle): "MainThread"
emit (logging/__init__.py:1113)
handle (logging/__init__.py:978)
callHandlers (logging/__init__.py:1714)
handle (logging/__init__.py:1644)
_log (logging/__init__.py:1634)
error (logging/__init__.py:1518)
default_exception_handler (asyncio/base_events.py:1785)
call_exception_handler (asyncio/base_events.py:1811)
_run_once (asyncio/base_events.py:1937)
run_forever (asyncio/base_events.py:608)
run_until_complete (asyncio/base_events.py:641)
run (asyncio/runners.py:118)
asyncio_run (distributed/compatibility.py:204)
main (distributed/cli/dask_spec.py:63)
invoke (click/core.py:788)
invoke (click/core.py:1443)
main (click/core.py:1082)
__call__ (click/core.py:1161)
<module> (distributed/cli/dask_spec.py:67)
_run_code (<frozen runpy>:88)
_run_module_as_main (<frozen runpy>:198)
And corresponding stack from the kernel side showing we're blocked in a pipe:
[<0>] pipe_wait+0x6f/0xc0
[<0>] pipe_write+0x17b/0x470
[<0>] new_sync_write+0x125/0x1c0
[<0>] __vfs_write+0x29/0x40
[<0>] vfs_write+0xb9/0x1a0
[<0>] ksys_write+0x67/0xe0
[<0>] __x64_sys_write+0x1a/0x20
[<0>] do_syscall_64+0x57/0x190
[<0>] entry_SYSCALL_64_after_hwframe+0x44/0xa9
Anything else we need to know?:
Environment:
- Dask version: 2024.2.1
- Python version: 3.11
- Operating System: Ubuntu 20.04
- Install method (conda, pip, source): conda