Skip to content

Flaky test_local_tls[False] #6767

Open
Open
@gjoseph92

Description

@gjoseph92

Relevant part is OSError: [Errno 98] Address already in use.

____________________________ test_local_tls[False] _____________________________
self = <Scheduler 'not-running', workers: 0, cores: 0, tasks: 0>
asyncdefstart(self):
asyncwithself._startup_lock:
ifself.status == Status.failed:
assertself.__startup_exc isnotNone
raiseself.__startup_exc
elifself.status != Status.init:
returnself
            timeout = getattr(self, "death_timeout", None)
asyncdef_close_on_failure(exc: Exception) -> None:
awaitself.close()
self.status = Status.failed
self.__startup_exc = exc
try:
>               await asyncio.wait_for(self.start_unsafe(), timeout=timeout)
distributed/core.py:480: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
fut = <coroutine object Scheduler.start_unsafe at 0x7ffab1c3dc40>
timeout = None
asyncdefwait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
    Coroutine will be wrapped in Task.
    Returns result of the Future or coroutine.  When a timeout occurs,
    it cancels the task and raises TimeoutError.  To avoid the task
    cancellation, wrap it in shield().
    If the wait is cancelled, the task is also cancelled.
    This function is a coroutine.
    """
if loop isNone:
            loop = events.get_running_loop()
else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
if timeout isNone:
>           returnawait fut
/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py:442: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
self = <Scheduler 'not-running', workers: 0, cores: 0, tasks: 0>
asyncdefstart_unsafe(self):
"""Clear out old state and restart all running coroutines"""
awaitsuper().start_unsafe()
        enable_gc_diagnosis()
self.clear_task_state()
with suppress(AttributeError):
for c inself._worker_coroutines:
                c.cancel()
for addr inself._start_address:
>           awaitself.listen(
                addr,
                allow_offload=False,
                handshake_overrides={"pickle-protocol": 4, "compression": None},
                **self.security.get_listen_args("scheduler"),
            )
distributed/scheduler.py:3343: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
self = <Scheduler 'not-running', workers: 0, cores: 0, tasks: 0>
port_or_addr = 'tls://0.0.0.0:45573', allow_offload = False
kwargs = {'handshake_overrides': {'compression': None, 'pickle-protocol': 4}, 'require_encryption': True, 'ssl_context': <ssl.SSLContext object at 0x7ffab1c3d840>}
addr = 'tls://0.0.0.0:45573'
asyncdeflisten(self, port_or_addr=None, allow_offload=True, **kwargs):
if port_or_addr isNone:
            port_or_addr = self.default_port
ifisinstance(port_or_addr, int):
            addr = unparse_host_port(self.default_ip, port_or_addr)
elifisinstance(port_or_addr, tuple):
            addr = unparse_host_port(*port_or_addr)
else:
            addr = port_or_addr
assertisinstance(addr, str)
>       listener = await listen(
            addr,
self.handle_comm,
            deserialize=self.deserialize,
            allow_offload=allow_offload,
            **kwargs,
        )
distributed/core.py:658: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
asyncdef_():
>       awaitself.start()
distributed/comm/core.py:212: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
self = <distributed.comm.tcp.TLSListener object at 0x7ffab1a31b80>
asyncdefstart(self):
self.tcp_server = TCPServer(max_buffer_size=MAX_BUFFER_SIZE, **self.server_args)
self.tcp_server.handle_stream = self._handle_stream
        backlog = int(dask.config.get("distributed.comm.socket-backlog"))
for i inrange(5):
try:
# When shuffling data between workers, there can
# really be O(cluster size) connection requests
# on a single worker socket, make sure the backlog
# is large enough not to lose any.
>               sockets = netutil.bind_sockets(
self.port, address=self.ip, backlog=backlog
                )
distributed/comm/tcp.py:528: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
port = 45573, address = '0.0.0.0', family = <AddressFamily.AF_UNSPEC: 0>
backlog = 2048, flags = <AddressInfo.AI_PASSIVE: 1>, reuse_port = False
defbind_sockets(
        port: int,
        address: Optional[str] = None,
        family: socket.AddressFamily = socket.AF_UNSPEC,
        backlog: int = _DEFAULT_BACKLOG,
        flags: Optional[int] = None,
        reuse_port: bool = False,
    ) -> List[socket.socket]:
"""Creates listening sockets bound to the given port and address.
    Returns a list of socket objects (multiple sockets are returned if
    the given address maps to multiple IP addresses, which is most common
    for mixed IPv4 and IPv6 use).
    Address may be either an IP address or hostname.  If it's a hostname,
    the server will listen on all IP addresses associated with the
    name.  Address may be an empty string or None to listen on all
    available interfaces.  Family may be set to either `socket.AF_INET`
    or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
    both will be used if available.
    The ``backlog`` argument has the same meaning as for
    `socket.listen() <socket.socket.listen>`.
    ``flags`` is a bitmask of AI_* flags to `~socket.getaddrinfo`, like
    ``socket.AI_PASSIVE | socket.AI_NUMERICHOST``.
    ``reuse_port`` option sets ``SO_REUSEPORT`` option for every socket
    in the list. If your platform doesn't support this option ValueError will
    be raised.
    """
if reuse_port andnothasattr(socket, "SO_REUSEPORT"):
raiseValueError("the platform doesn't support SO_REUSEPORT")
        sockets = []
if address == "":
            address = None
ifnot socket.has_ipv6 and family == socket.AF_UNSPEC:
# Python can be compiled with --disable-ipv6, which causes
# operations on AF_INET6 sockets to fail, but does not
# automatically exclude those results from getaddrinfo
# results.
# http://bugs.python.org/issue16208
            family = socket.AF_INET
if flags isNone:
            flags = socket.AI_PASSIVE
        bound_port = None
        unique_addresses = set()  # type: set
for res insorted(
            socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags),
            key=lambda x: x[0],
        ):
if res in unique_addresses:
continue
            unique_addresses.add(res)
            af, socktype, proto, canonname, sockaddr = res
if (
                sys.platform == "darwin"
and address == "localhost"
and af == socket.AF_INET6
and sockaddr[3] != 0
            ):
# Mac OS X includes a link-local address fe80::1%lo0 in the
# getaddrinfo results for 'localhost'.  However, the firewall
# doesn't understand that this is a local address and will
# prompt for access (often repeatedly, due to an apparent
# bug in its ability to remember granting access to an
# application). Skip these addresses.
continue
try:
                sock = socket.socket(af, socktype, proto)
except socket.error as e:
if errno_from_exception(e) == errno.EAFNOSUPPORT:
continue
raise
if os.name != "nt":
try:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except socket.error as e:
if errno_from_exception(e) != errno.ENOPROTOOPT:
# Hurd doesn't support SO_REUSEADDR.
raise
if reuse_port:
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if af == socket.AF_INET6:
# On linux, ipv6 sockets accept ipv4 too by default,
# but this makes it impossible to bind to both
# 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,
# separate sockets *must* be used to listen for both ipv4
# and ipv6.  For consistency, always disable ipv4 on our
# ipv6 sockets and use a separate ipv4 socket when needed.
#
# Python 2.x on windows doesn't have IPPROTO_IPV6.
ifhasattr(socket, "IPPROTO_IPV6"):
                    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
# automatic port allocation with port=None
# should bind on the same port on IPv4 and IPv6
            host, requested_port = sockaddr[:2]
if requested_port == 0and bound_port isnotNone:
                sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))
            sock.setblocking(False)
try:
>               sock.bind(sockaddr)
E               OSError: [Errno 98] Address already in use
/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/netutil.py:161: OSError
The above exception was the direct cause of the following exception:
self = LocalCluster(897fcf9c, '<Not Connected>', workers=0, threads=0, memory=0 B)
asyncdef_start(self):
whileself.status == Status.starting:
await asyncio.sleep(0.01)
ifself.status == Status.running:
return
ifself.status == Status.closed:
raiseValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
ifself.scheduler_spec isNone:
try:
importdistributed.dashboard# noqa: F401
exceptImportError:
pass
else:
                options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
ifself.scheduler isNone:
cls = self.scheduler_spec["cls"]
ifisinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
>               self.scheduler = awaitself.scheduler
distributed/deploy/spec.py:298: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
self = <Scheduler 'not-running', workers: 0, cores: 0, tasks: 0>
asyncdefstart(self):
asyncwithself._startup_lock:
ifself.status == Status.failed:
assertself.__startup_exc isnotNone
raiseself.__startup_exc
elifself.status != Status.init:
returnself
            timeout = getattr(self, "death_timeout", None)
asyncdef_close_on_failure(exc: Exception) -> None:
awaitself.close()
self.status = Status.failed
self.__startup_exc = exc
try:
await asyncio.wait_for(self.start_unsafe(), timeout=timeout)
except asyncio.TimeoutError as exc:
await _close_on_failure(exc)
raise asyncio.TimeoutError(
f"{type(self).__name__} start timed out after {timeout}s."
                ) fromexc
exceptExceptionas exc:
await _close_on_failure(exc)
>               raiseRuntimeError(f"{type(self).__name__} failed to start.") fromexc
E               RuntimeError: Scheduler failed to start.
distributed/core.py:488: RuntimeError
The above exception was the direct cause of the following exception:
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7ffab1c97a00>
temporary = False
@pytest.mark.parametrize("temporary", [True, False])
deftest_local_tls(loop, temporary):
        port = open_port()
if temporary:
            xfail_ssl_issue5601()
            pytest.importorskip("cryptography")
            security = True
else:
            security = tls_only_security()
>       with LocalCluster(
            n_workers=0,
            scheduler_port=port,
            silence_logs=False,
            security=security,
            dashboard_address=":0",
            host="tls://0.0.0.0",
            loop=loop,
        ) as c:
distributed/deploy/tests/test_local.py:794: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/deploy/local.py:238: in __init__
super().__init__(
distributed/deploy/spec.py:264: in __init__
self.sync(self._start)
distributed/utils.py:338: in sync
return sync(
distributed/utils.py:405: in sync
raise exc.with_traceback(tb)
distributed/utils.py:378: in f
    result = yield future
/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:762: in run
    value = future.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
self = LocalCluster(897fcf9c, '<Not Connected>', workers=0, threads=0, memory=0 B)
asyncdef_start(self):
whileself.status == Status.starting:
await asyncio.sleep(0.01)
ifself.status == Status.running:
return
ifself.status == Status.closed:
raiseValueError("Cluster is closed")
self._lock = asyncio.Lock()
self.status = Status.starting
ifself.scheduler_spec isNone:
try:
importdistributed.dashboard# noqa: F401
exceptImportError:
pass
else:
                options = {"dashboard": True}
self.scheduler_spec = {"cls": Scheduler, "options": options}
try:
# Check if scheduler has already been created by a subclass
ifself.scheduler isNone:
cls = self.scheduler_spec["cls"]
ifisinstance(cls, str):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
self.scheduler = awaitself.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None)
orself.scheduler.address,
                connection_args=self.security.get_connection_args("client"),
            )
awaitsuper()._start()
exceptExceptionas e:  # pragma: no cover
self.status = Status.failed
awaitself._close()
>           raiseRuntimeError(f"Cluster failed to start: {e}") frome
E           RuntimeError: Cluster failed to start: Scheduler failed to start.
distributed/deploy/spec.py:308: RuntimeError
----------------------------- Captured stderr call -----------------------------
[2022](https://github.com/dask/distributed/runs/7368650132?check_suite_focus=true#step:11:2023)-07-16 06:09:42,736 - distributed.scheduler - INFO - State start
2022-07-16 06:09:42,738 - distributed.scheduler - INFO - Clear task state
2022-07-16 06:09:42,739 - distributed.scheduler - INFO - Scheduler closing...
2022-07-16 06:09:42,739 - distributed.scheduler - INFO - Scheduler closing all comms
2022-07-16 06:09:42,740 - distributed.deploy.spec - WARNING - Cluster closed without starting up
2022-07-16 06:09:42,740 - distributed.scheduler - INFO - Scheduler closing...
2022-07-16 06:09:42,741 - distributed.scheduler - INFO - Scheduler closing all comms

https://github.com/dask/distributed/runs/7368650132?check_suite_focus=true#step:11:2108

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions