Skip to content

Multi Device Execution API

William L. Ruys edited this page Mar 3, 2023 · 1 revision

This presents an extended Parla API using a locals task local execution context to provide access to details of the current task environment. This is a work in progress and is not intended to be a final API.

We demonstrate how this API can be used to support multi-device libraries and user written applications.

Parla API

locals is the handle to the current ExecutionContext. It contains a list of the active devices and streams in the current execution context. For each device it initializes a DeviceContext object that can be used to set the current device and stream. These are stored in locals.device and can be accessed or sliced by the local relative device id. A slice of locals.devices returns a locals.context of the devices in the slice.

locals does not necessarily need to be passed in the task body handle. It can be defined as a threading.local() global object. I pass it in this document as it gives us some syntax highlighting.

A Parla DeviceContext is a derived class from ExecutionContext that sets the current device and stream on __enter__. It has a local_id attribute that is the id of the device in the current execution context. It has a id attribute that is the global id of the device. It has a device attribute that is the device object for the device. (typically a cupy device object) It has a stream attribute that defaults to the first stream object associated with the device.

The following are members of an ExecutionContext: get_active_devices() returns a list of all active Parla DeviceContext in the current execution context.

get_active_devices(architecture) returns a list of the active Parla DeviceContext of a given architecture in the current execution context.

get_active_cupy_devices() which is just [device.device for device in locals.get_active_devices(gpu)]

get_active_streams returns a dictionary of the active Parla streams in the current execution context. The dictionary is keyed by the device id of the device the stream is associated with. A device may have multiple streams associated with it.

add_active_streams(streams) adds the given streams to the current execution context. The streams object here is a dictionary keyed by the local device ids. The streams must be associated with a device in the current execution context.

with device enters the device context manager. This sets the active CUDA device on the current thread and the active stream to the first stream associated with the device in the current execution context.

with locals.context(list of device contexts) as context: creates an ExecutionContext with the given list of devices and streams.

for context in locals.generator([list of execution contexts]) is a generator that loops over the given list of execution contexts and enters each sequentially.

locals.devices(architecture) returns locals.generator(locals.get_active_devices(architecture))

synchronize() synchronizes all streams in the current execution context.

By default all ExecutionContext objects are do not synchronize on __exit__. This is to allow for serial dispatch of kernels across GPUs. If a user wants to synchronize on exit they can use the non_blocking=False flag when creating the ExecutionContext object.

