Description
Describe the issue:
During P2P shuffling, the error ArrowInvalid: These dictionaries cannot be combined. The unified dictionary requires a larger index type.
occurs.
Minimal Complete Verifiable Example:
To reproduce the error, we created a repository containing the dataset, code to reproduce the error, and a docker-compose file to start the dask cluster and dask-notebook.
How to reproduce the bug
To start containers, run the following command:
docker compose up
In the logs of the notebook container, there will be a link to the notebook of the next format:
http://127.0.0.1:8888/lab?token=c5ff9594e94ae9556be22c1c385152d8b30e0b11ae7b0689
Two notebooks will be available in JupyterLab, distributed.ipynb
and local-cluster.ipynb
.
An example of code that throws an error:
import dask.dataframe as dd
df = dd.read_parquet('/data/test_dataset.parquet')
def _calculate_time(df):
df = df.sort_values('sd')
df['time_diff'] = (df['sd'].diff().dt.total_seconds())
return df
df = (
df.groupby(['date', 'fn', 'a', 'wa', 'wp'], observed=True)
.apply(_calculate_time, meta=_calculate_time(df._meta))
.reset_index(drop=True)
)
df.groupby(['a'], observed=True).count().compute()
Anything else we need to know?:
The dataset for which this error occurs:
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 date 5 non-null datetime64[us]
1 fn 5 non-null category
2 a 5 non-null category
3 st 5 non-null datetime64[ns]
4 pc 5 non-null category
5 sd 5 non-null datetime64[ns]
6 it 0 non-null category
7 ps 5 non-null category
8 ss 5 non-null category
9 lb 5 non-null category
10 ri 5 non-null category
11 watc 5 non-null category
12 wat 5 non-null category
13 wa 5 non-null category
14 wp 5 non-null category
pyarrow.Table
date: timestamp[us]
fn: dictionary<values=string, indices=int32, ordered=0>
a: dictionary<values=string, indices=int32, ordered=0>
st: timestamp[ns]
pc: dictionary<values=string, indices=int32, ordered=0>
sd: timestamp[ns]
it: dictionary<values=string, indices=int32, ordered=0>
ps: dictionary<values=string, indices=int32, ordered=0>
ss: dictionary<values=string, indices=int32, ordered=0>
lb: dictionary<values=string, indices=int32, ordered=0>
ri: dictionary<values=string, indices=int32, ordered=0>
watc: dictionary<values=string, indices=int32, ordered=0>
wat: dictionary<values=string, indices=int32, ordered=0>
wa: dictionary<values=string, indices=int32, ordered=0>
wp: dictionary<values=string, indices=int32, ordered=0>
The error resolves when changing the type of some fields to str before groupby:
df['ps'] = df['ps'].astype(str)
df['ss'] = df['ss'].astype(str)
df['lb'] = df['lb'].astype(str)
df['ri'] = df['ri'].astype(str)
We suspect that during P2P shuffling, the initial index size (int32 as seen in pyarrow.Table) for the dictionary is not used.
Therefore, it seems to us that the error occurs during combining a dictionary with int8 and int16 indexes.
Stacktrace can be viewed here distributed.ipynb and here local-cluster.ipynb
Feel free to ask any additional questions.
Environment:
- Dask version: 2024.1.0
- Python version: 3.10.12.final.0
- Operating System: macOS Sonoma 14.2.1
- Install method (conda, pip, source): docker