diff --git a/pykokkos/interface/parallel_dispatch.py b/pykokkos/interface/parallel_dispatch.py index 3b79d755..eb68c447 100644 --- a/pykokkos/interface/parallel_dispatch.py +++ b/pykokkos/interface/parallel_dispatch.py @@ -8,11 +8,13 @@ from pykokkos.core.cppast import BuiltinType from .execution_policy import ExecutionPolicy, RangePolicy -from .execution_space import ExecutionSpace, is_host_execution_space +from .execution_space import ExecutionSpace, DeviceExecutionSpace from .views import ViewType, array from .interface_util import generic_error, get_filename, get_lineno +from .memory_space import get_default_memory_space + import inspect workunit_cache: Dict[int, Callable] = {} @@ -172,17 +174,21 @@ def check_workunit(workunit: Any) -> None: raise TypeError(f"ERROR: {workunit} is not a valid workunit") -def convert_arrays(kwargs: Dict[str, Any], workunit: Optional[Callable] = None) -> None: +def convert_arrays(kwargs: Dict[str, Any], workunit: Callable, execution_space) -> None: """ Convert all numpy, cupy and pytorch ndarray objects into pk Views :param kwargs: the list of keyword arguments passed to the workunit :param workunit: the workunit function (used to infer types for Python lists) + :param execution_space: the execution space of the workunit + (used to convert arrays to the correct memory space) """ cp_available: bool torch_available: bool + memory_space = get_default_memory_space(execution_space) + try: import cupy as cp @@ -226,28 +232,25 @@ def convert_arrays(kwargs: Dict[str, Any], workunit: Optional[Callable] = None) depth, dtype = parse_list_annotation(annotation) # Convert Python list to numpy array, then to View - kwargs[k] = array(np.array(v, dtype=dtype)) + kwargs[k] = array(np.array(v, dtype=dtype), space=memory_space) elif isinstance(v, np.ndarray): - default_space: ExecutionSpace = km.get_default_space() - _gpu_spaces = {ExecutionSpace.Cuda, ExecutionSpace.HIP} - if default_space in _gpu_spaces: + if execution_space in DeviceExecutionSpace: raise TypeError( f"Argument '{k}' is a numpy array, which cannot be accessed " - f"from the {default_space.value} execution space. " + f"from the {execution_space.value} execution space. " f"Use a pk.View (e.g. pk.View([...], dtype)) or a CuPy array instead." ) - kwargs[k] = array(v) + kwargs[k] = array(v, space=memory_space) elif cp_available and isinstance(v, cp.ndarray): - default_space = km.get_default_space() - if is_host_execution_space(default_space): + if execution_space not in DeviceExecutionSpace: raise TypeError( f"Argument '{k}' is a CuPy array, which cannot be accessed " - f"from the {default_space.value} (host) execution space. " + f"from the {execution_space.value} (host) execution space. " f"Convert it to a numpy array or pk.View in host memory first." ) - kwargs[k] = array(v) + kwargs[k] = array(v, space=memory_space) elif torch_available and torch.is_tensor(v): - kwargs[k] = array(v) + kwargs[k] = array(v, space=memory_space) elif ( hasattr(v, "__array__") or hasattr(v, "__cuda_array_interface__") @@ -279,7 +282,7 @@ def parallel_for(*args, **kwargs) -> None: kwargs = dict(kwargs) handled_args: HandledArgs = handle_args(True, args) - convert_arrays(kwargs, handled_args.workunit) + convert_arrays(kwargs, handled_args.workunit, handled_args.policy.space.space) runtime_singleton.runtime.run_workunit( handled_args.name, handled_args.policy, handled_args.workunit, "for", **kwargs @@ -296,7 +299,7 @@ def reduce_body(operation: str, *args, **kwargs) -> Union[float, int]: kwargs = dict(kwargs) handled_args: HandledArgs = handle_args(True, args) - convert_arrays(kwargs, handled_args.workunit) + convert_arrays(kwargs, handled_args.workunit, handled_args.policy.space.space) args_to_hash: List = [] args_not_to_hash: Dict = {}