16
16
# cython: embedsignature = True
17
17
# cython: language_level = 3
18
18
19
- import numpy as np
20
19
from libc.stdint cimport uintptr_t
21
20
from libcpp.utility cimport move
22
21
23
- from rmm.pylibrmm.device_buffer cimport DeviceBuffer
24
- from cudf.core.buffer import as_buffer
22
+ from pylibcudf cimport Column, DataType, type_id
25
23
import cudf
26
24
27
25
28
26
cdef move_device_buffer_to_column(
29
- unique_ptr[device_buffer] device_buffer_unique_ptr, dtype):
27
+ unique_ptr[device_buffer] device_buffer_unique_ptr,
28
+ DataType dtype,
29
+ size_t itemsize,
30
+ ):
30
31
"""
31
32
Transfers ownership of device_buffer_unique_ptr to a cuDF buffer which is
32
33
used to construct a cudf column object, which is then returned. If the
33
34
intermediate buffer is empty, the device_buffer_unique_ptr is still
34
35
transfered but None is returned.
35
36
"""
36
- buff = DeviceBuffer.c_from_unique_ptr(move(device_buffer_unique_ptr))
37
- buff = as_buffer(buff)
38
- if buff.nbytes != 0 :
39
- column = cudf.core.column.build_column(buff, dtype = cudf.dtype(dtype))
40
- return column
37
+ cdef size_t buff_size = device_buffer_unique_ptr.get().size()
38
+ cdef size_t col_size = buff_size // itemsize
39
+ cdef Column result_column = Column.from_rmm_buffer(
40
+ move(device_buffer_unique_ptr),
41
+ dtype,
42
+ col_size,
43
+ [],
44
+ )
45
+ if buff_size != 0 :
46
+ return result_column
41
47
return None
42
48
43
49
44
50
cdef move_device_buffer_to_series(
45
- unique_ptr[device_buffer] device_buffer_unique_ptr, dtype, series_name):
51
+ unique_ptr[device_buffer] device_buffer_unique_ptr,
52
+ DataType dtype,
53
+ size_t itemsize,
54
+ series_name
55
+ ):
46
56
"""
47
57
Transfers ownership of device_buffer_unique_ptr to a cuDF buffer which is
48
58
used to construct a cudf.Series object with name series_name, which is then
49
59
returned. If the intermediate buffer is empty, the device_buffer_unique_ptr
50
60
is still transfered but None is returned.
51
61
"""
52
- column = move_device_buffer_to_column(move(device_buffer_unique_ptr), dtype)
62
+ column = move_device_buffer_to_column(
63
+ move(device_buffer_unique_ptr),
64
+ dtype,
65
+ itemsize,
66
+ )
53
67
if column is not None :
54
- series = cudf.Series._from_data({series_name: column})
55
- return series
68
+ return cudf.Series.from_pylibcudf(column, metadata = {" name" : series_name})
56
69
return None
57
70
58
71
59
72
cdef coo_to_df(GraphCOOPtrType graph):
60
73
# FIXME: this function assumes columns named "src" and "dst" and can only
61
74
# be used for SG graphs due to that assumption.
62
75
contents = move(graph.get()[0 ].release())
63
- src = move_device_buffer_to_column(move(contents.src_indices), " int32" )
64
- dst = move_device_buffer_to_column(move(contents.dst_indices), " int32" )
76
+ src = move_device_buffer_to_column(
77
+ move(contents.src_indices),
78
+ DataType(type_id.INT32),
79
+ 4 ,
80
+ )
81
+ dst = move_device_buffer_to_column(
82
+ move(contents.dst_indices),
83
+ DataType(type_id.INT32),
84
+ 4 ,
85
+ )
65
86
66
87
if GraphCOOPtrType is GraphCOOPtrFloat:
67
- weight_type = " float32"
88
+ weight_type = DataType(type_id.FLOAT32)
89
+ itemsize = 4
68
90
elif GraphCOOPtrType is GraphCOOPtrDouble:
69
- weight_type = " float64"
91
+ weight_type = DataType(type_id.FLOAT64)
92
+ itemsize = 8
70
93
else :
71
94
raise TypeError (" Invalid GraphCOOPtrType" )
72
95
73
- wgt = move_device_buffer_to_column(move(contents.edge_data), weight_type)
96
+ wgt = move_device_buffer_to_column(
97
+ move(contents.edge_data),
98
+ weight_type,
99
+ itemsize
100
+ )
74
101
75
102
df = cudf.DataFrame()
76
103
df[' src' ] = src
@@ -83,20 +110,34 @@ cdef coo_to_df(GraphCOOPtrType graph):
83
110
84
111
cdef csr_to_series(GraphCSRPtrType graph):
85
112
contents = move(graph.get()[0 ].release())
86
- csr_offsets = move_device_buffer_to_series(move(contents.offsets),
87
- " int32" , " csr_offsets" )
88
- csr_indices = move_device_buffer_to_series(move(contents.indices),
89
- " int32" , " csr_indices" )
113
+ csr_offsets = move_device_buffer_to_series(
114
+ move(contents.offsets),
115
+ DataType(type_id.INT32),
116
+ 4 ,
117
+ " csr_offsets"
118
+ )
119
+ csr_indices = move_device_buffer_to_series(
120
+ move(contents.indices),
121
+ DataType(type_id.INT32),
122
+ 4 ,
123
+ " csr_indices"
124
+ )
90
125
91
126
if GraphCSRPtrType is GraphCSRPtrFloat:
92
- weight_type = " float32"
127
+ weight_type = DataType(type_id.FLOAT32)
128
+ itemsize = 4
93
129
elif GraphCSRPtrType is GraphCSRPtrDouble:
94
- weight_type = " float64"
130
+ weight_type = DataType(type_id.FLOAT64)
131
+ itemsize = 8
95
132
else :
96
133
raise TypeError (" Invalid GraphCSRPtrType" )
97
134
98
- csr_weights = move_device_buffer_to_series(move(contents.edge_data),
99
- weight_type, " csr_weights" )
135
+ csr_weights = move_device_buffer_to_series(
136
+ move(contents.edge_data),
137
+ weight_type,
138
+ itemsize,
139
+ " csr_weights"
140
+ )
100
141
101
142
return (csr_offsets, csr_indices, csr_weights)
102
143
0 commit comments