Skip to content

Commit 6a1b089

Browse files
fjettergjoseph92
andauthored
Rewrite test_reconnect to use subproc to kill scheduler reliably (#6967)
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
1 parent 10cf945 commit 6a1b089

File tree

1 file changed

+41
-53
lines changed

1 file changed

+41
-53
lines changed

distributed/tests/test_client.py

Lines changed: 41 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import zipfile
2222
from collections import deque
2323
from collections.abc import Generator
24-
from contextlib import contextmanager, nullcontext
24+
from contextlib import ExitStack, contextmanager, nullcontext
2525
from functools import partial
2626
from operator import add
2727
from threading import Semaphore
@@ -71,7 +71,7 @@
7171
from distributed.cluster_dump import load_cluster_dump
7272
from distributed.comm import CommClosedError
7373
from distributed.compatibility import LINUX, WINDOWS
74-
from distributed.core import Server, Status
74+
from distributed.core import Status
7575
from distributed.metrics import time
7676
from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler
7777
from distributed.sizeof import sizeof
@@ -3592,65 +3592,53 @@ async def test_scatter_raises_if_no_workers(c, s):
35923592
async def test_reconnect():
35933593
port = open_port()
35943594

3595-
async def hard_stop(s):
3596-
for pc in s.periodic_callbacks.values():
3597-
pc.stop()
3595+
stack = ExitStack()
3596+
proc = popen(["dask-scheduler", "--no-dashboard", f"--port={port}"])
3597+
stack.enter_context(proc)
3598+
async with Client(f"127.0.0.1:{port}", asynchronous=True) as c, Worker(
3599+
f"127.0.0.1:{port}"
3600+
) as w:
3601+
await c.wait_for_workers(1, timeout=10)
3602+
x = c.submit(inc, 1)
3603+
assert (await x) == 2
3604+
stack.close()
35983605

3599-
s.stop_services()
3600-
for comm in list(s.stream_comms.values()):
3601-
comm.abort()
3602-
for comm in list(s.client_comms.values()):
3603-
comm.abort()
3606+
start = time()
3607+
while c.status != "connecting":
3608+
assert time() < start + 10
3609+
await asyncio.sleep(0.01)
36043610

3605-
await s.rpc.close()
3606-
s.stop()
3607-
await Server.close(s)
3611+
assert x.status == "cancelled"
3612+
with pytest.raises(CancelledError):
3613+
await x
36083614

3609-
async with Scheduler(port=port) as s:
3610-
async with Client(f"127.0.0.1:{port}", asynchronous=True) as c:
3611-
async with Worker(f"127.0.0.1:{port}") as w:
3612-
await c.wait_for_workers(1, timeout=10)
3613-
x = c.submit(inc, 1)
3614-
assert (await x) == 2
3615-
await hard_stop(s)
3615+
with popen(["dask-scheduler", "--no-dashboard", f"--port={port}"]):
3616+
start = time()
3617+
while c.status != "running":
3618+
await asyncio.sleep(0.1)
3619+
assert time() < start + 10
36163620

3621+
await w.finished()
3622+
async with Worker(f"127.0.0.1:{port}"):
36173623
start = time()
3618-
while c.status != "connecting":
3624+
while len(await c.nthreads()) != 1:
3625+
await asyncio.sleep(0.05)
36193626
assert time() < start + 10
3620-
await asyncio.sleep(0.01)
3621-
3622-
assert x.status == "cancelled"
3623-
with pytest.raises(CancelledError):
3624-
await x
36253627

3626-
async with Scheduler(port=port) as s2:
3627-
start = time()
3628-
while c.status != "running":
3629-
await asyncio.sleep(0.1)
3630-
assert time() < start + 10
3631-
3632-
await w.finished()
3633-
async with Worker(f"127.0.0.1:{port}"):
3634-
start = time()
3635-
while len(await c.nthreads()) != 1:
3636-
await asyncio.sleep(0.05)
3637-
assert time() < start + 10
3638-
3639-
x = c.submit(inc, 1)
3640-
assert (await x) == 2
3641-
await hard_stop(s2)
3628+
x = c.submit(inc, 1)
3629+
assert (await x) == 2
36423630

3643-
start = time()
3644-
while True:
3645-
assert time() < start + 10
3646-
try:
3647-
await x
3648-
assert False
3649-
except CommClosedError:
3650-
continue
3651-
except CancelledError:
3652-
break
3653-
await c._close(fast=True)
3631+
start = time()
3632+
while True:
3633+
assert time() < start + 10
3634+
try:
3635+
await x
3636+
assert False
3637+
except CommClosedError:
3638+
continue
3639+
except CancelledError:
3640+
break
3641+
await c._close(fast=True)
36543642

36553643

36563644
class UnhandledExceptions(Exception):

0 commit comments

Comments
 (0)