Skip to content

Allow pylibcudf.Column to consume objects exposing __arrow_c_stream__ #18712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions cpp/include/cudf/interop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,21 @@ class arrow_column {
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Construct a new arrow column object
*
* The stream will be released after the column is created, so it is no longer
* suitable for use afterwards. This is done for consistency with other constructors of
* arrow_column even though the source data is always host data.
*
* @param input ArrowArrayStream data for the column
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used for any allocations during conversion
*/
arrow_column(ArrowArrayStream&& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Convert the column to an ArrowSchema
*
Expand Down Expand Up @@ -745,6 +760,24 @@ std::unique_ptr<table> from_arrow_stream(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Create `cudf::column` from given ArrowArrayStream input
*
* @throws std::invalid_argument if input is NULL
*
* The conversion WILL release the input ArrayArrayStream and its constituent
* arrays or schema since Arrow streams are not suitable for multiple reads.
*
* @param input `ArrowArrayStream` pointer to object that will produce ArrowArray data
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to perform cuda allocation
* @return cudf column generated from the given Arrow data
*/
std::unique_ptr<column> from_arrow_stream_column(
ArrowArrayStream* input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Create `cudf::column` from given ArrowDeviceArray input
*
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/interop/arrow_data_structures.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,17 @@ arrow_column::arrow_column(ArrowSchema&& schema,
cached_view = tmp.cached_view;
}

arrow_column::arrow_column(ArrowArrayStream&& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto col = from_arrow_stream_column(&input, stream, mr);
auto tmp = arrow_column(std::move(*col), get_column_metadata(col->view()), stream, mr);
container = tmp.container;
view_columns = std::move(tmp.view_columns);
cached_view = tmp.cached_view;
}

void arrow_column::to_arrow_schema(ArrowSchema* output,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
Expand Down
56 changes: 56 additions & 0 deletions cpp/src/interop/from_arrow_stream.cu
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,53 @@ std::unique_ptr<table> from_arrow_stream(ArrowArrayStream* input,
return cudf::detail::concatenate(chunk_views, stream, mr);
}

std::unique_ptr<column> from_arrow_stream_column(ArrowArrayStream* input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(input != nullptr, "input ArrowArrayStream must not be NULL", std::invalid_argument);

// Potential future optimization: Since the from_arrow API accepts an
// ArrowSchema we're allocating one here instead of using a view, which we
// could avoid with a different underlying implementation.
ArrowSchema schema;
NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetSchema(input, &schema, nullptr));

std::vector<std::unique_ptr<cudf::column>> chunks;
ArrowArray chunk;
while (true) {
NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetNext(input, &chunk, nullptr));
if (chunk.release == nullptr) { break; }
chunks.push_back(from_arrow_column(&schema, &chunk, stream, mr));
chunk.release(&chunk);
}
input->release(input);

if (chunks.empty()) {
if (schema.n_children == 0) {
schema.release(&schema);
return std::make_unique<cudf::column>();
}

// If there are no chunks but the schema has children, we need to construct a suitable empty
// column.
auto empty_column = make_empty_column_from_schema(&schema, stream, mr);
schema.release(&schema);
return empty_column;
}

schema.release(&schema);

if (chunks.size() == 1) { return std::move(chunks[0]); }
auto chunk_views = std::vector<column_view>{};
chunk_views.reserve(chunks.size());
std::transform(
chunks.begin(), chunks.end(), std::back_inserter(chunk_views), [](auto const& chunk) {
return chunk->view();
});
return cudf::detail::concatenate(chunk_views, stream, mr);
}

} // namespace detail

std::unique_ptr<table> from_arrow_stream(ArrowArrayStream* input,
Expand All @@ -140,4 +187,13 @@ std::unique_ptr<table> from_arrow_stream(ArrowArrayStream* input,
CUDF_FUNC_RANGE();
return detail::from_arrow_stream(input, stream, mr);
}

std::unique_ptr<column> from_arrow_stream_column(ArrowArrayStream* input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::from_arrow_stream_column(input, stream, mr);
}

} // namespace cudf
16 changes: 16 additions & 0 deletions python/pylibcudf/pylibcudf/_interop_helpers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ from pylibcudf.libcudf.interop cimport (
from dataclasses import dataclass, field


class _ArrowLikeMeta(type):
# We cannot separate these types via singledispatch because the dispatch
# will often be ambiguous when objects expose multiple protocols.
def __subclasscheck__(cls, other):
return (
hasattr(other, "__arrow_c_stream__")
or hasattr(other, "__arrow_c_device_stream__")
or hasattr(other, "__arrow_c_array__")
or hasattr(other, "__arrow_c_device_array__")
)


class ArrowLike(metaclass=_ArrowLikeMeta):
pass


@dataclass
class ColumnMetadata:
"""Metadata associated with a column.
Expand Down
47 changes: 31 additions & 16 deletions python/pylibcudf/pylibcudf/column.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ from pylibcudf.libcudf.column.column cimport column, column_contents
from pylibcudf.libcudf.column.column_factories cimport make_column_from_scalar
from pylibcudf.libcudf.interop cimport (
ArrowArray,
ArrowArrayStream,
ArrowSchema,
ArrowDeviceArray,
arrow_column,
Expand Down Expand Up @@ -52,27 +53,13 @@ from .null_mask cimport bitmask_allocation_size_bytes
from .utils cimport _get_stream

from .gpumemoryview import _datatype_from_dtype_desc
from ._interop_helpers import ColumnMetadata
from ._interop_helpers import ArrowLike, ColumnMetadata

import functools

__all__ = ["Column", "ListColumnView", "is_c_contiguous"]


class _ArrowLikeMeta(type):
def __subclasscheck__(cls, other):
# We cannot separate these types via singledispatch because the dispatch
# will often be ambiguous when objects expose multiple protocols.
return (
hasattr(other, "__arrow_c_array__")
or hasattr(other, "__arrow_c_device_array__")
)


class _ArrowLike(metaclass=_ArrowLikeMeta):
pass


cdef class _ArrowColumnHolder:
"""A holder for an Arrow column for gpumemoryview lifetime management."""
cdef unique_ptr[arrow_column] col
Expand Down Expand Up @@ -216,7 +203,7 @@ cdef class Column:
self._children = children
self._num_children = len(children)

@_init.register(_ArrowLike)
@_init.register(ArrowLike)
def _(self, arrow_like):
cdef ArrowSchema* c_schema
cdef ArrowArray* c_array
Expand Down Expand Up @@ -269,6 +256,34 @@ cdef class Column:
tmp.offset(),
tmp.children(),
)
elif hasattr(arrow_like, "__arrow_c_stream__"):
stream = arrow_like.__arrow_c_stream__()
c_stream = (
<ArrowArrayStream*>PyCapsule_GetPointer(stream, "arrow_array_stream")
)

result = _ArrowColumnHolder()
with nogil:
c_result = make_unique[arrow_column](
move(dereference(c_stream))
)
result.col.swap(c_result)

tmp = Column.from_column_view_of_arbitrary(result.col.get().view(), result)
self._init(
tmp.type(),
tmp.size(),
tmp.data(),
tmp.null_mask(),
tmp.null_count(),
tmp.offset(),
tmp.children(),
)
elif hasattr(arrow_like, "__arrow_c_device_stream__"):
# TODO: When we add support for this case, it should be moved above
# the __arrow_c_array__ case since we should prioritize device
# data if possible.
raise NotImplementedError("Device streams not yet supported")
else:
raise ValueError("Invalid Arrow-like object")

Expand Down
3 changes: 3 additions & 0 deletions python/pylibcudf/pylibcudf/libcudf/interop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ cdef extern from "cudf/interop.hpp" namespace "cudf::interop" \
ArrowSchema&& schema,
ArrowDeviceArray&& array
) except +libcudf_exception_handler
arrow_column(
ArrowArrayStream&& stream,
) except +libcudf_exception_handler
column_view view() except +libcudf_exception_handler

cdef cppclass arrow_table:
Expand Down
20 changes: 2 additions & 18 deletions python/pylibcudf/pylibcudf/table.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,13 @@ from pylibcudf._interop_helpers cimport (
_release_device_array,
_metadata_to_libcudf,
)
from ._interop_helpers import ColumnMetadata
from ._interop_helpers import ArrowLike, ColumnMetadata

from functools import singledispatchmethod

__all__ = ["Table"]


class _ArrowLikeMeta(type):
# We cannot separate these types via singledispatch because the dispatch
# will often be ambiguous when objects expose multiple protocols.
def __subclasscheck__(cls, other):
return (
hasattr(other, "__arrow_c_stream__")
or hasattr(other, "__arrow_c_device_stream__")
or hasattr(other, "__arrow_c_array__")
or hasattr(other, "__arrow_c_device_array__")
)


class _ArrowLike(metaclass=_ArrowLikeMeta):
pass


cdef class _ArrowTableHolder:
"""A holder for an Arrow table for gpumemoryview lifetime management."""
cdef unique_ptr[arrow_table] tbl
Expand Down Expand Up @@ -86,7 +70,7 @@ cdef class Table:
raise ValueError("All columns must be pylibcudf Column objects")
self._columns = columns

@_init.register(_ArrowLike)
@_init.register(ArrowLike)
def _(self, arrow_like):
cdef ArrowSchema* c_schema
cdef ArrowDeviceArray* c_array
Expand Down
20 changes: 20 additions & 0 deletions python/pylibcudf/pylibcudf/tests/test_interop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import pyarrow as pa
import pytest
from packaging.version import parse
from utils import assert_column_eq, assert_table_eq

import pylibcudf as plc
Expand Down Expand Up @@ -163,3 +164,22 @@ def test_device_interop_table():

new_tbl = plc.Table(na_arr)
assert_table_eq(pa_tbl, new_tbl)


@pytest.mark.skipif(
parse(pa.__version__) < parse("16.0.0"),
reason="https://github.com/apache/arrow/pull/39985",
)
@pytest.mark.parametrize(
"data",
[
[[1, 2, 3], [4, 5, 6]],
[[1, 2, 3], [None, 5, 6]],
[[[1]], [[2]]],
[[{"a": 1}], [{"b": 2}]],
],
)
def test_column_from_arrow_stream(data):
pa_arr = pa.chunked_array(data)
col = plc.Column(pa_arr)
assert_column_eq(pa_arr, col)
Loading