Description
Describe the issue:
The issue is observed in the application, which creates Dask array from HDF5 dataset (HDF5 file is opened in read-only mode) and then use the array for computations in the workers. After computations are completed and the file is closed in the main process. Then the application attempts to open the same file for writing to the save results of computations and fails with error message Failed to open file for writing: Unable to synchronously open file (file is already open for read-only)
.
The issue is not new. It was first observed in Dask 2021.7.0, but the workaround that involved clearing memory and running a small task in the workers kept the application running. Unfortunatley, after Dask 2023.9.3 the workarounds don't work, and even restarting the client does not help.
Minimal Complete Verifiable Example:
import dask
import dask.array as da
import distributed
from dask.distributed import Client, wait
import h5py
import time as ttime
import numpy as np
import platform
import logging
logger = logging.Logger(__name__)
def _dask_release_file_descriptors(*, client):
"""
A series of hacks that used to force Dask to close the file,
but don't work with Dask >= v2023.9.3.
"""
# Runs small task on Dask client (required after Dask v2021.7.0)
rfut = da.sum(da.random.random((1000,), chunks=(10,))).persist(scheduler=client)
rfut.compute(scheduler=client)
current_os = platform.system()
if current_os == "Linux":
# Starting with Dask/Distributed version 2022.2.0 the following step is required:
# https://distributed.dask.org/en/stable/worker-memory.html#manually-trim-memory
# (works for Linux only, there are different solutions for other OS if needed)
import ctypes
def trim_memory() -> int:
libc = ctypes.CDLL("libc.so.6")
return libc.malloc_trim(0)
client.run(trim_memory)
if __name__ == "__main__":
print(f"Version of Dask: {dask.__version__}")
print(f"Version of Distributed: {distributed.__version__}")
print(f"===============================")
# Create HDF5 file
print("Creating HDF5 file")
fln = "test.h5"
with h5py.File(fln, "w") as f:
dset = f.create_dataset("data", data=np.random.random(size=(100, 100)), chunks=(10, 10), dtype="float64")
print("Creating client")
client = Client()
# Process the file
print("Loading and processing data")
with h5py.File(fln, "r") as f:
data = da.from_array(f["data"], chunks=(10, 10))
sm_fut = da.sum(data, axis=0).persist(scheduler=client)
sm = sm_fut.compute(scheduler=client)
print(f"sm={sm}")
# Try to make Dask release the resources.
del sm_fut
_dask_release_file_descriptors(client=client)
# Try to open file for writing
print("Attempting to open file for writing")
try:
with h5py.File(fln, "r+") as f:
print("File was opened for writing !!!")
except OSError as ex:
logger.exception("Failed to open file for writing: %s", ex)
print("Closing client")
client.close()
Output
Version of Dask: 2023.9.3
Version of Distributed: 2023.9.3
===============================
Creating HDF5 file
Creating client
Loading and processing data
sm=[49.07035905 50.7408857 53.22938829 55.0860977 50.46002873 49.18274641
52.53514242 50.04221161 54.25822461 47.82321329 48.67929025 47.69840206
51.18568597 48.38903195 44.22137585 48.95708402 48.77094894 57.81744073
45.50887884 47.15333875 46.63276952 52.13236937 44.1608373 45.80125009
48.12951925 48.53631765 50.17888398 49.46190444 48.83796292 49.14353978
49.09835558 51.024579 51.69941642 55.73760187 52.54068909 53.56511402
48.27140464 48.64539501 49.80548893 52.26121039 50.93010051 50.11377153
48.4494458 52.92684851 51.54249341 45.7196064 55.96786986 51.77469759
47.93116461 51.04749181 52.48865301 50.71329317 46.77027536 48.32817114
48.32709842 54.64513028 48.0574896 50.1096233 54.39883198 50.53579316
47.80453158 46.38863046 47.36735421 51.10020424 51.76401106 52.116815
45.30900869 51.74240129 48.35973624 48.35493965 53.87065311 50.66255308
52.25417077 52.89681257 54.71724155 50.8693448 55.21182029 50.15425953
46.99193573 43.59772513 50.98128225 51.07151455 50.67648636 53.04815651
46.03783866 54.09294989 52.93293196 52.11284659 45.12975364 47.60484428
47.41706809 47.73139106 48.77933056 49.94992988 46.41711489 50.53056979
51.99010759 50.60218304 44.9712397 53.21132181]
Attempting to open file for writing
Failed to open file for writing: Unable to synchronously open file (file is already open for read-only)
Traceback (most recent call last):
File "/home/dmgav/Projects/tmp/pyxrf_dask_issue/dask_issue.py", line 67, in <module>
with h5py.File(fln, "r+") as f:
^^^^^^^^^^^^^^^^^^^^
File "/home/dmgav/miniconda3/envs/pyxrf-dev-test/lib/python3.11/site-packages/h5py/_hl/files.py", line 562, in __init__
fid = make_fid(name, mode, userblock_size, fapl, fcpl, swmr=swmr)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/dmgav/miniconda3/envs/pyxrf-dev-test/lib/python3.11/site-packages/h5py/_hl/files.py", line 237, in make_fid
fid = h5f.open(name, h5f.ACC_RDWR, fapl=fapl)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "h5py/_objects.pyx", line 54, in h5py._objects.with_phil.wrapper
File "h5py/_objects.pyx", line 55, in h5py._objects.with_phil.wrapper
File "h5py/h5f.pyx", line 102, in h5py.h5f.open
OSError: Unable to synchronously open file (file is already open for read-only)
Closing client
Anything else we need to know?:
Environment:
- Dask version: >= 2023.9.3
- Python version: 3.11
- Operating System: Ubuntu (also observed on Windows)
- Install method (conda, pip, source): pip/conda-forge