-
-
Notifications
You must be signed in to change notification settings - Fork 33
Description
When doing shuffle operations on non-pandas dataframes, we run into issues in partd due to its pandas-specific serialization logic; for example, when trying to do groupby.apply operations with dask-cudf, we run into issues due to not implementing an internal pandas function:
import cudf
import dask_cudf
df = cudf.DataFrame()
df['key'] = [0,0,1,1,1]
df['val']= range(5)
ddf = dask_cudf.from_cudf(df, npartitions=1)
ddf.groupby("key").val.apply(lambda x: x.sum()).compute()Details
/tmp/ipykernel_61671/1646734033.py:10: UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
Before: .apply(func)
After: .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
or: .apply(func, meta=('x', 'f8')) for series result
ddf.groupby("key").val.apply(lambda x: x.sum()).compute()
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/tmp/ipykernel_61671/1646734033.py in <module>
8 ddf = dask_cudf.from_cudf(df, npartitions=1)
9
---> 10 ddf.groupby("key").val.apply(lambda x: x.sum()).compute()
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
286 dask.base.compute
287 """
--> 288 (result,) = compute(self, traverse=False, **kwargs)
289 return result
290
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
569 postcomputes.append(x.__dask_postcompute__())
570
--> 571 results = schedule(dsk, keys, **kwargs)
572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
573
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
551 """
552 kwargs.pop("num_workers", None) # if num_workers present, remove it
--> 553 return get_async(
554 synchronous_executor.submit,
555 synchronous_executor._max_workers,
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
494 while state["waiting"] or state["ready"] or state["running"]:
495 fire_tasks(chunksize)
--> 496 for key, res_info, failed in queue_get(queue).result():
497 if failed:
498 exc, tb = loads(res_info)
~/conda/envs/rapids-21.12/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
435 raise CancelledError()
436 elif self._state == FINISHED:
--> 437 return self.__get_result()
438
439 self._condition.wait(timeout)
~/conda/envs/rapids-21.12/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
387 if self._exception:
388 try:
--> 389 raise self._exception
390 finally:
391 # Break a reference cycle with the exception in self._exception
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in submit(self, fn, *args, **kwargs)
536 fut = Future()
537 try:
--> 538 fut.set_result(fn(*args, **kwargs))
539 except BaseException as e:
540 fut.set_exception(e)
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in batch_execute_tasks(it)
232 Batch computing of multiple tasks with `execute_task`
233 """
--> 234 return [execute_task(*a) for a in it]
235
236
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in <listcomp>(.0)
232 Batch computing of multiple tasks with `execute_task`
233 """
--> 234 return [execute_task(*a) for a in it]
235
236
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
223 failed = False
224 except BaseException as e:
--> 225 result = pack_exception(e, dumps)
226 failed = True
227 return key, result, failed
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
218 try:
219 task, data = loads(task_info)
--> 220 result = _execute_task(task, data)
221 id = get_id()
222 result = dumps((result, id))
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/dataframe/shuffle.py in shuffle_group_3(df, col, npartitions, p)
916 g = df.groupby(col)
917 d = {i: g.get_group(i) for i in g.groups}
--> 918 p.append(d, fsync=True)
919
920
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/partd/encode.py in append(self, data, **kwargs)
21
22 def append(self, data, **kwargs):
---> 23 data = valmap(self.encode, data)
24 data = valmap(frame, data)
25 self.partd.append(data, **kwargs)
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/toolz/dicttoolz.py in valmap(func, d, factory)
81 """
82 rv = factory()
---> 83 rv.update(zip(d.keys(), map(func, d.values())))
84 return rv
85
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/partd/pandas.py in serialize(df)
179 """
180 col_header, col_bytes = index_to_header_bytes(df.columns)
--> 181 ind_header, ind_bytes = index_to_header_bytes(df.index)
182 headers = [col_header, ind_header]
183 bytes = [col_bytes, ind_bytes]
~/conda/envs/rapids-21.12/lib/python3.8/site-packages/partd/pandas.py in index_to_header_bytes(ind)
111 values = ind.values
112
--> 113 header = (type(ind), ind._get_attributes_dict(), values.dtype, cat)
114 bytes = pnp.compress(pnp.serialize(values), values.dtype)
115 return header, bytes
AttributeError: 'Int64Index' object has no attribute '_get_attributes_dict'It also looks like this has caused issues with pandas dataframe subclasses not being maintained through serialization roundtrips as noted in #52.
Looking through the serialization code itself, it seems like things haven't been modified significantly in a few years, which raises the question of if we can do things in a different (ideally more flexible) way today. For example, we currently only use pickle.dumps for a small subset of pandas Index subclasses:
Lines 98 to 102 in 236a44b
| # These have special `__reduce__` methods, just use pickle | |
| if isinstance(ind, (pd.DatetimeIndex, | |
| pd.MultiIndex, | |
| pd.RangeIndex)): | |
| return None, dumps(ind) |
When it appears that all Index subclasses should support serialization through pickle. Additionally, it looks like we're opting to manually construct header/bytes from pandas-like objects during serialization when many already implement serialization/deserialization functions that could be used for this purpose.
Essentially, I'm wondering if we could refactor the serialization methods here to:
- check for
serialize/deserializemethods and use them if available - if not, fall back on
pickle.dumpsif__reduce__is available - fall back on manual serialization as a last resort
Some other related goals could be to examine if something similar can be done for the NumPy serialization methods, and potentially adding testing for non-pandas / pandas subclass dataframes.