Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,13 @@ jobs:
with:
environment-name: test-env
init-shell: bash
# TODO: run the tests with cudf again when it supports numba-cuda>=0.23.0
create-args: >-
-c rapidsai
-c nvidia
python=3.13
pandas>=0.24.0
cccl-python
cudf
cupy
cuda-version=${{ matrix.cuda-version }}
cuda-toolkit
Expand Down
2 changes: 1 addition & 1 deletion requirements-test-gpu.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# cuda-cccl[cu12]
fsspec>=2022.11.0
numba>=0.60
numba-cuda
numba-cuda>=0.23.0
pyarrow
pytest>=6
uproot>=5
121 changes: 36 additions & 85 deletions src/awkward/_connect/cuda/_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@

from __future__ import annotations

from cuda.compute import ZipIterator, gpu_struct, segmented_reduce
# TODO: delete these after modifying argmin
from cuda.compute import (
CountingIterator,
unary_transform,
)

from awkward._nplikes.cupy import Cupy
from awkward._nplikes.numpy import Numpy

cupy_nplike = Cupy.instance()
cp = cupy_nplike._module

numpy_nplike = Numpy.instance()
np = numpy_nplike._module

# Cache for cuda.compute availability
_cuda_compute_available: bool | None = None

Expand Down Expand Up @@ -113,56 +121,28 @@ def awkward_reduce_argmax(
parents_length,
outlength,
):
ak_array = gpu_struct(
{
"data": input_data.dtype.type,
"local_index": cp.int64,
}
)

# compare the values of the arrays
def max_op(a: ak_array, b: ak_array):
return a if a.data > b.data else b

# use a helper function to get the local indices
# local_indices = local_idx_from_parents(parents_data, parents_length)

# use global indices instead
global_indices = cp.arange(0, parents_length + 1, dtype=cp.int64)
index_dtype = parents_data.dtype

# Combine data and their indices into a single structure
input_struct = ZipIterator(input_data, global_indices)
# alternative way
# input_struct = cp.stack((input_data, global_indices), axis=1).view(ak_array.dtype)
def segment_reduce_argmax(segment_id):
start_idx = start_o[segment_id]
end_idx = end_o[segment_id]
segment = input_data[start_idx:end_idx]
if len(segment) == 0:
return -1
# return a global index
return np.argmax(segment) + start_idx

# Prepare the start and end offsets
offsets = parents_to_offsets(parents_data, parents_length)
start_o = offsets[:-1]
end_o = offsets[1:]

# Prepare the output array
_result = result
_result = cp.concatenate((result, result))
_result = _result.view(ak_array.dtype)

# alternative way
# _result = cp.zeros([outlength], dtype= ak_array.dtype)

# Initial value for the reduction
# min value gets transformed to input_data.dtype automatically?
min = cp.iinfo(cp.int64).min
h_init = ak_array(min, min)

# Perform the segmented reduce
segmented_reduce(input_struct, _result, start_o, end_o, max_op, h_init, outlength)

# TODO: here converts float to int too, fix this?
_result = _result.view(cp.int64).reshape(-1, 2)
_result = _result[:, 1]

# pass the result outside the function
result_v = result.view()
result_v[...] = _result
# type_wrapper: cp.int64
type_wrapper = cp.dtype(index_dtype).type
segment_ids = CountingIterator(type_wrapper(0))
# TODO: try using segmented_reduce instead when https://github.com/NVIDIA/cccl/issues/6171 is fixed
unary_transform(segment_ids, result, segment_reduce_argmax, outlength)


# this function is called from ~/awkward/src/awkward/_reducers.py:161 (ArgMin.apply())
Expand All @@ -174,53 +154,24 @@ def awkward_reduce_argmin(
outlength,
):
index_dtype = parents_data.dtype
ak_array = gpu_struct(
{
"data": input_data.dtype.type,
"local_index": index_dtype,
}
)

# compare the values of the arrays
def min_op(a: ak_array, b: ak_array):
return a if a.data < b.data else b

# use a helper function to get the local indices
# local_indices = local_idx_from_parents(parents_data, parents_length)

# use global indices instead
global_indices = cp.arange(0, parents_length + 1, dtype=cp.int64)

# Combine data and their indices into a single structure
input_struct = ZipIterator(input_data, global_indices)
# alternative way
# input_struct = cp.stack((input_data, global_indices), axis=1).view(ak_array.dtype)
def segment_reduce_argmin(segment_id):
start_idx = start_o[segment_id]
end_idx = end_o[segment_id]
segment = input_data[start_idx:end_idx]
if len(segment) == 0:
return -1
# return a global index
return np.argmin(segment) + start_idx

# Prepare the start and end offsets
offsets = parents_to_offsets(parents_data, parents_length)
start_o = offsets[:-1]
end_o = offsets[1:]

# Prepare the output array
_result = result
_result = cp.concatenate((result, result))
_result = _result.view(ak_array.dtype)

# alternative way
# _result = cp.zeros([outlength], dtype= ak_array.dtype)

# Initial value for the reduction
# max value gets transformed to input_data.dtype automatically?
max = cp.iinfo(index_dtype).max
h_init = ak_array(max, max)

# Perform the segmented reduce
segmented_reduce(input_struct, _result, start_o, end_o, min_op, h_init, outlength)

# TODO: here converts float to int too, fix this?
_result = _result.view(index_dtype).reshape(-1, 2)
_result = _result[:, 1]

# pass the result outside the function
result_v = result.view()
result_v[...] = _result
# type_wrapper: cp.int64
type_wrapper = cp.dtype(index_dtype).type
segment_ids = CountingIterator(type_wrapper(0))
# TODO: try using segmented_reduce instead when https://github.com/NVIDIA/cccl/issues/6171 is fixed
unary_transform(segment_ids, result, segment_reduce_argmin, outlength)
Loading