Description
(Not sure if this should go here or dask-jobqueue)
I'm trying to debug problems with an adaptive deployment on an LSF cluster.
I have ~.5 TB of data saved on disk as a .zarr file that I use to generate a dask array with the following properties:
shape=(11087, 2500, 10000)
chunksize=(40, 1024, 1024)
dtype='uint16'
Working with this dataset using a statically-scaled cluster works fine, but the adaptive cluster fails for the same operations. I'm trying to debug these problems using a dummy dataset (i.e., da.zeros()
with the same shape, chunks, and datatype of my original data). With this dummy data and an LSFCluster scaled to 10 workers, the following code runs without problems:
client_regular = Client(get_jobqueue_cluster())
client_regular.cluster.scale(10)
result = client_regular.compute([dummy.min(), dummy.max()], sync=True)
(get_jobqueue_cluster
is a deployment-specific function for creating a LSFCluster
: source)
but the same computation on an adaptive
cluster fails:
client_adaptive = Client(get_jobqueue_cluster())
client_adaptive.cluster.adapt(minimum_jobs=10)
result = client_adaptive.compute([dummy.min(), dummy.max()], sync=True)
I see a lot of errors, e.g. lots of these:
distributed.core - ERROR - 'tcp://10.36.60.31:46145'
Traceback (most recent call last):
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream
handler(**merge(extra, msg))
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2359, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.36.60.31:46145'
distributed.utils - ERROR - 'tcp://10.36.60.31:46145'
Traceback (most recent call last):
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors
yield
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 1529, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2446, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream
handler(**merge(extra, msg))
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2359, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.36.60.31:46145'
and lots of these AssertionError
s from L2923 of scheduler.py
:
distributed.utils - ERROR -
Traceback (most recent call last):
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors
yield
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 3161, in retire_workers
delete=False,
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2923, in replicate
assert count > 0
AssertionError
None of these errors is fatal, so despite a constant stream of errors the computation via the adaptive cluster blocks indefinitely. Even if I use a keyboard interrupt, the adaptive cluster keeps running futile tasks in the background, which isn't great. The worker logs haven't revealed anything too illustrative, just lots of errors due to workers not finding data (presumably because some worker upstream has been killed).
If I reduce the size of the data (e.g., 100 GB), I can use the adaptive cluster without issues. But at some fraction of the full data size I start getting these errors and the whole thing breaks.
So, are there any ideas for debugging / fixing this?