-
Notifications
You must be signed in to change notification settings - Fork 124
/
Copy pathdoc_dataset.py
485 lines (430 loc) · 17.9 KB
/
doc_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from functools import wraps
from typing import Any, Callable, List, Literal, Optional, Union
import dask.dataframe as dd
from nemo_curator.utils.distributed_utils import read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.import_utils import gpu_only_import
dask_cudf = gpu_only_import("dask_cudf")
class DocumentDataset:
"""
A collection of documents and document metadata.
Internally it may be distributed across multiple nodes, and may be on GPUs.
"""
def __init__(self, dataset_df: dd.DataFrame):
if not hasattr(dataset_df, "npartitions"):
raise RuntimeError(
"Please use DocumentDataset.from_pandas or DocumentDataset.from_cudf "
"to initialize your Pandas/cuDF DataFrame to a DocumentDataset."
)
self.df = dataset_df
def __len__(self) -> int:
return len(self.df)
# `def persist(self) -> Self` requires Python 3.11 or higher
def persist(self) -> "DocumentDataset":
return DocumentDataset(self.df.persist())
@wraps(dd.DataFrame.repartition)
def repartition(self, *args, **kwargs) -> "DocumentDataset":
return self.__class__(self.df.repartition(*args, **kwargs))
def head(self, n: int = 5) -> Any:
return self.df.head(n)
@classmethod
def read_json(
cls,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename: Union[bool, str] = False,
input_meta: Optional[Union[str, dict]] = None,
columns: Optional[List[str]] = None,
**kwargs,
) -> "DocumentDataset":
"""
Read JSONL or JSONL file(s).
Args:
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.
columns: If not None, only these columns will be read from the file.
"""
return cls(
_read_json_or_parquet(
input_files=input_files,
file_type="jsonl",
backend=backend,
add_filename=add_filename,
files_per_partition=files_per_partition,
blocksize=blocksize,
input_meta=input_meta,
columns=columns,
**kwargs,
)
)
@classmethod
def read_parquet(
cls,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename: Union[bool, str] = False,
columns: Optional[List[str]] = None,
**kwargs,
) -> "DocumentDataset":
"""
Read Parquet file(s).
Args:
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
columns: If not None, only these columns will be read from the file.
There is a significant performance gain when specifying columns for Parquet files.
"""
return cls(
_read_json_or_parquet(
input_files=input_files,
file_type="parquet",
backend=backend,
add_filename=add_filename,
files_per_partition=files_per_partition,
blocksize=blocksize,
columns=columns,
**kwargs,
)
)
@classmethod
def read_pickle(
cls,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
columns: Optional[List[str]] = None,
**kwargs,
) -> "DocumentDataset":
"""
Read Pickle file(s).
Args:
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
columns: If not None, only these columns will be read from the file.
"""
return cls(
read_data(
input_files=input_files,
file_type="pickle",
backend=backend,
columns=columns,
**kwargs,
)
)
@classmethod
def read_custom(
cls,
input_files: Union[str, List[str]],
file_type: str,
read_func_single_partition: Callable[
[List[str], str, bool, Union[str, dict], dict],
Union["cudf.DataFrame", "pd.DataFrame"],
],
files_per_partition: Optional[int] = None,
backend: Optional[Literal["pandas", "cudf"]] = None,
add_filename: Union[bool, str] = False,
columns: Optional[List[str]] = None,
input_meta: Union[str, dict] = None,
) -> "DocumentDataset":
"""
Read custom data from a file or directory based on a custom read function.
Args:
input_files: The path of the input file(s).
If input_file is a single string that ends with the file_type, we consider it as a single file.
If input_file is a single string that does not end with the file_type, we consider it as a directory
and read all files under the directory.
If input_file is a list of strings, we assume each string is a file path.
file_type: The type of the file to read.
read_func_single_partition: A function that reads a single file or a list of files in an single dask partition.
The function should take the following arguments:
- files: A list of file paths.
- file_type: The type of the file to read (in case you want to handle different file types differently).
- backend: Read below
- add_filename: Read below
- columns: Read below
- input_meta: Read below
backend: The backend to use for reading the data, in case you want to handle pd.DataFrame or cudf.DataFrame.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
columns: If not None, only these columns will be returned from the output of the read_func_single_partition function.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.
"""
# Single file
if file_type and input_files.endswith(file_type):
files = [input_files]
# Directory of filetype files
else:
files = get_all_files_paths_under(
root=input_files,
recurse_subdirectories=False,
keep_extensions=[file_type],
)
return cls(
read_data(
input_files=files,
backend=backend,
files_per_partition=files_per_partition,
blocksize=None,
add_filename=add_filename,
columns=columns,
input_meta=input_meta,
read_func_single_partition=read_func_single_partition,
)
)
def to_json(
self,
output_path: str,
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
partition_on: Optional[str] = None,
):
"""
Writes the dataset to the specified path in JSONL format.
If `write_to_filename` is True, the DataFrame is expected to have a column
that specifies the filename for each document. This column can be named
`file_name` by default, or a custom name if `write_to_filename` is a string.
Args:
output_path (str): The directory or file path where the dataset will be written.
write_to_filename (Union[bool, str]): Determines how filenames are handled.
- If True, uses the `file_name` column in the DataFrame to determine filenames.
- If a string, uses that string as the column name for filenames.
- If False, writes all data to the specified `output_path`.
keep_filename_column (bool): If True, retains the filename column in the output.
If False, the filename column is dropped from the output.
partition_on (Optional[str]): The column name used to partition the data.
If specified, data is partitioned based on unique values in this column,
with each partition written to a separate directory.
For more details, refer to the `write_to_disk` function in
`nemo_curator.utils.distributed_utils`.
"""
write_to_disk(
df=self.df,
output_path=output_path,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
partition_on=partition_on,
output_type="jsonl",
)
def to_parquet(
self,
output_path: str,
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
partition_on: Optional[str] = None,
):
"""
Writes the dataset to the specified path in Parquet format.
If `write_to_filename` is True, the DataFrame is expected to have a column
that specifies the filename for each document. This column can be named
`file_name` by default, or a custom name if `write_to_filename` is a string.
Args:
output_path (str): The directory or file path where the dataset will be written.
write_to_filename (Union[bool, str]): Determines how filenames are handled.
- If True, uses the `file_name` column in the DataFrame to determine filenames.
- If a string, uses that string as the column name for filenames.
- If False, writes all data to the specified `output_path`.
keep_filename_column (bool): If True, retains the filename column in the output.
If False, the filename column is dropped from the output.
partition_on (Optional[str]): The column name used to partition the data.
If specified, data is partitioned based on unique values in this column,
with each partition written to a separate directory.
For more details, refer to the `write_to_disk` function in
`nemo_curator.utils.distributed_utils`.
"""
write_to_disk(
df=self.df,
output_path=output_path,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
partition_on=partition_on,
output_type="parquet",
)
def to_pickle(
self,
output_path: str,
write_to_filename: Union[bool, str] = False,
):
raise NotImplementedError("DocumentDataset does not support to_pickle yet")
@classmethod
def from_pandas(
cls,
data,
npartitions: Optional[int] = 1,
chunksize: Optional[int] = None,
sort: Optional[bool] = True,
name: Optional[str] = None,
):
"""
Creates a document dataset from a Pandas DataFrame.
For more information on the arguments see Dask's from_pandas documentation
https://docs.dask.org/en/stable/generated/dask.dataframe.from_pandas.html
Args:
data: A Pandas DataFrame
Returns:
A DocumentDataset with a Pandas backend (on the CPU).
"""
return cls(
dd.from_pandas(
data=data,
npartitions=npartitions,
chunksize=chunksize,
sort=sort,
)
)
def to_pandas(self):
"""
Creates a Pandas DataFrame from a DocumentDataset
Returns:
A Pandas DataFrame (on the CPU)
"""
return self.df.to_backend("pandas").compute()
@classmethod
def from_cudf(
cls,
data,
npartitions: Optional[int] = 1,
chunksize: Optional[int] = None,
sort: Optional[bool] = True,
name: Optional[str] = None,
):
"""
Creates a document dataset from a cuDF DataFrame.
For more information on the arguments see Dask-cuDF's from_cudf documentation
https://docs.rapids.ai/api/dask-cudf/legacy/api/
Args:
data: A cuDF DataFrame
Returns:
A DocumentDataset with a cuDF backend (on the GPU).
"""
return cls(
dask_cudf.from_cudf(
data=data,
npartitions=npartitions,
chunksize=chunksize,
sort=sort,
)
)
def to_cudf(self):
"""
Creates a cuDF DataFrame from a DocumentDataset
Returns:
A cuDF DataFrame (on the GPU)
"""
return self.df.to_backend("cudf").compute()
def _read_json_or_parquet(
input_files: Union[str, List[str]],
file_type: str,
backend: Literal["cudf", "pandas"],
add_filename: Union[bool, str] = False,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = None,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
):
"""
`input_files` may be a list or a string type.
If `input_files` is a list, it may be a list of JSONL or Parquet files,
e.g., `input_files = ["file1.jsonl", "file2.jsonl", "file3.jsonl"]`,
or a list of directories containing JSONL or Parquet files,
e.g., `input_files = ["dir1", "dir2", "dir3"]`,
where each of `dir1`, `dir2`, and `dir3` contain all JSONL or Parquet files.
If `input_files` is a string, it may be a single JSONL or Parquet file,
such as `input_files = "my_file.jsonl"`,
or it may also be a single directory containing JSONL or Parquet files,
such as `input_files = "my_directory"`.
See nemo_curator.utils.distributed_utils.read_data docstring for other parameters.
Returns a DataFrame to be used in initializing a DocumentDataset.
"""
file_ext = "." + file_type
if isinstance(input_files, list):
# List of files
if all(os.path.isfile(f) for f in input_files):
raw_data = read_data(
input_files,
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
**kwargs,
)
# List of directories
else:
dfs = []
for data_path in input_files:
files = get_all_files_paths_under(
root=data_path, recurse_subdirectories=False
)
df = read_data(
files,
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
**kwargs,
)
dfs.append(df)
raw_data = dd.concat(dfs, ignore_unknown_divisions=True)
elif isinstance(input_files, str):
# Single file
if input_files.endswith(file_ext):
files = [input_files]
# Directory of jsonl or parquet files
else:
files = get_all_files_paths_under(
root=input_files, recurse_subdirectories=False
)
raw_data = read_data(
input_files=files,
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
**kwargs,
)
else:
raise TypeError("File input must be a string or list.")
return raw_data