Skip to content

Commit d187ab1

Browse files
authored
Clean up deprecations in distributed.deploy (#9244)
1 parent dc82b5b commit d187ab1

9 files changed

Lines changed: 20 additions & 243 deletions

File tree

distributed/deploy/__init__.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
from __future__ import annotations
22

3-
from contextlib import suppress
4-
53
from distributed.deploy.adaptive import Adaptive
64
from distributed.deploy.cluster import Cluster
75
from distributed.deploy.local import LocalCluster
86
from distributed.deploy.spec import ProcessInterface, SpecCluster
97
from distributed.deploy.ssh import SSHCluster
108
from distributed.deploy.subprocess import SubprocessCluster
11-
12-
with suppress(ImportError):
13-
from distributed.deploy.ssh import SSHCluster

distributed/deploy/cluster.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from tornado.ioloop import IOLoop, PeriodicCallback
1313

1414
import dask.config
15-
from dask.utils import _deprecated, format_bytes, parse_timedelta, typename
15+
from dask.utils import format_bytes, parse_timedelta, typename
1616
from dask.widgets import get_template
1717

1818
from distributed.core import Status
@@ -347,10 +347,6 @@ def get_logs(self, cluster=True, scheduler=True, workers=True):
347347
self._get_logs, cluster=cluster, scheduler=scheduler, workers=workers
348348
)
349349

