diff --git a/merlin/core/dispatch.py b/merlin/core/dispatch.py index e17f8b562..f09ab341b 100644 --- a/merlin/core/dispatch.py +++ b/merlin/core/dispatch.py @@ -28,13 +28,13 @@ from merlin.core.compat import HAS_GPU # pylint: disable=unused-import # noqa: F401 from merlin.core.compat import cudf from merlin.core.compat import cupy as cp +from merlin.core.compat import dask_cudf from merlin.core.protocols import DataFrameLike, DictLike, SeriesLike rmm = None if cudf: try: - import dask_cudf import rmm # type: ignore[no-redef] from cudf.core.column import as_column, build_column @@ -311,6 +311,27 @@ def series_has_nulls(s): return s.has_nulls +def columnwise_explode(series): + """Explode a list column along the column axis""" + if isinstance(series, pd.Series): + df = pd.DataFrame(series.tolist()) + else: + df = cudf.DataFrame(series.to_pandas().tolist()) + df.columns = [f"{series.name}_{c}" for c in df.columns] + return df + + +def dataframe_columnwise_explode(dataframe): + """Explode all list columns in a dataframe along the column axis""" + columns_in_dataframe = dataframe.columns + for col in columns_in_dataframe: + if is_list_dtype(dataframe[col]): + col_df = columnwise_explode(dataframe[col]) + dataframe = concat_columns([dataframe, col_df]) + dataframe = dataframe.drop(labels=col, axis=1) + return dataframe + + def list_val_dtype(ser: SeriesLike) -> Optional[np.dtype]: """ Return the dtype of the leaves from a list or nested list @@ -356,6 +377,10 @@ def is_list_dtype(ser): return pd.api.types.is_list_like(ser.values[0]) elif cudf and isinstance(ser, (cudf.Series, cudf.ListDtype)): return cudf_is_list_dtype(ser) + elif dask_cudf and isinstance(ser, dask_cudf.Series): + return cudf_is_list_dtype(ser.head()) + elif isinstance(ser, dd.core.Series): + return pd.api.types.is_list_like(ser.head()[0]) elif isinstance(ser, np.ndarray) or (cp and isinstance(ser, cp.ndarray)): return len(ser.shape) > 1 return pd.api.types.is_list_like(ser) diff --git a/merlin/io/dataset.py b/merlin/io/dataset.py index 288976c73..50aada285 100644 --- a/merlin/io/dataset.py +++ b/merlin/io/dataset.py @@ -29,10 +29,12 @@ from dask.utils import natural_sort_key, parse_bytes from fsspec.core import get_fs_token_paths from fsspec.utils import stringify_path +from npy_append_array import NpyAppendArray from merlin.core.compat import HAS_GPU, cudf, device_mem_size from merlin.core.dispatch import ( convert_data, + dataframe_columnwise_explode, hex_to_int, is_dataframe_object, is_list_dtype, @@ -1067,6 +1069,41 @@ def to_hugectr( schema=self.schema, ) + def to_npy( + self, + output_file: str, + append: bool = False, + ): + """Converts a dataset into an npy file, can append if data is larger than memory + + Parameters + ---------- + output_file : str + The output file path for the resulting npy file + append : bool, optional + Enables append mode for larger that memory data, by default False + """ + data = self.to_ddf() + if append: + data = Dataset(data) + itr = iter(data.to_iter()) + with NpyAppendArray(output_file) as nf: + for df in itr: + to_write = dataframe_columnwise_explode(df) + # after the explode there may not be object series anymore + if "object" in to_write.dtypes.values and append: + raise TypeError("Cannot append object columns") + if (to_write.isnull()).any().any(): + raise ValueError("Cannot convert data because null values were detected") + nf.append(to_write.to_numpy()) + else: + to_write = dataframe_columnwise_explode(data.compute()) + if "object" in to_write.dtypes.values and append: + raise TypeError("Cannot append object columns") + if (to_write.isnull()).any().any(): + raise ValueError("Cannot convert data because null values were detected") + np.save(output_file, to_write.to_numpy()) + @property def num_rows(self): return self.engine.num_rows diff --git a/requirements.txt b/requirements.txt index 42bffc38e..0f1e8cb99 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ tqdm>=4.0 tensorflow-metadata>=1.2.0 betterproto<2.0.0 packaging +npy-append-array # pynvml==11.5.0 is incompatible with distributed<2023.2.1 pynvml>=11.0.0,<11.5 diff --git a/tests/unit/io/test_dataset.py b/tests/unit/io/test_dataset.py index a8452fb8c..d3ec43bd0 100644 --- a/tests/unit/io/test_dataset.py +++ b/tests/unit/io/test_dataset.py @@ -13,11 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import glob + +import numpy as np import pandas as pd import pytest from merlin.core.compat import HAS_GPU, cudf -from merlin.core.dispatch import make_df +from merlin.core.dispatch import dataframe_columnwise_explode, make_df from merlin.io import Dataset @@ -55,3 +58,49 @@ def test_infer_list_dtype_unknown(): df = pd.DataFrame({"col": [[], []]}) dataset = Dataset(df, cpu=True) assert dataset.schema["col"].dtype.element_type.value == "unknown" + + +@pytest.mark.parametrize("engine", ["csv", "parquet"]) +def test_dask_df_array_npy(tmpdir, datasets, engine): + paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0]) + # cannot have any null/NA entries + dataset = Dataset(Dataset(paths).to_ddf().compute().fillna(method="ffill")) + path = str(tmpdir / "result.npy") + dataset.to_npy(path) + nparr = np.load(path, allow_pickle=True) + numpy_arr = dataset.to_ddf().compute().to_numpy() + assert (nparr == numpy_arr).all() + + +@pytest.mark.parametrize("append", [True, False]) +@pytest.mark.parametrize("engine", ["csv", "parquet"]) +def test_dask_df_array_npy_append(tmpdir, datasets, engine, append): + df = make_df( + { + "id": [1, 2, 3, 4, 5, 6], + "embed_1": [1, 2, 3, 4, 5, 6], + "embed_2": [1, 2, 3, 4, 5, 6], + "embed_3": [1, 2, 3, 4, 5, 6], + } + ) + dataset = Dataset(df) + path = str(tmpdir / "result.npy") + dataset.to_npy(path, append=append) + nparr = np.load(path, allow_pickle=True) + numpy_arr = dataset.to_ddf().compute().to_numpy() + assert (nparr == numpy_arr).all() + + +@pytest.mark.parametrize("append", [True, False]) +@pytest.mark.parametrize("engine", ["csv", "parquet"]) +def test_dask_df_array_npy_append_list(tmpdir, datasets, engine, append): + df = make_df( + {"id": [1, 2, 3, 4], "embedings": [[1, 1, 1, 1], [2, 2, 2, 2], [3, 3, 3, 3], [4, 4, 4, 4]]} + ) + dataset = Dataset(df, cpu=True) + path = str(tmpdir / "result.npy") + dataset.to_npy(path, append=append) + nparr = np.load(path, allow_pickle=True) + ddf = dataset.to_ddf().compute() + numpy_arr = dataframe_columnwise_explode(ddf).to_numpy() + assert (nparr == numpy_arr).all()