Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@

def __getattr__(self, key):
if self._future and self._future.status not in ("finished", "pending"):
raise ValueError(
raise RuntimeError(

Check warning on line 144 in distributed/actor.py

View check run for this annotation

Codecov / codecov/patch

distributed/actor.py#L144

Added line #L144 was not covered by tests
"Worker holding Actor was lost. Status: " + self._future.status
)
self._try_bind_worker_client()
Expand Down
50 changes: 46 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2599,7 +2599,7 @@
if ts.who_wants:
ts.exception_blame = ts
ts.exception = Serialized(
*serialize(ValueError("Worker holding Actor was lost"))
*serialize(RuntimeError("Worker holding Actor was lost"))
)
return {ts.key: "erred"}, {}, {} # don't try to recreate

Expand Down Expand Up @@ -2652,7 +2652,7 @@

if self.validate:
assert ts.exception_blame
assert not ts.who_has
assert not ts.who_has or ts.actor

Check warning on line 2655 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2655

Added line #L2655 was not covered by tests
assert not ts.waiting_on

failing_ts = ts.exception_blame
Expand Down Expand Up @@ -2772,8 +2772,8 @@
self,
key: Key,
stimulus_id: str,
worker: str,
*,
worker: str | None = None,
cause: Key | None = None,
exception: Serialized | None = None,
traceback: Serialized | None = None,
Expand Down Expand Up @@ -2988,6 +2988,45 @@
ts.exception_blame = ts.exception = ts.traceback = None
self.task_metadata.pop(key, None)

def _transition_memory_erred(self, key: Key, stimulus_id: str) -> RecsMsgs:
ts = self.tasks[key]
if self.validate:
assert ts.actor
recommendations: Recs = {}
client_msgs: Msgs = {}
worker_msgs: Msgs = {}

Check warning on line 2997 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2991-L2997

Added lines #L2991 - L2997 were not covered by tests
# XXX factor this out?
worker_msg = {

Check warning on line 2999 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2999

Added line #L2999 was not covered by tests
"op": "free-keys",
"keys": [key],
"stimulus_id": stimulus_id,
}
for ws in ts.who_has or ():
worker_msgs[ws.address] = [worker_msg]
self.remove_all_replicas(ts)

Check warning on line 3006 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L3004-L3006

Added lines #L3004 - L3006 were not covered by tests

for dts in ts.dependents:
if not dts.who_has:
dts.exception_blame = ts
recommendations[dts.key] = "erred"
exception = Serialized(

Check warning on line 3012 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L3008-L3012

Added lines #L3008 - L3012 were not covered by tests
*serialize(RuntimeError("Worker holding Actor was lost"))
)
report_msg = {

Check warning on line 3015 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L3015

Added line #L3015 was not covered by tests
"op": "task-erred",
"key": key,
"exception": exception,
}
for cs in ts.who_wants or ():
client_msgs[cs.client_key] = [report_msg]

Check warning on line 3021 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L3020-L3021

Added lines #L3020 - L3021 were not covered by tests

ts.state = "erred"
return self._propagate_erred(

Check warning on line 3024 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L3023-L3024

Added lines #L3023 - L3024 were not covered by tests
ts,
cause=ts.key,
exception=exception,
)

def _transition_memory_forgotten(self, key: Key, stimulus_id: str) -> RecsMsgs:
ts = self.tasks[key]

Expand Down Expand Up @@ -3078,6 +3117,7 @@
("no-worker", "processing"): _transition_no_worker_processing,
("no-worker", "erred"): _transition_no_worker_erred,
("released", "forgotten"): _transition_released_forgotten,
("memory", "erred"): _transition_memory_erred,
("memory", "forgotten"): _transition_memory_forgotten,
("erred", "released"): _transition_erred_released,
("memory", "released"): _transition_memory_released,
Expand Down Expand Up @@ -5521,7 +5561,9 @@

for ts in list(ws.has_what):
self.remove_replica(ts, ws)
if not ts.who_has:
if ts in ws.actors:
recommendations[ts.key] = "erred"
elif not ts.who_has:

Check warning on line 5566 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L5564-L5566

Added lines #L5564 - L5566 were not covered by tests
if ts.run_spec:
recompute_keys.add(ts.key)
recommendations[ts.key] = "released"
Expand Down
70 changes: 68 additions & 2 deletions distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Actor,
BaseActorFuture,
Client,
Event,
Future,
Nanny,
as_completed,
Expand All @@ -23,7 +24,7 @@
from distributed.metrics import time
from distributed.utils import LateLoopEvent
from distributed.utils_test import cluster, double, gen_cluster, inc
from distributed.worker import get_worker
from distributed.worker import Worker, get_worker


class Counter:
Expand Down Expand Up @@ -290,7 +291,7 @@ async def test_failed_worker(c, s, a, b):

await a.close()

with pytest.raises(ValueError, match="Worker holding Actor was lost"):
with pytest.raises(RuntimeError, match="Worker holding Actor was lost"):
await counter.increment()


Expand Down Expand Up @@ -824,3 +825,68 @@ def demo(self):

actor = await c.submit(Actor, actor=True, workers=[a.address])
assert await actor.demo() == a.address


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_actor_worker_host_leaves_gracefully(c, s, a):
# see also test_actor_worker_host_dies
async with Worker(s.address, nthreads=1) as b:

counter = await c.submit(
Counter, actor=True, workers=[b.address], allow_other_workers=True
)

enter_ev = Event()
wait_ev = Event()

def foo(couner, enter_ev, wait_ev):
enter_ev.set()
wait_ev.wait()

fut = c.submit(
foo,
counter,
enter_ev,
wait_ev,
workers=[a.address],
allow_other_workers=True,
)

await enter_ev.wait()
await wait_ev.set()
with pytest.raises(RuntimeError, match="Worker holding Actor was lost"):
await fut.result()


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_actor_worker_host_dies(c, s, a):
# see also test_actor_worker_host_leaves_gracefully
async with Worker(s.address, nthreads=1) as b:

counter = await c.submit(
Counter, actor=True, workers=[b.address], allow_other_workers=True
)

enter_ev = Event()
wait_ev = Event()

def foo(couner, enter_ev, wait_ev):
enter_ev.set()
wait_ev.wait()

fut = c.submit(
foo,
counter,
enter_ev,
wait_ev,
workers=[a.address],
allow_other_workers=True,
)

await enter_ev.wait()
# Simulate the worker going down
s.stream_comms[b.address].close()
await b.finished()
await wait_ev.set()
with pytest.raises(RuntimeError, match="Worker holding Actor was lost"):
await fut.result()
Loading