Skip to content

Failed tasks persist in the Client even if the Future object falls out of scope, preventing Future deallocation/task cleanup from the cluster #8790

Open
@bcalvert-graft

Description

@bcalvert-graft

The issue

Note, I think this issue is very similar to #8789, but it's even simpler to reproduce as we can do it with a local Client. Given the similarity, I'm fairly certain that the outcome of the discussion here will largely subsume the discussion in #8789, but I'm submitting it as its own issue just in case there are pertinent differences. If I should have just edited #8789 instead, please share that feedback and I'll change moving forward.

Hi Dask Maintainers,

Thank you in advance for reading through this bug report. Please consider the following code snippet:

from typing import Any, Callable
from dask.distributed import Client

def submit_other_func_to_dask(dask_client: Client, other_func: Callable[[], Any]):
    other_func_fut = dask_client.submit(other_func)
    # Since the Future was created in this function, after we return the result,
    # nominally, it should fall out of scope
    return other_func_fut.result()

# Define some functions to feed into the above function
def return_1():
    return 1

def always_fails():
    raise ValueError("Always fails")

# Make a Client (+ LocalCluster)
client = Client(n_workers=1)
# No futures associated with the client to start
assert len(client.futures) == 0
# No tasks on the cluster to start
assert len(client.cluster.scheduler.tasks) == 0
submit_other_func_to_dask(client, return_1)  # returns 1
# The prior two assertions both hold, since the Future fell out of scope in the body of submit_other_func_to_dask
assert len(client.futures) == 0
assert len(client.cluster.scheduler.tasks) == 0
# But if we submit a task that fails
submit_other_func_to_dask(client, always_fails)
# That task gets stuck in the client.futures
assert len(client.futures) == 1
# And thus there is at least one reference to it, so the task stays on the cluster as well
assert len(client.cluster.scheduler.tasks) == 1

where the problematic bits are the last two non-comment lines. Basically, if I make a Future self-contained within the scope of some function, retrieve the result of that Future and then exit the function scope, the Future gets deallocated from local memory and then nominally, the task should be removed from the cluster once it finishes computation (matching the documentation here, for example). We can see that exact behavior occur with the submit_other_func_to_dask(client, return_1) line.

However, if, for that function-scoped Future, the computation associated with that Future fails, when I try to retrieve the result of that Future, I get the exception (as expected) but the Future sticks around in the client.futures collection, meaning it doesn't get GC'ed in the local Python process and so it never gets cleaned up from the Scheduler.

This can be ok if the Client we're using is transient. Unfortunately, it's not so great if this happens with either explicitly created long-lived Clients (our application does so) or with the implicitly long-lived Clients associated with individual workers (say, in the context of submitting a task from another task, as shown in #8789). In those cases, as mentioned in #8789, this represents, at bare minimum, a memory leak and can also lead to a "poisoned cache" if the function-scoped Future we create has a fixed task key and is a computation we might submit multiple times over the lifecycle of our application

A temporary fix?

As mentioned in #8789, one way I've thought about for resolving this is to wrap the Future.result call in a try/except block

def submit_other_func_to_dask(dask_client: Client, other_func: Callable[[], Any]):
    other_func_fut = dask_client.submit(other_func)
    # Since the Future was created in this function, after we return the result,
    # nominally, it should fall out of scope
    try:
        return other_func_fut.result()
    except:
        other_func_fut.cancel()
        raise

Other than negatively impacting the code ergonomics of using Dask, I don't see anything glaringly bad with this option, but I'm keen to hear if there's a better pattern and/or API I should use to streamline this.

Environment

  • Python Version: 3.10
  • OS: Mac/Linux
  • Dask Version: 2024.1.1
  • Install method: conda, but tried pip as well

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions