Description
What happened:
For cases where there are broken dependencies, e.g. deserialization isn't working, the workers no longer have the possibility to detect this failure and try indefinitely instead of raising an exception.
What you expected to happen:
The computation terminates with an appropriate exception.
Minimal Complete Verifiable Example:
@gen_cluster(client=True, Worker=Nanny, timeout=4600)
async def test_get_data_faulty_dep(c, s, a, b):
"""This test creates a broken dependency and forces serialization by
requiring it to be submitted to another worker. The computation should
eventually finish by flagging the dep as bad and raise an appropriate
exception.
"""
class BrokenDeserialization:
def __setstate__(self, *state):
raise AttributeError()
def __getstate__(self, *args):
return ""
def create():
return BrokenDeserialization()
def collect(*args):
return args
fut1 = c.submit(create, workers=[a.name])
fut2 = c.submit(collect, fut1, workers=[b.name])
with pytest.raises(RuntimeError, match="Could not find dependencies for collect-"):
await fut2.result()
Anything else we need to know?:
I tried addressing this issue in #4360 but I believe this is difficult to resolve without a more fundamental rework of the way tasks states are managed.
I explained the problem of this infinite loop in #4360 (comment) since this is a race condition between the worker-internal missing dep handler and the scheduler escalation. The scheduler escalation triggers a release and the worker forgets the dependency. The TaskState.suspicious_count
is therefore reset and the exception is never triggered.
I believe resolving this requires the introduction of additional task states on worker side (xref #4413; similar to released/forgotten on scheduler) where the meta information of a task is only forgotten once the scheduler allows this to happen. Alternatively, the scheduler must keep track of these suspicious tasks, relieving the worker of its responsibility to track these things.