Skip to content

Deadlock when deserializing a key fails #7060

Open
@gjoseph92

Description

@gjoseph92

If a key received from another worker fails to deserialize, it will:

  1. deadlock the task that needed that key
  2. break future communication between the workers, by throwing off the order of their requests and responses.
@gen_cluster(
    client=True,
    nthreads=[("", 1), ("", 1)],
    Worker=Nanny,  # Nanny necessary to force serialization, not sure why
)
async def test_deserialization_failure_receiving_key(c, s, a, b):
    class CantDeserialize:
        def __getstate__(self):
            return None

        def __setstate__(self, state):
            raise RuntimeError("can't deserialize me")

    f1 = c.submit(CantDeserialize, key="f1", workers=[a.address])
    f2 = c.submit(CantDeserialize, key="f2", workers=[b.address])

    # `f1` must be transferred to worker `b`
    f3 = c.submit(lambda x, y: None, f1, f2, workers=[b.address])

    # Currently deadlocks.
    # If you comment this out, and don't wait for `f3`, then the test will
    # deadlock at `await g3 == 3`, because worker<->worker comms have been messed up.
    with pytest.raises(asyncio.CancelledError):  # what error should this actually be?
        await f3

    del f1, f2, f3
    # Now, try normal tasks that should work
    g1 = c.submit(inc, 0, key="g1", workers=[a.address])
    g2 = c.submit(inc, 1, key="g2", workers=[b.address])

    g3 = c.submit(sum, [g1, g2], key="g3", workers=[b.address])

    assert await g3 == 3

I haven't looked into problem 1 much yet. I'm not sure why the dependent task doesn't get transitioned to error as well. Here's what a worker task looked like in a dump of a real cluster that experienced this issue:

('sub-341a2315e88b3b0847ea02cb2582f5d3', 2, 1):
  dependencies:
  - <TaskState "('rechunk-merge-3b908056696e32b9928bfb6c26433bd7', 2, 1)" memory>
  - <TaskState "('rechunk-merge-c50a2a505f3a9d80321c8bb9188b08de', 2, 1)" error>
  duration: 0.00014734268188476562
  key: ('sub-341a2315e88b3b0847ea02cb2582f5d3', 2, 1)
  priority:
  - 0
  - 1
  - 24
  - -24
  state: waiting
  waiting_for_data:
  - <TaskState "('rechunk-merge-c50a2a505f3a9d80321c8bb9188b08de', 2, 1)" error>

Clearly, if a dependency is in state error, the task should not be in waiting anymore. This feels like a state machine bug to me?


Problem 2 is kinda funny. On my real cluster, I saw a traceback like this on the worker requesting a key:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2029, in gather_dep
    response = await get_data_from_worker(
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2818, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/usr/local/lib/python3.9/site-packages/distributed/utils_comm.py", line 383, in retry_operation
    return await retry(
  File "/usr/local/lib/python3.9/site-packages/distributed/utils_comm.py", line 368, in retry
    return await coro()
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2798, in _get_data
    response = await send_recv(
  File "/usr/local/lib/python3.9/site-packages/distributed/core.py", line 944, in send_recv
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.9/site-packages/distributed/core.py", line 770, in _handle_comm
    result = await result
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 1737, in get_data
    assert response == "OK", response
AssertionError: {'op': 'get_data', 'keys': {"('rechunk-merge-c50a2a505f3a9d80321c8bb9188b08de', 2, 1)"}, 'who': 'tls://10.0.1.172:44309', 'max_connections': None, 'reply': True}

Notice the raise exc.with_traceback(tb). This is actually the RPC system re-raising an error that occurred on the other worker (the sender), which is coming from here in get_data (the handler for the RPC):

compressed = await comm.write(msg, serializers=serializers)
response = await comm.read(deserializers=serializers)
assert response == "OK", response

What's happening is that in get_data_from_worker (confusingly, the caller of the RPC):

response = await send_recv(
comm,
serializers=serializers,
deserializers=deserializers,
op="get_data",
keys=keys,
who=who,
max_connections=max_connections,
)
try:
status = response["status"]
except KeyError: # pragma: no cover
raise ValueError("Unexpected response", response)
else:
if status == "OK":
await comm.write("OK")

send_recv errors, because it fails to deserialize the data. So we never do the await comm.write("OK").

On the other end, the worker sending the data has no idea we received it and failed to deserialize. It's still waiting for us to say OK.

Which means that the next time we ask that worker for another piece of data, we send an 'op': 'get_data' message over the same comm—which get_data is still waiting to hear an OK over. 'op': 'get_data' doesn't match OK, so the subsequent data transfer request fails as well.

cc @crusaderky @fjetter

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething is brokendeadlockThe cluster appears to not make any progressstabilityIssue or feature related to cluster stability (e.g. deadlock)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions