From 5606c8db91bf250a98cb8b6f696034454593fe1c Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Sat, 15 Nov 2025 01:08:34 +0100 Subject: [PATCH 01/11] =?UTF-8?q?flake8,=20bugbear,=20pyupgrade=20?= =?UTF-8?q?=E2=86=92=20ruff?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .flake8 | 41 ----------------------------------------- .pre-commit-config.yaml | 21 ++++----------------- pyproject.toml | 26 ++++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 58 deletions(-) delete mode 100644 .flake8 diff --git a/.flake8 b/.flake8 deleted file mode 100644 index 4d442b908a..0000000000 --- a/.flake8 +++ /dev/null @@ -1,41 +0,0 @@ -# flake8 doesn't support pyproject.toml yet https://github.com/PyCQA/flake8/issues/234 -[flake8] -# References: -# https://flake8.readthedocs.io/en/latest/user/configuration.html -# https://flake8.readthedocs.io/en/latest/user/error-codes.html - -# Note: there cannot be spaces after commas here -exclude = __init__.py,distributed/_concurrent_futures_thread.py -ignore = - # Ignores below are aligned with black https://github.com/psf/black/blob/main/.flake8 - # Whitespace before ':' - E203 - # Too many leading '#' for block comment - E266 - # Line too long, ignored in favor of B950 - E501 - # Line break occurred before a binary operator - W503 - # Line too long - B950 - # No explicit stacklevel in warnings.warn. FIXME we should correct this in the code - B028 - # do not compare types, for exact checks use `is` / `is not`, for instance checks use `isinstance()` - E721 - # multiple statements on one line; required for black compat - E701, E704 - -per-file-ignores = - **/tests/*: - # Module level import not at top of file (to silence on pytest.importorskip) - # See https://github.com/PyCQA/pycodestyle/issues/472 - E402, - # Do not use variables named 'I', 'O', or 'l' - E741, - # Local variable name is assigned to but never used - F841, - # Do not call assert False since python -O removes these calls - B011, - -# B950 will only trigger at 88 -max-line-length = 80 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6838051c78..59b56b11f9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,35 +1,22 @@ -# NOTE: autoupdate does not pick up flake8-bugbear since it is a transitive -# dependency. Make sure to update flake8-bugbear manually on a regular basis. repos: - repo: https://github.com/MarcoGorelli/absolufy-imports rev: v0.3.1 hooks: - id: absolufy-imports name: absolufy-imports + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.14.5 + hooks: + - id: ruff-check - repo: https://github.com/pycqa/isort rev: 5.13.2 hooks: - id: isort language_version: python3 - - repo: https://github.com/asottile/pyupgrade - rev: v3.17.0 - hooks: - - id: pyupgrade - args: - - --py39-plus - repo: https://github.com/psf/black-pre-commit-mirror rev: 25.11.0 hooks: - id: black - - repo: https://github.com/pycqa/flake8 - rev: 7.1.1 - hooks: - - id: flake8 - language_version: python3 - additional_dependencies: - # NOTE: autoupdate does not pick up flake8-bugbear since it is a transitive - # dependency. Make sure to update flake8-bugbear manually on a regular basis. - - flake8-bugbear==24.8.19 - repo: https://github.com/codespell-project/codespell rev: v2.3.0 hooks: diff --git a/pyproject.toml b/pyproject.toml index d19fc195d7..e45b54ff35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,6 +73,32 @@ distributed = [ "py.typed", ] +[tool.ruff] +line-length = 120 + +[tool.ruff.lint] +extend-select = [ + "B", + "UP", +] +ignore = [ + "B011", # Do not `assert False`, raise `AssertionError()` + "B018", # Found useless expression + "B028", # FIXME: No explicit `stacklevel` keyword argument found + "B904", # Within an `except` clause, raise exceptions with `raise ... from err` or `raise ... from None` to distinguish them from errors in exception handling + "B905", # `zip()` without an explicit `strict=` parameter + "F841", # Local variable `futures` is assigned to but never used + "E402", # Module level import not at top of file + "E714", # Test for object identity should be `is not` + "E721", # Use `is` and `is not` for type comparisons, or `isinstance()` for isinstance checks + "E741", # Ambiguous variable name: `l` +] + +[tool.ruff.lint.extend-per-file-ignores] +"**__init__.py" = ["F401", "F811"] +"distributed/shuffle/tests/test_shuffle.py" = ["F601"] +"distributed/utils.py" = ["F821"] + [tool.isort] sections = ["FUTURE", "STDLIB", "THIRDPARTY", "DISTRIBUTED", "FIRSTPARTY", "LOCALFOLDER"] profile = "black" From f19c2ac7d85367f5ec8f054feaf6e86bc4769fd0 Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Sat, 15 Nov 2025 01:13:59 +0100 Subject: [PATCH 02/11] Apply ruff/flake8-bugbear rule B007 B007 Loop control variable not used within loop body --- distributed/_concurrent_futures_thread.py | 4 ++-- distributed/tests/test_client.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/_concurrent_futures_thread.py b/distributed/_concurrent_futures_thread.py index d3c05251b9..35efba0411 100644 --- a/distributed/_concurrent_futures_thread.py +++ b/distributed/_concurrent_futures_thread.py @@ -41,9 +41,9 @@ def _python_exit(): global _shutdown _shutdown = True items = list(_threads_queues.items()) - for t, q in items: + for _t, q in items: q.put(None) - for t, q in items: + for t, _q in items: t.join() diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 7dce7717d7..da2eb533b6 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -6428,7 +6428,7 @@ async def test_as_completed_async_for_results(c, s, a, b): results = [] async def f(): - async for future, result in ac: + async for _future, result in ac: results.append(result) await f() @@ -8199,7 +8199,7 @@ async def test_client_disconnect_exception_on_cancelled_futures(c, s, a, b): with pytest.raises(FuturesCancelledError, match="connection to the scheduler"): futures_of(fut, client=c) - async for fut, res in as_completed([fut], with_results=True): + async for _fut, res in as_completed([fut], with_results=True): assert isinstance(res, FutureCancelledError) assert "connection to the scheduler" in res.msg From 14ae7b00f7e33429477dda588ec8945a55e552e9 Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Sat, 15 Nov 2025 01:20:47 +0100 Subject: [PATCH 03/11] Normalize `noqa` directives warning: Invalid rule code provided to `# noqa` --- distributed/cfexecutor.py | 2 +- distributed/tests/test_client.py | 2 +- distributed/utils.py | 2 +- distributed/worker.py | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/distributed/cfexecutor.py b/distributed/cfexecutor.py index 13998708cc..1eadfcac19 100644 --- a/distributed/cfexecutor.py +++ b/distributed/cfexecutor.py @@ -30,7 +30,7 @@ def _cascade_future(future, cf_future): try: typ, exc, tb = result raise exc.with_traceback(tb) - except BaseException as exc: # noqa: B036 + except BaseException as exc: cf_future.set_exception(exc) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index da2eb533b6..d7157278c1 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -6390,7 +6390,7 @@ async def test_profile_server_disabled(c, s, a, b): async def test_await_future(c, s, a, b): future = c.submit(inc, 1) - async def f(): # flake8: noqa + async def f(): result = await future assert result == 2 diff --git a/distributed/utils.py b/distributed/utils.py index 36baa5dc53..b02bd8aaa8 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -468,7 +468,7 @@ def __init__(self, target: Callable[[], None], daemon: bool, name: str): def wrapper() -> None: try: target() - except BaseException as e: # noqa: B036 + except BaseException as e: self._exception = e self._thread = thread = threading.Thread( diff --git a/distributed/worker.py b/distributed/worker.py index 4c2c1ca6c0..38843ffc5c 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2994,7 +2994,7 @@ def _run_task_simple( # Any other `BaseException` types would ultimately be ignored by asyncio if # raised here, after messing up the worker state machine along their way. raise - except BaseException as e: # noqa: B036 + except BaseException as e: # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they # aren't a reason to shut down the whole system (since we allow the # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) @@ -3038,7 +3038,7 @@ async def _run_task_async( # Any other `BaseException` types would ultimately be ignored by asyncio if # raised here, after messing up the worker state machine along their way. raise - except BaseException as e: # noqa: B036 + except BaseException as e: # NOTE: this includes `CancelledError`! Since it's a user task, that's _not_ # a reason to shut down the worker. # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they From 8a9cb9ac2d6e46db62a56903811f79a549ffc1db Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Sat, 15 Nov 2025 01:24:04 +0100 Subject: [PATCH 04/11] Apply ruff rule RUF100 RUF100 Unused `noqa` directive --- distributed/client.py | 2 +- distributed/protocol/tests/test_pickle.py | 2 +- distributed/tests/test_parse_stdout.py | 1 - distributed/worker.py | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index e778e5da29..2626a2d178 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -2368,7 +2368,7 @@ async def _gather(self, futures, errors="raise", direct=None, local_worker=None) "Cannot gather Futures created by another client. " f"These are the {len(mismatched_futures)} (out of {len(futures)}) " f"mismatched Futures and their client IDs (this client is {self.id}): " - f"{ {f: f.client.id for f in mismatched_futures} }" # noqa: E201, E202 + f"{ {f: f.client.id for f in mismatched_futures} }" ) keys = [future.key for future in future_set] bad_data = dict() diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index 3d9ef38de7..40931c0f11 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -203,7 +203,7 @@ def test_pickle_by_value_when_registered(): module = f"{d}/mymodule.py" with open(module, "w") as f: f.write("def myfunc(x):\n return x + 1") - import mymodule # noqa + import mymodule assert dumps(mymodule.myfunc) == pickle.dumps( mymodule.myfunc, protocol=HIGHEST_PROTOCOL diff --git a/distributed/tests/test_parse_stdout.py b/distributed/tests/test_parse_stdout.py index c2d3dac341..3a180a2766 100644 --- a/distributed/tests/test_parse_stdout.py +++ b/distributed/tests/test_parse_stdout.py @@ -15,7 +15,6 @@ # Note: test_timeout below ends with a whitespace! -# flake8: noqa: W291 stdout = """ Unrelated row, must ignore distributed/tests/test1.py::test_fail FAILED [ 10%] diff --git a/distributed/worker.py b/distributed/worker.py index 38843ffc5c..4bc3d379b9 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3233,7 +3233,7 @@ async def rmm_metric(worker): # avoid importing cuDF unless explicitly enabled if dask.config.get("distributed.diagnostics.cudf"): try: - import cudf as _cudf # noqa: F401 + import cudf as _cudf except Exception: pass else: From 84d53ff63a88ee1b90d445001844f0a0e95d5731 Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Sat, 15 Nov 2025 10:00:31 +0100 Subject: [PATCH 05/11] Apply ruff/pyupgrade rule UP007 UP007 Use `X | Y` for type annotations --- distributed/worker_memory.py | 16 ++++++++-------- distributed/worker_state_machine.py | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/distributed/worker_memory.py b/distributed/worker_memory.py index 2ac4bcf4e7..112f09704c 100644 --- a/distributed/worker_memory.py +++ b/distributed/worker_memory.py @@ -29,7 +29,7 @@ from collections.abc import Callable, Container, Hashable, MutableMapping from contextlib import suppress from functools import partial -from typing import TYPE_CHECKING, Any, Literal, Union, cast +from typing import TYPE_CHECKING, Any, Literal, cast import psutil @@ -54,18 +54,18 @@ from distributed.nanny import Nanny from distributed.worker import Worker -WorkerDataParameter: TypeAlias = Union[ +WorkerDataParameter: TypeAlias = ( # pre-initialized - MutableMapping[Key, object], + MutableMapping[Key, object] # constructor - Callable[[], MutableMapping[Key, object]], + | Callable[[], MutableMapping[Key, object]] # constructor, passed worker.local_directory - Callable[[str], MutableMapping[Key, object]], + | Callable[[str], MutableMapping[Key, object]] # (constructor, kwargs to constructor) - tuple[Callable[..., MutableMapping[Key, object]], dict[str, Any]], + | tuple[Callable[..., MutableMapping[Key, object]], dict[str, Any]] # initialize internally - None, -] + | None +) worker_logger = logging.getLogger("distributed.worker.memory") worker_logger.addFilter(RateLimiterFilter(r"Unmanaged memory use is high", rate="300s")) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 6cde3886c8..0380a06606 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -25,7 +25,7 @@ from dataclasses import dataclass, field from functools import lru_cache, partial, singledispatchmethod, wraps from itertools import chain -from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict, Union, cast +from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict, cast from tlz import peekn @@ -1022,7 +1022,7 @@ class SecedeEvent(StateMachineEvent): # {TaskState -> finish: TaskStateState | (finish: TaskStateState, transition *args)} # Not to be confused with distributed.scheduler.Recs -Recs: TypeAlias = dict[TaskState, Union[TaskStateState, tuple]] +Recs: TypeAlias = dict[TaskState, TaskStateState | tuple] Instructions: TypeAlias = list[Instruction] RecsInstrs: TypeAlias = tuple[Recs, Instructions] From d241cb4575bf97678800231d1de940cd61181904 Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Sat, 15 Nov 2025 10:04:34 +0100 Subject: [PATCH 06/11] Apply ruff/pyupgrade rule UP032 UP032 Use f-string instead of `format` call Contains manual changes on top of automatic changes. --- distributed/cli/dask_ssh.py | 6 +---- distributed/client.py | 13 ++++------- distributed/dashboard/components/nvml.py | 5 ++-- distributed/deploy/old_ssh.py | 23 +++++-------------- distributed/deploy/ssh.py | 16 ++++--------- distributed/http/proxy.py | 9 +++----- distributed/scheduler.py | 29 +++++++++--------------- distributed/utils.py | 3 +-- 8 files changed, 32 insertions(+), 72 deletions(-) diff --git a/distributed/cli/dask_ssh.py b/distributed/cli/dask_ssh.py index f449774537..919f1911b8 100755 --- a/distributed/cli/dask_ssh.py +++ b/distributed/cli/dask_ssh.py @@ -189,11 +189,7 @@ def main( import distributed print("\n---------------------------------------------------------------") - print( - " Dask.distributed v{version}\n".format( - version=distributed.__version__ - ) - ) + print(f" Dask.distributed v{distributed.__version__}\n") print(f"Worker nodes: {len(hostnames)}") for i, host in enumerate(hostnames): print(f" {i}: {host}") diff --git a/distributed/client.py b/distributed/client.py index 2626a2d178..eca4441177 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -1121,9 +1121,7 @@ def __init__( security = getattr(self.cluster, "security", None) elif address is not None and not isinstance(address, str): raise TypeError( - "Scheduler address must be a string or a Cluster instance, got {}".format( - type(address) - ) + f"Scheduler address must be a string or a Cluster instance, got {type(address)}" ) # If connecting to an address and no explicit security is configured, attempt @@ -1375,10 +1373,7 @@ def __repr__(self): return text elif self.scheduler is not None: - return "<{}: scheduler={!r}>".format( - self.__class__.__name__, - self.scheduler.address, - ) + return f"<{self.__class__.__name__}: scheduler={self.scheduler.address!r}>" else: return f"<{self.__class__.__name__}: No scheduler connected>" @@ -5892,8 +5887,8 @@ def count(self): return len(self.futures) + len(self.queue.queue) def __repr__(self): - return "".format( - len(self.futures), len(self.queue.queue) + return ( + f"" ) def __iter__(self): diff --git a/distributed/dashboard/components/nvml.py b/distributed/dashboard/components/nvml.py index 92e4353354..51f70ed97c 100644 --- a/distributed/dashboard/components/nvml.py +++ b/distributed/dashboard/components/nvml.py @@ -151,9 +151,8 @@ def update(self): "escaped_worker": [url_escape(w) for w in worker], } - self.memory_figure.title.text = "GPU Memory: {} / {}".format( - format_bytes(sum(memory)), - format_bytes(memory_total), + self.memory_figure.title.text = ( + f"GPU Memory: {format_bytes(sum(memory))} / {format_bytes(memory_total)}" ) self.memory_figure.x_range.end = memory_max diff --git a/distributed/deploy/old_ssh.py b/distributed/deploy/old_ssh.py index b2eb8efad6..a8ea5c3e58 100644 --- a/distributed/deploy/old_ssh.py +++ b/distributed/deploy/old_ssh.py @@ -208,16 +208,12 @@ def communicate(): def start_scheduler( logdir, addr, port, ssh_username, ssh_port, ssh_private_key, remote_python=None ): - cmd = "{python} -m distributed.cli.dask_scheduler --port {port}".format( - python=remote_python or sys.executable, port=port - ) + cmd = f"{remote_python or sys.executable} -m distributed.cli.dask_scheduler --port {port}" # Optionally re-direct stdout and stderr to a logfile if logdir is not None: cmd = f"mkdir -p {logdir} && {cmd}" - cmd += "&> {logdir}/dask_scheduler_{addr}:{port}.log".format( - addr=addr, port=port, logdir=logdir - ) + cmd += f"&> {logdir}/dask_scheduler_{addr}:{port}.log" # Format output labels we can prepend to each line of output, and create # a 'status' key to keep track of jobs that terminate prematurely. @@ -297,16 +293,11 @@ def start_worker( ) if local_directory is not None: - cmd += " --local-directory {local_directory}".format( - local_directory=local_directory - ) + cmd += f" --local-directory {local_directory}" # Optionally redirect stdout and stderr to a logfile if logdir is not None: - cmd = f"mkdir -p {logdir} && {cmd}" - cmd += "&> {logdir}/dask_scheduler_{addr}.log".format( - addr=worker_addr, logdir=logdir - ) + cmd = f"mkdir -p {logdir} && {cmd}&> {logdir}/dask_scheduler_{worker_addr}.log" label = f"worker {worker_addr}" @@ -401,10 +392,8 @@ def __init__( "dask-ssh_" + datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S"), ) print( - bcolors.WARNING + "Output will be redirected to logfiles " - 'stored locally on individual worker nodes under "{logdir}".'.format( - logdir=logdir - ) + bcolors.WARNING + + f'Output will be redirected to logfiles stored locally on individual worker nodes under "{logdir}".' + bcolors.ENDC ) self.logdir = logdir diff --git a/distributed/deploy/ssh.py b/distributed/deploy/ssh.py index 481745d139..1d500ff9b6 100644 --- a/distributed/deploy/ssh.py +++ b/distributed/deploy/ssh.py @@ -137,15 +137,11 @@ async def start(self): result = await self.connection.run("uname") if result.exit_status == 0: - set_env = 'env DASK_INTERNAL_INHERIT_CONFIG="{}"'.format( - dask.config.serialize(dask.config.global_config) - ) + set_env = f'env DASK_INTERNAL_INHERIT_CONFIG="{dask.config.serialize(dask.config.global_config)}"' else: result = await self.connection.run("cmd /c ver") if result.exit_status == 0: - set_env = "set DASK_INTERNAL_INHERIT_CONFIG={} &&".format( - dask.config.serialize(dask.config.global_config) - ) + set_env = f"set DASK_INTERNAL_INHERIT_CONFIG={dask.config.serialize(dask.config.global_config)} &&" else: raise Exception( "Worker failed to set DASK_INTERNAL_INHERIT_CONFIG variable " @@ -237,15 +233,11 @@ async def start(self): result = await self.connection.run("uname") if result.exit_status == 0: - set_env = 'env DASK_INTERNAL_INHERIT_CONFIG="{}"'.format( - dask.config.serialize(dask.config.global_config) - ) + set_env = f'env DASK_INTERNAL_INHERIT_CONFIG="{dask.config.serialize(dask.config.global_config)}"' else: result = await self.connection.run("cmd /c ver") if result.exit_status == 0: - set_env = "set DASK_INTERNAL_INHERIT_CONFIG={} &&".format( - dask.config.serialize(dask.config.global_config) - ) + set_env = f"set DASK_INTERNAL_INHERIT_CONFIG={dask.config.serialize(dask.config.global_config)} &&" else: raise Exception( "Scheduler failed to set DASK_INTERNAL_INHERIT_CONFIG variable " diff --git a/distributed/http/proxy.py b/distributed/http/proxy.py index 29ca16e5fb..5ba4d4ff61 100644 --- a/distributed/http/proxy.py +++ b/distributed/http/proxy.py @@ -97,8 +97,8 @@ def initialize(self, dask_server=None, extra=None): def get(self, port, host, proxied_path): worker_url = f"{host}:{port}/{proxied_path}" - msg = """ -

