Description
Describe the issue:
We were seeing some our jobs enter a state where not all of the tasks had completed, but the workers were sitting idle. This seemed to be happening fairly consistently on a particular set of jobs we were running.
Looking in the worker logs, I saw the following error:
File "/.pyenv/versions/3.8.10/lib/python3.8/site-packages/distributed/protocol/core.py", line 100, in _encode_default
frames.extend(create_serialized_sub_frames(obj))
File "/.pyenv/versions/3.8.10/lib/python3.8/site-packages/distributed/protocol/core.py", line 67, in create_serialized_sub_frames
_inplace_compress_frames(sub_header, sub_frames)
File "/.pyenv/versions/3.8.10/lib/python3.8/site-packages/distributed/protocol/core.py", line 50, in _inplace_compress_frames
compression[i], frames[i] = maybe_compress(
File "/.pyenv/versions/3.8.10/lib/python3.8/site-packages/distributed/protocol/compression.py", line 188, in maybe_compress
compressed = compress(mv)
_block.LZ4BlockError: Compression failed
It seems that at this point, Dask had attempted to transfer data between workers, failed, and then gotten into a bit of a stalemate state. Looking into the LZ4 code, I believe we were hitting this case where the input data compresses so poorly it exceeds the original size. Why this doesn't get hit in the "subset test", I'm not sure.
From the perspective of the library, it seems reasonable that this case should raise an error (indeed, it did, technically, fail to compress, but there was nothing inherently wrong or corrupt with the data).
What are your thoughts on catching compression errors and returning the uncompressed data as a fall back? I can see issues with a) what to catch, we don't want to catch errors that highlight a bad system state and b) handling errors from all the different libraries, however I feel like some of this is mitigated by the fact that this code is in full control of the compress-decompress lifecycle, and it can be reasonably expected that compression should fail. Decompression errors should definitely not be handled.
Thoughts here?
Environment:
- Dask version: 2022.11.1
- Python version: 3.8.10
- Operating System: Ubuntu 16.04.7 LTS
- Install method (conda, pip, source): Pip