Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions continuous_integration/recipes/distributed/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ build:
string: py_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
noarch: python
script: {{ PYTHON }} -m pip install . -vv --no-deps
entry_points:
# Old style CLI
- dask-scheduler = distributed.cli.dask_scheduler:main
- dask-ssh = distributed.cli.dask_ssh:main
- dask-worker = distributed.cli.dask_worker:main

requirements:
host:
Expand Down Expand Up @@ -61,9 +56,6 @@ test:
- distributed.protocol
commands:
- pip check
- dask-scheduler --help
- dask-ssh --help
- dask-worker --help
- dask scheduler --help
- dask ssh --help
- dask worker --help
Expand Down
20 changes: 7 additions & 13 deletions distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import os
import re
import sys
import warnings

import click

from distributed import Scheduler
from distributed._signals import wait_for_signals
from distributed.cli.utils import deprecated_option
from distributed.compatibility import asyncio_run
from distributed.config import get_loop_factory
from distributed.preloading import validate_preload_argv
Expand Down Expand Up @@ -81,11 +81,13 @@
"it possible for anyone with access to your dashboard address to run"
"Python code",
)
@click.option("--show/--no-show", default=False, help="Show web UI [default: --show]")
@deprecated_option(
"--show/--no-show", default=False, help="Show web UI [default: --show]"
)
@click.option(
"--dashboard-prefix", type=str, default="", help="Prefix for the dashboard app"
)
@click.option(
@deprecated_option(
"--use-xheaders",
type=bool,
default=False,
Expand Down Expand Up @@ -124,10 +126,10 @@ def main(
port,
protocol,
interface,
show,
show, # deprecated
dashboard,
dashboard_prefix,
use_xheaders,
use_xheaders, # deprecated
pid_file,
tls_ca_file,
tls_cert,
Expand All @@ -137,14 +139,6 @@ def main(
**kwargs,
):
"""Launch a Dask scheduler."""

if "dask-scheduler" in sys.argv[0]:
warnings.warn(
"dask-scheduler is deprecated and will be removed in a future release; use `dask scheduler` instead",
FutureWarning,
stacklevel=1,
)

g0, g1, g2 = gc.get_threshold() # https://github.com/dask/distributed/issues/1653
gc.set_threshold(g0 * 3, g1 * 3, g2 * 3)

Expand Down
9 changes: 0 additions & 9 deletions distributed/cli/dask_ssh.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from __future__ import annotations

import logging
import sys
import warnings
from textwrap import dedent

import click
Expand Down Expand Up @@ -146,13 +144,6 @@ def main(
remote_dask_worker,
local_directory,
):
if "dask-ssh" in sys.argv[0]:
warnings.warn(
"dask-ssh is deprecated and will be removed in a future release; use `dask ssh` instead",
FutureWarning,
stacklevel=1,
)

try:
hostnames = list(hostnames)
if hostfile:
Expand Down
18 changes: 7 additions & 11 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import logging
import os
import sys
import warnings
from collections.abc import Iterator
from contextlib import suppress
from typing import Any
Expand All @@ -20,6 +19,7 @@

from distributed import Nanny
from distributed._signals import wait_for_signals
from distributed.cli.utils import deprecated_option
from distributed.comm import get_address_host_port
from distributed.compatibility import asyncio_run
from distributed.config import get_loop_factory
Expand Down Expand Up @@ -180,8 +180,11 @@
default=None,
help="Seconds to wait for a scheduler before closing",
)
@click.option(
"--dashboard-prefix", type=str, default="", help="Prefix for the dashboard"
@deprecated_option( # type: ignore[untyped-decorator]
"--dashboard-prefix",
type=str,
default=None,
help="Prefix for the dashboard",
)
@click.option(
"--lifetime",
Expand Down Expand Up @@ -251,7 +254,7 @@ def main( # type: ignore[no-untyped-def]
resources,
dashboard,
scheduler_file,
dashboard_prefix,
dashboard_prefix, # deprecated
tls_ca_file,
tls_cert,
tls_key,
Expand All @@ -262,13 +265,6 @@ def main( # type: ignore[no-untyped-def]
):
"""Launch a Dask worker attached to an existing scheduler"""

if "dask-worker" in sys.argv[0]:
warnings.warn(
"dask-worker is deprecated and will be removed in a future release; use `dask worker` instead",
FutureWarning,
stacklevel=1,
)

g0, g1, g2 = gc.get_threshold() # https://github.com/dask/distributed/issues/1653
gc.set_threshold(g0 * 3, g1 * 3, g2 * 3)

Expand Down
20 changes: 1 addition & 19 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ async def f():
assert _get_dashboard_port(c) == 8787


@pytest.mark.slow
def test_hostport(loop):
port = open_port()
with popen(
Expand Down Expand Up @@ -686,22 +687,3 @@ def test_signal_handling(loop, sig):
assert scheduler.returncode == 0
assert "scheduler closing" in logs
assert "end scheduler" in logs


@pytest.mark.skipif(WINDOWS, reason="POSIX only")
def test_single_executable_deprecated(loop):
port = open_port()
with popen(
[
"dask-scheduler",
"--no-dashboard",
f"--port={port}",
],
capture_output=True,
) as scheduler:
with Client(f"127.0.0.1:{port}", loop=loop) as c:
pass
scheduler.send_signal(signal.SIGTERM)
stdout, stderr = scheduler.communicate()
logs = stdout.decode()
assert "FutureWarning: dask-scheduler is deprecated" in logs
18 changes: 0 additions & 18 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from distributed.utils import get_ip, open_port
from distributed.utils_test import (
gen_cluster,
inc,
popen,
requires_ipv6,
wait_for_log_line,
Expand Down Expand Up @@ -812,20 +811,3 @@ def test_error_during_startup(monkeypatch, nanny, loop):
],
) as worker:
assert worker.wait(10) == 1


def test_single_executable_deprecated():
assert (
b"FutureWarning: dask-worker is deprecated"
in subprocess.run(["dask-worker"], capture_output=True).stderr
)


@pytest.mark.slow
@gen_cluster(nthreads=[], client=True)
async def test_single_executable_works(c, s):
with popen(["dask-worker", s.address]):
# make sure the worker still works
await c.wait_for_workers(1)
results = await c.submit(inc, 1).result()
assert results == 2
3 changes: 3 additions & 0 deletions distributed/cli/tests/test_tls_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys
from time import sleep

import pytest

from distributed import Client
from distributed.metrics import time
from distributed.utils import open_port
Expand Down Expand Up @@ -75,6 +77,7 @@ def test_sni(loop):
wait_for_cores(c)


@pytest.mark.slow
def test_nanny(loop):
port = open_port()
with popen(
Expand Down
43 changes: 11 additions & 32 deletions distributed/cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,17 @@
from __future__ import annotations

import warnings
import importlib.metadata
from functools import wraps
from typing import Any

from tornado.ioloop import IOLoop
import click

warnings.warn(
"the distributed.cli.utils module is deprecated", DeprecationWarning, stacklevel=2
)
CLICK_VERSION = tuple(map(int, importlib.metadata.version("click").split(".")[:2]))


def install_signal_handlers(loop=None, cleanup=None):
"""
Install global signal handlers to halt the Tornado IOLoop in case of
a SIGINT or SIGTERM. *cleanup* is an optional callback called,
before the loop stops, with a single signal number argument.
"""
import signal

loop = loop or IOLoop.current()

old_handlers = {}

def handle_signal(sig, frame):
async def cleanup_and_stop():
try:
if cleanup is not None:
await cleanup(sig)
finally:
loop.stop()

loop.add_callback_from_signal(cleanup_and_stop)
# Restore old signal handler to allow for a quicker exit
# if the user sends the signal again.
signal.signal(sig, old_handlers[sig])

for sig in [signal.SIGINT, signal.SIGTERM]:
old_handlers[sig] = signal.signal(sig, handle_signal)
@wraps(click.option)
def deprecated_option(*args: Any, **kwargs: Any) -> Any:
if CLICK_VERSION >= (8, 2):
return click.option(*args, **kwargs, deprecated=True)
help = kwargs.pop("help", "") + "(DEPRECATED)"
return click.option(*args, help=help, **kwargs)
2 changes: 1 addition & 1 deletion distributed/deploy/old_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


# These are handy for creating colorful terminal output to enhance readability
# of the output generated by dask-ssh.
# of the output generated by dask ssh.
class bcolors:
HEADER = "\033[95m"
OKBLUE = "\033[94m"
Expand Down
2 changes: 1 addition & 1 deletion distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class Nanny(ServerNode):
For the same reason, be warned that changing
``distributed.worker.multiprocessing-method`` from ``spawn`` to ``fork`` or
``forkserver`` may inhibit some environment variables; if you do, you should
set the variables yourself in the shell before you start ``dask-worker``.
set the variables yourself in the shell before you start ``dask worker``.

See Also
--------
Expand Down
10 changes: 6 additions & 4 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,12 +1202,14 @@ def popen(
if not os.path.isabs(executable_path):
executable_path = os.path.join(sysconfig.get_path("scripts"), executable_path)

# On Windows, it's valid to start a process using only '{program-name}' and Windows will
# automatically find and execute '{program-name}.exe'.
# On Windows, it's valid to start a process using only '{program-name}' and Windows
# will automatically find and execute '{program-name}.exe'.
#
# That allows e.g. `popen(["dask-worker"])` to work despite the installed file being called 'dask-worker.exe'.
# That allows e.g. `popen(["dask", "worker"])` to work despite the installed file
# being called 'dask.exe'.
#
# docs: https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-createprocessw
# docs:
# https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-createprocessw
#
if WINDOWS:
executable_exists = os.path.isfile(executable_path) or os.path.isfile(
Expand Down
5 changes: 0 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ worker = "distributed.cli.dask_worker:main"
ssh = "distributed.cli.dask_ssh:main"
spec = "distributed.cli.dask_spec:main"

[project.scripts]
dask-ssh = "distributed.cli.dask_ssh:main"
dask-scheduler = "distributed.cli.dask_scheduler:main"
dask-worker = "distributed.cli.dask_worker:main"

[tool.setuptools.packages.find]
exclude = ["*tests*"]
namespaces = false
Expand Down
Loading