Skip to content

Clean up Pandas, cuDF, Dask, and Dask-cuDF DocumentDataset type logic #494

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

from nemo_curator.utils.distributed_utils import read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.import_utils import gpu_only_import

dask_cudf = gpu_only_import("dask_cudf")


class DocumentDataset:
Expand All @@ -29,6 +32,11 @@ class DocumentDataset:
"""

def __init__(self, dataset_df: dd.DataFrame):
if not hasattr(dataset_df, "npartitions"):
raise RuntimeError(
"Please use DocumentDataset.from_pandas or DocumentDataset.from_cudf "
"to initialize your Pandas/cuDF DataFrame to a DocumentDataset."
)
self.df = dataset_df

def __len__(self) -> int:
Expand Down Expand Up @@ -315,14 +323,14 @@ def from_pandas(
name: Optional[str] = None,
):
"""
Creates a document dataset from a pandas data frame.
Creates a document dataset from a Pandas DataFrame.
For more information on the arguments see Dask's from_pandas documentation
https://docs.dask.org/en/stable/generated/dask.dataframe.from_pandas.html

Args:
data: A pandas dataframe
data: A Pandas DataFrame
Returns:
A document dataset with a pandas backend (on the CPU).
A DocumentDataset with a Pandas backend (on the CPU).
"""
return cls(
dd.from_pandas(
Expand All @@ -335,13 +343,50 @@ def from_pandas(

def to_pandas(self):
"""
Creates a pandas dataframe from a DocumentDataset
Creates a Pandas DataFrame from a DocumentDataset

Returns:
A pandas dataframe (on the CPU)
A Pandas DataFrame (on the CPU)
"""
return self.df.to_backend("pandas").compute()

@classmethod
def from_cudf(
cls,
data,
npartitions: Optional[int] = 1,
chunksize: Optional[int] = None,
sort: Optional[bool] = True,
name: Optional[str] = None,
):
"""
Creates a document dataset from a cuDF DataFrame.
For more information on the arguments see Dask-cuDF's from_cudf documentation
https://docs.rapids.ai/api/dask-cudf/legacy/api/

Args:
data: A cuDF DataFrame
Returns:
A DocumentDataset with a cuDF backend (on the GPU).
"""
return cls(
dask_cudf.from_cudf(
data=data,
npartitions=npartitions,
chunksize=chunksize,
sort=sort,
)
)

def to_cudf(self):
"""
Creates a cuDF DataFrame from a DocumentDataset

Returns:
A cuDF DataFrame (on the GPU)
"""
return self.df.to_backend("cudf").compute()


def _read_json_or_parquet(
input_files: Union[str, List[str]],
Expand Down
79 changes: 74 additions & 5 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,79 @@
import dask.dataframe as dd
import pandas as pd
import pytest

from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.import_utils import gpu_only_import

cudf = gpu_only_import("cudf")
dask_cudf = gpu_only_import("dask_cudf")

def test_to_from_pandas():
original_df = pd.DataFrame({"first_col": [1, 2, 3], "second_col": ["a", "b", "c"]})
dataset = DocumentDataset.from_pandas(original_df)
converted_df = dataset.to_pandas()
pd.testing.assert_frame_equal(original_df, converted_df)

def all_equal(left_result: pd.DataFrame, right_result: pd.DataFrame, gpu=True):
l_cols = set(left_result.columns)
r_cols = set(right_result.columns)
assert l_cols == r_cols

for col in left_result.columns:
left = left_result[col].reset_index(drop=True)
right = right_result[col].reset_index(drop=True)

# The `all` function expects an iterable, so we need to convert cuDF to Pandas
if gpu:
left = left.to_pandas()
right = right.to_pandas()

assert all(left == right), f"Mismatch in {col} column.\n{left}\n{right}\n"


class TestDocumentDataset:
def test_to_from_pandas(self):
original_df = pd.DataFrame(
{"first_col": [1, 2, 3], "second_col": ["a", "b", "c"]}
)
dataset = DocumentDataset.from_pandas(original_df)
converted_df = dataset.to_pandas()
pd.testing.assert_frame_equal(original_df, converted_df)

def test_init_pandas(self):
original_df = pd.DataFrame(
{"first_col": [1, 2, 3], "second_col": ["a", "b", "c"]}
)
with pytest.raises(RuntimeError):
dataset = DocumentDataset(dataset_df=original_df)

def test_init_dask(self):
original_df = pd.DataFrame(
{"first_col": [1, 2, 3], "second_col": ["a", "b", "c"]}
)
ddf = dd.from_pandas(original_df, npartitions=1)
dataset = DocumentDataset(dataset_df=ddf)
assert type(dataset.df == dd.DataFrame)
pd.testing.assert_frame_equal(original_df, dataset.df.compute())

@pytest.mark.gpu
def test_to_from_cudf(self):
original_df = cudf.DataFrame(
{"first_col": [1, 2, 3], "second_col": ["a", "b", "c"]}
)
dataset = DocumentDataset.from_cudf(original_df)
converted_df = dataset.to_cudf()
all_equal(original_df, converted_df, gpu=True)

@pytest.mark.gpu
def test_init_cudf(self):
original_df = cudf.DataFrame(
{"first_col": [1, 2, 3], "second_col": ["a", "b", "c"]}
)
with pytest.raises(RuntimeError):
dataset = DocumentDataset(dataset_df=original_df)

@pytest.mark.gpu
def test_init_dask_cudf(self):
original_df = cudf.DataFrame(
{"first_col": [1, 2, 3], "second_col": ["a", "b", "c"]}
)
ddf = dask_cudf.from_cudf(original_df, npartitions=1)
dataset = DocumentDataset(dataset_df=ddf)
assert type(dataset.df == dask_cudf.DataFrame)
all_equal(original_df, dataset.df.compute(), gpu=True)