Skip to content

Commit fb2933a

Browse files
authored
Revert "Revert "Revert "[Datasets] [Tensor Story - 1/2] Automatically provide tensor views to UDFs and infer tensor blocks for pure-tensor datasets."" (ray-project#25031)" (ray-project#25057)
Reverts ray-project#25031 It looks to be still somewhat flaky.
1 parent b2d41fc commit fb2933a

17 files changed

+167
-517
lines changed

doc/source/data/dataset-tensor-support.rst

Lines changed: 11 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,57 +15,22 @@ Automatic conversion between the Pandas and Arrow extension types/arrays keeps t
1515
Single-column tensor datasets
1616
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1717

18-
The most basic case is when a dataset only has a single column, which is of tensor
19-
type. This kind of dataset can be:
18+
The most basic case is when a dataset only has a single column, which is of tensor type. This kind of dataset can be created with ``.range_tensor()``, and can be read from and written to ``.npy`` files. Here are some examples:
2019

21-
* created with :func:`range_tensor() <ray.data.range_tensor>`
22-
or :func:`from_numpy() <ray.data.from_numpy>`,
23-
* transformed with NumPy UDFs via
24-
:meth:`ds.map_batches() <ray.data.Dataset.map_batches>`,
25-
* consumed with :meth:`ds.iter_rows() <ray.data.Dataset.iter_rows>` and
26-
:meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>`, and
27-
* can be read from and written to ``.npy`` files.
20+
.. code-block:: python
2821
29-
Here is an end-to-end example:
22+
# Create a Dataset of tensor-typed values.
23+
ds = ray.data.range_tensor(10000, shape=(3, 5))
24+
# -> Dataset(num_blocks=200, num_rows=10000,
25+
# schema={value: <ArrowTensorType: shape=(3, 5), dtype=int64>})
3026
31-
.. code-block:: python
27+
# Save to storage.
28+
ds.write_numpy("/tmp/tensor_out", column="value")
3229
33-
# Create a synthetic pure-tensor Dataset.
34-
ds = ray.data.range_tensor(10, shape=(3, 5))
35-
# -> Dataset(num_blocks=10, num_rows=10,
36-
# schema={__value__: <ArrowTensorType: shape=(3, 5), dtype=int64>})
37-
38-
# Create a pure-tensor Dataset from an existing NumPy ndarray.
39-
arr = np.arange(10 * 3 * 5).reshape((10, 3, 5))
40-
ds = ray.data.from_numpy(arr)
41-
# -> Dataset(num_blocks=1, num_rows=10,
42-
# schema={__value__: <ArrowTensorType: shape=(3, 5), dtype=int64>})
43-
44-
# Transform the tensors. Datasets will automatically unpack the single-column Arrow
45-
# table into a NumPy ndarray, provide that ndarray to your UDF, and then repack it
46-
# into a single-column Arrow table; this will be a zero-copy conversion in both
47-
# cases.
48-
ds = ds.map_batches(lambda arr: arr / arr.max())
49-
# -> Dataset(num_blocks=1, num_rows=10,
50-
# schema={__value__: <ArrowTensorType: shape=(3, 5), dtype=double>})
51-
52-
# Consume the tensor. This will yield the underlying (3, 5) ndarrays.
53-
for arr in ds.iter_rows():
54-
assert isinstance(arr, np.ndarray)
55-
assert arr.shape == (3, 5)
56-
57-
# Consume the tensor in batches.
58-
for arr in ds.iter_batches(batch_size=2):
59-
assert isinstance(arr, np.ndarray)
60-
assert arr.shape == (2, 3, 5)
61-
62-
# Save to storage. This will write out the blocks of the tensor column as NPY files.
63-
ds.write_numpy("/tmp/tensor_out")
64-
65-
# Read back from storage.
30+
# Read from storage.
6631
ray.data.read_numpy("/tmp/tensor_out")
67-
# -> Dataset(num_blocks=1, num_rows=?,
68-
# schema={__value__: <ArrowTensorType: shape=(3, 5), dtype=double>})
32+
# -> Dataset(num_blocks=200, num_rows=?,
33+
# schema={value: <ArrowTensorType: shape=(3, 5), dtype=int64>})
6934
7035
Reading existing serialized tensor columns
7136
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

python/ray/data/block.py

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from typing import (
44
TypeVar,
55
List,
6-
Dict,
76
Generic,
87
Iterator,
98
Tuple,
@@ -83,10 +82,6 @@ def _validate_key_fn(ds: "Dataset", key: KeyFn) -> None:
8382
# ``SimpleBlockAccessor`` and ``ArrowBlockAccessor``.
8483
Block = Union[List[T], "pyarrow.Table", "pandas.DataFrame", bytes]
8584

86-
# User-facing data batch type. This is the data type for data that is supplied to and
87-
# returned from batch UDFs.
88-
DataBatch = Union[Block, np.ndarray]
89-
9085
# A list of block references pending computation by a single task. For example,
9186
# this may be the output of a task reading a file.
9287
BlockPartition = List[Tuple[ObjectRef[Block], "BlockMetadata"]]
@@ -215,13 +210,11 @@ def to_pandas(self) -> "pandas.DataFrame":
215210
"""Convert this block into a Pandas dataframe."""
216211
raise NotImplementedError
217212

218-
def to_numpy(
219-
self, columns: Optional[Union[str, List[str]]] = None
220-
) -> Union[np.ndarray, Dict[str, np.ndarray]]:
221-
"""Convert this block (or columns of block) into a NumPy ndarray.
213+
def to_numpy(self, column: str = None) -> np.ndarray:
214+
"""Convert this block (or column of block) into a NumPy ndarray.
222215
223216
Args:
224-
columns: Name of columns to convert, or None if converting all columns.
217+
column: Name of column to convert, or None.
225218
"""
226219
raise NotImplementedError
227220

@@ -233,10 +226,6 @@ def to_block(self) -> Block:
233226
"""Return the base block that this accessor wraps."""
234227
raise NotImplementedError
235228

236-
def to_native(self) -> Block:
237-
"""Return the native data format for this accessor."""
238-
return self.to_block()
239-
240229
def size_bytes(self) -> int:
241230
"""Return the approximate size in bytes of this block."""
242231
raise NotImplementedError
@@ -266,15 +255,6 @@ def builder() -> "BlockBuilder[T]":
266255
"""Create a builder for this block type."""
267256
raise NotImplementedError
268257

269-
@staticmethod
270-
def batch_to_block(batch: DataBatch) -> Block:
271-
"""Create a block from user-facing data formats."""
272-
if isinstance(batch, np.ndarray):
273-
from ray.data.impl.arrow_block import ArrowBlockAccessor
274-
275-
return ArrowBlockAccessor.numpy_to_block(batch)
276-
return batch
277-
278258
@staticmethod
279259
def for_block(block: Block) -> "BlockAccessor[T]":
280260
"""Create a block accessor for the given block."""

python/ray/data/dataset.py

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
from ray.data.row import TableRow
6969
from ray.data.aggregate import AggregateFn, Sum, Max, Min, Mean, Std
7070
from ray.data.random_access_dataset import RandomAccessDataset
71-
from ray.data.impl.table_block import VALUE_COL_NAME
7271
from ray.data.impl.remote_fn import cached_remote_fn
7372
from ray.data.impl.block_batching import batch_blocks, BatchType
7473
from ray.data.impl.plan import ExecutionPlan, OneToOneStage, AllToAllStage
@@ -235,8 +234,8 @@ def map(
235234

236235
def transform(block: Block) -> Iterable[Block]:
237236
DatasetContext._set_current(context)
238-
output_buffer = BlockOutputBuffer(None, context.target_max_block_size)
239237
block = BlockAccessor.for_block(block)
238+
output_buffer = BlockOutputBuffer(None, context.target_max_block_size)
240239
for row in block.iter_rows():
241240
output_buffer.add(fn(row))
242241
if output_buffer.has_next():
@@ -261,9 +260,6 @@ def map_batches(
261260
) -> "Dataset[Any]":
262261
"""Apply the given function to batches of records of this dataset.
263262
264-
The format of the data batch provided to ``fn`` can be controlled via the
265-
``batch_format`` argument, and the output of the UDF can be any batch type.
266-
267263
This is a blocking operation.
268264
269265
Examples:
@@ -310,9 +306,10 @@ def map_batches(
310306
blocks as batches. Defaults to a system-chosen batch size.
311307
compute: The compute strategy, either "tasks" (default) to use Ray
312308
tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.
313-
batch_format: Specify "native" to use the native block format (promotes
314-
tables to Pandas and tensors to NumPy), "pandas" to select
315-
``pandas.DataFrame``, or "pyarrow" to select `pyarrow.Table``.
309+
batch_format: Specify "native" to use the native block format
310+
(promotes Arrow to pandas), "pandas" to select
311+
``pandas.DataFrame`` as the batch format,
312+
or "pyarrow" to select ``pyarrow.Table``.
316313
ray_remote_args: Additional resource requirements to request from
317314
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
318315
"""
@@ -341,7 +338,9 @@ def transform(block: Block) -> Iterable[Block]:
341338
# bug where we include the entire base view on serialization.
342339
view = block.slice(start, end, copy=batch_size is not None)
343340
if batch_format == "native":
344-
view = BlockAccessor.for_block(view).to_native()
341+
# Always promote Arrow blocks to pandas for consistency.
342+
if isinstance(view, pa.Table) or isinstance(view, bytes):
343+
view = BlockAccessor.for_block(view).to_pandas()
345344
elif batch_format == "pandas":
346345
view = BlockAccessor.for_block(view).to_pandas()
347346
elif batch_format == "pyarrow":
@@ -356,7 +355,6 @@ def transform(block: Block) -> Iterable[Block]:
356355
if not (
357356
isinstance(applied, list)
358357
or isinstance(applied, pa.Table)
359-
or isinstance(applied, np.ndarray)
360358
or isinstance(applied, pd.core.frame.DataFrame)
361359
):
362360
raise ValueError(
@@ -366,7 +364,7 @@ def transform(block: Block) -> Iterable[Block]:
366364
"The return type must be either list, "
367365
"pandas.DataFrame, or pyarrow.Table"
368366
)
369-
output_buffer.add_batch(applied)
367+
output_buffer.add_block(applied)
370368
if output_buffer.has_next():
371369
yield output_buffer.next()
372370

@@ -703,8 +701,6 @@ def process_batch(batch):
703701
)
704702
if isinstance(batch, pd.DataFrame):
705703
return batch.sample(frac=fraction)
706-
if isinstance(batch, np.ndarray):
707-
return np.array([row for row in batch if random.random() <= fraction])
708704
raise ValueError(f"Unsupported batch type: {type(batch)}")
709705

710706
return self.map_batches(process_batch)
@@ -2075,7 +2071,7 @@ def write_numpy(
20752071
self,
20762072
path: str,
20772073
*,
2078-
column: str = VALUE_COL_NAME,
2074+
column: str = "value",
20792075
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
20802076
try_create_dir: bool = True,
20812077
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
@@ -2103,8 +2099,7 @@ def write_numpy(
21032099
path: The path to the destination root directory, where npy
21042100
files will be written to.
21052101
column: The name of the table column that contains the tensor to
2106-
be written. The default is ``"__value__"``, the column name that
2107-
Datasets uses for storing tensors in single-column tables.
2102+
be written. This defaults to "value".
21082103
filesystem: The filesystem implementation to write to.
21092104
try_create_dir: Try to create all directories in destination path
21102105
if True. Does nothing if all directories already exist.
@@ -2251,10 +2246,10 @@ def iter_batches(
22512246
current block during the scan.
22522247
batch_size: Record batch size, or None to let the system pick.
22532248
batch_format: The format in which to return each batch.
2254-
Specify "native" to use the native block format (promoting
2255-
tables to Pandas and tensors to NumPy), "pandas" to select
2256-
``pandas.DataFrame``, or "pyarrow" to select ``pyarrow.Table``. Default
2257-
is "native".
2249+
Specify "native" to use the current block format (promoting
2250+
Arrow to pandas automatically), "pandas" to
2251+
select ``pandas.DataFrame`` or "pyarrow" to select
2252+
``pyarrow.Table``. Default is "native".
22582253
drop_last: Whether to drop the last batch if it's incomplete.
22592254
22602255
Returns:
@@ -2776,9 +2771,8 @@ def to_numpy_refs(
27762771
Time complexity: O(dataset size / parallelism)
27772772
27782773
Args:
2779-
column: The name of the column to convert to numpy, or None to specify the
2780-
entire row. If not specified for Arrow or Pandas blocks, each returned
2781-
future will represent a dict of column ndarrays.
2774+
column: The name of the column to convert to numpy, or None to
2775+
specify the entire row. Required for Arrow tables.
27822776
27832777
Returns:
27842778
A list of remote NumPy ndarrays created from this dataset.

python/ray/data/datasource/datasource.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,14 @@ def make_block(start: int, count: int) -> Block:
193193
elif block_format == "tensor":
194194
import pyarrow as pa
195195

196-
tensor = np.ones(tensor_shape, dtype=np.int64) * np.expand_dims(
197-
np.arange(start, start + count),
198-
tuple(range(1, 1 + len(tensor_shape))),
196+
tensor = TensorArray(
197+
np.ones(tensor_shape, dtype=np.int64)
198+
* np.expand_dims(
199+
np.arange(start, start + count),
200+
tuple(range(1, 1 + len(tensor_shape))),
201+
)
199202
)
200-
return BlockAccessor.batch_to_block(tensor)
203+
return pa.Table.from_pydict({"value": tensor})
201204
else:
202205
return list(builtins.range(start, start + count))
203206

@@ -211,12 +214,16 @@ def make_block(start: int, count: int) -> Block:
211214
schema = pa.Table.from_pydict({"value": [0]}).schema
212215
elif block_format == "tensor":
213216
_check_pyarrow_version()
217+
from ray.data.extensions import TensorArray
214218
import pyarrow as pa
215219

216-
tensor = np.ones(tensor_shape, dtype=np.int64) * np.expand_dims(
217-
np.arange(0, 10), tuple(range(1, 1 + len(tensor_shape)))
220+
tensor = TensorArray(
221+
np.ones(tensor_shape, dtype=np.int64)
222+
* np.expand_dims(
223+
np.arange(0, 10), tuple(range(1, 1 + len(tensor_shape)))
224+
)
218225
)
219-
schema = BlockAccessor.batch_to_block(tensor).schema
226+
schema = pa.Table.from_pydict({"value": tensor}).schema
220227
elif block_format == "list":
221228
schema = int
222229
else:

python/ray/data/datasource/numpy_datasource.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,18 @@ class NumpyDatasource(FileBasedDatasource):
2626
"""
2727

2828
def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args):
29+
from ray.data.extensions import TensorArray
30+
import pyarrow as pa
31+
2932
# TODO(ekl) Ideally numpy can read directly from the file, but it
3033
# seems like it requires the file to be seekable.
3134
buf = BytesIO()
3235
data = f.readall()
3336
buf.write(data)
3437
buf.seek(0)
35-
return BlockAccessor.batch_to_block(np.load(buf, allow_pickle=True))
38+
return pa.Table.from_pydict(
39+
{"value": TensorArray(np.load(buf, allow_pickle=True))}
40+
)
3641

3742
def _write_block(
3843
self,

0 commit comments

Comments
 (0)