diff --git a/ucp/_libs/arr.pxd b/ucp/_libs/arr.pxd index baa6a98e..e1daf93e 100644 --- a/ucp/_libs/arr.pxd +++ b/ucp/_libs/arr.pxd @@ -20,6 +20,8 @@ cdef class Array: cdef readonly bint cuda + cdef readonly list _blocks + cpdef bint _c_contiguous(self) cpdef bint _f_contiguous(self) cpdef bint _contiguous(self) diff --git a/ucp/_libs/arr.pyx b/ucp/_libs/arr.pyx index 1937e7bb..c07c2b28 100644 --- a/ucp/_libs/arr.pyx +++ b/ucp/_libs/arr.pyx @@ -132,6 +132,29 @@ cdef class Array: else: self.shape_mv = None self.strides_mv = None + + # Multi-block VMM property + self._blocks = None + try: + from dask_cuda.vmm_pool import rmm_get_current_vmm_pool + + try: + vmm_pool = rmm_get_current_vmm_pool() + try: + from ucp._libs.vmm import build_slices + blocks = vmm_pool._allocs[self.ptr].blocks + try: + # self._blocks = build_slices(blocks, obj.shape[0]) + self._blocks = build_slices(blocks, self.nbytes) + except AttributeError: + print(f"AttributeError: {type(obj)}, {obj}") + except KeyError as e: + pass + except ValueError as e: + pass + except ImportError as e: + if hasattr(obj, "get_blocks"): + self._blocks = obj.get_blocks() else: mv = PyMemoryView_FromObject(obj) pybuf = PyMemoryView_GET_BUFFER(mv) @@ -237,6 +260,10 @@ cdef class Array: s *= self.shape_mv[i] return strides + @property + def blocks(self): + return self._blocks + @boundscheck(False) @initializedcheck(False) diff --git a/ucp/_libs/vmm.py b/ucp/_libs/vmm.py new file mode 100644 index 00000000..1add5164 --- /dev/null +++ b/ucp/_libs/vmm.py @@ -0,0 +1,139 @@ +from functools import partial +from typing import List, Tuple, Union + +import numpy as np +from cuda import cuda + +from dask_cuda.rmm_vmm_block_pool import VmmBlockPool +from dask_cuda.rmm_vmm_pool import checkCudaErrors +from dask_cuda.vmm_pool import VmmBlock, VmmPool + + +def get_vmm_allocator(vmm): + if vmm: + if isinstance(vmm, VmmBlockPool) or isinstance(vmm, VmmPool): + vmm_allocator = VmmBlockPoolArray + else: + vmm_allocator = VmmSingleArray + return partial(vmm_allocator, vmm) + + return None + + +def copy_to_host( + dst: np.ndarray, + src: Union[int, cuda.CUdeviceptr], + size: int, + stream: cuda.CUstream = cuda.CUstream(0), +): + if isinstance(src, int): + src = cuda.CUdeviceptr(src) + assert isinstance(src, cuda.CUdeviceptr) + assert isinstance(dst, np.ndarray) + assert isinstance(size, int) + assert size > 0 + # print( + # f"copy_to_host src: {hex(int(src))}, dst: {hex(int(dst.ctypes.data))}", + # flush=True + # ) + checkCudaErrors(cuda.cuMemcpyDtoHAsync(dst.ctypes.data, src, size, stream)) + checkCudaErrors(cuda.cuStreamSynchronize(stream)) + + +def copy_to_device( + dst: Union[int, cuda.CUdeviceptr], + src: np.ndarray, + size: int, + stream: cuda.CUstream = cuda.CUstream(0), +): + assert isinstance(src, np.ndarray) + if isinstance(dst, int): + dst = cuda.CUdeviceptr(dst) + assert isinstance(dst, cuda.CUdeviceptr) + assert isinstance(size, int) + assert size > 0 + # print( + # f"copy_to_device src: {hex(int(src.ctypes.data))}, dst: {hex(int(dst))}", + # flush=True + # ) + checkCudaErrors(cuda.cuMemcpyHtoDAsync(dst, src.ctypes.data, size, stream)) + checkCudaErrors(cuda.cuStreamSynchronize(stream)) + + +class VmmAllocBase: + def __init__(self, ptr, size): + self.ptr = cuda.CUdeviceptr(ptr) + self.shape = (size,) + + def __repr__(self) -> str: + return f"" + + @property + def __cuda_array_interface__(self): + return { + "shape": (self.shape), + "typestr": "u1", + "data": (int(self.ptr), False), + "version": 2, + } + + @property + def nbytes(self): + return self.shape[0] + + +class VmmArraySlice(VmmAllocBase): + pass + + +class VmmSingleArray(VmmAllocBase): + def __init__(self, vmm_allocator, size): + ptr = cuda.CUdeviceptr(vmm_allocator.allocate(size)) + super().__init__(ptr, size) + + self.vmm_allocator = vmm_allocator + + def __del__(self): + self.vmm_allocator.deallocate(int(self.ptr), self.shape[0]) + + +class VmmBlockPoolArray(VmmAllocBase): + def __init__(self, vmm_block_pool_allocator, size): + ptr = cuda.CUdeviceptr(vmm_block_pool_allocator.allocate(size)) + super().__init__(ptr, size) + + self.vmm_allocator = vmm_block_pool_allocator + + def __del__(self): + self.vmm_allocator.deallocate(int(self.ptr), self.shape[0]) + + def get_blocks(self): + if isinstance(self.vmm_allocator, VmmBlockPool): + blocks = self.vmm_allocator.get_allocation_blocks(int(self.ptr)) + else: + blocks = self.vmm_allocator._allocs[int(self.ptr)].blocks + return build_slices(blocks, self.shape[0]) + + +def build_slices( + blocks: List[Union[Tuple, VmmBlock]], alloc_size: int +) -> List[VmmArraySlice]: + assert len(blocks) > 0 + + cur_size = 0 + ret = [] + if isinstance(blocks[0], VmmBlock): + for block in blocks: + block_size = min(alloc_size - cur_size, block.size) + ret.append(VmmArraySlice(block._ptr, block_size)) + cur_size += block.size + if cur_size >= alloc_size: + break + else: + for block in blocks: + block_size = min(alloc_size - cur_size, block[1]) + ret.append(VmmArraySlice(block[0], block_size)) + cur_size += block[1] + if cur_size >= alloc_size: + break + return ret diff --git a/ucp/benchmarks/backends/ucp_async.py b/ucp/benchmarks/backends/ucp_async.py index 20246b02..4b7bff3c 100644 --- a/ucp/benchmarks/backends/ucp_async.py +++ b/ucp/benchmarks/backends/ucp_async.py @@ -7,6 +7,7 @@ import ucp from ucp._libs.arr import Array from ucp._libs.utils import print_key_value +from ucp._libs.vmm import copy_to_device, copy_to_host, get_vmm_allocator from ucp.benchmarks.backends.base import BaseClient, BaseServer @@ -46,16 +47,24 @@ def __init__(self, args: Namespace, xp: Any, queue: Queue): self.args = args self.xp = xp self.queue = queue + self.vmm = None async def run(self): ucp.init() register_am_allocators(self.args) + vmm_allocator = get_vmm_allocator(self.vmm) + async def server_handler(ep): + recv_msg_vmm = None if not self.args.enable_am: if self.args.reuse_alloc: - recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1")) + if self.vmm: + recv_msg_vmm = vmm_allocator(self.args.n_bytes) + recv_msg = Array(recv_msg_vmm) + else: + recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1")) assert recv_msg.nbytes == self.args.n_bytes @@ -65,10 +74,20 @@ async def server_handler(ep): await ep.am_send(recv) else: if not self.args.reuse_alloc: - recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1")) + if self.vmm: + recv_msg = Array(vmm_allocator(self.args.n_bytes)) + else: + recv_msg = Array( + self.xp.empty(self.args.n_bytes, dtype="u1") + ) await ep.recv(recv_msg) await ep.send(recv_msg) + + if self.vmm and self.args.vmm_debug: + h_recv_msg = self.xp.empty(self.args.n_bytes, dtype="u1") + copy_to_host(h_recv_msg, recv_msg.ptr, self.args.n_bytes) + print(f"Server recv msg: {h_recv_msg}") await ep.close() lf.close() @@ -88,6 +107,7 @@ def __init__( self.args = args self.xp = xp self.queue = queue + self.vmm = None self.server_address = server_address self.port = port @@ -96,17 +116,29 @@ async def run(self): register_am_allocators(self.args) + vmm_allocator = get_vmm_allocator(self.vmm) + ep = await ucp.create_endpoint(self.server_address, self.port) if self.args.enable_am: msg = self.xp.arange(self.args.n_bytes, dtype="u1") else: - send_msg = Array(self.xp.arange(self.args.n_bytes, dtype="u1")) - if self.args.reuse_alloc: - recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1")) + if self.vmm: + h_send_msg = self.xp.arange(self.args.n_bytes, dtype="u1") + print(f"Client send: {h_send_msg}") + send_msg = Array(vmm_allocator(self.args.n_bytes)) + copy_to_device(send_msg.ptr, h_send_msg, send_msg.shape[0]) + if self.args.reuse_alloc: + recv_msg = Array(vmm_allocator(self.args.n_bytes)) + + assert recv_msg.nbytes == self.args.n_bytes + else: + send_msg = Array(self.xp.arange(self.args.n_bytes, dtype="u1")) + if self.args.reuse_alloc: + recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1")) - assert send_msg.nbytes == self.args.n_bytes - assert recv_msg.nbytes == self.args.n_bytes + assert send_msg.nbytes == self.args.n_bytes + assert recv_msg.nbytes == self.args.n_bytes if self.args.cuda_profile: self.xp.cuda.profiler.start() @@ -118,11 +150,23 @@ async def run(self): await ep.am_recv() else: if not self.args.reuse_alloc: - recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1")) + if self.vmm: + recv_msg = Array(vmm_allocator(self.args.n_bytes)) + else: + recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1")) await ep.send(send_msg) await ep.recv(recv_msg) stop = monotonic() + + if self.vmm and self.args.vmm_debug: + import numpy as np + + h_recv_msg = self.xp.empty(self.args.n_bytes, dtype="u1") + copy_to_host(h_recv_msg, recv_msg.ptr, recv_msg.shape[0]) + print(f"Client recv: {h_recv_msg}") + np.testing.assert_equal(h_recv_msg, h_send_msg) + if i >= self.args.n_warmup_iter: times.append(stop - start) if self.args.cuda_profile: diff --git a/ucp/benchmarks/send_recv.py b/ucp/benchmarks/send_recv.py index 8ede2577..84060c8d 100644 --- a/ucp/benchmarks/send_recv.py +++ b/ucp/benchmarks/send_recv.py @@ -55,17 +55,39 @@ def _get_backend_implementation(backend): raise ValueError(f"Unknown backend {backend}") -def server(queue, args): - if args.server_cpu_affinity >= 0: - os.sched_setaffinity(0, [args.server_cpu_affinity]) +def _get_vmm_allocator(object_type): + if object_type.startswith("vmm"): + if object_type == "vmm-block-pool": + from dask_cuda.rmm_vmm_block_pool import VmmBlockPool - if args.object_type == "numpy": + return VmmBlockPool() + elif object_type == "vmm-pool": + from dask_cuda.vmm_pool import VmmPool + + return VmmPool() + elif object_type == "vmm-default-pool": + from dask_cuda.rmm_vmm_pool import VmmAllocPool + + return VmmAllocPool() + elif object_type == "vmm-default": + from dask_cuda.rmm_vmm_pool import VmmAlloc + + return VmmAlloc() + else: + raise ValueError(f"Unknown VMM type {object_type}") + return None + + +def _initialize_object_type_allocator( + object_type: str, device: int, rmm_init_pool_size: int +) -> object: + if object_type == "numpy": import numpy as xp - elif args.object_type == "cupy": + elif object_type == "cupy": import cupy as xp - xp.cuda.runtime.setDevice(args.server_dev) - else: + xp.cuda.runtime.setDevice(device) + elif object_type == "rmm": import cupy as xp import rmm @@ -73,13 +95,45 @@ def server(queue, args): rmm.reinitialize( pool_allocator=True, managed_memory=False, - initial_pool_size=args.rmm_init_pool_size, - devices=[args.server_dev], + initial_pool_size=rmm_init_pool_size, + devices=[device], ) - xp.cuda.runtime.setDevice(args.server_dev) + xp.cuda.runtime.setDevice(device) xp.cuda.set_allocator(rmm.rmm_cupy_allocator) + elif object_type == "rmm-vmm-pool": + import cupy as xp + + from dask_cuda.vmm_pool import rmm_set_current_vmm_pool + + import rmm + + xp.cuda.runtime.setDevice(device) + + rmm_set_current_vmm_pool() + + xp.cuda.set_allocator(rmm.rmm_cupy_allocator) + + elif object_type.startswith("vmm"): + import numpy as xp + from cuda import cudart + + cudart.cudaSetDevice(device) + else: + raise ValueError(f"Unknown objet type {object_type}") + + return xp + + +def server(queue, args): + if args.server_cpu_affinity >= 0: + os.sched_setaffinity(0, [args.server_cpu_affinity]) + + xp = _initialize_object_type_allocator( + args.object_type, args.server_dev, args.rmm_init_pool_size + ) server = _get_backend_implementation(args.backend)["server"](args, xp, queue) + server.vmm = _get_vmm_allocator(args.object_type) if asyncio.iscoroutinefunction(server.run): loop = get_event_loop() @@ -94,29 +148,14 @@ def client(queue, port, server_address, args): import numpy as np - if args.object_type == "numpy": - import numpy as xp - elif args.object_type == "cupy": - import cupy as xp - - xp.cuda.runtime.setDevice(args.client_dev) - else: - import cupy as xp - - import rmm - - rmm.reinitialize( - pool_allocator=True, - managed_memory=False, - initial_pool_size=args.rmm_init_pool_size, - devices=[args.client_dev], - ) - xp.cuda.runtime.setDevice(args.client_dev) - xp.cuda.set_allocator(rmm.rmm_cupy_allocator) + xp = _initialize_object_type_allocator( + args.object_type, args.client_dev, args.rmm_init_pool_size + ) client = _get_backend_implementation(args.backend)["client"]( args, xp, queue, server_address, port ) + client.vmm = _get_vmm_allocator(args.object_type) if asyncio.iscoroutinefunction(client.run): loop = get_event_loop() @@ -224,7 +263,16 @@ def parse_args(): "-o", "--object_type", default="numpy", - choices=["numpy", "cupy", "rmm"], + choices=[ + "numpy", + "cupy", + "rmm", + "rmm-vmm-pool", + "vmm-default", + "vmm-default-pool", + "vmm-block-pool", + "vmm-pool", + ], help="In-memory array type.", ) parser.add_argument( @@ -336,6 +384,12 @@ def parse_args(): help="Only applies to 'ucp-core' backend: number of maximum outstanding " "operations, see --delay-progress. (Default: 32)", ) + parser.add_argument( + "--vmm-debug", + default=False, + action="store_true", + help="Activate verbose debug prints for VMM and result checking.", + ) args = parser.parse_args() diff --git a/ucp/core.py b/ucp/core.py index 6f5ddf3c..039b67e7 100644 --- a/ucp/core.py +++ b/ucp/core.py @@ -635,24 +635,31 @@ async def send(self, buffer, tag=None, force_tag=False): tag = self._tags["msg_send"] elif not force_tag: tag = hash64bits(self._tags["msg_send"], hash(tag)) - nbytes = buffer.nbytes - log = "[Send #%03d] ep: %s, tag: %s, nbytes: %d, type: %s" % ( - self._send_count, - hex(self.uid), - hex(tag), - nbytes, - type(buffer.obj), - ) - logger.debug(log) - self._send_count += 1 + if buffer.blocks: + # `asyncio.gather` may not be used here to guarantee ordering + ret = [] + for block in buffer.blocks: + ret.append(await self.send(block)) + return ret + else: + nbytes = buffer.nbytes + log = "[Send #%03d] ep: %s, tag: %s, nbytes: %d, type: %s" % ( + self._send_count, + hex(self.uid), + hex(tag), + nbytes, + type(buffer.obj), + ) + logger.debug(log) + self._send_count += 1 - try: - return await comm.tag_send(self._ep, buffer, nbytes, tag, name=log) - except UCXCanceled as e: - # If self._ep has already been closed and destroyed, we reraise the - # UCXCanceled exception. - if self._ep is None: - raise e + try: + return await comm.tag_send(self._ep, buffer, nbytes, tag, name=log) + except UCXCanceled as e: + # If self._ep has already been closed and destroyed, we reraise the + # UCXCanceled exception. + if self._ep is None: + raise e @ucx_api.nvtx_annotate("UCXPY_AM_SEND", color="green", domain="ucxpy") async def am_send(self, buffer): @@ -711,26 +718,34 @@ async def recv(self, buffer, tag=None, force_tag=False): if not isinstance(buffer, Array): buffer = Array(buffer) - nbytes = buffer.nbytes - log = "[Recv #%03d] ep: %s, tag: %s, nbytes: %d, type: %s" % ( - self._recv_count, - hex(self.uid), - hex(tag), - nbytes, - type(buffer.obj), - ) - logger.debug(log) - self._recv_count += 1 - ret = await comm.tag_recv(self._ep, buffer, nbytes, tag, name=log) + if buffer.blocks: + # `asyncio.gather` may not be used here to guarantee ordering + ret = [] + for block in buffer.blocks: + ret.append(await self.recv(block)) + return ret + else: + nbytes = buffer.nbytes + log = "[Recv #%03d] ep: %s, tag: %s, nbytes: %d, type: %s" % ( + self._recv_count, + hex(self.uid), + hex(tag), + nbytes, + type(buffer.obj), + ) + logger.debug(log) + self._recv_count += 1 + + ret = await comm.tag_recv(self._ep, buffer, nbytes, tag, name=log) - self._finished_recv_count += 1 - if ( - self._close_after_n_recv is not None - and self._finished_recv_count >= self._close_after_n_recv - ): - self.abort() - return ret + self._finished_recv_count += 1 + if ( + self._close_after_n_recv is not None + and self._finished_recv_count >= self._close_after_n_recv + ): + self.abort() + return ret @ucx_api.nvtx_annotate("UCXPY_AM_RECV", color="red", domain="ucxpy") async def am_recv(self):