Skip to content

Flaky test_scale_up_down #4905

@fjetter

Description

@fjetter

The test distributed/deploy/tests/test_slow_adaptive.py::test_scale_up_down appears to leave RPC comms open

https://github.com/dask/distributed/pull/4784/checks?check_run_id=2803492944

Details
___________________ ERROR at teardown of test_scale_up_down ____________________

    @pytest.fixture
    def cleanup():
        with clean():
>           yield

distributed/utils_test.py:1563: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.7/contextlib.py:119: in __exit__
    next(self.gen)
distributed/utils_test.py:1557: in clean
    del thread_state.on_event_loop_thread
../../../miniconda3/envs/dask-distributed/lib/python3.7/contextlib.py:119: in __exit__
    next(self.gen)
distributed/utils_test.py:522: in check_active_rpc
    loop.run_sync(wait)
../../../miniconda3/envs/dask-distributed/lib/python3.7/site-packages/tornado/ioloop.py:571: in run_sync
    self.start()
../../../miniconda3/envs/dask-distributed/lib/python3.7/site-packages/tornado/platform/asyncio.py:132: in start
    self.asyncio_loop.run_forever()
../../../miniconda3/envs/dask-distributed/lib/python3.7/asyncio/base_events.py:541: in run_forever
    self._run_once()
../../../miniconda3/envs/dask-distributed/lib/python3.7/asyncio/base_events.py:1786: in _run_once
    handle._run()
../../../miniconda3/envs/dask-distributed/lib/python3.7/asyncio/events.py:88: in _run
    self._context.run(self._callback, *self._args)
distributed/utils_test.py:519: in wait
    fail_func=fail,
distributed/utils_test.py:1079: in async_wait_for
    fail_func()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def fail():
        pytest.fail(
>           "some RPCs left active by test: %s" % (set(rpc.active) - active_before)
        )
E       Failed: some RPCs left active by test: {<rpc to 'tcp://10.79.1.237:53211', 2 comms>}

distributed/utils_test.py:512: Failed
=================================== FAILURES ===================================
______________________________ test_scale_up_down ______________________________

cleanup = None

    @pytest.mark.asyncio
    async def test_scale_up_down(cleanup):
        start = time()
        async with SpecCluster(
            scheduler=scheduler,
            workers={
                "slow": {"cls": SlowWorker, "options": {"delay": 5}},
                "fast": {"cls": Worker, "options": {}},
            },
            asynchronous=True,
        ) as cluster:
            cluster.scale(1)  # remove a worker, hopefully the one we don't have
            await cluster
    
            assert list(cluster.worker_spec) == ["fast"]
    
            cluster.scale(0)
            await cluster
>           assert not cluster.worker_spec

distributed/deploy/tests/test_slow_adaptive.py:76: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/deploy/cluster.py:428: in __aexit__
    await f
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = SpecCluster(4c0eba94, 'tcp://10.79.1.237:53211', workers=0, threads=0, memory=0 B)

    async def _close(self):
        while self.status == Status.closing:
            await asyncio.sleep(0.1)
        if self.status == Status.closed:
            return
        if self.status == Status.running or self.status == Status.failed:
            self.status = Status.closing
    
            # Need to call stop here before we close all servers to avoid having
            # dangling tasks in the ioloop
            with suppress(AttributeError):
                self._adaptive.stop()
    
            f = self.scale(0)
            if isawaitable(f):
                await f
            await self._correct_state()
            for future in self._futures:
                await future
            async with self._lock:
                with suppress(CommClosedError, OSError):
                    if self.scheduler_comm:
                        await self.scheduler_comm.close(close_workers=True)
                    else:
                        logger.warning("Cluster closed without starting up")
    
            await self.scheduler.close()
            for w in self._created:
>               assert w.status == Status.closed, w.status
E               AssertionError: closed

Metadata

Metadata

Assignees

No one assigned

    Labels

    adaptiveAll things relating to adaptive scalingflaky testIntermittent failures on CI.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions