Open
Description
I ran into a case today where repeatedly restarting my cluster ends up deadlocking (or at least taking long enough that things timeout). My actual use case is I'm running regularly scheduled Prefect tasks which, in part, restart a LocalCluster
(this is to avoid buildup from a memory leak in another library).
Here's a reproducer:
from distributed import Client
c = Client()
for i in range(20):
print(f"{i = }")
c.restart()
which consistently raises the following error for me locally:
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
Cell In[3], line 3
1 for i in range(20):
2 print(f"{i = }")
----> 3 c.restart()
File ~/projects/dask/distributed/distributed/client.py:3648, in Client.restart(self, timeout, wait_for_workers)
3618 def restart(self, timeout=no_default, wait_for_workers=True):
3619 """
3620 Restart all workers. Reset local state. Optionally wait for workers to return.
3621
(...)
3646 Client.restart_workers
3647 """
-> 3648 return self.sync(
3649 self._restart, timeout=timeout, wait_for_workers=wait_for_workers
3650 )
File ~/projects/dask/distributed/distributed/utils.py:358, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
356 return future
357 else:
--> 358 return sync(
359 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
360 )
File ~/projects/dask/distributed/distributed/utils.py:434, in sync(loop, func, callback_timeout, *args, **kwargs)
431 wait(10)
433 if error is not None:
--> 434 raise error
435 else:
436 return result
File ~/projects/dask/distributed/distributed/utils.py:408, in sync.<locals>.f()
406 awaitable = wait_for(awaitable, timeout)
407 future = asyncio.ensure_future(awaitable)
--> 408 result = yield future
409 except Exception as exception:
410 error = exception
File ~/mambaforge/envs/distributed/lib/python3.11/site-packages/tornado/gen.py:767, in Runner.run(self)
765 try:
766 try:
--> 767 value = future.result()
768 except Exception as e:
769 # Save the exception for later. It's important that
770 # gen.throw() not be called inside this try/except block
771 # because that makes sys.exc_info behave unexpectedly.
772 exc: Optional[Exception] = e
File ~/projects/dask/distributed/distributed/client.py:3615, in Client._restart(self, timeout, wait_for_workers)
3612 if timeout is not None:
3613 timeout = parse_timedelta(timeout, "s")
-> 3615 await self.scheduler.restart(timeout=timeout, wait_for_workers=wait_for_workers)
3616 return self
File ~/projects/dask/distributed/distributed/core.py:1395, in PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc(**kwargs)
1393 prev_name, comm.name = comm.name, "ConnectionPool." + key
1394 try:
-> 1395 return await send_recv(comm=comm, op=key, **kwargs)
1396 finally:
1397 self.pool.reuse(self.addr, comm)
File ~/projects/dask/distributed/distributed/core.py:1179, in send_recv(comm, reply, serializers, deserializers, **kwargs)
1177 _, exc, tb = clean_exception(**response)
1178 assert exc
-> 1179 raise exc.with_traceback(tb)
1180 else:
1181 raise Exception(response["exception_text"])
File ~/projects/dask/distributed/distributed/core.py:970, in _handle_comm()
968 result = handler(**msg)
969 if inspect.iscoroutine(result):
--> 970 result = await result
971 elif inspect.isawaitable(result):
972 raise RuntimeError(
973 f"Comm handler returned unknown awaitable. Expected coroutine, instead got {type(result)}"
974 )
File ~/projects/dask/distributed/distributed/utils.py:832, in wrapper()
830 async def wrapper(*args, **kwargs):
831 with self:
--> 832 return await func(*args, **kwargs)
File ~/projects/dask/distributed/distributed/scheduler.py:6292, in restart()
6284 if (n_nanny := len(nanny_workers)) < n_workers:
6285 msg += (
6286 f" The {n_workers - n_nanny} worker(s) not using Nannies were just shut "
6287 "down instead of restarted (restart is only possible with Nannies). If "
(...)
6290 "will always time out. Do not use `Client.restart` in that case."
6291 )
-> 6292 raise TimeoutError(msg) from None
6293 logger.info("Restarting finished.")
TimeoutError: Waited for 4 worker(s) to reconnect after restarting, but after 120s, only 0 have returned. Consider a longer timeout, or `wait_for_workers=False`.
A few additional things to note:
- Things don't hang consistently on the same
for
-loop iteration in the reproducer. But things do consistently hang within 20 iterations (at least for me locally). - I don't see the same behavior when using a
coiled.Cluster
(so far I've only used aLocalCluster
and acoiled.Cluster
) - I tried specifying a larger
timeout="4 minutes"
, which also eventually timed out