350-
@_deprecated(use_instead="get_logs")
351-
def logs(self, *args, **kwargs):
352-
return self.get_logs(*args, **kwargs)
353-
354350
def get_client(self):
355351
"""Return client for the cluster
356352

distributed/deploy/local.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ class LocalCluster(SpecCluster):
5555
Use a falsey value like False or None for no change.
5656
host: string
5757
Host address on which the scheduler will listen, defaults to only localhost
58-
ip: string
59-
Deprecated. See ``host`` above.
6058
dashboard_address: str
6159
Address on which to listen for the Bokeh diagnostics server like
6260
'localhost:8787' or '0.0.0.0:8787'. Defaults to ':8787'.
@@ -69,8 +67,6 @@ class LocalCluster(SpecCluster):
6967
Address on which to listen for the Bokeh worker diagnostics server like
7068
'localhost:8787' or '0.0.0.0:8787'. Defaults to None which disables the dashboard.
7169
Use ':0' for a random port.
72-
diagnostics_port: int
73-
Deprecated. See dashboard_address.
7470
asynchronous: bool (False by default)
7571
Set to True if using this cluster within async/await functions or within
7672
Tornado gen.coroutines. This should remain False for normal use.
@@ -120,14 +116,13 @@ def __init__(
120116
threads_per_worker=None,
121117
processes=None,
122118
loop=None,
123-
start=None,
119+
start=None, # deprecated
124120
host=None,
125-
ip=None,
121+
ip=None, # deprecated
126122
scheduler_port=0,
127123
silence_logs=logging.WARN,
128124
dashboard_address=":8787",
129125
worker_dashboard_address=None,
130-
diagnostics_port=None,
131126
services=None,
132127
worker_services=None,
133128
service_kwargs=None,
@@ -142,23 +137,19 @@ def __init__(
142137
**worker_kwargs,
143138
):
144139
if ip is not None:
145-
# In the future we should warn users about this move
146-
# warnings.warn("The ip keyword has been moved to host")
147-
host = ip
148-
149-
if diagnostics_port is not None:
150140
warnings.warn(
151-
"diagnostics_port has been deprecated. "
152-
"Please use `dashboard_address=` instead"
141+
"The `ip` parameter has been deprecated. Please use `host` instead",
142+
DeprecationWarning,
143+
stacklevel=2,
153144
)
154-
dashboard_address = diagnostics_port
145+
host = ip
155146

156-
if threads_per_worker == 0:
147+
if start is not None:
157148
warnings.warn(
158-
"Setting `threads_per_worker` to 0 has been deprecated. "
159-
"Please set to None or to a specific int."
149+
"The `start` parameter has been deprecated and has no effect.",
150+
DeprecationWarning,
151+
stacklevel=2,
160152
)
161-
threads_per_worker = None
162153

163154
if "dashboard" in worker_kwargs:
164155
warnings.warn(

distributed/deploy/old_ssh.py

Lines changed: 2 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from __future__ import annotations
22

3+
import datetime
34
import logging
45
import os
56
import sys
67
import traceback
7-
import warnings
88
from queue import Queue
99
from threading import Thread
1010
from time import sleep
@@ -330,7 +330,7 @@ def __init__(
330330
scheduler_port,
331331
worker_addrs,
332332
nthreads=0,
333-
n_workers=None,
333+
n_workers=1,
334334
ssh_username=None,
335335
ssh_port=22,
336336
ssh_private_key=None,
@@ -342,49 +342,23 @@ def __init__(
342342
nanny_port=None,
343343
remote_dask_worker="distributed.cli.dask_worker",
344344
local_directory=None,
345-
**kwargs,
346345
):
347346
self.scheduler_addr = scheduler_addr
348347
self.scheduler_port = scheduler_port
349348
self.nthreads = nthreads
350-
nprocs = kwargs.pop("nprocs", None)
351-
if kwargs:
352-
raise TypeError(
353-
f"__init__() got an unexpected keyword argument {', '.join(kwargs.keys())}"
354-
)
355-
if nprocs is not None and n_workers is not None:
356-
raise ValueError(
357-
"Both nprocs and n_workers were specified. Use n_workers only."
358-
)
359-
elif nprocs is not None:
360-
warnings.warn(
361-
"The nprocs argument will be removed in a future release. It has been "
362-
"renamed to n_workers.",
363-
FutureWarning,
364-
)
365-
n_workers = nprocs
366-
elif n_workers is None:
367-
n_workers = 1
368-
369349
self.n_workers = n_workers
370-
371350
self.ssh_username = ssh_username
372351
self.ssh_port = ssh_port
373352
self.ssh_private_key = ssh_private_key
374-
375353
self.nohost = nohost
376-
377354
self.remote_python = remote_python
378-
379355
self.memory_limit = memory_limit
380356
self.worker_port = worker_port
381357
self.nanny_port = nanny_port
382358
self.remote_dask_worker = remote_dask_worker
383359
self.local_directory = local_directory
384360

385361
# Generate a universal timestamp to use for log files
386-
import datetime
387-
388362
if logdir is not None:
389363
logdir = os.path.join(
390364
logdir,
@@ -420,24 +394,6 @@ def __init__(
420394
def _start(self):
421395
pass
422396

423-
@property
424-
def nprocs(self):
425-
warnings.warn(
426-
"The nprocs attribute will be removed in a future release. It has been "
427-
"renamed to n_workers.",
428-
FutureWarning,
429-
)
430-
return self.n_workers
431-
432-
@nprocs.setter
433-
def nprocs(self, value):
434-
warnings.warn(
435-
"The nprocs attribute will be removed in a future release. It has been "
436-
"renamed to n_workers.",
437-
FutureWarning,
438-
)
439-
self.n_workers = value
440-
441397
@property
442398
def scheduler_address(self):
443399
return f"{self.scheduler_addr}:{self.scheduler_port}"

distributed/deploy/ssh.py

Lines changed: 7 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -71,58 +71,28 @@ def __init__( # type: ignore[no-untyped-def]
7171
address: str,
7272
connect_options: dict,
7373
kwargs: dict,
74-
worker_module="deprecated",
7574
worker_class="distributed.Nanny",
7675
remote_python=None,
7776
loop=None,
7877
name=None,
7978
):
8079
super().__init__()
8180

82-
if worker_module != "deprecated":
83-
raise ValueError(
84-
"worker_module has been deprecated in favor of worker_class. "
85-
"Please specify a Python class rather than a CLI module."
81+
if loop is not None:
82+
warnings.warn(
83+
"The `loop` parameter has been deprecated and has no effect.",
84+
DeprecationWarning,
85+
stacklevel=2,
8686
)
8787

8888
self.address = address
8989
self.scheduler = scheduler
9090
self.worker_class = worker_class
9191
self.connect_options = connect_options
92-
self.kwargs = copy.copy(kwargs)
9392
self.name = name
9493
self.remote_python = remote_python
95-
if kwargs.get("nprocs") is not None and kwargs.get("n_workers") is not None:
96-
raise ValueError(
97-
"Both nprocs and n_workers were specified. Use n_workers only."
98-
)
99-
elif kwargs.get("nprocs") is not None:
100-
warnings.warn(
101-
"The nprocs argument will be removed in a future release. It has been "
102-
"renamed to n_workers.",
103-
FutureWarning,
104-
)
105-
self.n_workers = self.kwargs.pop("nprocs", 1)
106-
else:
107-
self.n_workers = self.kwargs.pop("n_workers", 1)
108-
109-
@property
110-
def nprocs(self):
111-
warnings.warn(
112-
"The nprocs attribute will be removed in a future release. It has been "
113-
"renamed to n_workers.",
114-
FutureWarning,
115-
)
116-
return self.n_workers
117-
118-
@nprocs.setter
119-
def nprocs(self, value):
120-
warnings.warn(
121-
"The nprocs attribute will be removed in a future release. It has been "
122-
"renamed to n_workers.",
123-
FutureWarning,
124-
)
125-
self.n_workers = value
94+
self.kwargs = copy.copy(kwargs)
95+
self.n_workers = self.kwargs.pop("n_workers", 1)
12696

12797
async def start(self):
12898
try:
@@ -291,7 +261,6 @@ def SSHCluster(
291261
connect_options: dict | list[dict] | None = None,
292262
worker_options: dict | None = None,
293263
scheduler_options: dict | None = None,
294-
worker_module: str = "deprecated",
295264
worker_class: str = "distributed.Nanny",
296265
remote_python: str | list[str] | None = None,
297266
**kwargs: Any,
@@ -387,12 +356,6 @@ def SSHCluster(
387356
worker_options = worker_options or {}
388357
scheduler_options = scheduler_options or {}
389358

390-
if worker_module != "deprecated":
391-
raise ValueError(
392-
"worker_module has been deprecated in favor of worker_class. "
393-
"Please specify a Python class rather than a CLI module."
394-
)
395-
396359
if set(kwargs) & old_cluster_kwargs:
397360
from distributed.deploy.old_ssh import SSHCluster as OldSSHCluster
398361

distributed/deploy/tests/test_local.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,22 +1060,6 @@ async def test_repr(memory_limit):
10601060
assert "memory" not in text
10611061

10621062

1063-
@gen_test()
1064-
async def test_threads_per_worker_set_to_0():
1065-
with pytest.warns(
1066-
Warning, match="Setting `threads_per_worker` to 0 has been deprecated."
1067-
):
1068-
async with LocalCluster(
1069-
n_workers=2,
1070-
processes=False,
1071-
threads_per_worker=0,
1072-
asynchronous=True,
1073-
dashboard_address=":0",
1074-
) as cluster:
1075-
assert len(cluster.workers) == 2
1076-
assert all(w.state.nthreads < CPU_COUNT for w in cluster.workers.values())
1077-
1078-
10791063
@pytest.mark.parametrize("temporary", [True, False])
10801064
@gen_test()
10811065
async def test_capture_security(temporary):

distributed/deploy/tests/test_old_ssh.py

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -36,50 +36,3 @@ def test_cluster(loop):
3636
while len(e.ncores()) != 3:
3737
sleep(0.01)
3838
assert time() < start + 5
39-
40-
41-
def test_old_ssh_nprocs_renamed_to_n_workers():
42-
with pytest.warns(FutureWarning, match="renamed to n_workers"):
43-
with SSHCluster(
44-
scheduler_addr="127.0.0.1",
45-
scheduler_port=8687,
46-
worker_addrs=["127.0.0.1", "127.0.0.1"],
47-
nprocs=2,
48-
) as c:
49-
assert c.n_workers == 2
50-
51-
52-
def test_nprocs_attribute_is_deprecated():
53-
with SSHCluster(
54-
scheduler_addr="127.0.0.1",
55-
scheduler_port=8687,
56-
worker_addrs=["127.0.0.1", "127.0.0.1"],
57-
) as c:
58-
assert c.n_workers == 1
59-
with pytest.warns(FutureWarning, match="renamed to n_workers"):
60-
assert c.nprocs == 1
61-
with pytest.warns(FutureWarning, match="renamed to n_workers"):
62-
c.nprocs = 3
63-
64-
assert c.n_workers == 3
65-
66-
67-
def test_old_ssh_n_workers_with_nprocs_is_an_error():
68-
with pytest.raises(ValueError, match="Both nprocs and n_workers"):
69-
SSHCluster(
70-
scheduler_addr="127.0.0.1",
71-
scheduler_port=8687,
72-
worker_addrs=(),
73-
nprocs=2,
74-
n_workers=2,
75-
)
76-
77-
78-
def test_extra_kwargs_is_an_error():
79-
with pytest.raises(TypeError, match="unexpected keyword argument"):
80-
SSHCluster(
81-
scheduler_addr="127.0.0.1",
82-
scheduler_port=8687,
83-
worker_addrs=["127.0.0.1", "127.0.0.1"],
84-
unknown_kwarg=None,
85-
)

distributed/deploy/tests/test_spec_cluster.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -304,14 +304,6 @@ async def test_get_logs():
304304
assert set(logs) == {w}
305305

306306

307-
@gen_test()
308-
async def test_logs_deprecated():
309-
async with SpecCluster(asynchronous=True, scheduler=scheduler) as cluster:
310-
with pytest.warns(FutureWarning, match="get_logs"):
311-
logs = await cluster.logs()
312-
assert logs["Scheduler"]
313-
314-
315307
@gen_test()
316308
async def test_scheduler_info():
317309
async with SpecCluster(

0 commit comments

Comments
 (0)