Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions pykokkos/interface/parallel_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
from pykokkos.core.cppast import BuiltinType

from .execution_policy import ExecutionPolicy, RangePolicy
from .execution_space import ExecutionSpace, is_host_execution_space
from .execution_space import ExecutionSpace, DeviceExecutionSpace
from .views import ViewType, array

from .interface_util import generic_error, get_filename, get_lineno

from .memory_space import get_default_memory_space

import inspect

workunit_cache: Dict[int, Callable] = {}
Expand Down Expand Up @@ -172,17 +174,21 @@ def check_workunit(workunit: Any) -> None:
raise TypeError(f"ERROR: {workunit} is not a valid workunit")


def convert_arrays(kwargs: Dict[str, Any], workunit: Optional[Callable] = None) -> None:
def convert_arrays(kwargs: Dict[str, Any], workunit: Callable, execution_space) -> None:
"""
Convert all numpy, cupy and pytorch ndarray objects into pk Views

:param kwargs: the list of keyword arguments passed to the workunit
:param workunit: the workunit function (used to infer types for Python lists)
:param execution_space: the execution space of the workunit
(used to convert arrays to the correct memory space)
"""

cp_available: bool
torch_available: bool

memory_space = get_default_memory_space(execution_space)

try:
import cupy as cp

Expand Down Expand Up @@ -226,28 +232,25 @@ def convert_arrays(kwargs: Dict[str, Any], workunit: Optional[Callable] = None)
depth, dtype = parse_list_annotation(annotation)

# Convert Python list to numpy array, then to View
kwargs[k] = array(np.array(v, dtype=dtype))
kwargs[k] = array(np.array(v, dtype=dtype), space=memory_space)
elif isinstance(v, np.ndarray):
default_space: ExecutionSpace = km.get_default_space()
_gpu_spaces = {ExecutionSpace.Cuda, ExecutionSpace.HIP}
if default_space in _gpu_spaces:
if execution_space in DeviceExecutionSpace:
raise TypeError(
f"Argument '{k}' is a numpy array, which cannot be accessed "
f"from the {default_space.value} execution space. "
f"from the {execution_space.value} execution space. "
f"Use a pk.View (e.g. pk.View([...], dtype)) or a CuPy array instead."
)
kwargs[k] = array(v)
kwargs[k] = array(v, space=memory_space)
elif cp_available and isinstance(v, cp.ndarray):
default_space = km.get_default_space()
if is_host_execution_space(default_space):
if execution_space not in DeviceExecutionSpace:
raise TypeError(
f"Argument '{k}' is a CuPy array, which cannot be accessed "
f"from the {default_space.value} (host) execution space. "
f"from the {execution_space.value} (host) execution space. "
f"Convert it to a numpy array or pk.View in host memory first."
)
kwargs[k] = array(v)
kwargs[k] = array(v, space=memory_space)
elif torch_available and torch.is_tensor(v):
kwargs[k] = array(v)
kwargs[k] = array(v, space=memory_space)
elif (
hasattr(v, "__array__")
or hasattr(v, "__cuda_array_interface__")
Expand Down Expand Up @@ -279,7 +282,7 @@ def parallel_for(*args, **kwargs) -> None:

kwargs = dict(kwargs)
handled_args: HandledArgs = handle_args(True, args)
convert_arrays(kwargs, handled_args.workunit)
convert_arrays(kwargs, handled_args.workunit, handled_args.policy.space.space)

runtime_singleton.runtime.run_workunit(
handled_args.name, handled_args.policy, handled_args.workunit, "for", **kwargs
Expand All @@ -296,7 +299,7 @@ def reduce_body(operation: str, *args, **kwargs) -> Union[float, int]:

kwargs = dict(kwargs)
handled_args: HandledArgs = handle_args(True, args)
convert_arrays(kwargs, handled_args.workunit)
convert_arrays(kwargs, handled_args.workunit, handled_args.policy.space.space)

args_to_hash: List = []
args_not_to_hash: Dict = {}
Expand Down