Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ class does handle all of the logic around asynchronously cleanly setting up
Whether or not we should silence logging when setting up the cluster.
name: str, optional
A name to use when printing out the cluster, defaults to type name
shutdown_on_close: bool
Whether or not to close the cluster when the program exits
shutdown_scheduler: bool
Whether or not to shut down the scheduler when the cluster is closed

Examples
--------
Expand Down Expand Up @@ -247,6 +251,7 @@ def __init__(
name=None,
shutdown_on_close=True,
scheduler_sync_interval=1,
shutdown_scheduler=True,
):
if loop is None and asynchronous:
loop = IOLoop.current()
Expand All @@ -271,6 +276,7 @@ def __init__(
self._correct_state_waiting = None
self._name = name or type(self).__name__
self.shutdown_on_close = shutdown_on_close
self.shutdown_scheduler = shutdown_scheduler

super().__init__(
asynchronous=asynchronous,
Expand Down Expand Up @@ -450,13 +456,14 @@ async def _close(self):

if self.scheduler_comm:
async with self._lock:
with suppress(OSError):
await self.scheduler_comm.terminate()
if self.shutdown_scheduler:
with suppress(OSError):
await self.scheduler_comm.terminate()
await self.scheduler_comm.close_rpc()
else:
logger.warning("Cluster closed without starting up")

if self.scheduler:
if self.scheduler and self.shutdown_scheduler:
await self.scheduler.close()
for w in self._created:
assert w.status in {
Expand Down
25 changes: 25 additions & 0 deletions distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,3 +519,28 @@ async def test_bad_close():
await cluster.close()

assert not record


@gen_test()
async def test_shutdown_scheduler_disabled():
async with SpecCluster(
workers=worker_spec,
scheduler=scheduler,
asynchronous=True,
shutdown_scheduler=False,
) as cluster:
s = cluster.scheduler
assert isinstance(s, Scheduler)

assert s.status == Status.running


@gen_test()
async def test_shutdown_scheduler():
async with SpecCluster(
workers=worker_spec, scheduler=scheduler, asynchronous=True
) as cluster:
s = cluster.scheduler
assert isinstance(s, Scheduler)

assert s.status == Status.closed
Loading