Skip to content

Commit f4c52e9

Browse files
authored
drop deprecated tornado.netutil.ExecutorResolver (#6031)
ExecutorResolver was deprecated in tornado v5.0 and causes a number of problems for us: - it results in a thread leak as the executor is never shutdown and so requires a .warmup() from the resource tracker - .warmup() instantiates the resolver and attaches an io_loop tornadoweb/tornado@43bd9ce this results in a DeprecationWarning on 3.10 because there's no loop running this swaps it for a backport of DefaultLoopResolver from tornado v6.2 that uses get_running_loop().getaddrinfo for resolving DNS
1 parent 64615ed commit f4c52e9

File tree

3 files changed

+29
-46
lines changed

3 files changed

+29
-46
lines changed

distributed/comm/tcp.py

+27-32
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from __future__ import annotations
2+
3+
import asyncio
14
import ctypes
25
import errno
36
import functools
@@ -7,7 +10,7 @@
710
import sys
811
import weakref
912
from ssl import SSLCertVerificationError, SSLError
10-
from typing import ClassVar
13+
from typing import Any, ClassVar
1114

1215
from tornado import gen
1316

@@ -43,7 +46,6 @@
4346
)
4447
from distributed.protocol.utils import pack_frames_prelude, unpack_frames
4548
from distributed.system import MEMORY_LIMIT
46-
from distributed.threadpoolexecutor import ThreadPoolExecutor
4749
from distributed.utils import ensure_ip, get_ip, get_ipv6, nbytes
4850

4951
logger = logging.getLogger(__name__)
@@ -402,38 +404,31 @@ def _check_encryption(self, address, connection_args):
402404
)
403405

404406

405-
class BaseTCPConnector(Connector, RequireEncryptionMixin):
406-
_executor: ClassVar[ThreadPoolExecutor] = ThreadPoolExecutor(
407-
2, thread_name_prefix="TCP-Executor"
408-
)
409-
_client: ClassVar[TCPClient]
410-
411-
@classmethod
412-
def warmup(cls) -> None:
413-
"""Pre-start threads and sockets to avoid catching them in checks for thread and
414-
fd leaks
415-
"""
416-
ex = cls._executor
417-
while len(ex._threads) < ex._max_workers:
418-
ex._adjust_thread_count()
419-
cls._get_client()
420-
421-
@classmethod
422-
def _get_client(cls):
423-
if not hasattr(cls, "_client"):
424-
resolver = netutil.ExecutorResolver(
425-
close_executor=False, executor=cls._executor
407+
class _DefaultLoopResolver(netutil.Resolver):
408+
"""
409+
Resolver implementation using `asyncio.loop.getaddrinfo`.
410+
backport from Tornado 6.2+
411+
https://github.com/tornadoweb/tornado/blob/3de78b7a15ba7134917a18b0755ea24d7f8fde94/tornado/netutil.py#L416-L432
412+
"""
413+
414+
async def resolve(
415+
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC
416+
) -> list[tuple[int, Any]]:
417+
# On Solaris, getaddrinfo fails if the given port is not found
418+
# in /etc/services and no socket type is given, so we must pass
419+
# one here. The socket type used here doesn't seem to actually
420+
# matter (we discard the one we get back in the results),
421+
# so the addresses we return should still be usable with SOCK_DGRAM.
422+
return [
423+
(fam, address)
424+
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
425+
host, port, family=family, type=socket.SOCK_STREAM
426426
)
427-
cls._client = TCPClient(resolver=resolver)
428-
return cls._client
427+
]
429428

430-
@property
431-
def client(self):
432-
# The `TCPClient` is cached on the class itself to avoid creating
433-
# excess `ThreadPoolExecutor`s. We delay creation until inside an async
434-
# function to avoid accessing an IOLoop from a context where a backing
435-
# event loop doesn't exist.
436-
return self._get_client()
429+
430+
class BaseTCPConnector(Connector, RequireEncryptionMixin):
431+
client: ClassVar[TCPClient] = TCPClient(resolver=_DefaultLoopResolver())
437432

438433
async def connect(self, address, deserialize=True, **connection_args):
439434
self._check_encryption(address, connection_args)

distributed/pytest_resourceleaks.py

+1-10
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ def test1():
3131
flag would solve this issue. See pytest_rerunfailures code for inspiration.
3232
3333
- The @gen_cluster fixture leaks 2 fds on the first test decorated with it within a test
34-
suite; this is likely caused by an incomplete warmup routine of
35-
distributed.comm.tcp.BaseTCPConnector.
36-
This issue would also be fixed by rerunning failing tests.
34+
suite; This issue would also be fixed by rerunning failing tests.
3735
3836
- The @pytest.mark.flaky decorator (pytest_rerunfailures) completely disables this
3937
plugin for the decorated tests.
@@ -57,7 +55,6 @@ def test1():
5755
import psutil
5856
import pytest
5957

60-
from distributed.comm.tcp import BaseTCPConnector
6158
from distributed.compatibility import WINDOWS
6259
from distributed.metrics import time
6360

@@ -157,9 +154,6 @@ def format(self, before: int, after: int) -> str:
157154

158155

159156
class FDChecker(ResourceChecker, name="fds"):
160-
def __init__(self):
161-
BaseTCPConnector.warmup()
162-
163157
def measure(self) -> int:
164158
if WINDOWS:
165159
# Don't use num_handles(); you'll get tens of thousands of reported leaks
@@ -187,9 +181,6 @@ def format(self, before: int, after: int) -> str:
187181

188182

189183
class ActiveThreadsChecker(ResourceChecker, name="threads"):
190-
def __init__(self):
191-
BaseTCPConnector.warmup()
192-
193184
def measure(self) -> set[threading.Thread]:
194185
return set(threading.enumerate())
195186

distributed/utils_test.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
from distributed import versions as version_module
4848
from distributed.client import Client, _global_clients, default_client
4949
from distributed.comm import Comm
50-
from distributed.comm.tcp import TCP, BaseTCPConnector
50+
from distributed.comm.tcp import TCP
5151
from distributed.compatibility import WINDOWS
5252
from distributed.config import initialize_logging
5353
from distributed.core import CommClosedError, ConnectionPool, Status, connect, rpc
@@ -1589,9 +1589,6 @@ def save_sys_modules():
15891589
@contextmanager
15901590
def check_thread_leak():
15911591
"""Context manager to ensure we haven't leaked any threads"""
1592-
# "TCP-Executor" threads are never stopped once they are started
1593-
BaseTCPConnector.warmup()
1594-
15951592
active_threads_start = threading.enumerate()
15961593

15971594
yield

0 commit comments

Comments
 (0)