Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def __getattr__(self, key):
and getattr(thread_state, "actor", False)
):
# actor calls actor on same worker
actor = self._worker.actors[self.key]
actor = self._worker.state.actors[self.key]
attr = getattr(actor, key)

if iscoroutinefunction(attr):
Expand Down
11 changes: 1 addition & 10 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@
wait_for,
)
from distributed.worker import Worker, run
from distributed.worker_memory import (
DeprecatedMemoryManagerAttribute,
DeprecatedMemoryMonitor,
NannyMemoryManager,
)
from distributed.worker_memory import NannyMemoryManager

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -286,11 +282,6 @@ def __init__( # type: ignore[no-untyped-def]
self._listen_address = listen_address
Nanny._instances.add(self)

# Deprecated attributes; use Nanny.memory_manager.<name> instead
memory_limit = DeprecatedMemoryManagerAttribute()
memory_terminate_fraction = DeprecatedMemoryManagerAttribute()
memory_monitor = DeprecatedMemoryMonitor()

def __repr__(self):
return "<Nanny: %s, threads: %d>" % (self.worker_address, self.nthreads)

Expand Down
21 changes: 0 additions & 21 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
from dask.core import istask, validate_key
from dask.typing import Key, no_default
from dask.utils import (
_deprecated,
_deprecated_kwarg,
format_bytes,
format_time,
Expand Down Expand Up @@ -376,16 +375,6 @@ def unmanaged_recent(self) -> int:
def optimistic(self) -> int:
return self.managed + self.unmanaged_old

@property
def managed_in_memory(self) -> int:
warnings.warn("managed_in_memory has been renamed to managed", FutureWarning)
return self.managed

@property
def managed_spilled(self) -> int:
warnings.warn("managed_spilled has been renamed to spilled", FutureWarning)
return self.spilled

def __repr__(self) -> str:
return (
f"Process memory (RSS) : {format_bytes(self.process)}\n"
Expand Down Expand Up @@ -1069,21 +1058,11 @@ def remove_group(self, tg: TaskGroup) -> None:
if self._types[typename] == 0:
del self._types[typename]

@property
@_deprecated(use_instead="groups") # type: ignore[untyped-decorator]
def active(self) -> Set[TaskGroup]:
return self.groups

@property
def groups(self) -> Set[TaskGroup]:
"""Insertion-sorted set-like of groups associated to this prefix"""
return self._groups.keys()

@property
@_deprecated(use_instead="states") # type: ignore[untyped-decorator]
def active_states(self) -> dict[TaskStateState, int]:
return self.states

def __repr__(self) -> str:
return (
"<"
Expand Down
6 changes: 3 additions & 3 deletions distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def check(counter, blanks):
assert result == 0 + 1 + 2 + 3 + 4

def check(dask_worker):
return len(dask_worker.data) + len(dask_worker.actors)
return len(dask_worker.data) + len(dask_worker.state.actors)

start = time()
while any(client.run(check).values()):
Expand Down Expand Up @@ -630,7 +630,7 @@ def test_worker_actor_handle_is_weakref_sync(client):
del counter

def check(dask_worker):
return len(dask_worker.data) + len(dask_worker.actors)
return len(dask_worker.data) + len(dask_worker.state.actors)

start = time()
while any(client.run(check).values()):
Expand All @@ -651,7 +651,7 @@ def test_worker_actor_handle_is_weakref_from_compute_sync(client):
final.compute(actors=counter, optimize_graph=False)

def worker_tasks_running(dask_worker):
return len(dask_worker.data) + len(dask_worker.actors)
return len(dask_worker.data) + len(dask_worker.state.actors)

start = time()
while any(client.run(worker_tasks_running).values()):
Expand Down
34 changes: 0 additions & 34 deletions distributed/tests/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,40 +463,6 @@ async def test_full_collections(c, s, a, b):
assert not b.state.log


@pytest.mark.parametrize(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test had to be fixed as it was accessing worker.log.
However, I decided to remove it entirely as it was 100% redundant with the other resource tests.

"optimize_graph",
[
pytest.param(
True,
marks=pytest.mark.xfail(
reason="don't track resources through optimization"
),
),
False,
],
)
def test_collections_get(client, optimize_graph, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")

async def f(dask_worker):
await dask_worker.set_resources(**{"A": 1})

client.run(f, workers=[a["address"]])

with dask.annotate(resources={"A": 1}):
x = da.random.random(100, chunks=(10,)) + 1

x.compute(optimize_graph=optimize_graph)

def g(dask_worker):
return len(dask_worker.log)

logs = client.run(g)
assert logs[a["address"]]
assert not logs[b["address"]]


@gen_cluster(config={"distributed.worker.resources.my_resources": 1}, client=True)
async def test_resources_from_config(c, s, a, b):
info = c.scheduler_info()
Expand Down
21 changes: 0 additions & 21 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2930,13 +2930,9 @@ async def test_task_group_and_prefix_statistics(c, s, a, b, no_time_resync):

assert tg.prefix is tp
assert tp.groups == {tg}
with pytest.warns(FutureWarning, match="active"):
assert tp.groups == tp.active
# these must be true since in this simple case there is a 1to1 mapping
# between prefix and group
assert tg.states == tp.states
with pytest.warns(FutureWarning, match="active_states"):
assert tp.states == tp.active_states
assert tg.duration == tp.duration
assert tg.all_durations == tp.all_durations
assert tg.nbytes_total == tp.nbytes_total
Expand All @@ -2952,13 +2948,9 @@ async def test_task_group_and_prefix_statistics(c, s, a, b, no_time_resync):
tp = s.task_prefixes["add"]
assert tg.prefix is tp
assert tp.groups == {tg}
with pytest.warns(FutureWarning, match="active"):
assert tp.groups == tp.active
# these must be true since in this simple case there is a 1to1 mapping
# between prefix and group
assert tg.states == tp.states
with pytest.warns(FutureWarning, match="active_states"):
assert tp.states == tp.active_states
assert tg.duration == tp.duration
assert tg.all_durations == tp.all_durations
assert tg.nbytes_total == tp.nbytes_total
Expand All @@ -2980,17 +2972,13 @@ async def test_task_group_and_prefix_statistics(c, s, a, b, no_time_resync):

assert tg.prefix is tp
assert tp.groups == {tg}
with pytest.warns(FutureWarning, match="active"):
assert tp.groups == tp.active
assert tg.states["forgotten"] == 4
assert tg.states["released"] == 1
assert sum(tg.states.values()) == 5
assert len(tg) == 5
assert len(tp) == 5

assert tg.states == tp.states
with pytest.warns(FutureWarning, match="active_states"):
assert tp.states == tp.active_states
assert tg.duration == tp.duration
assert tg.all_durations == tp.all_durations
assert tg.nbytes_total == tp.nbytes_total
Expand All @@ -3016,11 +3004,7 @@ async def test_task_group_and_prefix_statistics(c, s, a, b, no_time_resync):
# these must be zero because we remove fully-forgotten task groups
# from the prefixes
assert tp.groups == set()
with pytest.warns(FutureWarning, match="active"):
assert tp.groups == tp.active
assert all(count == 0 for count in tp.states.values())
with pytest.warns(FutureWarning, match="active_states"):
assert tp.states == tp.active_states
assert len(tp) == 0
assert tp.duration == 0
assert tp.nbytes_total == 0
Expand Down Expand Up @@ -3457,11 +3441,6 @@ def test_memorystate():
assert m.unmanaged_recent == 17
assert m.optimistic == 83

with pytest.warns(FutureWarning):
assert m.managed_spilled == m.spilled
with pytest.warns(FutureWarning):
assert m.managed_in_memory == m.managed

assert repr(m) == dedent("""
Process memory (RSS) : 100 B
- managed by Dask : 68 B
Expand Down
34 changes: 0 additions & 34 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import tempfile
import threading
import traceback
import warnings
import weakref
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from concurrent.futures.process import BrokenProcessPool
Expand Down Expand Up @@ -3483,22 +3482,6 @@ async def close():
await asyncio.gather(block(), close(), set_future())


@gen_cluster(nthreads=[])
async def test_reconnect_argument_deprecated(s):
with pytest.deprecated_call(match="`reconnect` argument"):
async with Worker(s.address, reconnect=False):
pass
with pytest.raises(ValueError, match="reconnect=True"):
async with Worker(s.address, reconnect=True):
pass

with warnings.catch_warnings():
# No argument should not warn or raise
warnings.simplefilter("error")
async with Worker(s.address):
pass


@gen_cluster(client=True, nthreads=[])
async def test_worker_running_before_running_plugins(c, s, caplog):
class InitWorkerNewThread(WorkerPlugin):
Expand Down Expand Up @@ -3570,23 +3553,6 @@ async def test_execute_preamble_abort_retirement(c, s):
assert await y == 2


@gen_cluster()
async def test_deprecation_of_renamed_worker_attributes(s, a, b):
msg = (
"The `Worker.outgoing_count` attribute has been renamed to "
"`Worker.transfer_outgoing_count_total`"
)
with pytest.warns(DeprecationWarning, match=msg):
assert a.outgoing_count == a.transfer_outgoing_count_total

msg = (
"The `Worker.outgoing_current_count` attribute has been renamed to "
"`Worker.transfer_outgoing_count`"
)
with pytest.warns(DeprecationWarning, match=msg):
assert a.outgoing_current_count == a.transfer_outgoing_count


@gen_cluster(client=True, Worker=Nanny)
async def test_forward_output(c, s, a, b, capsys):
def print_stdout(*args, **kwargs):
Expand Down
44 changes: 0 additions & 44 deletions distributed/tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,50 +1047,6 @@ def __reduce__(self):
assert not any(v for k, v in c.items() if k >= 2.0), dict(c)


@pytest.mark.parametrize(
"cls,name,value",
[
(Worker, "memory_limit", 123e9),
(Worker, "memory_target_fraction", 0.789),
(Worker, "memory_spill_fraction", 0.789),
(Worker, "memory_pause_fraction", 0.789),
(Nanny, "memory_limit", 123e9),
(Nanny, "memory_terminate_fraction", 0.789),
],
)
@gen_cluster(nthreads=[])
async def test_deprecated_attributes(s, cls, name, value):
async with cls(s.address) as a:
with pytest.warns(FutureWarning, match=name):
setattr(a, name, value)
with pytest.warns(FutureWarning, match=name):
assert getattr(a, name) == value
assert getattr(a.memory_manager, name) == value


@gen_cluster(nthreads=[("", 1)])
async def test_deprecated_memory_monitor_method_worker(s, a):
with pytest.warns(FutureWarning, match="memory_monitor"):
await a.memory_monitor()


@gen_cluster(nthreads=[("", 1)], Worker=Nanny)
async def test_deprecated_memory_monitor_method_nanny(s, a):
with pytest.warns(FutureWarning, match="memory_monitor"):
a.memory_monitor()


@pytest.mark.parametrize(
"name",
["memory_target_fraction", "memory_spill_fraction", "memory_pause_fraction"],
)
@gen_cluster(nthreads=[])
async def test_deprecated_params(s, name):
with pytest.warns(FutureWarning, match=name):
async with Worker(s.address, **{name: 0.789}) as a:
assert getattr(a.memory_manager, name) == 0.789


@gen_cluster(config={"distributed.worker.memory.monitor-interval": "10ms"})
async def test_pause_while_idle(s, a, b):
sa = s.workers[a.address]
Expand Down
28 changes: 0 additions & 28 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,34 +1018,6 @@ async def get_data(self, comm, *args, **kwargs):
await wait_for_state("y", "missing", a)


@gen_cluster()
async def test_deprecated_worker_attributes(s, a, b):
n = a.state.generation
msg = (
"The `Worker.generation` attribute has been moved to "
"`Worker.state.generation`"
)
with pytest.warns(FutureWarning, match=msg):
assert a.generation == n
with pytest.warns(FutureWarning, match=msg):
a.generation -= 1
assert a.generation == n - 1
assert a.state.generation == n - 1

# Old and new names differ
msg = (
"The `Worker.in_flight_tasks` attribute has been moved to "
"`Worker.state.in_flight_tasks_count`"
)
with pytest.warns(FutureWarning, match=msg):
assert a.in_flight_tasks == 0

with pytest.warns(FutureWarning, match="attribute has been removed"):
assert a.data_needed == set()
with pytest.warns(FutureWarning, match="attribute has been removed"):
assert a.waiting_for_data_count == 0


@pytest.mark.parametrize("n_remote_workers", [1, 2])
@pytest.mark.parametrize(
"nbytes,n_in_flight_per_worker",
Expand Down
Loading
Loading