Skip to content

Commit c212174

Browse files
authored
Allow pylibcudf.Column to consume objects exposing __arrow_c_stream__ (#18712)
closes #18573 unblocks #18564 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) URL: #18712
1 parent 57c4052 commit c212174

File tree

8 files changed

+172
-34
lines changed

8 files changed

+172
-34
lines changed

cpp/include/cudf/interop.hpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,21 @@ class arrow_column {
282282
rmm::cuda_stream_view stream = cudf::get_default_stream(),
283283
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());
284284

285+
/**
286+
* @brief Construct a new arrow column object
287+
*
288+
* The stream will be released after the column is created, so it is no longer
289+
* suitable for use afterwards. This is done for consistency with other constructors of
290+
* arrow_column even though the source data is always host data.
291+
*
292+
* @param input ArrowArrayStream data for the column
293+
* @param stream CUDA stream used for device memory operations and kernel launches
294+
* @param mr Device memory resource used for any allocations during conversion
295+
*/
296+
arrow_column(ArrowArrayStream&& input,
297+
rmm::cuda_stream_view stream = cudf::get_default_stream(),
298+
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());
299+
285300
/**
286301
* @brief Convert the column to an ArrowSchema
287302
*
@@ -745,6 +760,24 @@ std::unique_ptr<table> from_arrow_stream(
745760
rmm::cuda_stream_view stream = cudf::get_default_stream(),
746761
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());
747762

763+
/**
764+
* @brief Create `cudf::column` from given ArrowArrayStream input
765+
*
766+
* @throws std::invalid_argument if input is NULL
767+
*
768+
* The conversion WILL release the input ArrayArrayStream and its constituent
769+
* arrays or schema since Arrow streams are not suitable for multiple reads.
770+
*
771+
* @param input `ArrowArrayStream` pointer to object that will produce ArrowArray data
772+
* @param stream CUDA stream used for device memory operations and kernel launches
773+
* @param mr Device memory resource used to perform cuda allocation
774+
* @return cudf column generated from the given Arrow data
775+
*/
776+
std::unique_ptr<column> from_arrow_stream_column(
777+
ArrowArrayStream* input,
778+
rmm::cuda_stream_view stream = cudf::get_default_stream(),
779+
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());
780+
748781
/**
749782
* @brief Create `cudf::column` from given ArrowDeviceArray input
750783
*

cpp/src/interop/arrow_data_structures.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,17 @@ arrow_column::arrow_column(ArrowSchema&& schema,
276276
cached_view = tmp.cached_view;
277277
}
278278

279+
arrow_column::arrow_column(ArrowArrayStream&& input,
280+
rmm::cuda_stream_view stream,
281+
rmm::device_async_resource_ref mr)
282+
{
283+
auto col = from_arrow_stream_column(&input, stream, mr);
284+
auto tmp = arrow_column(std::move(*col), get_column_metadata(col->view()), stream, mr);
285+
container = tmp.container;
286+
view_columns = std::move(tmp.view_columns);
287+
cached_view = tmp.cached_view;
288+
}
289+
279290
void arrow_column::to_arrow_schema(ArrowSchema* output,
280291
rmm::cuda_stream_view stream,
281292
rmm::device_async_resource_ref mr) const

cpp/src/interop/from_arrow_stream.cu

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,53 @@ std::unique_ptr<table> from_arrow_stream(ArrowArrayStream* input,
131131
return cudf::detail::concatenate(chunk_views, stream, mr);
132132
}
133133

134+
std::unique_ptr<column> from_arrow_stream_column(ArrowArrayStream* input,
135+
rmm::cuda_stream_view stream,
136+
rmm::device_async_resource_ref mr)
137+
{
138+
CUDF_EXPECTS(input != nullptr, "input ArrowArrayStream must not be NULL", std::invalid_argument);
139+
140+
// Potential future optimization: Since the from_arrow API accepts an
141+
// ArrowSchema we're allocating one here instead of using a view, which we
142+
// could avoid with a different underlying implementation.
143+
ArrowSchema schema;
144+
NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetSchema(input, &schema, nullptr));
145+
146+
std::vector<std::unique_ptr<cudf::column>> chunks;
147+
ArrowArray chunk;
148+
while (true) {
149+
NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetNext(input, &chunk, nullptr));
150+
if (chunk.release == nullptr) { break; }
151+
chunks.push_back(from_arrow_column(&schema, &chunk, stream, mr));
152+
chunk.release(&chunk);
153+
}
154+
input->release(input);
155+
156+
if (chunks.empty()) {
157+
if (schema.n_children == 0) {
158+
schema.release(&schema);
159+
return std::make_unique<cudf::column>();
160+
}
161+
162+
// If there are no chunks but the schema has children, we need to construct a suitable empty
163+
// column.
164+
auto empty_column = make_empty_column_from_schema(&schema, stream, mr);
165+
schema.release(&schema);
166+
return empty_column;
167+
}
168+
169+
schema.release(&schema);
170+
171+
if (chunks.size() == 1) { return std::move(chunks[0]); }
172+
auto chunk_views = std::vector<column_view>{};
173+
chunk_views.reserve(chunks.size());
174+
std::transform(
175+
chunks.begin(), chunks.end(), std::back_inserter(chunk_views), [](auto const& chunk) {
176+
return chunk->view();
177+
});
178+
return cudf::detail::concatenate(chunk_views, stream, mr);
179+
}
180+
134181
} // namespace detail
135182

136183
std::unique_ptr<table> from_arrow_stream(ArrowArrayStream* input,
@@ -140,4 +187,13 @@ std::unique_ptr<table> from_arrow_stream(ArrowArrayStream* input,
140187
CUDF_FUNC_RANGE();
141188
return detail::from_arrow_stream(input, stream, mr);
142189
}
190+
191+
std::unique_ptr<column> from_arrow_stream_column(ArrowArrayStream* input,
192+
rmm::cuda_stream_view stream,
193+
rmm::device_async_resource_ref mr)
194+
{
195+
CUDF_FUNC_RANGE();
196+
return detail::from_arrow_stream_column(input, stream, mr);
197+
}
198+
143199
} // namespace cudf

python/pylibcudf/pylibcudf/_interop_helpers.pyx

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,22 @@ from pylibcudf.libcudf.interop cimport (
1515
from dataclasses import dataclass, field
1616

1717

18+
class _ArrowLikeMeta(type):
19+
# We cannot separate these types via singledispatch because the dispatch
20+
# will often be ambiguous when objects expose multiple protocols.
21+
def __subclasscheck__(cls, other):
22+
return (
23+
hasattr(other, "__arrow_c_stream__")
24+
or hasattr(other, "__arrow_c_device_stream__")
25+
or hasattr(other, "__arrow_c_array__")
26+
or hasattr(other, "__arrow_c_device_array__")
27+
)
28+
29+
30+
class ArrowLike(metaclass=_ArrowLikeMeta):
31+
pass
32+
33+
1834
@dataclass
1935
class ColumnMetadata:
2036
"""Metadata associated with a column.

python/pylibcudf/pylibcudf/column.pyx

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ from pylibcudf.libcudf.column.column cimport column, column_contents
1717
from pylibcudf.libcudf.column.column_factories cimport make_column_from_scalar
1818
from pylibcudf.libcudf.interop cimport (
1919
ArrowArray,
20+
ArrowArrayStream,
2021
ArrowSchema,
2122
ArrowDeviceArray,
2223
arrow_column,
@@ -53,28 +54,14 @@ from .null_mask cimport bitmask_allocation_size_bytes
5354
from .utils cimport _get_stream
5455

5556
from .gpumemoryview import _datatype_from_dtype_desc
56-
from ._interop_helpers import ColumnMetadata
57+
from ._interop_helpers import ArrowLike, ColumnMetadata
5758

5859
import functools
5960
import operator
6061

6162
__all__ = ["Column", "ListColumnView", "is_c_contiguous"]
6263

6364

64-
class _ArrowLikeMeta(type):
65-
def __subclasscheck__(cls, other):
66-
# We cannot separate these types via singledispatch because the dispatch
67-
# will often be ambiguous when objects expose multiple protocols.
68-
return (
69-
hasattr(other, "__arrow_c_array__")
70-
or hasattr(other, "__arrow_c_device_array__")
71-
)
72-
73-
74-
class _ArrowLike(metaclass=_ArrowLikeMeta):
75-
pass
76-
77-
7865
cdef class _ArrowColumnHolder:
7966
"""A holder for an Arrow column for gpumemoryview lifetime management."""
8067
cdef unique_ptr[arrow_column] col
@@ -276,7 +263,7 @@ cdef class Column:
276263
self._children = children
277264
self._num_children = len(children)
278265

279-
@_init.register(_ArrowLike)
266+
@_init.register(ArrowLike)
280267
def _(self, arrow_like):
281268
cdef ArrowSchema* c_schema
282269
cdef ArrowArray* c_array
@@ -329,6 +316,34 @@ cdef class Column:
329316
tmp.offset(),
330317
tmp.children(),
331318
)
319+
elif hasattr(arrow_like, "__arrow_c_stream__"):
320+
stream = arrow_like.__arrow_c_stream__()
321+
c_stream = (
322+
<ArrowArrayStream*>PyCapsule_GetPointer(stream, "arrow_array_stream")
323+
)
324+
325+
result = _ArrowColumnHolder()
326+
with nogil:
327+
c_result = make_unique[arrow_column](
328+
move(dereference(c_stream))
329+
)
330+
result.col.swap(c_result)
331+
332+
tmp = Column.from_column_view_of_arbitrary(result.col.get().view(), result)
333+
self._init(
334+
tmp.type(),
335+
tmp.size(),
336+
tmp.data(),
337+
tmp.null_mask(),
338+
tmp.null_count(),
339+
tmp.offset(),
340+
tmp.children(),
341+
)
342+
elif hasattr(arrow_like, "__arrow_c_device_stream__"):
343+
# TODO: When we add support for this case, it should be moved above
344+
# the __arrow_c_array__ case since we should prioritize device
345+
# data if possible.
346+
raise NotImplementedError("Device streams not yet supported")
332347
else:
333348
raise ValueError("Invalid Arrow-like object")
334349

python/pylibcudf/pylibcudf/libcudf/interop.pxd

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ cdef extern from "cudf/interop.hpp" namespace "cudf::interop" \
5858
ArrowSchema&& schema,
5959
ArrowDeviceArray&& array
6060
) except +libcudf_exception_handler
61+
arrow_column(
62+
ArrowArrayStream&& stream,
63+
) except +libcudf_exception_handler
6164
column_view view() except +libcudf_exception_handler
6265

6366
cdef cppclass arrow_table:

python/pylibcudf/pylibcudf/table.pyx

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,29 +35,13 @@ from pylibcudf._interop_helpers cimport (
3535
_release_device_array,
3636
_metadata_to_libcudf,
3737
)
38-
from ._interop_helpers import ColumnMetadata
38+
from ._interop_helpers import ArrowLike, ColumnMetadata
3939

4040
from functools import singledispatchmethod
4141

4242
__all__ = ["Table"]
4343

4444

45-
class _ArrowLikeMeta(type):
46-
# We cannot separate these types via singledispatch because the dispatch
47-
# will often be ambiguous when objects expose multiple protocols.
48-
def __subclasscheck__(cls, other):
49-
return (
50-
hasattr(other, "__arrow_c_stream__")
51-
or hasattr(other, "__arrow_c_device_stream__")
52-
or hasattr(other, "__arrow_c_array__")
53-
or hasattr(other, "__arrow_c_device_array__")
54-
)
55-
56-
57-
class _ArrowLike(metaclass=_ArrowLikeMeta):
58-
pass
59-
60-
6145
cdef class _ArrowTableHolder:
6246
"""A holder for an Arrow table for gpumemoryview lifetime management."""
6347
cdef unique_ptr[arrow_table] tbl
@@ -86,7 +70,7 @@ cdef class Table:
8670
raise ValueError("All columns must be pylibcudf Column objects")
8771
self._columns = columns
8872

89-
@_init.register(_ArrowLike)
73+
@_init.register(ArrowLike)
9074
def _(self, arrow_like):
9175
cdef ArrowSchema* c_schema
9276
cdef ArrowDeviceArray* c_array

python/pylibcudf/pylibcudf/tests/test_interop.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import numpy as np
77
import pyarrow as pa
88
import pytest
9+
from packaging.version import parse
910
from utils import assert_column_eq, assert_table_eq
1011

1112
import pylibcudf as plc
@@ -163,3 +164,22 @@ def test_device_interop_table():
163164

164165
new_tbl = plc.Table(na_arr)
165166
assert_table_eq(pa_tbl, new_tbl)
167+
168+
169+
@pytest.mark.skipif(
170+
parse(pa.__version__) < parse("16.0.0"),
171+
reason="https://github.com/apache/arrow/pull/39985",
172+
)
173+
@pytest.mark.parametrize(
174+
"data",
175+
[
176+
[[1, 2, 3], [4, 5, 6]],
177+
[[1, 2, 3], [None, 5, 6]],
178+
[[[1]], [[2]]],
179+
[[{"a": 1}], [{"b": 2}]],
180+
],
181+
)
182+
def test_column_from_arrow_stream(data):
183+
pa_arr = pa.chunked_array(data)
184+
col = plc.Column(pa_arr)
185+
assert_column_eq(pa_arr, col)

0 commit comments

Comments
 (0)