Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion merlin/core/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
from merlin.core.compat import HAS_GPU # pylint: disable=unused-import # noqa: F401
from merlin.core.compat import cudf
from merlin.core.compat import cupy as cp
from merlin.core.compat import dask_cudf
from merlin.core.protocols import DataFrameLike, DictLike, SeriesLike

rmm = None

if cudf:
try:
import dask_cudf
import rmm # type: ignore[no-redef]
from cudf.core.column import as_column, build_column

Expand Down Expand Up @@ -311,6 +311,27 @@ def series_has_nulls(s):
return s.has_nulls


def columnwise_explode(series):
"""Explode a list column along the column axis"""
if isinstance(series, pd.Series):
df = pd.DataFrame(series.tolist())
else:
df = cudf.DataFrame(series.to_pandas().tolist())
df.columns = [f"{series.name}_{c}" for c in df.columns]
return df


def dataframe_columnwise_explode(dataframe):
"""Explode all list columns in a dataframe along the column axis"""
columns_in_dataframe = dataframe.columns
for col in columns_in_dataframe:
if is_list_dtype(dataframe[col]):
col_df = columnwise_explode(dataframe[col])
dataframe = concat_columns([dataframe, col_df])
dataframe = dataframe.drop(labels=col, axis=1)
return dataframe


def list_val_dtype(ser: SeriesLike) -> Optional[np.dtype]:
"""
Return the dtype of the leaves from a list or nested list
Expand Down Expand Up @@ -356,6 +377,10 @@ def is_list_dtype(ser):
return pd.api.types.is_list_like(ser.values[0])
elif cudf and isinstance(ser, (cudf.Series, cudf.ListDtype)):
return cudf_is_list_dtype(ser)
elif dask_cudf and isinstance(ser, dask_cudf.Series):
return cudf_is_list_dtype(ser.head())
elif isinstance(ser, dd.core.Series):
return pd.api.types.is_list_like(ser.head()[0])
elif isinstance(ser, np.ndarray) or (cp and isinstance(ser, cp.ndarray)):
return len(ser.shape) > 1
return pd.api.types.is_list_like(ser)
Expand Down
37 changes: 37 additions & 0 deletions merlin/io/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
from dask.utils import natural_sort_key, parse_bytes
from fsspec.core import get_fs_token_paths
from fsspec.utils import stringify_path
from npy_append_array import NpyAppendArray

from merlin.core.compat import HAS_GPU, cudf, device_mem_size
from merlin.core.dispatch import (
convert_data,
dataframe_columnwise_explode,
hex_to_int,
is_dataframe_object,
is_list_dtype,
Expand Down Expand Up @@ -1067,6 +1069,41 @@ def to_hugectr(
schema=self.schema,
)

def to_npy(
self,
output_file: str,
append: bool = False,
):
"""Converts a dataset into an npy file, can append if data is larger than memory

Parameters
----------
output_file : str
The output file path for the resulting npy file
append : bool, optional
Enables append mode for larger that memory data, by default False
"""
data = self.to_ddf()
if append:
data = Dataset(data)
itr = iter(data.to_iter())
with NpyAppendArray(output_file) as nf:
for df in itr:
to_write = dataframe_columnwise_explode(df)
# after the explode there may not be object series anymore
if "object" in to_write.dtypes.values and append:
raise TypeError("Cannot append object columns")
if (to_write.isnull()).any().any():
raise ValueError("Cannot convert data because null values were detected")
nf.append(to_write.to_numpy())
else:
to_write = dataframe_columnwise_explode(data.compute())
if "object" in to_write.dtypes.values and append:
raise TypeError("Cannot append object columns")
if (to_write.isnull()).any().any():
raise ValueError("Cannot convert data because null values were detected")
np.save(output_file, to_write.to_numpy())

@property
def num_rows(self):
return self.engine.num_rows
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ tqdm>=4.0
tensorflow-metadata>=1.2.0
betterproto<2.0.0
packaging
npy-append-array

# pynvml==11.5.0 is incompatible with distributed<2023.2.1
pynvml>=11.0.0,<11.5
51 changes: 50 additions & 1 deletion tests/unit/io/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import glob

import numpy as np
import pandas as pd
import pytest

from merlin.core.compat import HAS_GPU, cudf
from merlin.core.dispatch import make_df
from merlin.core.dispatch import dataframe_columnwise_explode, make_df
from merlin.io import Dataset


Expand Down Expand Up @@ -55,3 +58,49 @@ def test_infer_list_dtype_unknown():
df = pd.DataFrame({"col": [[], []]})
dataset = Dataset(df, cpu=True)
assert dataset.schema["col"].dtype.element_type.value == "unknown"


@pytest.mark.parametrize("engine", ["csv", "parquet"])
def test_dask_df_array_npy(tmpdir, datasets, engine):
paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0])
# cannot have any null/NA entries
dataset = Dataset(Dataset(paths).to_ddf().compute().fillna(method="ffill"))
path = str(tmpdir / "result.npy")
dataset.to_npy(path)
nparr = np.load(path, allow_pickle=True)
numpy_arr = dataset.to_ddf().compute().to_numpy()
assert (nparr == numpy_arr).all()


@pytest.mark.parametrize("append", [True, False])
@pytest.mark.parametrize("engine", ["csv", "parquet"])
def test_dask_df_array_npy_append(tmpdir, datasets, engine, append):
df = make_df(
{
"id": [1, 2, 3, 4, 5, 6],
"embed_1": [1, 2, 3, 4, 5, 6],
"embed_2": [1, 2, 3, 4, 5, 6],
"embed_3": [1, 2, 3, 4, 5, 6],
}
)
dataset = Dataset(df)
path = str(tmpdir / "result.npy")
dataset.to_npy(path, append=append)
nparr = np.load(path, allow_pickle=True)
numpy_arr = dataset.to_ddf().compute().to_numpy()
assert (nparr == numpy_arr).all()


@pytest.mark.parametrize("append", [True, False])
@pytest.mark.parametrize("engine", ["csv", "parquet"])
def test_dask_df_array_npy_append_list(tmpdir, datasets, engine, append):
df = make_df(
{"id": [1, 2, 3, 4], "embedings": [[1, 1, 1, 1], [2, 2, 2, 2], [3, 3, 3, 3], [4, 4, 4, 4]]}
)
dataset = Dataset(df, cpu=True)
path = str(tmpdir / "result.npy")
dataset.to_npy(path, append=append)
nparr = np.load(path, allow_pickle=True)
ddf = dataset.to_ddf().compute()
numpy_arr = dataframe_columnwise_explode(ddf).to_numpy()
assert (nparr == numpy_arr).all()