diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 09488969edc..d92568a6620 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -380,8 +380,8 @@ async def _correct_state_internal(self) -> None: workers.append(worker) if workers: worker_futs = [asyncio.ensure_future(w) for w in workers] - await asyncio.wait(worker_futs) self.workers.update(dict(zip(to_open, workers))) + await asyncio.wait(worker_futs) for w in workers: w._cluster = weakref.ref(self) # Collect exceptions from failed workers. This must happen after all diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 2ddb0f0bbf2..eb9b8cb47a6 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -19,7 +19,7 @@ from distributed.core import Status from distributed.metrics import time from distributed.system import MEMORY_LIMIT -from distributed.utils import TimeoutError, open_port, sync +from distributed.utils import Deadline, TimeoutError, open_port, sync from distributed.utils_test import ( assert_can_connect_from_everywhere_4, assert_can_connect_from_everywhere_4_6, @@ -1255,6 +1255,59 @@ def setup(self, worker=None): import my_nonexistent_library # noqa +class SlowPlugin: + def __init__(self, delay=0.1): + self.delay = delay + + def setup(self, worker=None): + sleep(self.delay) + + +@pytest.mark.slow() +def test_localcluster_plan_requested_observed(): + with LocalCluster( + n_workers=0, + threads_per_worker=1, + processes=True, + # FIXME: Ideally this would work with an IPC Event or a file to + # synchronize instead of sleeping + plugins={SlowPlugin(delay=2)}, + dashboard_address=":0", + ) as cluster: + assert len(cluster.plan) == 0 + assert len(cluster.requested) == 0 + assert len(cluster.observed) == 0 + + cluster.scale(1) + assert len(cluster.plan) == 1 + assert len(cluster.requested) == 0 + assert len(cluster.observed) == 0 + + # This should pretty much trigger once we had the chance to run an event + # loop tick + dl = Deadline.after(1) + while not cluster.requested and dl.remaining: + sleep(0.01) + + # The worker is requested. For the LocalCluster this means that the + # process is up but for generic SpecCluster implementation this merely + # means that an additional worker has been asked for but it is not yet + # up and running + assert not cluster.scheduler_info["workers"] + assert len(cluster.plan) == 1 + assert len(cluster.requested) == 1 + assert len(cluster.observed) == 0 + + with Client(cluster) as client: + client.wait_for_workers(1) + + # The worker is fully functional and registered to the scheduler + assert cluster.scheduler_info["workers"] + assert len(cluster.requested) == 1 + assert len(cluster.plan) == 1 + assert len(cluster.observed) == 1 + + @pytest.mark.slow def test_localcluster_start_exception(loop): with raises_with_cause(