@locals.parallel([list of execution contexts])`
def body(context, barrier):
    with context:
        #do stuff
    barrier.wait()
    with context:
        #do stuff

is a decorator that creates and launches a thread for each execution context in the list. If none is given it uses the list of active devices in the current execution context.

Function variants always dispatch to the implementation defined for the active device or execution context.

Example 1: Call to an External Multi-Device Library (no known streams)

External libraries must be able to take a list of CuPy devices or global CUDA device ids to specify the device set they run on. We cannot control external libraries with any process level configuration without VLCs.

Demonstrates get_active_devices(gpu)

#Define a task that can run on 1, 2, or 4 gpus.
@spawn(placement=[gpu, gpu*2, gpu*4], in=[A, B])
def multidevice_application(locals):

    # Get list of active cupy devices from current execution context
    # Returns list[cupy.Device]
    devices = locals.get_active_devices(gpu)

    lA = repartition(A, devices) #dummy functions to get the data on the right devices
    lB = repartition(B, devices) #NOT THE API I'm proposing. Just example user code. 
    #Can assume data is already on the correct devices for now

    #Get list of device ids from current execution context
    device_ids = [device.id for device in devices]

    # Launch a library kernel on the real devices
    # Assume A,B already correctly distributed
    C = NVIDIA_MULTIGPU_GEMM(device_ids, lA, lB)

    for device in devices:
        device.synchronize()

Example 2: Call to an External Multi-Device Library (uses Parla streams)

Demonstrates get_active_devices() and get_active_streams()

#Define a task that can run on 1, 2, or 4 gpus.
@spawn(placement=[gpu, gpu*2, gpu*4], in=[A, B])
def multidevice_application(locals):

    # Get list of active cupy devices from current execution context
    # Returns list[cupy.Device]
    devices = locals.get_active_devices(gpu)

    lA = repartition(A, devices) #dummy functions to get the data on the right devices
    lB = repartition(B, devices) #NOT THE API I'm proposing. Just example user code. 
    #Can assume data is already on the correct devices for now

    #Get list of device ids from current execution context
    device_ids = [device.id for device in devices]

    #Get list of streams from current execution context
    #Returns Dict[int, list[cupy.cuda.Stream]]]
    streams = locals.get_active_streams()

    # Launch a library kernel on the real devices
    # Assume A,B already correctly distributed
    C = NVIDIA_MULTIGPU_GEMM(device_ids, streams, lA, lB)

    #As parla is aware of these streams synchronization will be automatic

Example 3: Call to an External Multi-Device Library (uses known user streams)

Demonstrates get_active_devices(), add_active_streams().

#Define a task that can run on 1, 2, or 4 gpus.
@spawn(placement=[gpu, gpu*2, gpu*4], in=[A, B])
def multidevice_application(locals):

    # Get list of active cupy devices from current execution context
    # Returns list[cupy.Device]
    devices = locals.get_active_cupy_devices()

    lA = repartition(A, devices) #dummy functions to get the data on the right devices
    lB = repartition(B, devices) #NOT THE API I'm proposing. Just example user code. 
    #Can assume data is already on the correct devices for now

    #Get list of device ids from current execution context
    device_ids = [device.id for device in devices]

    # Launch a library kernel on the real devices
    # Assume A,B already correctly distributed
    C, streams = NVIDIA_MULTIGPU_GEMM(device_ids, A, B)

    #Add the streams to the current execution context
    locals.add_active_streams(streams)

    #Now parla will handle their synchronization

Example 4: User Written Application (Parla Aware)

Demonstrates with locals.device[local_id]:

#Define a 2 GPU task
@spawn(placement=[gpu*2], in=[A, B])
def task(locals):
    #Assume A & B already distributed. 
    mid = len(A) // 2
    
    #single_gpu_kernel is a user written async CUDA kernel

    with locals.device[0]: #Set the current CUDA device to the first device in the current execution context
                           #Sets the current stream to the stream associated with the current device in this task
        B[:mid] =  single_gpu_kernel(A[:mid])

    with locals.device[1]: #Set the current CUDA device to the second device in the current execution context
                           #Sets the current stream to the stream associated with the current device in this task
        B[mid:] =  single_gpu_kernel(A[mid:])

    #Parla will handle synchronization as the streams are known by the execution context
#Define a 4 GPU task
@spawn(placement=[gpu*4], in=[A, B])
def task(locals):
    #Assume A & B already distributed. 
    
    #two_gpu_kernel is a user written async CUDA multi-device kernel

    num_devices = len(locals.get_active_devices(gpu))
    half = num_devices // 2
    mid = len(A) // 2

    with locals.device[:half] as context: #Set the current context to the first two devices in the current execution context
        devices = context.get_active_devices(gpu)
        streams = context.get_active_streams()
        B[:mid] =  two_gpu_kernel(devices, streams, A[:mid])

    with locals.device[half:] as sublocal: #Set the current context to the second two device in the current execution context
        devices = context.get_active_devices(gpu)
        streams = context.get_active_streams(gpu)
        B[mid:] =  two_gpu_kernel(devices, streams, A[mid:])

    #Parla will handle synchronization as the streams are known by the outer execution context

The above is the same as:

#Define a 4 GPU task
@spawn(placement=[gpu*4], in=[A, B])
def task(locals):
    #Assume A & B already distributed. 
    
    #2_gpu_kernel is a user written async CUDA multi-device kernel

    devices = locals.get_active_cupy_devices()
    device_0 = devices[0]
    device_1 = devices[1]
    device_2 = devices[2]
    device_3 = devices[3]

    half = num_devices // 2
    mid = len(A) // 2

    with locals.context([device_0, device_1]) as context: #Set the current context to the first two devices in the current execution context
        devices = context.get_active_cupy_devices()
        streams = context.get_active_cupy_streams()
        B[:mid] =  two_gpu_kernel(devices, streams, A[:mid])

    with locals.context([device_0, device_1]) as context: #Set the current context to the second two device in the current execution context
        devices = context.get_active_cupy_devices()
        streams = context.get_active_cupy_streams()
        B[mid:] =  two_gpu_kernel(devices, streams, A[mid:])

    #Parla will handle synchronization as the streams are known by the outer execution context

Example: Variant Dispatch

@specialize
def example_kernel(context, A):
    pass

@example_kernel.variant(gpu*2)
def two_gpu_handle(context, A):
    devices = context.get_active_devices(gpu)
    streams = context.get_active_streams()
    B = two_gpu_kernel(devices, streams, A)

@example_kernel.variant(gpu*1)
def one_gpu_handle(context, A):
    device = context.get_active_devices(gpu)[0]
    stream = context.get_active_streams()[0]
    B = one_gpu_kernel(device, stream, A)


#Define a 2 or 4 GPU task
@spawn(placement=[gpu*2, gpu*4], in=[A, B])
def task(locals):
    #Assume A & B already distributed correctly. 
    
    #2_gpu_kernel is a user written async CUDA multi-device kernel

    num_devices = len(locals.get_active_devices(gpu))
    half = num_devices // 2
    mid = len(A) // 2

    with locals.device[:half] as context: #Set the current context to the first half devices in the current execution context
        #Either a 1 or 2 GPU kernel will be called
        B[:mid] =  example_kernel(context, A[:mid])

    with locals.device[half:] as context: #Set the current context to the second half device in the current execution context
        #Either a 1 or 2 GPU kernel will be called
        B[mid:] =  example_kernel(devices, streams, A[mid:])

    #Parla will handle synchronization as the streams are known by the outer execution context

Example 5: User Written Application (Parla Aware), Contexts with a Loop

Demonstrates writing a loop over the devices in the current execution context.

#Define a 2 or 4 GPU task
@spawn(placement=[gpu*2, gpu*4], in=[A, B])
def task(locals):
    #Assume A & B already distributed. 
    
    #single_gpu_kernel is a user written async CUDA kernel

    for device in locals.get_active_devices(): #Iterate over the devices in the current execution context
        with device: #Set the current CUDA device to the current device in the current execution context
                     #Sets the current stream to the stream associated with the current device in this task
            i = device.local_id
            idx = slice[i*block_size:(i+1)*block_size]
            B[idx] =  single_gpu_kernel(idx)

    #This runs all on the same 1 worker thread, so the loop body must be non-blocking to get concurrent execution.

    #Parla will handle synchronization as the streams are known by the outer execution context

Note we can also hide the with device with a generator that yields the device and sets the current CUDA device and stream.

#Define a 2 or 4 GPU task
@spawn(placement=[gpu*2, gpu*4], in=[A, B])
def task(locals):
    #Assume A & B already distributed.

    #single_gpu_kernel is a user written async CUDA kernel

    #This yields the device and sets the current CUDA device and stream before running the loop body in each iteration
    for device in locals.devices:
        i = device.local_id
        idx = slice[i*block_size:(i+1)*block_size]
        B[idx] =  single_gpu_kernel(idx)

    #Parla will handle synchronization as the streams are known by the execution context

Note that the loop body must be completely non-blocking to get concurrent execution.

Example 6: User Written Application (Parla Aware), Multi-threading.

Demonstrates writing a parallel loop over the devices in the current execution context. I don't know if this is a good idea, but it's possible. I'm not sure how to make this have better syntax without running AST transformations on the user code at runtime.

#Define a 2 or 4 GPU task
#multithreaded_launch is dummy shorthand for requiring nthreads=ngpus on the CPU device. 
@spawn(placement=[(cpu, gpu*2), (cpu, gpu*4)], in=[A, B], multithreaded_launch=True)
def task(locals):
    #Assume A & B already distributed. 
    
    #single_gpu_kernel is a user written async CUDA kernel
    num_devices = len(locals.get_active_gpu_devices())

    #This runs on multiple internal threads, so the loop body can be blocking without blocking the other gpus from starting.
    #.... well aside from the GIL
    #Launches a thread for each device with python.threading
    @locals.parallel()
    def loop_body(device, barrier):
        i = device.local_id
        idx = slice[i*block_size:(i+1)*block_size]
        B[idx] =  single_gpu_kernel(A[idx])

    #Parla will handle synchronization as the streams are known by the outer execution context
#Define a 4 GPU task
#multithreaded_launch is dummy shorthand for requiring nthreads=ngpus on the CPU device. 
@spawn(placement=[(cpu, gpu*4)], in=[A, B], multithreaded_launch=True)
def task(locals):
    #Assume A & B already distributed. 
    
    #two_gpu_kernel is a user written async CUDA kernel
    num_devices = len(locals.get_active_devices(gpu))
    half = num_devices // 2

    #This runs on multiple internal threads, so the loop body can be blocking wihout blocking the other threads.
    #.... well aside from the GIL
    #Launches a thread for each listed context with python.threading
    @locals.parallel([locals.devices[:half], locals.devices[half:]])
    def loop_body(context, barrier):
        devices = context.get_active_devices(gpu)
        i = context.local_id
        idx = slice[i*block_size:(i+1)*block_size]
        B[idx] =  two_gpu_kernel(devices, streams A[idx])

    #Parla will handle synchronization as the streams are known by the outer execution context

Example 7: Alternative User Written Application (Data Driven API)

Instead of being aware of the current context, the syntax could also be more driven by the data partitioning itself.

#Define a 2 or 4 GPU task
#multithreaded_launch is dummy shorthand for requiring nthreads=ngpus on the CPU device. 
@spawn(placement=[(cpu, gpu*2), (cpu, gpu*4)], in=[A, B], multithreaded_launch=True)
def task(locals):
    
    #get cupy devices
    devices = locals.get_active_devices(gpu)
    lA = repartition(A, devices) #dummy function to get the data on the right devices
    lB = repartition(B, devices) #NOT THE API I'm proposing. Just example user code.

    #Launch a library kernel on the real devices
    # Assume A,B already correctly distributed

    #This applies the kernel to each partition of the data on the device
    #Can be async sequential or done with parallel thread dispatch.
    distributed_map(single_gpu_kernel, lB, (lA,))

    #Parla will handle synchronization as the streams are known by the outer execution context

This also opens up the possiblility of other data drive primitives: distributed_reduce() etc. But this ties into what we want the data model itself to support.