Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions .azure-pipelines/multi-nodes-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,37 @@ jobs:
remoteScript: |
bash /root/mscclpp/test/deploy/run_tests.sh mscclpp-test

- template: templates/run-remote-task.yml
parameters:
name: RunMultiNodeUnitTest
displayName: Run multi-nodes unit tests
runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser'
remoteScript: |
bash /root/mscclpp/test/deploy/run_tests.sh mp-ut
# - template: templates/run-remote-task.yml
# parameters:
# name: RunMultiNodeUnitTest
# displayName: Run multi-nodes unit tests
# runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser'
# remoteScript: |
# bash /root/mscclpp/test/deploy/run_tests.sh mp-ut

- template: templates/run-remote-task.yml
parameters:
name: RunMultiNodePythonTests
displayName: Run multi-nodes python tests
runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser'
remoteScript: |
bash /root/mscclpp/test/deploy/run_tests.sh pytests
# - template: templates/run-remote-task.yml
# parameters:
# name: RunMultiNodePythonTests
# displayName: Run multi-nodes python tests
# runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser'
# remoteScript: |
# bash /root/mscclpp/test/deploy/run_tests.sh pytests

# - template: templates/run-remote-task.yml
# parameters:
# name: RunMultiNodePythonBenchmark
# displayName: Run multi-nodes python benchmark
# runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser'
# remoteScript: |
# bash /root/mscclpp/test/deploy/run_tests.sh py-benchmark

- template: templates/run-remote-task.yml
parameters:
name: RunMultiNodePythonBenchmark
displayName: Run multi-nodes python benchmark
name: RunMultiNodeExecutorTests
displayName: Run multi-nodes executor tests
runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser'
remoteScript: |
bash /root/mscclpp/test/deploy/run_tests.sh py-benchmark
bash /root/mscclpp/test/deploy/run_tests.sh executor-tests

- template: templates/stop.yml
parameters:
Expand Down
23 changes: 22 additions & 1 deletion test/deploy/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,25 @@ function run_py_benchmark()
-x MSCCLPP_HOME=/root/mscclpp -npernode 8 python3 /root/mscclpp/python/mscclpp_benchmark/allreduce_bench.py
}

function run_executor_tests()
{
echo "==================Run multi-node executor tests======================"
ALGOS_DIR=/root/mscclpp/test/executor-tests/algos
PLANS_DIR=/root/mscclpp/test/executor-tests/execution-plans
TEST_SCRIPT=/root/mscclpp/python/test/executor_test.py
PYTHON_BIN=/root/venv/bin/python3

echo "Generating execution plans"
${PYTHON_BIN} ${ALGOS_DIR}/multi_node_transfer.py --name multi_node_transfer > ${PLANS_DIR}/multi_node_transfer.json
${PYTHON_BIN} ${ALGOS_DIR}/multi_node_transfer_pkt.py --name multi_node_transfer_pkt > ${PLANS_DIR}/multi_node_transfer_pkt.json

echo "Running multi-node transfer test with in-place buffers"
mpirun ${MPI_ARGS} -np 2 -npernode 1 ${MSCCLPP_ENV} ${PYTHON_BIN} $TEST_SCRIPT -path $PLANS_DIR/multi_node_transfer.json --size 1M --in_place
mpirun ${MPI_ARGS} -np 2 -npernode 1 ${MSCCLPP_ENV} ${PYTHON_BIN} $TEST_SCRIPT -path $PLANS_DIR/multi_node_transfer_pkt.json --size 1M --in_place
}

if [ $# -lt 1 ]; then
echo "Usage: $0 <mscclpp-test/mp-ut/run_pytests/run_py_benchmark>"
echo "Usage: $0 <mscclpp-test/mp-ut/run_pytests/run_py_benchmark/executor-tests>"
exit 1
fi
test_name=$1
Expand All @@ -118,6 +135,10 @@ case $test_name in
echo "==================Run python benchmark================================"
run_py_benchmark
;;
executor-tests)
echo "==================Run executor tests================================="
run_executor_tests
;;
*)
echo "Unknown test name: $test_name"
exit 1
Expand Down
85 changes: 85 additions & 0 deletions test/executor-tests/algos/multi_node_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""
Multi-Node Transfer Test