Try navigating to {} for your worker dashboard

+ msg = f""" +

Try navigating to {worker_url} for your worker dashboard

Dask tried to proxy you to that page through your @@ -116,10 +116,7 @@ def get(self, port, host, proxied_path): but less common in production clusters. Your IT administrators will know more

- """.format( - worker_url, - worker_url, - ) + """ self.write(msg) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2d5ee2c8cf..615b45a48c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8552,44 +8552,37 @@ def profile_to_figure(state: object) -> object: import distributed # HTML - html = """ + duration = format_time(stop - start) + nworkers = len(self.workers) + threads = sum(ws.nthreads for ws in self.workers.values()) + memory = format_bytes(sum(ws.memory_limit for ws in self.workers.values())) + html = f"""

Dask Performance Report

Select different tabs on the top for additional information -

Duration: {time}

+

Duration: {duration}

Tasks Information

Scheduler Information

Calling Code

 {code}
         
- """.format( - time=format_time(stop - start), - ntasks=total_tasks, - tasks_timings=tasks_timings, - address=self.address, - nworkers=len(self.workers), - threads=sum(ws.nthreads for ws in self.workers.values()), - memory=format_bytes(sum(ws.memory_limit for ws in self.workers.values())), - code=code, - dask_version=dask.__version__, - distributed_version=distributed.__version__, - ) + """ html = Div(text=html, styles=_STYLES) html = TabPanel(child=html, title="Summary") diff --git a/distributed/utils.py b/distributed/utils.py index b02bd8aaa8..cc98894581 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -246,8 +246,7 @@ def get_ip_interface(ifname): if ifname not in net_if_addrs: allowed_ifnames = list(net_if_addrs.keys()) raise ValueError( - "{!r} is not a valid network interface. " - "Valid network interfaces are: {}".format(ifname, allowed_ifnames) + f"{ifname!r} is not a valid network interface. Valid network interfaces are: {allowed_ifnames}" ) for info in net_if_addrs[ifname]: From b178f88deada0abf1897722ba83d63d15f0d415f Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Sat, 15 Nov 2025 10:08:22 +0100 Subject: [PATCH 07/11] Apply ruff/pyupgrade rule UP035 UP035 Import from `collections.abc` instead UP035 Import from `typing` instead --- distributed/active_memory_manager.py | 2 +- distributed/client.py | 12 +++++++++--- distributed/deploy/adaptive.py | 6 +++--- distributed/diagnostics/plugin.py | 4 ++-- distributed/diagnostics/progressbar.py | 2 +- distributed/gc.py | 3 ++- distributed/protocol/compression.py | 2 +- distributed/scheduler.py | 4 ++-- distributed/shuffle/_core.py | 4 +++- distributed/shuffle/_memory.py | 3 ++- distributed/shuffle/_merge.py | 6 +++--- distributed/shuffle/_rechunk.py | 3 ++- distributed/shuffle/_shuffle.py | 6 +++--- distributed/worker_memory.py | 2 +- distributed/worker_state_machine.py | 4 +++- 15 files changed, 38 insertions(+), 25 deletions(-) diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 4f7c472127..d03e1382d5 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -437,7 +437,7 @@ class Suggestion(NamedTuple): if TYPE_CHECKING: # TODO import from typing (requires Python >=3.10) - from typing_extensions import TypeAlias + from typing import TypeAlias SuggestionGenerator: TypeAlias = Generator[ Suggestion, Union["scheduler_module.WorkerState", None], None diff --git a/distributed/client.py b/distributed/client.py index eca4441177..3858c576b5 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -17,7 +17,14 @@ import warnings import weakref from collections import defaultdict -from collections.abc import Collection, Coroutine, Iterable, Iterator, Sequence +from collections.abc import ( + Callable, + Collection, + Coroutine, + Iterable, + Iterator, + Sequence, +) from concurrent.futures import ThreadPoolExecutor from concurrent.futures._base import DoneAndNotDoneFutures from contextlib import asynccontextmanager, contextmanager, suppress @@ -29,7 +36,6 @@ from typing import ( TYPE_CHECKING, Any, - Callable, ClassVar, Generic, Literal, @@ -125,7 +131,7 @@ from distributed.worker import get_client, get_worker, secede if TYPE_CHECKING: - from typing_extensions import TypeAlias + from typing import TypeAlias logger = logging.getLogger(__name__) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index ff2a2f5ec9..406928b1b0 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -1,10 +1,10 @@ from __future__ import annotations import logging -from collections.abc import Hashable +from collections.abc import Callable, Hashable from datetime import timedelta from inspect import isawaitable -from typing import TYPE_CHECKING, Any, Callable, Literal, cast +from typing import TYPE_CHECKING, Any, Literal, cast from tornado.ioloop import IOLoop @@ -18,7 +18,7 @@ from distributed.utils import log_errors if TYPE_CHECKING: - from typing_extensions import TypeAlias + from typing import TypeAlias import distributed from distributed.deploy.cluster import Cluster diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index 7bc2220c64..54bea73cef 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -11,8 +11,8 @@ import tempfile import uuid import zipfile -from collections.abc import Awaitable -from typing import TYPE_CHECKING, Any, Callable, ClassVar +from collections.abc import Awaitable, Callable +from typing import TYPE_CHECKING, Any, ClassVar from dask.typing import Key from dask.utils import _deprecated_kwarg, funcname, tmpfile diff --git a/distributed/diagnostics/progressbar.py b/distributed/diagnostics/progressbar.py index b4774e343a..8c1e7e20e4 100644 --- a/distributed/diagnostics/progressbar.py +++ b/distributed/diagnostics/progressbar.py @@ -5,9 +5,9 @@ import sys import warnings import weakref +from collections.abc import Callable from contextlib import suppress from timeit import default_timer -from typing import Callable from tlz import valmap from tornado.ioloop import IOLoop diff --git a/distributed/gc.py b/distributed/gc.py index 5a02411ae3..67fc1ec17b 100644 --- a/distributed/gc.py +++ b/distributed/gc.py @@ -4,7 +4,8 @@ import logging import threading from collections import deque -from typing import Callable, Final +from collections.abc import Callable +from typing import Final import psutil diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 5b8eac78fb..ed51bd6e90 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -23,7 +23,7 @@ if TYPE_CHECKING: # TODO import from typing (requires Python >=3.10) - from typing_extensions import TypeAlias + from typing import TypeAlias # TODO remove quotes (requires Python >=3.10) AnyBytes: TypeAlias = "bytes | bytearray | memoryview" diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 615b45a48c..a10a4255e1 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -148,9 +148,9 @@ if TYPE_CHECKING: # TODO import from typing (requires Python >=3.10) # TODO import from typing (requires Python >=3.11) - from typing import TypeVar + from typing import TypeAlias, TypeVar - from typing_extensions import Self, TypeAlias + from typing_extensions import Self from dask._expr import Expr diff --git a/distributed/shuffle/_core.py b/distributed/shuffle/_core.py index a3f9ce327d..db4d3abbe6 100644 --- a/distributed/shuffle/_core.py +++ b/distributed/shuffle/_core.py @@ -49,7 +49,9 @@ if TYPE_CHECKING: # TODO import from typing (requires Python >=3.10) - from typing_extensions import ParamSpec, TypeAlias + from typing import TypeAlias + + from typing_extensions import ParamSpec _P = ParamSpec("_P") diff --git a/distributed/shuffle/_memory.py b/distributed/shuffle/_memory.py index 106052cc75..73ac261193 100644 --- a/distributed/shuffle/_memory.py +++ b/distributed/shuffle/_memory.py @@ -1,7 +1,8 @@ from __future__ import annotations from collections import defaultdict, deque -from typing import Any, Callable +from collections.abc import Callable +from typing import Any from dask.sizeof import sizeof diff --git a/distributed/shuffle/_merge.py b/distributed/shuffle/_merge.py index e8e5a8f173..630f1a5f1e 100644 --- a/distributed/shuffle/_merge.py +++ b/distributed/shuffle/_merge.py @@ -10,12 +10,12 @@ from distributed.shuffle._shuffle import shuffle_transfer if TYPE_CHECKING: + # TODO import from typing (requires Python >=3.10) + from typing import TypeAlias + import pandas as pd from pandas._typing import IndexLabel, MergeHow, Suffixes - # TODO import from typing (requires Python >=3.10) - from typing_extensions import TypeAlias - _T_LowLevelGraph: TypeAlias = dict[Key, GraphNode] diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index f096b10821..c85985f25c 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -148,8 +148,9 @@ from distributed.sizeof import sizeof if TYPE_CHECKING: + from typing import TypeAlias + import numpy as np - from typing_extensions import TypeAlias import dask.array as da diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index e936b9d7c6..02a45aefec 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -41,12 +41,12 @@ logger = logging.getLogger("distributed.shuffle") if TYPE_CHECKING: + # TODO import from typing (requires Python >=3.10) + from typing import TypeAlias + import pandas as pd import pyarrow as pa - # TODO import from typing (requires Python >=3.10) - from typing_extensions import TypeAlias - def shuffle_transfer( input: pd.DataFrame, diff --git a/distributed/worker_memory.py b/distributed/worker_memory.py index 112f09704c..9771506168 100644 --- a/distributed/worker_memory.py +++ b/distributed/worker_memory.py @@ -48,7 +48,7 @@ if TYPE_CHECKING: # TODO import from typing (requires Python >=3.10) - from typing_extensions import TypeAlias + from typing import TypeAlias # Circular imports from distributed.nanny import Nanny diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 0380a06606..b7c53e2753 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -49,7 +49,9 @@ if TYPE_CHECKING: # TODO import from typing (ParamSpec and TypeAlias requires Python >=3.10) # TODO import from typing (NotRequired requires Python >=3.11) - from typing_extensions import NotRequired, ParamSpec, TypeAlias + from typing import TypeAlias + + from typing_extensions import NotRequired, ParamSpec P = ParamSpec("P") From 21ab1d0cc881cef6a6f37d292759befad2b31a56 Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Sat, 15 Nov 2025 10:09:43 +0100 Subject: [PATCH 08/11] Apply ruff/pyupgrade rule UP041 UP041 Replace aliased errors with `TimeoutError` --- distributed/deploy/old_ssh.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/deploy/old_ssh.py b/distributed/deploy/old_ssh.py index a8ea5c3e58..c3527e4e80 100644 --- a/distributed/deploy/old_ssh.py +++ b/distributed/deploy/old_ssh.py @@ -2,7 +2,6 @@ import logging import os -import socket import sys import traceback import warnings @@ -136,7 +135,7 @@ def read_from_stdout(): ) ) line = stdout.readline() - except (PipeTimeout, socket.timeout): + except (TimeoutError, PipeTimeout): pass def read_from_stderr(): @@ -155,7 +154,7 @@ def read_from_stderr(): + bcolors.ENDC ) line = stderr.readline() - except (PipeTimeout, socket.timeout): + except (TimeoutError, PipeTimeout): pass def communicate(): From f799b85e7d1e26775a1774fa64564db05c12354b Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Sat, 15 Nov 2025 10:12:27 +0100 Subject: [PATCH 09/11] Ignore ruff/pyupgrade rule UP031 This rule will be applied in a PR of its own. --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index e45b54ff35..4937965834 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,6 +92,7 @@ ignore = [ "E714", # Test for object identity should be `is not` "E721", # Use `is` and `is not` for type comparisons, or `isinstance()` for isinstance checks "E741", # Ambiguous variable name: `l` + "UP031", # TODO: apply this rule ] [tool.ruff.lint.extend-per-file-ignores] From dde1794df35a2f52eacd1a01f24c025e67f699f9 Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos Orfanos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Mon, 17 Nov 2025 22:54:02 +0100 Subject: [PATCH 10/11] Use `_` for unused variables Co-authored-by: Jacob Tomlinson --- distributed/_concurrent_futures_thread.py | 4 ++-- distributed/tests/test_client.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/_concurrent_futures_thread.py b/distributed/_concurrent_futures_thread.py index 35efba0411..78399d51ad 100644 --- a/distributed/_concurrent_futures_thread.py +++ b/distributed/_concurrent_futures_thread.py @@ -41,9 +41,9 @@ def _python_exit(): global _shutdown _shutdown = True items = list(_threads_queues.items()) - for _t, q in items: + for _, q in items: q.put(None) - for t, _q in items: + for t, _ in items: t.join() diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index d7157278c1..abb774a2a9 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -6428,7 +6428,7 @@ async def test_as_completed_async_for_results(c, s, a, b): results = [] async def f(): - async for _future, result in ac: + async for _, result in ac: results.append(result) await f() @@ -8199,7 +8199,7 @@ async def test_client_disconnect_exception_on_cancelled_futures(c, s, a, b): with pytest.raises(FuturesCancelledError, match="connection to the scheduler"): futures_of(fut, client=c) - async for _fut, res in as_completed([fut], with_results=True): + async for _, res in as_completed([fut], with_results=True): assert isinstance(res, FutureCancelledError) assert "connection to the scheduler" in res.msg From 279c6cb784dc03ff8ce5c1ebee9103838582af0c Mon Sep 17 00:00:00 2001 From: Dimitri Papadopoulos <3234522+DimitriPapadopoulos@users.noreply.github.com> Date: Mon, 17 Nov 2025 23:04:18 +0100 Subject: [PATCH 11/11] Remove obsolete comment Co-authored-by: Jacob Tomlinson --- distributed/active_memory_manager.py | 1 - distributed/protocol/compression.py | 1 - distributed/scheduler.py | 3 +-- distributed/shuffle/_core.py | 1 - distributed/shuffle/_merge.py | 1 - distributed/shuffle/_shuffle.py | 1 - distributed/shuffle/_worker_plugin.py | 1 - distributed/worker_memory.py | 1 - distributed/worker_state_machine.py | 7 +++---- 9 files changed, 4 insertions(+), 13 deletions(-) diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index d03e1382d5..bb918a24ff 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -436,7 +436,6 @@ class Suggestion(NamedTuple): if TYPE_CHECKING: - # TODO import from typing (requires Python >=3.10) from typing import TypeAlias SuggestionGenerator: TypeAlias = Generator[ diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index ed51bd6e90..5f520335f8 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -22,7 +22,6 @@ from distributed.utils import ensure_memoryview, nbytes if TYPE_CHECKING: - # TODO import from typing (requires Python >=3.10) from typing import TypeAlias # TODO remove quotes (requires Python >=3.10) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a10a4255e1..dedf0d6085 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -146,10 +146,9 @@ from distributed.variable import VariableExtension if TYPE_CHECKING: - # TODO import from typing (requires Python >=3.10) - # TODO import from typing (requires Python >=3.11) from typing import TypeAlias, TypeVar + # TODO import from typing (requires Python >=3.11) from typing_extensions import Self from dask._expr import Expr diff --git a/distributed/shuffle/_core.py b/distributed/shuffle/_core.py index db4d3abbe6..31121979dc 100644 --- a/distributed/shuffle/_core.py +++ b/distributed/shuffle/_core.py @@ -48,7 +48,6 @@ from distributed.utils_comm import retry if TYPE_CHECKING: - # TODO import from typing (requires Python >=3.10) from typing import TypeAlias from typing_extensions import ParamSpec diff --git a/distributed/shuffle/_merge.py b/distributed/shuffle/_merge.py index 630f1a5f1e..41eee0000b 100644 --- a/distributed/shuffle/_merge.py +++ b/distributed/shuffle/_merge.py @@ -10,7 +10,6 @@ from distributed.shuffle._shuffle import shuffle_transfer if TYPE_CHECKING: - # TODO import from typing (requires Python >=3.10) from typing import TypeAlias import pandas as pd diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 02a45aefec..aec5d75947 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -41,7 +41,6 @@ logger = logging.getLogger("distributed.shuffle") if TYPE_CHECKING: - # TODO import from typing (requires Python >=3.10) from typing import TypeAlias import pandas as pd diff --git a/distributed/shuffle/_worker_plugin.py b/distributed/shuffle/_worker_plugin.py index 88f65af82a..e8e27d7f77 100644 --- a/distributed/shuffle/_worker_plugin.py +++ b/distributed/shuffle/_worker_plugin.py @@ -20,7 +20,6 @@ from distributed.utils import log_errors, sync if TYPE_CHECKING: - # TODO import from typing (requires Python >=3.10) import pandas as pd from distributed.worker import Worker diff --git a/distributed/worker_memory.py b/distributed/worker_memory.py index 9771506168..e0ac5dde12 100644 --- a/distributed/worker_memory.py +++ b/distributed/worker_memory.py @@ -47,7 +47,6 @@ from distributed.utils import RateLimiterFilter, has_arg, log_errors if TYPE_CHECKING: - # TODO import from typing (requires Python >=3.10) from typing import TypeAlias # Circular imports diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index b7c53e2753..02b6538432 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -47,11 +47,10 @@ logger = logging.getLogger("distributed.worker.state_machine") if TYPE_CHECKING: - # TODO import from typing (ParamSpec and TypeAlias requires Python >=3.10) - # TODO import from typing (NotRequired requires Python >=3.11) - from typing import TypeAlias + from typing import ParamSpec, TypeAlias - from typing_extensions import NotRequired, ParamSpec + # TODO import from typing (NotRequired requires Python >=3.11) + from typing_extensions import NotRequired P = ParamSpec("P")