Description
I'm new to dask and trying to parallel some calculations on HPC cluster using SLURMCluster
. Apparently, I faced a memory leak similar to that discussed in #5960. There is a reproducer of code I run (the only difference is that in real workflow I submit scipy.optimize.minimize function with some args, then get result and save it to file).
from memory_profiler import profile
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, as_completed
@profile
def big_arr():
return [5]*(10**5)
@profile
def go():
extra_directives = [f"-J check_mem", f"-o daskout", f"-e daskerr"]
cluster = SLURMCluster(
queue='mpi', # your queue
account="tguskov", # your account
cores=1,
processes=1,
memory="1GB",
job_name='check_mem',
walltime='06:00:00',
job_extra_directives=extra_directives, #_directives
)
client = Client(cluster)
client.cluster.scale(5)
futures = []
for i in range(10):
futures.append(client.submit(big_arr))
for future in as_completed(futures):
sol = future.result()
del future, sol
client.cluster.scale(0)
del futures, client, cluster
for i in range(4):
go()
There is also a graph of mprof
memory usage by the script above:
It's clear for me from that graph that smth's wrong with submitting a new future (10 peaks corresponding to 10 submitted futures on each iteration). CreatingSLURMCluster
outside of the function go()
doesn't make any difference. The lines with aggressive deletion are also doesn't really affect the memory usage.
Here I used dask 2022.1.1
and distributed 2022.1.1
according to temporary solution from #5960, but that didn't work out for me either. The same situation with the latest(2024.8.0
) version of dask
and distributed
.
The memory leak is around a few GB per iteration in my original case where I try to submit around ~500 futures on 100 workers.
I'm also providing my bash script just in case:
#!/bin/sh
#SBATCH -p mpi
#SBATCH -N 1
#SBATCH -n 1
#SBATCH --mem=1GB
#SBATCH -o output
#SBATCH -e errors
ls -l|grep mprofile|xargs rm -f --
conda activate dask
mprof run --multiprocess --python python check.py # --include-children
Env
dask==2022.01.1/2024.8.0
distributed==2022.01.1/2024.8.0
dask_jobqueue==0.8.5