This file tests the SIGNAL, WAIT, PUT, PUT_WITH_SIGNAL and
PUT_WITH_SIGNAL_AND_FLUSH operations on PortChannels in a multi-node
environment. It implements a 2-GPU allgather using the Simple protocol,
exercising the different port-channel synchronization primitives.
"""

import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *


def multi_node_transfer(name, num_threads_per_block, min_message_size, max_message_size):
chunksperloop = 2
gpu_size = 2
collective = AllGather(gpu_size, chunksperloop, True)
with CollectiveProgram(
name,
collective,
gpu_size,
protocol="Simple",
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=False,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
# Setup ranks, channels, output and scratch buffers for 2-GPU allgather
first_rank = Rank(0)
second_rank = Rank(1)
first_ch1 = PortChannel(1, 0)
second_ch1 = PortChannel(0, 1)
first_ch2 = PortChannel(1, 0)
second_ch2 = PortChannel(0, 1)
first_output_buffer = first_rank.get_output_buffer()
second_output_buffer = second_rank.get_output_buffer()

# Initial handshake on both port channels: peers exchange SIGNAL/WAIT to
# ensure remote buffers are ready before any data transfer begins.
first_ch1.signal(tb=0)
second_ch1.signal(tb=0)
first_ch1.wait(tb=0)
second_ch1.wait(tb=0)
first_ch2.signal(tb=1)
second_ch2.signal(tb=1)
first_ch2.wait(tb=1)
second_ch2.wait(tb=1)

# Rank 0 -> rank 1 via ch1: PUT followed by an explicit SIGNAL and FLUSH
first_ch1.put(second_output_buffer[0:1], first_output_buffer[0:1], tb=0)
first_ch1.signal(tb=0)
first_ch1.flush(tb=0)
# Rank 0 -> rank 1 via ch2: PUT_WITH_SIGNAL fuses the data transfer with
# the completion signal, followed by a separate FLUSH
first_ch2.put_with_signal(second_output_buffer[1:2], first_output_buffer[1:2], tb=1)
first_ch2.flush(tb=1)
# Rank 1 -> rank 0 via ch1: PUT_WITH_SIGNAL_AND_FLUSH fuses PUT, SIGNAL
# and FLUSH into a single operation
second_ch1.put_with_signal_and_flush(first_output_buffer[2:4], second_output_buffer[2:4], tb=0)

# Final WAITs ensure all incoming transfers have completed on each rank
first_ch1.wait(tb=0)
second_ch1.wait(tb=0)
second_ch2.wait(tb=1)

print(JSON())


parser = argparse.ArgumentParser()

parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")

args = parser.parse_args()

multi_node_transfer(args.name, args.num_threads_per_block, args.min_message_size, args.max_message_size)
70 changes: 70 additions & 0 deletions test/executor-tests/algos/multi_node_transfer_pkt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""
Multi-Node Transfer Pack Test

This file tests the PUT_PACKETS and READ_PUT_PACKETS operations on
PortChannels in a multi-node environment. It implements a 2-GPU allgather
with the LL (low-latency) packet protocol, using port channels for inter-node
communication.
"""

import argparse
from mscclpp.language.channel import *
from mscclpp.language.rank import *
from mscclpp.language.general import *
from mscclpp.language.program import *
from mscclpp.language.collectives import *


def multi_node_transfer_pkt(name, num_threads_per_block, min_message_size, max_message_size):
chunksperloop = 1
gpu_size = 2
collective = AllGather(gpu_size, chunksperloop, True)
with CollectiveProgram(
name,
collective,
gpu_size,
protocol="LL",
num_threads_per_block=num_threads_per_block,
use_double_scratch_buffer=True,
min_message_size=min_message_size,
max_message_size=max_message_size,
):
# Setup ranks, channels, output and scratch buffers for 2-GPU allgather
first_rank = Rank(0)
second_rank = Rank(1)
first_ch = PortChannel(1, 0)
second_ch = PortChannel(0, 1)
first_output_buffer = first_rank.get_output_buffer()
second_output_buffer = second_rank.get_output_buffer()
first_scratch_buffer = Buffer(0, 2)
second_scratch_buffer = Buffer(1, 2)

# Each rank stages its own output chunk into its local scratch buffer as packets
first_rank.copy_packets(first_scratch_buffer[0:1], first_output_buffer[0:1], tb=0)
second_rank.copy_packets(second_scratch_buffer[1:2], second_output_buffer[1:2], tb=0)

# Rank 0 pushes packets to rank 1's scratch via port channel (PUT_PACKETS)
# Rank 1 reads from rank 0's scratch and pushes packets back via port channel (READ_PUT_PACKETS)
first_ch.put_packets(second_scratch_buffer[0:1], first_scratch_buffer[0:1], tb=0)
second_ch.read_put_packets(first_scratch_buffer[1:2], second_scratch_buffer[1:2], tb=1)

# Both ranks unpack received packets from scratch into their output buffers
first_rank.unpack_packets(first_output_buffer[1:2], first_scratch_buffer[1:2], tb=1)
second_rank.unpack_packets(second_output_buffer[0:1], second_scratch_buffer[0:1], tb=2)

print(JSON())


parser = argparse.ArgumentParser()

parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")

args = parser.parse_args()

multi_node_transfer_pkt(args.name, args.num_threads_per_block, args.min_message_size, args.max_message_size)
Loading
Loading