Skip to content
This repository was archived by the owner on Sep 18, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions tests/test_multiple_processes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import asyncio
import random
import multiprocessing

import numpy as np
import pytest

import ucp


def listener(ports):
ucp.init()

async def _listener(ports):
async def write(ep):
close_msg = np.empty(1, dtype=np.int64)
msg2send = np.arange(10)
msg2recv = np.empty_like(msg2send)

msgs = [ep.recv(close_msg), ep.send(msg2send), ep.recv(msg2recv)]
await asyncio.gather(*msgs, loop=asyncio.get_event_loop())

if close_msg[0] != 0:
await ep.close()
listeners[close_msg[0]].close()

listeners = {}
for port in ports:
listeners[port] = ucp.create_listener(write, port=port)

try:
while not all(listener.closed() for listener in listeners.values()):
await asyncio.sleep(0.1)
except ucp.UCXCloseError:
pass

asyncio.get_event_loop().run_until_complete(_listener(ports))


def client(listener_ports):
ucp.init()

async def _client(listener_ports):
async def read(port, close):
close_msg = (
np.array(port, dtype=np.int64) if close else np.array(0, dtype=np.int64)
)
msg2send = np.arange(10)
msg2recv = np.empty_like(msg2send)

ep = await ucp.create_endpoint(ucp.get_address(), port)
msgs = [ep.send(close_msg), ep.send(msg2send), ep.recv(msg2recv)]
await asyncio.gather(*msgs, loop=asyncio.get_event_loop())

close_after = 100
clients = []
for i in range(close_after):
for port in listener_ports:
close = i == close_after - 1
clients.append(read(port, close=close))

await asyncio.gather(*clients, loop=asyncio.get_event_loop())

asyncio.get_event_loop().run_until_complete(_client(listener_ports))


@pytest.mark.parametrize("num_listeners", [1, 2, 4, 8])
def test_send_recv_cu(num_listeners):
ports = [random.randint(13000, 15500) for n in range(num_listeners)]

ctx = multiprocessing.get_context("spawn")
listener_process = ctx.Process(name="listener", target=listener, args=[ports])
client_process = ctx.Process(name="client", target=client, args=[ports])

listener_process.start()
client_process.start()

listener_process.join()
client_process.join()

assert listener_process.exitcode == 0
assert client_process.exitcode == 0
23 changes: 6 additions & 17 deletions tests/test_multiple_nodes.py → tests/test_single_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,22 @@ async def client_node(port):


@pytest.mark.asyncio
async def test_multiple_nodes():
lf1 = ucp.create_listener(server_node)
lf2 = ucp.create_listener(server_node)
assert lf1.port != lf2.port

nodes = []
for _ in range(10):
nodes.append(client_node(lf1.port))
nodes.append(client_node(lf2.port))
await asyncio.gather(*nodes, loop=asyncio.get_event_loop())


@pytest.mark.asyncio
async def test_one_server_many_clients():
async def test_one_listener_many_clients():
lf = ucp.create_listener(server_node)
clients = []
for _ in range(100):
for _ in range(50):
clients.append(client_node(lf.port))
await asyncio.gather(*clients, loop=asyncio.get_event_loop())


@pytest.mark.asyncio
async def test_two_servers_many_clients():
async def test_two_listeners_many_clients():
lf1 = ucp.create_listener(server_node)
lf2 = ucp.create_listener(server_node)
assert lf1.port != lf2.port

clients = []
for _ in range(100):
for _ in range(25):
clients.append(client_node(lf1.port))
clients.append(client_node(lf2.port))
await asyncio.gather(*clients, loop=asyncio.get_event_loop())