Open
Description
_________________________________ test_simple __________________________________
args = (), kwds = {}
@wraps(func)
def inner(*args, **kwds):
with self._recreate_cm():
> return func(*args, **kwds)
../../../miniconda3/envs/dask-distributed/lib/python3.8/contextlib.py:75:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.8/contextlib.py:75: in inner
return func(*args, **kwds)
distributed/utils_test.py:1074: in test_func
return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:376: in _run_and_close_tornado
return asyncio.run(inner_fn())
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/runners.py:44: in run
return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/base_events.py:616: in run_until_complete
return future.result()
distributed/utils_test.py:373: in inner_fn
return await async_fn(*args, **kwargs)
distributed/utils_test.py:1071: in async_fn_outer
return await asyncio.wait_for(async_fn(), timeout=timeout * 2)
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:494: in wait_for
return fut.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
async def async_fn():
result = None
with dask.config.set(config):
workers = []
s = False
for _ in range(60):
try:
s, ws = await start_cluster(
nthreads,
scheduler,
security=security,
Worker=Worker,
scheduler_kwargs=scheduler_kwargs,
worker_kwargs=worker_kwargs,
)
except Exception as e:
logger.error(
"Failed to start gen_cluster: "
f"{e.__class__.__name__}: {e}; retrying",
exc_info=True,
)
await asyncio.sleep(1)
else:
workers[:] = ws
args = [s] + workers
break
if s is False:
raise Exception("Could not start cluster")
if client:
c = await Client(
s.address,
security=security,
asynchronous=True,
**client_kwargs,
)
args = [c] + args
try:
coro = func(*args, *outer_args, **kwargs)
task = asyncio.create_task(coro)
coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
result = await coro2
validate_state(s, *workers)
except asyncio.TimeoutError:
assert task
buffer = io.StringIO()
# This stack indicates where the coro/test is suspended
task.print_stack(file=buffer)
if cluster_dump_directory:
await dump_cluster_state(
s,
ws,
output_dir=cluster_dump_directory,
func_name=func.__name__,
)
task.cancel()
while not task.cancelled():
await asyncio.sleep(0.01)
# Hopefully, the hang has been caused by inconsistent
# state, which should be much more meaningful than the
# timeout
validate_state(s, *workers)
# Remove as much of the traceback as possible; it's
# uninteresting boilerplate from utils_test and asyncio
# and not from the code being tested.
> raise asyncio.TimeoutError(
f"Test timeout after {timeout}s.\n"
"========== Test stack trace starts here ==========\n"
f"{buffer.getvalue()}"
) from None
E asyncio.exceptions.TimeoutError: Test timeout after 30s.
E ========== Test stack trace starts here ==========
E Stack for <Task pending name='Task-[1937](https://github.com/dask/distributed/runs/7696512528?check_suite_focus=true#step:11:1938)4' coro=<test_simple() running at /Users/runner/work/distributed/distributed/distributed/dashboard/tests/test_scheduler_bokeh.py:75> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe615435820>()]>> (most recent call last):
E File "/Users/runner/work/distributed/distributed/distributed/dashboard/tests/test_scheduler_bokeh.py", line 75, in test_simple
E response = await http_client.fetch(
distributed/utils_test.py:1002: TimeoutError
----------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump/test_simple.yaml
----------------------------- Captured stderr call -----------------------------
2022-08-05 18:57:30,388 - distributed.utils_perf - WARNING - full garbage collections took 85% CPU time recently (threshold: 10%)
2022-08-05 18:57:30,568 - distributed.utils_perf - WARNING - full garbage collections took 97% CPU time recently (threshold: 10%)
2022-08-05 18:57:30,745 - distributed.utils_perf - WARNING - full garbage collections took 97% CPU time recently (threshold: 10%)
2022-08-05 18:57:31,274 - distributed.utils_perf - WARNING - full garbage collections took 93% CPU time recently (threshold: 10%)
2022-08-05 18:57:31,732 - distributed.utils_perf - WARNING - full garbage collections took 84% CPU time recently (threshold: 10%)
2022-08-05 18:57:32,727 - distributed.utils_perf - WARNING - full garbage collections took 71% CPU time recently (threshold: 10%)
2022-08-05 18:57:33,460 - distributed.utils_perf - WARNING - full garbage collections took 64% CPU time recently (threshold: 10%)
2022-08-05 18:57:33,808 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fe69076d880>>, <Task finished name='Task-19538' coro=<Hardware.__init__.<locals>.f() done, defined at /Users/runner/work/distributed/distributed/distributed/dashboard/components/scheduler.py:672> exception=CommClosedError('in <TCP (closed) ConnectionPool.benchmark_network local=tcp://127.0.0.1:52705 remote=tcp://127.0.0.1:52647>: Stream is closed')>)
Traceback (most recent call last):
File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 223, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/Users/runner/work/distributed/distributed/distributed/dashboard/components/scheduler.py", line 673, in f
result = await self.scheduler.benchmark_hardware()
File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 6508, in benchmark_hardware
responses = await asyncio.gather(*futures)
File "/Users/runner/work/distributed/distributed/distributed/core.py", line 1154, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/Users/runner/work/distributed/distributed/distributed/core.py", line 919, in send_recv
response = await comm.read(deserializers=deserializers)
File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 239, in read
convert_stream_closed_error(self, e)
File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.benchmark_network local=tcp://127.0.0.1:52705 remote=tcp://127.0.0.1:52647>: Stream is closed
https://github.com/dask/distributed/runs/7696512528?check_suite_focus=true#step:11:1962