|
| 1 | +.. _datasets-ml-preprocessing: |
| 2 | + |
| 3 | +ML Preprocessing |
| 4 | +======================= |
| 5 | + |
| 6 | +Datasets supports data preprocessing transformations commonly performed just before model training and model inference, which we refer to as **last-mile preprocessing**. These transformations are carried out via a few key operations: mapping, groupbys + aggregations, and random shuffling. |
| 7 | + |
| 8 | +Mapping |
| 9 | +------- |
| 10 | + |
| 11 | +Many common preprocessing transformations, such as: |
| 12 | + |
| 13 | +- adding new columns |
| 14 | +- transforming existing columns |
| 15 | +- dropping columns |
| 16 | +- dropping nulls |
| 17 | +- one-hot encoding |
| 18 | + |
| 19 | +can be efficiently applied to a ``Dataset`` using Pandas DataFrame UDFs and ``.map_batches()``; this will execute these transformations in parallel over the ``Dataset`` blocks, and allows you to apply vectorized Pandas operations to the block columns within the UDF. |
| 20 | + |
| 21 | +.. code-block:: python |
| 22 | +
|
| 23 | + # A Pandas DataFrame UDF for transforming the underlying blocks of a Dataset in parallel. |
| 24 | + def transform_batch(df: pd.DataFrame): |
| 25 | + # Drop nulls. |
| 26 | + df = df.dropna(subset=["feature_1"]) |
| 27 | + # Add new column. |
| 28 | + df["new_col"] = df["feature_1"] - 2 * df["feature_2"] + df["feature_3"] / 3 |
| 29 | + # Transform existing column. |
| 30 | + df["feature_1"] = 2 * df["feature_1"] + 1 |
| 31 | + # Drop column. |
| 32 | + df.drop(columns="feature_2", inplace=True) |
| 33 | + # One-hot encoding. |
| 34 | + categories = ["cat_1", "cat_2", "cat_3"] |
| 35 | + for category in categories: |
| 36 | + df[f"category_{category}"] = df["category"].map( |
| 37 | + collections.defaultdict(int, **{category: 1})) |
| 38 | + return df |
| 39 | +
|
| 40 | + # batch_format="pandas" tells Datasets to provide the transformer with blocks |
| 41 | + # represented as Pandas DataFrames. |
| 42 | + ds = ds.map_batches(transform_batch, batch_format="pandas") |
| 43 | +
|
| 44 | +Groupbys and aggregations |
| 45 | +------------------------- |
| 46 | + |
| 47 | +Other preprocessing operations require global operations, such as groupbys and grouped/global aggregations. Just like other transformations, grouped/global aggregations are executed *eagerly* and block until the aggregation has been computed. |
| 48 | + |
| 49 | +.. code-block:: python |
| 50 | +
|
| 51 | + ds: ray.data.Dataset = ray.data.from_items([ |
| 52 | + {"A": x % 3, "B": 2 * x, "C": 3 * x} |
| 53 | + for x in range(10)]) |
| 54 | +
|
| 55 | + # Group by the A column and calculate the per-group mean for B and C columns. |
| 56 | + agg_ds: ray.data.Dataset = ds.groupby("A").mean(["B", "C"]) |
| 57 | + # -> Sort Sample: 100%|███████████████████████████████████████| 10/10 [00:01<00:00, 9.04it/s] |
| 58 | + # -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 23.66it/s] |
| 59 | + # -> GroupBy Reduce: 100%|████████████████████████████████████| 10/10 [00:00<00:00, 937.21it/s] |
| 60 | + # -> Dataset(num_blocks=10, num_rows=3, schema={}) |
| 61 | + agg_ds.to_pandas() |
| 62 | + # -> |
| 63 | + # A mean(B) mean(C) |
| 64 | + # 0 0 9.0 13.5 |
| 65 | + # 1 1 8.0 12.0 |
| 66 | + # 2 2 10.0 15.0 |
| 67 | +
|
| 68 | + # Global mean on B column. |
| 69 | + ds.mean("B") |
| 70 | + # -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 2851.91it/s] |
| 71 | + # -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 319.69it/s] |
| 72 | + # -> 9.0 |
| 73 | +
|
| 74 | + # Global mean on multiple columns. |
| 75 | + ds.mean(["B", "C"]) |
| 76 | + # -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 1730.32it/s] |
| 77 | + # -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 231.41it/s] |
| 78 | + # -> {'mean(B)': 9.0, 'mean(C)': 13.5} |
| 79 | +
|
| 80 | + # Multiple global aggregations on multiple columns. |
| 81 | + from ray.data.aggregate import Mean, Std |
| 82 | + ds.aggregate(Mean("B"), Std("B", ddof=0), Mean("C"), Std("C", ddof=0)) |
| 83 | + # -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 1568.73it/s] |
| 84 | + # -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 133.51it/s] |
| 85 | + # -> {'mean(A)': 0.9, 'std(A)': 0.8306623862918076, 'mean(B)': 9.0, 'std(B)': 5.744562646538029} |
| 86 | +
|
| 87 | +These aggregations can be combined with batch mapping to transform a dataset using computed statistics. For example, you can efficiently standardize feature columns and impute missing values with calculated column means. |
| 88 | + |
| 89 | +.. code-block:: python |
| 90 | +
|
| 91 | + # Impute missing values with the column mean. |
| 92 | + b_mean = ds.mean("B") |
| 93 | + # -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 4054.03it/s] |
| 94 | + # -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 359.22it/s] |
| 95 | + # -> 9.0 |
| 96 | +
|
| 97 | + def impute_b(df: pd.DataFrame): |
| 98 | + df["B"].fillna(b_mean) |
| 99 | + return df |
| 100 | +
|
| 101 | + ds = ds.map_batches(impute_b, batch_format="pandas") |
| 102 | + # -> Map Progress: 100%|██████████████████████████████████████| 10/10 [00:00<00:00, 132.66it/s] |
| 103 | + # -> Dataset(num_blocks=10, num_rows=10, schema={A: int64, B: int64, C: int64}) |
| 104 | +
|
| 105 | + # Standard scaling of all feature columns. |
| 106 | + stats = ds.aggregate(Mean("B"), Std("B"), Mean("C"), Std("C")) |
| 107 | + # -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 1260.99it/s] |
| 108 | + # -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 128.77it/s] |
| 109 | + # -> {'mean(B)': 9.0, 'std(B)': 6.0553007081949835, 'mean(C)': 13.5, 'std(C)': 9.082951062292475} |
| 110 | +
|
| 111 | + def batch_standard_scaler(df: pd.DataFrame): |
| 112 | + def column_standard_scaler(s: pd.Series): |
| 113 | + s_mean = stats[f"mean({s.name})"] |
| 114 | + s_std = stats[f"std({s.name})"] |
| 115 | + return (s - s_mean) / s_std |
| 116 | +
|
| 117 | + cols = df.columns.difference(["A"]) |
| 118 | + df.loc[:, cols] = df.loc[:, cols].transform(column_standard_scaler) |
| 119 | + return df |
| 120 | +
|
| 121 | + ds = ds.map_batches(batch_standard_scaler, batch_format="pandas") |
| 122 | + # -> Map Progress: 100%|██████████████████████████████████████| 10/10 [00:00<00:00, 144.79it/s] |
| 123 | + # -> Dataset(num_blocks=10, num_rows=10, schema={A: int64, B: double, C: double}) |
| 124 | +
|
| 125 | +Random shuffle |
| 126 | +-------------- |
| 127 | + |
| 128 | +Randomly shuffling data is an important part of training machine learning models: it decorrelates samples, preventing overfitting and improving generalization. For many models, even between-epoch shuffling can drastically improve the precision gain per step/epoch. Datasets has a hyper-scalable distributed random shuffle that allows you to realize the model accuracy benefits of per-epoch shuffling without sacrificing training throughput, even at large data scales and even when doing distributed data-parallel training across multiple GPUs/nodes. |
| 129 | + |
| 130 | +.. code-block:: python |
| 131 | +
|
| 132 | + ds = ray.data.range(10) |
| 133 | + # -> [0, 1, ..., 9] |
| 134 | +
|
| 135 | + # Global random shuffle. |
| 136 | + ds = ds.random_shuffle() |
| 137 | + # -> Shuffle Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 12.35it/s] |
| 138 | + # -> Shuffle Reduce: 100%|████████████████████████████████████| 10/10 [00:00<00:00, 45.54it/s] |
| 139 | + # -> [7, 1, ..., 3] |
| 140 | +
|
| 141 | + # Scales to terabytes of data with the same simple API. |
| 142 | + ds = ray.data.read_parquet("s3://ursa-labs-taxi-data") # open, tabular, NYC taxi dataset |
| 143 | + # -> Dataset(num_blocks=125, num_rows=1547741381, schema={ |
| 144 | + # vendor_id: string, pickup_at: timestamp[us], dropoff_at: timestamp[us], |
| 145 | + # passenger_count: int8, trip_distance: float, ...}) |
| 146 | +
|
| 147 | + # Don't run this next one on your laptop; it will probably crash since it will |
| 148 | + # try to read and shuffle ~99 GB of data! |
| 149 | + ds = ds.random_shuffle() |
| 150 | + # -> Shuffle Map: 100%|███████████████████████████████████████| 125/125 [00:00<00:00, 5021.94it/s] |
| 151 | + # -> Shuffle Reduce: 100%|████████████████████████████████████| 125/125 [00:00<00:00, 4034.33it/s] |
| 152 | + # -> Dataset(num_blocks=125, num_rows=1547741381, schema={ |
| 153 | + # vendor_id: string, pickup_at: timestamp[us], dropoff_at: timestamp[us], |
| 154 | + # passenger_count: int8, trip_distance: float, ...}) |
| 155 | +
|
| 156 | + # Per-epoch shuffling is as simple as changing where we invoke the shuffle: |
| 157 | + # - Before repeating => dataset is shuffled once. |
| 158 | + # - After repeating => dataset is shuffled on every epoch. |
| 159 | + num_epochs = 20 |
| 160 | +
|
| 161 | + # Shuffle once, then repeat this once-shuffled dataset for num_epochs epochs. |
| 162 | + ds.random_shuffle().repeat(num_epochs) |
| 163 | + # -> Shuffle Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 13.43it/s] |
| 164 | + # -> Shuffle Reduce: 100%|████████████████████████████████████| 10/10 [00:00<00:00, 42.70it/s] |
| 165 | + # -> DatasetPipeline(num_windows=10, num_stages=1) |
| 166 | +
|
| 167 | + # Shuffle repeatedly, where the original dataset is shuffled into a different |
| 168 | + # order at the beginning of each epoch. |
| 169 | + ds.repeat(num_epochs).random_shuffle_each_window() |
| 170 | + # -> DatasetPipeline(num_windows=10, num_stages=2) |
| 171 | +
|
| 172 | +See the `large-scale ML ingest example <examples/big_data_ingestion.html>`__ for an end-to-end example of per-epoch shuffled data loading for distributed training. |
| 173 | + |
0 commit comments